source_node.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. package nodes
  2. import (
  3. "engine/common"
  4. "engine/xsql"
  5. "engine/xstream/api"
  6. "fmt"
  7. "github.com/go-yaml/yaml"
  8. "github.com/sirupsen/logrus"
  9. )
  10. type SourceNode struct {
  11. source api.Source
  12. outs map[string]chan<- interface{}
  13. name string
  14. ctx api.StreamContext
  15. options map[string]string
  16. }
  17. func NewSourceNode(name string, source api.Source, options map[string]string) *SourceNode {
  18. return &SourceNode{
  19. source: source,
  20. outs: make(map[string]chan<- interface{}),
  21. name: name,
  22. options: options,
  23. ctx: nil,
  24. }
  25. }
  26. func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
  27. m.ctx = ctx
  28. logger := ctx.GetLogger()
  29. logger.Debugf("open source node %s with option %v", m.name, m.options)
  30. go func(){
  31. props := getConf(m.options["TYPE"], m.options["CONF_KEY"], ctx)
  32. err := m.source.Configure(m.options["DATASOURCE"], props)
  33. if err != nil{
  34. m.drainError(errCh, err, ctx, logger)
  35. return
  36. }
  37. if err := m.source.Open(ctx, func(message map[string]interface{}, meta map[string]interface{}) {
  38. tuple := &xsql.Tuple{Emitter: m.name, Message: message, Timestamp: common.GetNowInMilli(), Metadata: meta}
  39. m.Broadcast(tuple)
  40. logger.Debugf("%s consume data %v complete", m.name, tuple)
  41. }); err != nil {
  42. m.drainError(errCh, err, ctx, logger)
  43. return
  44. }
  45. for {
  46. select {
  47. case <-ctx.Done():
  48. logger.Infof("source %s done", m.name)
  49. if err := m.source.Close(ctx); err != nil {
  50. logger.Warnf("close source fails: %v", err)
  51. }
  52. return
  53. }
  54. }
  55. }()
  56. }
  57. func (m *SourceNode) drainError(errCh chan<- error, err error, ctx api.StreamContext, logger *logrus.Entry) {
  58. select {
  59. case errCh <- err:
  60. case <-ctx.Done():
  61. if err := m.source.Close(ctx); err != nil {
  62. logger.Warnf("close source fails: %v", err)
  63. }
  64. }
  65. return
  66. }
  67. func getConf(t string, confkey string, ctx api.StreamContext) map[string]interface{} {
  68. logger := ctx.GetLogger()
  69. if t == ""{
  70. t = "mqtt"
  71. }
  72. conf, err := common.LoadConf("sources/" + t + ".yaml")
  73. props := make(map[string]interface{})
  74. if err == nil {
  75. cfg := make(map[string]map[string]interface{})
  76. if err := yaml.Unmarshal(conf, &cfg); err != nil {
  77. logger.Warnf("fail to parse yaml for source %s. Return an empty configuration", t)
  78. } else {
  79. var ok bool
  80. props, ok = cfg["default"]
  81. if !ok {
  82. logger.Warnf("default conf is not found", confkey)
  83. }
  84. if c, ok := cfg[confkey]; ok {
  85. for k, v := range c {
  86. props[k] = v
  87. }
  88. }
  89. }
  90. } else {
  91. logger.Warnf("config file %s.yaml is not loaded properly. Return an empty configuration", t)
  92. }
  93. logger.Debugf("get conf for %s with conf key %s: %v", t, confkey, props)
  94. return props
  95. }
  96. func (m *SourceNode) Broadcast(data interface{}) int{
  97. return Broadcast(m.outs, data, m.ctx)
  98. }
  99. func (m *SourceNode) GetName() string{
  100. return m.name
  101. }
  102. func (m *SourceNode) AddOutput(output chan<- interface{}, name string) (err error) {
  103. if _, ok := m.outs[name]; !ok{
  104. m.outs[name] = output
  105. }else{
  106. return fmt.Errorf("fail to add output %s, stream node %s already has an output of the same name", name, m.name)
  107. }
  108. return nil
  109. }