source_node.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. package nodes
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/common/plugin_manager"
  6. "github.com/emqx/kuiper/common/utils"
  7. "github.com/emqx/kuiper/xsql"
  8. "github.com/emqx/kuiper/xstream/api"
  9. "github.com/emqx/kuiper/xstream/extensions"
  10. "github.com/go-yaml/yaml"
  11. "sync"
  12. )
  13. type SourceNode struct {
  14. sourceType string
  15. outs map[string]chan<- interface{}
  16. name string
  17. ctx api.StreamContext
  18. options map[string]string
  19. concurrency int
  20. mutex sync.RWMutex
  21. sources []api.Source
  22. statManagers []*StatManager
  23. buffer *utils.DynamicChannelBuffer
  24. }
  25. func NewSourceNode(name string, options map[string]string) *SourceNode {
  26. t, ok := options["TYPE"]
  27. if !ok {
  28. t = "mqtt"
  29. }
  30. return &SourceNode{
  31. sourceType: t,
  32. outs: make(map[string]chan<- interface{}),
  33. name: name,
  34. options: options,
  35. ctx: nil,
  36. concurrency: 1,
  37. buffer: utils.NewDynamicChannelBuffer(),
  38. }
  39. }
  40. //Only for mock source, do not use it in production
  41. func NewSourceNodeWithSource(name string, source api.Source, options map[string]string) *SourceNode {
  42. return &SourceNode{
  43. sources: []api.Source{ source},
  44. outs: make(map[string]chan<- interface{}),
  45. name: name,
  46. options: options,
  47. ctx: nil,
  48. concurrency: 1,
  49. buffer: utils.NewDynamicChannelBuffer(),
  50. }
  51. }
  52. func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
  53. m.ctx = ctx
  54. logger := ctx.GetLogger()
  55. logger.Infof("open source node %s with option %v", m.name, m.options)
  56. go func() {
  57. props := m.getConf(ctx)
  58. if c, ok := props["concurrency"]; ok {
  59. if t, err := common.ToInt(c); err != nil || t <= 0 {
  60. logger.Warnf("invalid type for concurrency property, should be positive integer but found %t", c)
  61. } else {
  62. m.concurrency = t
  63. }
  64. }
  65. if c, ok := props["bufferLength"]; ok {
  66. if t, err := common.ToInt(c); err != nil || t <= 0 {
  67. logger.Warnf("invalid type for bufferLength property, should be positive integer but found %t", c)
  68. } else {
  69. m.buffer.SetLimit(t)
  70. }
  71. }
  72. createSource := len(m.sources) == 0
  73. logger.Infof("open source node %d instances", m.concurrency)
  74. for i := 0; i < m.concurrency; i++ { // workers
  75. go func(instance int) {
  76. //Do open source instances
  77. var source api.Source
  78. var err error
  79. if createSource{
  80. source, err = getSource(m.sourceType)
  81. if err != nil {
  82. m.drainError(errCh, err, ctx, logger)
  83. return
  84. }
  85. err = source.Configure(m.options["DATASOURCE"], props)
  86. if err != nil {
  87. m.drainError(errCh, err, ctx, logger)
  88. return
  89. }
  90. m.mutex.Lock()
  91. m.sources = append(m.sources, source)
  92. m.mutex.Unlock()
  93. }else{
  94. source = m.sources[instance]
  95. }
  96. stats, err := NewStatManager("source", ctx)
  97. if err != nil {
  98. m.drainError(errCh, err, ctx, logger)
  99. return
  100. }
  101. m.mutex.Lock()
  102. m.statManagers = append(m.statManagers, stats)
  103. m.mutex.Unlock()
  104. if err := source.Open(ctx.WithInstance(instance), func(message map[string]interface{}, meta map[string]interface{}) {
  105. stats.IncTotalRecordsIn()
  106. stats.ProcessTimeStart()
  107. tuple := &xsql.Tuple{Emitter: m.name, Message: message, Timestamp: common.GetNowInMilli(), Metadata: meta}
  108. stats.ProcessTimeEnd()
  109. m.Broadcast(tuple)
  110. stats.IncTotalRecordsOut()
  111. stats.SetBufferLength(int64(m.getBufferLength()))
  112. logger.Debugf("%s consume data %v complete", m.name, tuple)
  113. }); err != nil {
  114. m.drainError(errCh, err, ctx, logger)
  115. return
  116. }
  117. logger.Infof("Start source %s instance %d successfully", m.name, instance)
  118. }(i)
  119. }
  120. for {
  121. select {
  122. case <-ctx.Done():
  123. logger.Infof("source %s done", m.name)
  124. m.close(ctx, logger)
  125. return
  126. case data := <- m.buffer.Out:
  127. //blocking
  128. Broadcast(m.outs, data, ctx)
  129. }
  130. }
  131. }()
  132. }
  133. func getSource(t string) (api.Source, error) {
  134. var s api.Source
  135. var ok bool
  136. switch t {
  137. case "mqtt":
  138. s = &extensions.MQTTSource{}
  139. default:
  140. nf, err := plugin_manager.GetPlugin(t, "sources")
  141. if err != nil {
  142. return nil, err
  143. }
  144. s, ok = nf.(api.Source)
  145. if !ok {
  146. return nil, fmt.Errorf("exported symbol %s is not type of api.Source", t)
  147. }
  148. }
  149. return s, nil
  150. }
  151. func (m *SourceNode) drainError(errCh chan<- error, err error, ctx api.StreamContext, logger api.Logger) {
  152. select {
  153. case errCh <- err:
  154. case <-ctx.Done():
  155. m.close(ctx, logger)
  156. }
  157. return
  158. }
  159. func (m *SourceNode) close(ctx api.StreamContext, logger api.Logger) {
  160. for _, s := range m.sources {
  161. if err := s.Close(ctx); err != nil {
  162. logger.Warnf("close source fails: %v", err)
  163. }
  164. }
  165. }
  166. func (m *SourceNode) getConf(ctx api.StreamContext) map[string]interface{} {
  167. confkey := m.options["CONF_KEY"]
  168. logger := ctx.GetLogger()
  169. confPath := "sources/" + m.sourceType + ".yaml"
  170. if m.sourceType == "mqtt" {
  171. confPath = "mqtt_source.yaml"
  172. }
  173. conf, err := common.LoadConf(confPath)
  174. props := make(map[string]interface{})
  175. if err == nil {
  176. cfg := make(map[string]map[string]interface{})
  177. if err := yaml.Unmarshal(conf, &cfg); err != nil {
  178. logger.Warnf("fail to parse yaml for source %s. Return an empty configuration", m.sourceType)
  179. } else {
  180. var ok bool
  181. props, ok = cfg["default"]
  182. if !ok {
  183. logger.Warnf("default conf is not found", confkey)
  184. }
  185. if c, ok := cfg[confkey]; ok {
  186. for k, v := range c {
  187. props[k] = v
  188. }
  189. }
  190. }
  191. } else {
  192. logger.Warnf("config file %s.yaml is not loaded properly. Return an empty configuration", m.sourceType)
  193. }
  194. logger.Debugf("get conf for %s with conf key %s: %v", m.sourceType, confkey, props)
  195. return props
  196. }
  197. func (m *SourceNode) Broadcast(data interface{}) {
  198. m.buffer.In <- data
  199. }
  200. func (m *SourceNode) getBufferLength() int {
  201. return m.buffer.GetLength()
  202. }
  203. func (m *SourceNode) GetName() string {
  204. return m.name
  205. }
  206. func (m *SourceNode) AddOutput(output chan<- interface{}, name string) (err error) {
  207. if _, ok := m.outs[name]; !ok {
  208. m.outs[name] = output
  209. } else {
  210. return fmt.Errorf("fail to add output %s, stream node %s already has an output of the same name", name, m.name)
  211. }
  212. return nil
  213. }
  214. func (m *SourceNode) GetMetrics() (result [][]interface{}) {
  215. for _, stats := range m.statManagers{
  216. result = append(result, stats.GetMetrics())
  217. }
  218. return result
  219. }