source_node.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. package nodes
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/common/plugin_manager"
  6. "github.com/emqx/kuiper/common/utils"
  7. "github.com/emqx/kuiper/xsql"
  8. "github.com/emqx/kuiper/xstream/api"
  9. "github.com/emqx/kuiper/xstream/extensions"
  10. "github.com/go-yaml/yaml"
  11. "sync"
  12. )
  13. type SourceNode struct {
  14. sourceType string
  15. outs map[string]chan<- interface{}
  16. name string
  17. ctx api.StreamContext
  18. options map[string]string
  19. concurrency int
  20. mutex sync.RWMutex
  21. sources []api.Source
  22. statManagers []StatManager
  23. buffer *utils.DynamicChannelBuffer
  24. }
  25. func NewSourceNode(name string, options map[string]string) *SourceNode {
  26. t, ok := options["TYPE"]
  27. if !ok {
  28. t = "mqtt"
  29. }
  30. return &SourceNode{
  31. sourceType: t,
  32. outs: make(map[string]chan<- interface{}),
  33. name: name,
  34. options: options,
  35. ctx: nil,
  36. concurrency: 1,
  37. buffer: utils.NewDynamicChannelBuffer(),
  38. }
  39. }
  40. //Only for mock source, do not use it in production
  41. func NewSourceNodeWithSource(name string, source api.Source, options map[string]string) *SourceNode {
  42. return &SourceNode{
  43. sources: []api.Source{source},
  44. outs: make(map[string]chan<- interface{}),
  45. name: name,
  46. options: options,
  47. ctx: nil,
  48. concurrency: 1,
  49. buffer: utils.NewDynamicChannelBuffer(),
  50. }
  51. }
  52. func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
  53. m.ctx = ctx
  54. logger := ctx.GetLogger()
  55. logger.Infof("open source node %s with option %v", m.name, m.options)
  56. go func() {
  57. props := m.getConf(ctx)
  58. if c, ok := props["concurrency"]; ok {
  59. if t, err := common.ToInt(c); err != nil || t <= 0 {
  60. logger.Warnf("invalid type for concurrency property, should be positive integer but found %t", c)
  61. } else {
  62. m.concurrency = t
  63. }
  64. }
  65. if c, ok := props["bufferLength"]; ok {
  66. if t, err := common.ToInt(c); err != nil || t <= 0 {
  67. logger.Warnf("invalid type for bufferLength property, should be positive integer but found %t", c)
  68. } else {
  69. m.buffer.SetLimit(t)
  70. }
  71. }
  72. createSource := len(m.sources) == 0
  73. logger.Infof("open source node %d instances", m.concurrency)
  74. for i := 0; i < m.concurrency; i++ { // workers
  75. go func(instance int) {
  76. //Do open source instances
  77. var source api.Source
  78. var err error
  79. if createSource {
  80. source, err = getSource(m.sourceType)
  81. if err != nil {
  82. m.drainError(errCh, err, ctx, logger)
  83. return
  84. }
  85. err = source.Configure(m.options["DATASOURCE"], props)
  86. if err != nil {
  87. m.drainError(errCh, err, ctx, logger)
  88. return
  89. }
  90. m.mutex.Lock()
  91. m.sources = append(m.sources, source)
  92. m.mutex.Unlock()
  93. } else {
  94. logger.Infof("use instance %d of %d sources", instance, len(m.sources))
  95. source = m.sources[instance]
  96. }
  97. stats, err := NewStatManager("source", ctx)
  98. if err != nil {
  99. m.drainError(errCh, err, ctx, logger)
  100. return
  101. }
  102. m.mutex.Lock()
  103. m.statManagers = append(m.statManagers, stats)
  104. m.mutex.Unlock()
  105. if err := source.Open(ctx.WithInstance(instance), func(message map[string]interface{}, meta map[string]interface{}) {
  106. stats.IncTotalRecordsIn()
  107. stats.ProcessTimeStart()
  108. tuple := &xsql.Tuple{Emitter: m.name, Message: message, Timestamp: common.GetNowInMilli(), Metadata: meta}
  109. stats.ProcessTimeEnd()
  110. m.Broadcast(tuple)
  111. stats.IncTotalRecordsOut()
  112. stats.SetBufferLength(int64(m.getBufferLength()))
  113. logger.Debugf("%s consume data %v complete", m.name, tuple)
  114. }); err != nil {
  115. m.drainError(errCh, err, ctx, logger)
  116. return
  117. }
  118. logger.Infof("Start source %s instance %d successfully", m.name, instance)
  119. }(i)
  120. }
  121. for {
  122. select {
  123. case <-ctx.Done():
  124. logger.Infof("source %s done", m.name)
  125. m.close(ctx, logger)
  126. return
  127. case data := <-m.buffer.Out:
  128. //blocking
  129. Broadcast(m.outs, data, ctx)
  130. }
  131. }
  132. }()
  133. }
  134. func getSource(t string) (api.Source, error) {
  135. var s api.Source
  136. var ok bool
  137. switch t {
  138. case "mqtt":
  139. s = &extensions.MQTTSource{}
  140. default:
  141. nf, err := plugin_manager.GetPlugin(t, "sources")
  142. if err != nil {
  143. return nil, err
  144. }
  145. s, ok = nf.(api.Source)
  146. if !ok {
  147. return nil, fmt.Errorf("exported symbol %s is not type of api.Source", t)
  148. }
  149. }
  150. return s, nil
  151. }
  152. func (m *SourceNode) drainError(errCh chan<- error, err error, ctx api.StreamContext, logger api.Logger) {
  153. select {
  154. case errCh <- err:
  155. case <-ctx.Done():
  156. m.close(ctx, logger)
  157. }
  158. return
  159. }
  160. func (m *SourceNode) close(ctx api.StreamContext, logger api.Logger) {
  161. for _, s := range m.sources {
  162. if err := s.Close(ctx); err != nil {
  163. logger.Warnf("close source fails: %v", err)
  164. }
  165. }
  166. //Reset the states
  167. m.sources = nil
  168. m.statManagers = nil
  169. }
  170. func (m *SourceNode) getConf(ctx api.StreamContext) map[string]interface{} {
  171. confkey := m.options["CONF_KEY"]
  172. logger := ctx.GetLogger()
  173. confPath := "sources/" + m.sourceType + ".yaml"
  174. if m.sourceType == "mqtt" {
  175. confPath = "mqtt_source.yaml"
  176. }
  177. conf, err := common.LoadConf(confPath)
  178. props := make(map[string]interface{})
  179. if err == nil {
  180. cfg := make(map[string]map[string]interface{})
  181. if err := yaml.Unmarshal(conf, &cfg); err != nil {
  182. logger.Warnf("fail to parse yaml for source %s. Return an empty configuration", m.sourceType)
  183. } else {
  184. var ok bool
  185. props, ok = cfg["default"]
  186. if !ok {
  187. logger.Warnf("default conf is not found", confkey)
  188. }
  189. if c, ok := cfg[confkey]; ok {
  190. for k, v := range c {
  191. props[k] = v
  192. }
  193. }
  194. }
  195. } else {
  196. logger.Warnf("config file %s.yaml is not loaded properly. Return an empty configuration", m.sourceType)
  197. }
  198. logger.Debugf("get conf for %s with conf key %s: %v", m.sourceType, confkey, props)
  199. return props
  200. }
  201. func (m *SourceNode) Broadcast(data interface{}) {
  202. m.buffer.In <- data
  203. }
  204. func (m *SourceNode) getBufferLength() int {
  205. return m.buffer.GetLength()
  206. }
  207. func (m *SourceNode) GetName() string {
  208. return m.name
  209. }
  210. func (m *SourceNode) AddOutput(output chan<- interface{}, name string) (err error) {
  211. if _, ok := m.outs[name]; !ok {
  212. m.outs[name] = output
  213. } else {
  214. return fmt.Errorf("fail to add output %s, stream node %s already has an output of the same name", name, m.name)
  215. }
  216. return nil
  217. }
  218. func (m *SourceNode) GetMetrics() (result [][]interface{}) {
  219. for _, stats := range m.statManagers {
  220. result = append(result, stats.GetMetrics())
  221. }
  222. return result
  223. }