source_node.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. package nodes
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/common/plugin_manager"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/emqx/kuiper/xstream/api"
  8. "github.com/emqx/kuiper/xstream/extensions"
  9. "github.com/go-yaml/yaml"
  10. "sync"
  11. )
  12. type SourceNode struct {
  13. sourceType string
  14. outs map[string]chan<- interface{}
  15. name string
  16. ctx api.StreamContext
  17. options map[string]string
  18. concurrency int
  19. mutex sync.RWMutex
  20. sources []api.Source
  21. statManagers []*StatManager
  22. }
  23. func NewSourceNode(name string, options map[string]string) *SourceNode {
  24. t, ok := options["TYPE"]
  25. if !ok {
  26. t = "mqtt"
  27. }
  28. return &SourceNode{
  29. sourceType: t,
  30. outs: make(map[string]chan<- interface{}),
  31. name: name,
  32. options: options,
  33. ctx: nil,
  34. concurrency: 1,
  35. }
  36. }
  37. //Only for mock source, do not use it in production
  38. func NewSourceNodeWithSource(name string, source api.Source, options map[string]string) *SourceNode {
  39. return &SourceNode{
  40. sources: []api.Source{ source},
  41. outs: make(map[string]chan<- interface{}),
  42. name: name,
  43. options: options,
  44. ctx: nil,
  45. concurrency: 1,
  46. }
  47. }
  48. func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
  49. m.ctx = ctx
  50. logger := ctx.GetLogger()
  51. logger.Infof("open source node %s with option %v", m.name, m.options)
  52. go func() {
  53. props := m.getConf(ctx)
  54. if c, ok := props["concurrency"]; ok {
  55. if t, err := common.ToInt(c); err != nil {
  56. logger.Warnf("invalid type for concurrency property, should be int but found %t", c)
  57. } else {
  58. m.concurrency = t
  59. }
  60. }
  61. createSource := len(m.sources) == 0
  62. logger.Infof("open source node %d instances", m.concurrency)
  63. for i := 0; i < m.concurrency; i++ { // workers
  64. go func(instance int) {
  65. //Do open source instances
  66. var source api.Source
  67. var err error
  68. if createSource{
  69. source, err = getSource(m.sourceType)
  70. if err != nil {
  71. m.drainError(errCh, err, ctx, logger)
  72. return
  73. }
  74. err = source.Configure(m.options["DATASOURCE"], props)
  75. if err != nil {
  76. m.drainError(errCh, err, ctx, logger)
  77. return
  78. }
  79. m.mutex.Lock()
  80. m.sources = append(m.sources, source)
  81. m.mutex.Unlock()
  82. }else{
  83. source = m.sources[instance]
  84. }
  85. stats, err := NewStatManager("source", ctx)
  86. if err != nil {
  87. m.drainError(errCh, err, ctx, logger)
  88. return
  89. }
  90. m.mutex.Lock()
  91. m.statManagers = append(m.statManagers, stats)
  92. m.mutex.Unlock()
  93. outputCount := len(m.outs)
  94. if err := source.Open(ctx.WithInstance(instance), func(message map[string]interface{}, meta map[string]interface{}) {
  95. stats.IncTotalRecordsIn()
  96. stats.ProcessTimeStart()
  97. tuple := &xsql.Tuple{Emitter: m.name, Message: message, Timestamp: common.GetNowInMilli(), Metadata: meta}
  98. c := m.Broadcast(tuple)
  99. if c == outputCount {
  100. stats.ProcessTimeEnd()
  101. stats.IncTotalRecordsOut()
  102. } else {
  103. logger.Warnf("broadcast to %d outputs but expect %d", c, outputCount)
  104. stats.IncTotalExceptions()
  105. }
  106. logger.Debugf("%s consume data %v complete", m.name, tuple)
  107. }); err != nil {
  108. m.drainError(errCh, err, ctx, logger)
  109. return
  110. }
  111. logger.Infof("Start source %s instance %d successfully", m.name, instance)
  112. }(i)
  113. }
  114. for {
  115. select {
  116. case <-ctx.Done():
  117. logger.Infof("source %s done", m.name)
  118. m.close(ctx, logger)
  119. return
  120. }
  121. }
  122. }()
  123. }
  124. func getSource(t string) (api.Source, error) {
  125. var s api.Source
  126. var ok bool
  127. switch t {
  128. case "mqtt":
  129. s = &extensions.MQTTSource{}
  130. default:
  131. nf, err := plugin_manager.GetPlugin(t, "sources")
  132. if err != nil {
  133. return nil, err
  134. }
  135. s, ok = nf.(api.Source)
  136. if !ok {
  137. return nil, fmt.Errorf("exported symbol %s is not type of api.Source", t)
  138. }
  139. }
  140. return s, nil
  141. }
  142. func (m *SourceNode) drainError(errCh chan<- error, err error, ctx api.StreamContext, logger api.Logger) {
  143. select {
  144. case errCh <- err:
  145. case <-ctx.Done():
  146. m.close(ctx, logger)
  147. }
  148. return
  149. }
  150. func (m *SourceNode) close(ctx api.StreamContext, logger api.Logger) {
  151. for _, s := range m.sources {
  152. if err := s.Close(ctx); err != nil {
  153. logger.Warnf("close source fails: %v", err)
  154. }
  155. }
  156. }
  157. func (m *SourceNode) getConf(ctx api.StreamContext) map[string]interface{} {
  158. confkey := m.options["CONF_KEY"]
  159. logger := ctx.GetLogger()
  160. confPath := "sources/" + m.sourceType + ".yaml"
  161. if m.sourceType == "mqtt" {
  162. confPath = "mqtt_source.yaml"
  163. }
  164. conf, err := common.LoadConf(confPath)
  165. props := make(map[string]interface{})
  166. if err == nil {
  167. cfg := make(map[string]map[string]interface{})
  168. if err := yaml.Unmarshal(conf, &cfg); err != nil {
  169. logger.Warnf("fail to parse yaml for source %s. Return an empty configuration", m.sourceType)
  170. } else {
  171. var ok bool
  172. props, ok = cfg["default"]
  173. if !ok {
  174. logger.Warnf("default conf is not found", confkey)
  175. }
  176. if c, ok := cfg[confkey]; ok {
  177. for k, v := range c {
  178. props[k] = v
  179. }
  180. }
  181. }
  182. } else {
  183. logger.Warnf("config file %s.yaml is not loaded properly. Return an empty configuration", m.sourceType)
  184. }
  185. logger.Debugf("get conf for %s with conf key %s: %v", m.sourceType, confkey, props)
  186. return props
  187. }
  188. func (m *SourceNode) Broadcast(data interface{}) int {
  189. return Broadcast(m.outs, data, m.ctx)
  190. }
  191. func (m *SourceNode) GetName() string {
  192. return m.name
  193. }
  194. func (m *SourceNode) AddOutput(output chan<- interface{}, name string) (err error) {
  195. if _, ok := m.outs[name]; !ok {
  196. m.outs[name] = output
  197. } else {
  198. return fmt.Errorf("fail to add output %s, stream node %s already has an output of the same name", name, m.name)
  199. }
  200. return nil
  201. }
  202. func (m *SourceNode) GetMetrics() map[string]interface{} {
  203. result := make(map[string]interface{})
  204. for _, stats := range m.statManagers{
  205. for k, v := range stats.GetMetrics(){
  206. result[k] = v
  207. }
  208. }
  209. return result
  210. }