source_node.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. package nodes
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/common/plugin_manager"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/emqx/kuiper/xstream/api"
  8. "github.com/emqx/kuiper/xstream/extensions"
  9. "github.com/go-yaml/yaml"
  10. "sync"
  11. )
  12. type SourceNode struct {
  13. sourceType string
  14. outs map[string]chan<- interface{}
  15. name string
  16. ctx api.StreamContext
  17. options map[string]string
  18. concurrency int
  19. mutex sync.RWMutex
  20. sources []api.Source
  21. }
  22. func NewSourceNode(name string, options map[string]string) *SourceNode {
  23. t, ok := options["TYPE"]
  24. if !ok {
  25. t = "mqtt"
  26. }
  27. return &SourceNode{
  28. sourceType: t,
  29. outs: make(map[string]chan<- interface{}),
  30. name: name,
  31. options: options,
  32. ctx: nil,
  33. concurrency: 1,
  34. }
  35. }
  36. func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
  37. m.ctx = ctx
  38. logger := ctx.GetLogger()
  39. logger.Debugf("open source node %s with option %v", m.name, m.options)
  40. go func() {
  41. props := m.getConf(ctx)
  42. if c, ok := props["concurrency"]; ok {
  43. if f, ok := c.(float64); ok {
  44. m.concurrency = int(f)
  45. }
  46. }
  47. for i := 0; i < m.concurrency; i++ { // workers
  48. go func() {
  49. //Do open source instances
  50. source, err := getSource(m.sourceType)
  51. if err != nil {
  52. m.drainError(errCh, err, ctx, logger)
  53. return
  54. }
  55. err = source.Configure(m.options["DATASOURCE"], props)
  56. if err != nil {
  57. m.drainError(errCh, err, ctx, logger)
  58. return
  59. }
  60. m.mutex.Lock()
  61. m.sources = append(m.sources, source)
  62. m.mutex.Unlock()
  63. if err := source.Open(ctx, func(message map[string]interface{}, meta map[string]interface{}) {
  64. tuple := &xsql.Tuple{Emitter: m.name, Message: message, Timestamp: common.GetNowInMilli(), Metadata: meta}
  65. m.Broadcast(tuple)
  66. logger.Debugf("%s consume data %v complete", m.name, tuple)
  67. }); err != nil {
  68. m.drainError(errCh, err, ctx, logger)
  69. return
  70. }
  71. }()
  72. }
  73. for {
  74. select {
  75. case <-ctx.Done():
  76. logger.Infof("source %s done", m.name)
  77. m.close(ctx, logger)
  78. return
  79. }
  80. }
  81. }()
  82. }
  83. func getSource(t string) (api.Source, error) {
  84. var s api.Source
  85. var ok bool
  86. switch t {
  87. case "mqtt":
  88. s = &extensions.MQTTSource{}
  89. default:
  90. nf, err := plugin_manager.GetPlugin(t, "sources")
  91. if err != nil {
  92. return nil, err
  93. }
  94. s, ok = nf.(api.Source)
  95. if !ok {
  96. return nil, fmt.Errorf("exported symbol %s is not type of api.Source", t)
  97. }
  98. }
  99. return s, nil
  100. }
  101. func (m *SourceNode) drainError(errCh chan<- error, err error, ctx api.StreamContext, logger api.Logger) {
  102. select {
  103. case errCh <- err:
  104. case <-ctx.Done():
  105. m.close(ctx, logger)
  106. }
  107. return
  108. }
  109. func (m *SourceNode) close(ctx api.StreamContext, logger api.Logger) {
  110. for _, s := range m.sources {
  111. if err := s.Close(ctx); err != nil {
  112. logger.Warnf("close source fails: %v", err)
  113. }
  114. }
  115. }
  116. func (m *SourceNode) getConf(ctx api.StreamContext) map[string]interface{} {
  117. confkey := m.options["CONF_KEY"]
  118. logger := ctx.GetLogger()
  119. confPath := "sources/" + m.sourceType + ".yaml"
  120. if m.sourceType == "mqtt" {
  121. confPath = "mqtt_source.yaml"
  122. }
  123. conf, err := common.LoadConf(confPath)
  124. props := make(map[string]interface{})
  125. if err == nil {
  126. cfg := make(map[string]map[string]interface{})
  127. if err := yaml.Unmarshal(conf, &cfg); err != nil {
  128. logger.Warnf("fail to parse yaml for source %s. Return an empty configuration", m.sourceType)
  129. } else {
  130. var ok bool
  131. props, ok = cfg["default"]
  132. if !ok {
  133. logger.Warnf("default conf is not found", confkey)
  134. }
  135. if c, ok := cfg[confkey]; ok {
  136. for k, v := range c {
  137. props[k] = v
  138. }
  139. }
  140. }
  141. } else {
  142. logger.Warnf("config file %s.yaml is not loaded properly. Return an empty configuration", m.sourceType)
  143. }
  144. logger.Debugf("get conf for %s with conf key %s: %v", m.sourceType, confkey, props)
  145. return props
  146. }
  147. func (m *SourceNode) Broadcast(data interface{}) int {
  148. return Broadcast(m.outs, data, m.ctx)
  149. }
  150. func (m *SourceNode) GetName() string {
  151. return m.name
  152. }
  153. func (m *SourceNode) AddOutput(output chan<- interface{}, name string) (err error) {
  154. if _, ok := m.outs[name]; !ok {
  155. m.outs[name] = output
  156. } else {
  157. return fmt.Errorf("fail to add output %s, stream node %s already has an output of the same name", name, m.name)
  158. }
  159. return nil
  160. }