source_node.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. package nodes
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/plugins"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/emqx/kuiper/xstream/api"
  8. "github.com/emqx/kuiper/xstream/extensions"
  9. "github.com/go-yaml/yaml"
  10. "sync"
  11. )
  12. type SourceNode struct {
  13. sourceType string
  14. outs map[string]chan<- interface{}
  15. name string
  16. ctx api.StreamContext
  17. options map[string]string
  18. concurrency int
  19. isMock bool
  20. mutex sync.RWMutex
  21. sources []api.Source
  22. statManagers []StatManager
  23. }
  24. func NewSourceNode(name string, options map[string]string) *SourceNode {
  25. t, ok := options["TYPE"]
  26. if !ok {
  27. t = "mqtt"
  28. }
  29. return &SourceNode{
  30. sourceType: t,
  31. outs: make(map[string]chan<- interface{}),
  32. name: name,
  33. options: options,
  34. ctx: nil,
  35. concurrency: 1,
  36. }
  37. }
  38. //Only for mock source, do not use it in production
  39. func NewSourceNodeWithSource(name string, source api.Source, options map[string]string) *SourceNode {
  40. return &SourceNode{
  41. sources: []api.Source{source},
  42. outs: make(map[string]chan<- interface{}),
  43. name: name,
  44. options: options,
  45. ctx: nil,
  46. concurrency: 1,
  47. isMock: true,
  48. }
  49. }
  50. func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
  51. m.ctx = ctx
  52. logger := ctx.GetLogger()
  53. logger.Infof("open source node %s with option %v", m.name, m.options)
  54. go func() {
  55. props := m.getConf(ctx)
  56. if c, ok := props["concurrency"]; ok {
  57. if t, err := common.ToInt(c); err != nil || t <= 0 {
  58. logger.Warnf("invalid type for concurrency property, should be positive integer but found %t", c)
  59. } else {
  60. m.concurrency = t
  61. }
  62. }
  63. bl := 102400
  64. if c, ok := props["bufferLength"]; ok {
  65. if t, err := common.ToInt(c); err != nil || t <= 0 {
  66. logger.Warnf("invalid type for bufferLength property, should be positive integer but found %t", c)
  67. } else {
  68. bl = t
  69. }
  70. }
  71. m.reset()
  72. logger.Infof("open source node %d instances", m.concurrency)
  73. for i := 0; i < m.concurrency; i++ { // workers
  74. go func(instance int) {
  75. //Do open source instances
  76. var source api.Source
  77. var err error
  78. if !m.isMock {
  79. source, err = getSource(m.sourceType)
  80. if err != nil {
  81. m.drainError(errCh, err, ctx, logger)
  82. return
  83. }
  84. err = source.Configure(m.options["DATASOURCE"], props)
  85. if err != nil {
  86. m.drainError(errCh, err, ctx, logger)
  87. return
  88. }
  89. m.mutex.Lock()
  90. m.sources = append(m.sources, source)
  91. m.mutex.Unlock()
  92. } else {
  93. logger.Debugf("get source instance %d from %d sources", instance, len(m.sources))
  94. source = m.sources[instance]
  95. }
  96. stats, err := NewStatManager("source", ctx)
  97. if err != nil {
  98. m.drainError(errCh, err, ctx, logger)
  99. return
  100. }
  101. m.mutex.Lock()
  102. m.statManagers = append(m.statManagers, stats)
  103. m.mutex.Unlock()
  104. buffer := NewDynamicChannelBuffer()
  105. buffer.SetLimit(bl)
  106. sourceErrCh := make(chan error)
  107. go source.Open(ctx.WithInstance(instance), buffer.In, sourceErrCh)
  108. logger.Infof("Start source %s instance %d successfully", m.name, instance)
  109. for {
  110. select {
  111. case <-ctx.Done():
  112. logger.Infof("source %s done", m.name)
  113. m.close(ctx, logger)
  114. return
  115. case err := <-sourceErrCh:
  116. m.drainError(errCh, err, ctx, logger)
  117. return
  118. case data := <-buffer.Out:
  119. stats.IncTotalRecordsIn()
  120. stats.ProcessTimeStart()
  121. tuple := &xsql.Tuple{Emitter: m.name, Message: data.Message(), Timestamp: common.GetNowInMilli(), Metadata: data.Meta()}
  122. stats.ProcessTimeEnd()
  123. //blocking
  124. Broadcast(m.outs, tuple, ctx)
  125. stats.IncTotalRecordsOut()
  126. stats.SetBufferLength(int64(buffer.GetLength()))
  127. logger.Debugf("%s consume data %v complete", m.name, tuple)
  128. }
  129. }
  130. }(i)
  131. }
  132. }()
  133. }
  134. func (m *SourceNode) reset() {
  135. if !m.isMock {
  136. m.sources = nil
  137. }
  138. m.statManagers = nil
  139. }
  140. func doGetSource(t string) (api.Source, error) {
  141. var s api.Source
  142. var ok bool
  143. switch t {
  144. case "mqtt":
  145. s = &extensions.MQTTSource{}
  146. default:
  147. nf, err := plugins.GetPlugin(t, plugins.SOURCE)
  148. if err != nil {
  149. return nil, err
  150. }
  151. s, ok = nf.(api.Source)
  152. if !ok {
  153. return nil, fmt.Errorf("exported symbol %s is not type of api.Source", t)
  154. }
  155. }
  156. return s, nil
  157. }
  158. func (m *SourceNode) drainError(errCh chan<- error, err error, ctx api.StreamContext, logger api.Logger) {
  159. select {
  160. case errCh <- err:
  161. case <-ctx.Done():
  162. m.close(ctx, logger)
  163. }
  164. return
  165. }
  166. func (m *SourceNode) close(ctx api.StreamContext, logger api.Logger) {
  167. for _, s := range m.sources {
  168. if err := s.Close(ctx); err != nil {
  169. logger.Warnf("close source fails: %v", err)
  170. }
  171. }
  172. }
  173. func (m *SourceNode) getConf(ctx api.StreamContext) map[string]interface{} {
  174. confkey := m.options["CONF_KEY"]
  175. logger := ctx.GetLogger()
  176. confPath := "sources/" + m.sourceType + ".yaml"
  177. if m.sourceType == "mqtt" {
  178. confPath = "mqtt_source.yaml"
  179. }
  180. conf, err := common.LoadConf(confPath)
  181. props := make(map[string]interface{})
  182. if err == nil {
  183. cfg := make(map[string]map[string]interface{})
  184. if err := yaml.Unmarshal(conf, &cfg); err != nil {
  185. logger.Warnf("fail to parse yaml for source %s. Return an empty configuration", m.sourceType)
  186. } else {
  187. var ok bool
  188. props, ok = cfg["default"]
  189. if !ok {
  190. logger.Warnf("default conf is not found", confkey)
  191. }
  192. if c, ok := cfg[confkey]; ok {
  193. for k, v := range c {
  194. props[k] = v
  195. }
  196. }
  197. }
  198. } else {
  199. logger.Warnf("config file %s.yaml is not loaded properly. Return an empty configuration", m.sourceType)
  200. }
  201. logger.Debugf("get conf for %s with conf key %s: %v", m.sourceType, confkey, props)
  202. return props
  203. }
  204. func (m *SourceNode) GetName() string {
  205. return m.name
  206. }
  207. func (m *SourceNode) AddOutput(output chan<- interface{}, name string) (err error) {
  208. if _, ok := m.outs[name]; !ok {
  209. m.outs[name] = output
  210. } else {
  211. return fmt.Errorf("fail to add output %s, stream node %s already has an output of the same name", name, m.name)
  212. }
  213. return nil
  214. }
  215. func (m *SourceNode) GetMetrics() (result [][]interface{}) {
  216. for _, stats := range m.statManagers {
  217. result = append(result, stats.GetMetrics())
  218. }
  219. return result
  220. }