source_node.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. package nodes
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/plugins"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/emqx/kuiper/xstream/api"
  8. "github.com/emqx/kuiper/xstream/extensions"
  9. "github.com/go-yaml/yaml"
  10. "sync"
  11. )
  12. type SourceNode struct {
  13. sourceType string
  14. outs map[string]chan<- interface{}
  15. name string
  16. ctx api.StreamContext
  17. options map[string]string
  18. concurrency int
  19. isMock bool
  20. mutex sync.RWMutex
  21. sources []api.Source
  22. statManagers []StatManager
  23. }
  24. func NewSourceNode(name string, options map[string]string) *SourceNode {
  25. t, ok := options["TYPE"]
  26. if !ok {
  27. t = "mqtt"
  28. }
  29. return &SourceNode{
  30. sourceType: t,
  31. outs: make(map[string]chan<- interface{}),
  32. name: name,
  33. options: options,
  34. ctx: nil,
  35. concurrency: 1,
  36. }
  37. }
  38. //Only for mock source, do not use it in production
  39. func NewSourceNodeWithSource(name string, source api.Source, options map[string]string) *SourceNode {
  40. return &SourceNode{
  41. sources: []api.Source{source},
  42. outs: make(map[string]chan<- interface{}),
  43. name: name,
  44. options: options,
  45. ctx: nil,
  46. concurrency: 1,
  47. isMock: true,
  48. }
  49. }
  50. func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
  51. m.ctx = ctx
  52. logger := ctx.GetLogger()
  53. logger.Infof("open source node %s with option %v", m.name, m.options)
  54. go func() {
  55. props := m.getConf(ctx)
  56. if c, ok := props["concurrency"]; ok {
  57. if t, err := common.ToInt(c); err != nil || t <= 0 {
  58. logger.Warnf("invalid type for concurrency property, should be positive integer but found %t", c)
  59. } else {
  60. m.concurrency = t
  61. }
  62. }
  63. bl := 102400
  64. if c, ok := props["bufferLength"]; ok {
  65. if t, err := common.ToInt(c); err != nil || t <= 0 {
  66. logger.Warnf("invalid type for bufferLength property, should be positive integer but found %t", c)
  67. } else {
  68. bl = t
  69. }
  70. }
  71. m.reset()
  72. logger.Infof("open source node %d instances", m.concurrency)
  73. for i := 0; i < m.concurrency; i++ { // workers
  74. go func(instance int) {
  75. //Do open source instances
  76. var source api.Source
  77. var err error
  78. if !m.isMock {
  79. source, err = getSource(m.sourceType)
  80. if err != nil {
  81. m.drainError(errCh, err, ctx, logger)
  82. return
  83. }
  84. err = source.Configure(m.options["DATASOURCE"], props)
  85. if err != nil {
  86. m.drainError(errCh, err, ctx, logger)
  87. return
  88. }
  89. m.mutex.Lock()
  90. m.sources = append(m.sources, source)
  91. m.mutex.Unlock()
  92. } else {
  93. logger.Debugf("get source instance %d from %d sources", instance, len(m.sources))
  94. source = m.sources[instance]
  95. }
  96. stats, err := NewStatManager("source", ctx)
  97. if err != nil {
  98. m.drainError(errCh, err, ctx, logger)
  99. return
  100. }
  101. m.mutex.Lock()
  102. m.statManagers = append(m.statManagers, stats)
  103. m.mutex.Unlock()
  104. buffer := NewDynamicChannelBuffer()
  105. buffer.SetLimit(bl)
  106. sourceErrCh := make(chan error)
  107. go source.Open(ctx.WithInstance(instance), buffer.In, sourceErrCh)
  108. logger.Infof("Start source %s instance %d successfully", m.name, instance)
  109. for {
  110. select {
  111. case <-ctx.Done():
  112. logger.Infof("source %s done", m.name)
  113. m.close(ctx, logger)
  114. return
  115. case err := <-sourceErrCh:
  116. m.drainError(errCh, err, ctx, logger)
  117. return
  118. case data := <-buffer.Out:
  119. stats.IncTotalRecordsIn()
  120. stats.ProcessTimeStart()
  121. tuple := &xsql.Tuple{Emitter: m.name, Message: data.Message(), Timestamp: common.GetNowInMilli(), Metadata: data.Meta()}
  122. stats.ProcessTimeEnd()
  123. //blocking
  124. Broadcast(m.outs, tuple, ctx)
  125. stats.IncTotalRecordsOut()
  126. stats.SetBufferLength(int64(buffer.GetLength()))
  127. logger.Debugf("%s consume data %v complete", m.name, tuple)
  128. }
  129. }
  130. }(i)
  131. }
  132. }()
  133. }
  134. func (m *SourceNode) reset() {
  135. if !m.isMock {
  136. m.sources = nil
  137. }
  138. m.statManagers = nil
  139. }
  140. func doGetSource(t string) (api.Source, error) {
  141. var (
  142. s api.Source
  143. err error
  144. )
  145. switch t {
  146. case "mqtt":
  147. s = &extensions.MQTTSource{}
  148. default:
  149. s, err = plugins.GetSource(t)
  150. if err != nil {
  151. return nil, err
  152. }
  153. }
  154. return s, nil
  155. }
  156. func (m *SourceNode) drainError(errCh chan<- error, err error, ctx api.StreamContext, logger api.Logger) {
  157. select {
  158. case errCh <- err:
  159. case <-ctx.Done():
  160. m.close(ctx, logger)
  161. }
  162. return
  163. }
  164. func (m *SourceNode) close(ctx api.StreamContext, logger api.Logger) {
  165. for _, s := range m.sources {
  166. if err := s.Close(ctx); err != nil {
  167. logger.Warnf("close source fails: %v", err)
  168. }
  169. }
  170. }
  171. func (m *SourceNode) getConf(ctx api.StreamContext) map[string]interface{} {
  172. confkey := m.options["CONF_KEY"]
  173. logger := ctx.GetLogger()
  174. confPath := "sources/" + m.sourceType + ".yaml"
  175. if m.sourceType == "mqtt" {
  176. confPath = "mqtt_source.yaml"
  177. }
  178. conf, err := common.LoadConf(confPath)
  179. props := make(map[string]interface{})
  180. if err == nil {
  181. cfg := make(map[string]map[string]interface{})
  182. if err := yaml.Unmarshal(conf, &cfg); err != nil {
  183. logger.Warnf("fail to parse yaml for source %s. Return an empty configuration", m.sourceType)
  184. } else {
  185. var ok bool
  186. props, ok = cfg["default"]
  187. if !ok {
  188. logger.Warnf("default conf is not found", confkey)
  189. }
  190. if c, ok := cfg[confkey]; ok {
  191. for k, v := range c {
  192. props[k] = v
  193. }
  194. }
  195. }
  196. } else {
  197. logger.Warnf("config file %s.yaml is not loaded properly. Return an empty configuration", m.sourceType)
  198. }
  199. logger.Debugf("get conf for %s with conf key %s: %v", m.sourceType, confkey, props)
  200. return props
  201. }
  202. func (m *SourceNode) GetName() string {
  203. return m.name
  204. }
  205. func (m *SourceNode) AddOutput(output chan<- interface{}, name string) (err error) {
  206. if _, ok := m.outs[name]; !ok {
  207. m.outs[name] = output
  208. } else {
  209. return fmt.Errorf("fail to add output %s, stream node %s already has an output of the same name", name, m.name)
  210. }
  211. return nil
  212. }
  213. func (m *SourceNode) GetMetrics() (result [][]interface{}) {
  214. for _, stats := range m.statManagers {
  215. result = append(result, stats.GetMetrics())
  216. }
  217. return result
  218. }