source_node.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package nodes
  2. import (
  3. "github.com/emqx/kuiper/common"
  4. "github.com/emqx/kuiper/xsql"
  5. "github.com/emqx/kuiper/xstream/api"
  6. "fmt"
  7. "github.com/go-yaml/yaml"
  8. )
  9. type SourceNode struct {
  10. source api.Source
  11. outs map[string]chan<- interface{}
  12. name string
  13. ctx api.StreamContext
  14. options map[string]string
  15. }
  16. func NewSourceNode(name string, source api.Source, options map[string]string) *SourceNode {
  17. return &SourceNode{
  18. source: source,
  19. outs: make(map[string]chan<- interface{}),
  20. name: name,
  21. options: options,
  22. ctx: nil,
  23. }
  24. }
  25. func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
  26. m.ctx = ctx
  27. logger := ctx.GetLogger()
  28. logger.Debugf("open source node %s with option %v", m.name, m.options)
  29. go func(){
  30. props := getConf(m.options["TYPE"], m.options["CONF_KEY"], ctx)
  31. err := m.source.Configure(m.options["DATASOURCE"], props)
  32. if err != nil{
  33. m.drainError(errCh, err, ctx, logger)
  34. return
  35. }
  36. if err := m.source.Open(ctx, func(message map[string]interface{}, meta map[string]interface{}) {
  37. tuple := &xsql.Tuple{Emitter: m.name, Message: message, Timestamp: common.GetNowInMilli(), Metadata: meta}
  38. m.Broadcast(tuple)
  39. logger.Debugf("%s consume data %v complete", m.name, tuple)
  40. }); err != nil {
  41. m.drainError(errCh, err, ctx, logger)
  42. return
  43. }
  44. for {
  45. select {
  46. case <-ctx.Done():
  47. logger.Infof("source %s done", m.name)
  48. if err := m.source.Close(ctx); err != nil {
  49. logger.Warnf("close source fails: %v", err)
  50. }
  51. return
  52. }
  53. }
  54. }()
  55. }
  56. func (m *SourceNode) drainError(errCh chan<- error, err error, ctx api.StreamContext, logger api.Logger) {
  57. select {
  58. case errCh <- err:
  59. case <-ctx.Done():
  60. if err := m.source.Close(ctx); err != nil {
  61. logger.Warnf("close source fails: %v", err)
  62. }
  63. }
  64. return
  65. }
  66. func getConf(t string, confkey string, ctx api.StreamContext) map[string]interface{} {
  67. logger := ctx.GetLogger()
  68. if t == ""{
  69. t = "mqtt"
  70. }
  71. confPath := "sources/" + t + ".yaml"
  72. if t == "mqtt"{
  73. confPath = "mqtt_source.yaml"
  74. }
  75. conf, err := common.LoadConf(confPath)
  76. props := make(map[string]interface{})
  77. if err == nil {
  78. cfg := make(map[string]map[string]interface{})
  79. if err := yaml.Unmarshal(conf, &cfg); err != nil {
  80. logger.Warnf("fail to parse yaml for source %s. Return an empty configuration", t)
  81. } else {
  82. var ok bool
  83. props, ok = cfg["default"]
  84. if !ok {
  85. logger.Warnf("default conf is not found", confkey)
  86. }
  87. if c, ok := cfg[confkey]; ok {
  88. for k, v := range c {
  89. props[k] = v
  90. }
  91. }
  92. }
  93. } else {
  94. logger.Warnf("config file %s.yaml is not loaded properly. Return an empty configuration", t)
  95. }
  96. logger.Debugf("get conf for %s with conf key %s: %v", t, confkey, props)
  97. return props
  98. }
  99. func (m *SourceNode) Broadcast(data interface{}) int{
  100. return Broadcast(m.outs, data, m.ctx)
  101. }
  102. func (m *SourceNode) GetName() string{
  103. return m.name
  104. }
  105. func (m *SourceNode) AddOutput(output chan<- interface{}, name string) (err error) {
  106. if _, ok := m.outs[name]; !ok{
  107. m.outs[name] = output
  108. }else{
  109. return fmt.Errorf("fail to add output %s, stream node %s already has an output of the same name", name, m.name)
  110. }
  111. return nil
  112. }