source_node.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. package nodes
  2. import (
  3. "github.com/emqx/kuiper/common"
  4. "github.com/emqx/kuiper/plugins"
  5. "github.com/emqx/kuiper/xsql"
  6. "github.com/emqx/kuiper/xstream/api"
  7. "github.com/emqx/kuiper/xstream/extensions"
  8. "github.com/go-yaml/yaml"
  9. "sync"
  10. )
  11. type SourceNode struct {
  12. *defaultNode
  13. sourceType string
  14. options map[string]string
  15. isMock bool
  16. mutex sync.RWMutex
  17. sources []api.Source
  18. }
  19. func NewSourceNode(name string, options map[string]string) *SourceNode {
  20. t, ok := options["TYPE"]
  21. if !ok {
  22. t = "mqtt"
  23. }
  24. return &SourceNode{
  25. sourceType: t,
  26. defaultNode: &defaultNode{
  27. name: name,
  28. outputs: make(map[string]chan<- interface{}),
  29. concurrency: 1,
  30. },
  31. options: options,
  32. }
  33. }
  34. //Only for mock source, do not use it in production
  35. func NewSourceNodeWithSource(name string, source api.Source, options map[string]string) *SourceNode {
  36. return &SourceNode{
  37. sources: []api.Source{source},
  38. defaultNode: &defaultNode{
  39. name: name,
  40. outputs: make(map[string]chan<- interface{}),
  41. concurrency: 1,
  42. },
  43. options: options,
  44. isMock: true,
  45. }
  46. }
  47. func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
  48. m.ctx = ctx
  49. logger := ctx.GetLogger()
  50. logger.Infof("open source node %s with option %v", m.name, m.options)
  51. go func() {
  52. props := m.getConf(ctx)
  53. if c, ok := props["concurrency"]; ok {
  54. if t, err := common.ToInt(c); err != nil || t <= 0 {
  55. logger.Warnf("invalid type for concurrency property, should be positive integer but found %t", c)
  56. } else {
  57. m.concurrency = t
  58. }
  59. }
  60. bl := 102400
  61. if c, ok := props["bufferLength"]; ok {
  62. if t, err := common.ToInt(c); err != nil || t <= 0 {
  63. logger.Warnf("invalid type for bufferLength property, should be positive integer but found %t", c)
  64. } else {
  65. bl = t
  66. }
  67. }
  68. m.reset()
  69. logger.Infof("open source node %d instances", m.concurrency)
  70. for i := 0; i < m.concurrency; i++ { // workers
  71. go func(instance int) {
  72. //Do open source instances
  73. var source api.Source
  74. var err error
  75. if !m.isMock {
  76. source, err = getSource(m.sourceType)
  77. if err != nil {
  78. m.drainError(errCh, err, ctx, logger)
  79. return
  80. }
  81. err = source.Configure(m.options["DATASOURCE"], props)
  82. if err != nil {
  83. m.drainError(errCh, err, ctx, logger)
  84. return
  85. }
  86. m.mutex.Lock()
  87. m.sources = append(m.sources, source)
  88. m.mutex.Unlock()
  89. } else {
  90. logger.Debugf("get source instance %d from %d sources", instance, len(m.sources))
  91. source = m.sources[instance]
  92. }
  93. stats, err := NewStatManager("source", ctx)
  94. if err != nil {
  95. m.drainError(errCh, err, ctx, logger)
  96. return
  97. }
  98. m.mutex.Lock()
  99. m.statManagers = append(m.statManagers, stats)
  100. m.mutex.Unlock()
  101. buffer := NewDynamicChannelBuffer()
  102. buffer.SetLimit(bl)
  103. sourceErrCh := make(chan error)
  104. go source.Open(ctx.WithInstance(instance), buffer.In, sourceErrCh)
  105. logger.Infof("Start source %s instance %d successfully", m.name, instance)
  106. for {
  107. select {
  108. case <-ctx.Done():
  109. logger.Infof("source %s done", m.name)
  110. m.close(ctx, logger)
  111. return
  112. case err := <-sourceErrCh:
  113. m.drainError(errCh, err, ctx, logger)
  114. return
  115. case data := <-buffer.Out:
  116. stats.IncTotalRecordsIn()
  117. stats.ProcessTimeStart()
  118. tuple := &xsql.Tuple{Emitter: m.name, Message: data.Message(), Timestamp: common.GetNowInMilli(), Metadata: data.Meta()}
  119. stats.ProcessTimeEnd()
  120. //blocking
  121. m.Broadcast(tuple)
  122. stats.IncTotalRecordsOut()
  123. stats.SetBufferLength(int64(buffer.GetLength()))
  124. logger.Debugf("%s consume data %v complete", m.name, tuple)
  125. }
  126. }
  127. }(i)
  128. }
  129. }()
  130. }
  131. func (m *SourceNode) reset() {
  132. if !m.isMock {
  133. m.sources = nil
  134. }
  135. m.statManagers = nil
  136. }
  137. func doGetSource(t string) (api.Source, error) {
  138. var (
  139. s api.Source
  140. err error
  141. )
  142. switch t {
  143. case "mqtt":
  144. s = &extensions.MQTTSource{}
  145. case "httppull":
  146. s = &extensions.HTTPPullSource{}
  147. default:
  148. s, err = plugins.GetSource(t)
  149. if err != nil {
  150. return nil, err
  151. }
  152. }
  153. return s, nil
  154. }
  155. func (m *SourceNode) drainError(errCh chan<- error, err error, ctx api.StreamContext, logger api.Logger) {
  156. select {
  157. case errCh <- err:
  158. case <-ctx.Done():
  159. m.close(ctx, logger)
  160. }
  161. return
  162. }
  163. func (m *SourceNode) close(ctx api.StreamContext, logger api.Logger) {
  164. for _, s := range m.sources {
  165. if err := s.Close(ctx); err != nil {
  166. logger.Warnf("close source fails: %v", err)
  167. }
  168. }
  169. }
  170. func (m *SourceNode) getConf(ctx api.StreamContext) map[string]interface{} {
  171. confkey := m.options["CONF_KEY"]
  172. logger := ctx.GetLogger()
  173. confPath := "sources/" + m.sourceType + ".yaml"
  174. if m.sourceType == "mqtt" {
  175. confPath = "mqtt_source.yaml"
  176. }
  177. conf, err := common.LoadConf(confPath)
  178. props := make(map[string]interface{})
  179. if err == nil {
  180. cfg := make(map[interface{}]interface{})
  181. if err := yaml.Unmarshal(conf, &cfg); err != nil {
  182. logger.Warnf("fail to parse yaml for source %s. Return an empty configuration", m.sourceType)
  183. } else {
  184. def, ok := cfg["default"]
  185. if !ok {
  186. logger.Warnf("default conf is not found", confkey)
  187. } else {
  188. if def1, ok1 := def.(map[interface{}]interface{}); ok1 {
  189. props = common.ConvertMap(def1)
  190. }
  191. if c, ok := cfg[confkey]; ok {
  192. if c1, ok := c.(map[interface{}]interface{}); ok {
  193. c2 := common.ConvertMap(c1)
  194. for k, v := range c2 {
  195. props[k] = v
  196. }
  197. }
  198. }
  199. }
  200. }
  201. } else {
  202. logger.Warnf("config file %s.yaml is not loaded properly. Return an empty configuration", m.sourceType)
  203. }
  204. logger.Debugf("get conf for %s with conf key %s: %v", m.sourceType, confkey, props)
  205. return props
  206. }