source_node.go 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. package nodes
  2. import (
  3. "engine/common"
  4. "engine/xsql"
  5. "engine/xstream/api"
  6. "fmt"
  7. )
  8. type SourceNode struct {
  9. source api.Source
  10. outs map[string]chan<- interface{}
  11. name string
  12. ctx api.StreamContext
  13. }
  14. func NewSourceNode(name string, source api.Source) *SourceNode{
  15. return &SourceNode{
  16. source: source,
  17. outs: make(map[string]chan<- interface{}),
  18. name: name,
  19. ctx: nil,
  20. }
  21. }
  22. func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
  23. m.ctx = ctx
  24. logger := ctx.GetLogger()
  25. logger.Debugf("open source node %s", m.name)
  26. go func(){
  27. if err := m.source.Open(ctx, func(message map[string]interface{}, meta map[string]interface{}){
  28. tuple := &xsql.Tuple{Emitter: m.name, Message:message, Timestamp: common.GetNowInMilli(), Metadata:meta}
  29. m.Broadcast(tuple)
  30. logger.Debugf("%s consume data %v complete", m.name, tuple)
  31. }); err != nil{
  32. select {
  33. case errCh <- err:
  34. case <-ctx.Done():
  35. if err := m.source.Close(ctx); err != nil{
  36. go func() { errCh <- err }()
  37. }
  38. }
  39. }
  40. for {
  41. select {
  42. case <-ctx.Done():
  43. logger.Infof("source %s done", m.name)
  44. if err := m.source.Close(ctx); err != nil{
  45. go func() { errCh <- err }()
  46. }
  47. return
  48. }
  49. }
  50. }()
  51. }
  52. func (m *SourceNode) Broadcast(data interface{}) int{
  53. return Broadcast(m.outs, data, m.ctx)
  54. }
  55. func (m *SourceNode) GetName() string{
  56. return m.name
  57. }
  58. func (m *SourceNode) AddOutput(output chan<- interface{}, name string) (err error) {
  59. if _, ok := m.outs[name]; !ok{
  60. m.outs[name] = output
  61. }else{
  62. return fmt.Errorf("fail to add output %s, stream node %s already has an output of the same name", name, m.name)
  63. }
  64. return nil
  65. }