source_node.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. package nodes
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/common/plugin_manager"
  6. "github.com/emqx/kuiper/common/utils"
  7. "github.com/emqx/kuiper/xsql"
  8. "github.com/emqx/kuiper/xstream/api"
  9. "github.com/emqx/kuiper/xstream/extensions"
  10. "github.com/go-yaml/yaml"
  11. "sync"
  12. )
  13. type SourceNode struct {
  14. sourceType string
  15. outs map[string]chan<- interface{}
  16. name string
  17. ctx api.StreamContext
  18. options map[string]string
  19. concurrency int
  20. mutex sync.RWMutex
  21. sources []api.Source
  22. statManagers []StatManager
  23. buffer *utils.DynamicChannelBuffer
  24. }
  25. func NewSourceNode(name string, options map[string]string) *SourceNode {
  26. t, ok := options["TYPE"]
  27. if !ok {
  28. t = "mqtt"
  29. }
  30. return &SourceNode{
  31. sourceType: t,
  32. outs: make(map[string]chan<- interface{}),
  33. name: name,
  34. options: options,
  35. ctx: nil,
  36. concurrency: 1,
  37. buffer: utils.NewDynamicChannelBuffer(),
  38. }
  39. }
  40. //Only for mock source, do not use it in production
  41. func NewSourceNodeWithSource(name string, source api.Source, options map[string]string) *SourceNode {
  42. return &SourceNode{
  43. sources: []api.Source{source},
  44. outs: make(map[string]chan<- interface{}),
  45. name: name,
  46. options: options,
  47. ctx: nil,
  48. concurrency: 1,
  49. buffer: utils.NewDynamicChannelBuffer(),
  50. }
  51. }
  52. func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
  53. m.ctx = ctx
  54. logger := ctx.GetLogger()
  55. logger.Infof("open source node %s with option %v", m.name, m.options)
  56. go func() {
  57. props := m.getConf(ctx)
  58. if c, ok := props["concurrency"]; ok {
  59. if t, err := common.ToInt(c); err != nil || t <= 0 {
  60. logger.Warnf("invalid type for concurrency property, should be positive integer but found %t", c)
  61. } else {
  62. m.concurrency = t
  63. }
  64. }
  65. if c, ok := props["bufferLength"]; ok {
  66. if t, err := common.ToInt(c); err != nil || t <= 0 {
  67. logger.Warnf("invalid type for bufferLength property, should be positive integer but found %t", c)
  68. } else {
  69. m.buffer.SetLimit(t)
  70. }
  71. }
  72. createSource := len(m.sources) == 0
  73. logger.Infof("open source node %d instances", m.concurrency)
  74. for i := 0; i < m.concurrency; i++ { // workers
  75. go func(instance int) {
  76. //Do open source instances
  77. var source api.Source
  78. var err error
  79. if createSource {
  80. source, err = getSource(m.sourceType)
  81. if err != nil {
  82. m.drainError(errCh, err, ctx, logger)
  83. return
  84. }
  85. err = source.Configure(m.options["DATASOURCE"], props)
  86. if err != nil {
  87. m.drainError(errCh, err, ctx, logger)
  88. return
  89. }
  90. m.mutex.Lock()
  91. m.sources = append(m.sources, source)
  92. m.mutex.Unlock()
  93. } else {
  94. logger.Debugf("get source instance %d from %d sources", instance, len(m.sources))
  95. source = m.sources[instance]
  96. }
  97. stats, err := NewStatManager("source", ctx)
  98. if err != nil {
  99. m.drainError(errCh, err, ctx, logger)
  100. return
  101. }
  102. m.mutex.Lock()
  103. m.statManagers = append(m.statManagers, stats)
  104. m.mutex.Unlock()
  105. source.Open(ctx.WithInstance(instance), func(message map[string]interface{}, meta map[string]interface{}) {
  106. stats.IncTotalRecordsIn()
  107. stats.ProcessTimeStart()
  108. tuple := &xsql.Tuple{Emitter: m.name, Message: message, Timestamp: common.GetNowInMilli(), Metadata: meta}
  109. stats.ProcessTimeEnd()
  110. m.Broadcast(tuple)
  111. stats.IncTotalRecordsOut()
  112. stats.SetBufferLength(int64(m.getBufferLength()))
  113. logger.Debugf("%s consume data %v complete", m.name, tuple)
  114. }, func(err error) {
  115. m.drainError(errCh, err, ctx, logger)
  116. })
  117. logger.Infof("Start source %s instance %d successfully", m.name, instance)
  118. }(i)
  119. }
  120. for {
  121. select {
  122. case <-ctx.Done():
  123. logger.Infof("source %s done", m.name)
  124. m.close(ctx, logger)
  125. return
  126. case data := <-m.buffer.Out:
  127. //blocking
  128. Broadcast(m.outs, data, ctx)
  129. }
  130. }
  131. }()
  132. }
  133. func getSource(t string) (api.Source, error) {
  134. var s api.Source
  135. var ok bool
  136. switch t {
  137. case "mqtt":
  138. s = &extensions.MQTTSource{}
  139. default:
  140. nf, err := plugin_manager.GetPlugin(t, "sources")
  141. if err != nil {
  142. return nil, err
  143. }
  144. s, ok = nf.(api.Source)
  145. if !ok {
  146. return nil, fmt.Errorf("exported symbol %s is not type of api.Source", t)
  147. }
  148. }
  149. return s, nil
  150. }
  151. func (m *SourceNode) drainError(errCh chan<- error, err error, ctx api.StreamContext, logger api.Logger) {
  152. select {
  153. case errCh <- err:
  154. case <-ctx.Done():
  155. m.close(ctx, logger)
  156. }
  157. return
  158. }
  159. func (m *SourceNode) close(ctx api.StreamContext, logger api.Logger) {
  160. for _, s := range m.sources {
  161. if err := s.Close(ctx); err != nil {
  162. logger.Warnf("close source fails: %v", err)
  163. }
  164. }
  165. //Reset the states
  166. m.sources = nil
  167. m.statManagers = nil
  168. }
  169. func (m *SourceNode) getConf(ctx api.StreamContext) map[string]interface{} {
  170. confkey := m.options["CONF_KEY"]
  171. logger := ctx.GetLogger()
  172. confPath := "sources/" + m.sourceType + ".yaml"
  173. if m.sourceType == "mqtt" {
  174. confPath = "mqtt_source.yaml"
  175. }
  176. conf, err := common.LoadConf(confPath)
  177. props := make(map[string]interface{})
  178. if err == nil {
  179. cfg := make(map[string]map[string]interface{})
  180. if err := yaml.Unmarshal(conf, &cfg); err != nil {
  181. logger.Warnf("fail to parse yaml for source %s. Return an empty configuration", m.sourceType)
  182. } else {
  183. var ok bool
  184. props, ok = cfg["default"]
  185. if !ok {
  186. logger.Warnf("default conf is not found", confkey)
  187. }
  188. if c, ok := cfg[confkey]; ok {
  189. for k, v := range c {
  190. props[k] = v
  191. }
  192. }
  193. }
  194. } else {
  195. logger.Warnf("config file %s.yaml is not loaded properly. Return an empty configuration", m.sourceType)
  196. }
  197. logger.Debugf("get conf for %s with conf key %s: %v", m.sourceType, confkey, props)
  198. return props
  199. }
  200. func (m *SourceNode) Broadcast(data interface{}) {
  201. m.buffer.In <- data
  202. }
  203. func (m *SourceNode) getBufferLength() int {
  204. return m.buffer.GetLength()
  205. }
  206. func (m *SourceNode) GetName() string {
  207. return m.name
  208. }
  209. func (m *SourceNode) AddOutput(output chan<- interface{}, name string) (err error) {
  210. if _, ok := m.outs[name]; !ok {
  211. m.outs[name] = output
  212. } else {
  213. return fmt.Errorf("fail to add output %s, stream node %s already has an output of the same name", name, m.name)
  214. }
  215. return nil
  216. }
  217. func (m *SourceNode) GetMetrics() (result [][]interface{}) {
  218. for _, stats := range m.statManagers {
  219. result = append(result, stats.GetMetrics())
  220. }
  221. return result
  222. }