source_node.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. package nodes
  2. import (
  3. "engine/xstream/api"
  4. "fmt"
  5. )
  6. type SourceNode struct {
  7. source api.Source
  8. outs map[string]chan<- interface{}
  9. name string
  10. ctx api.StreamContext
  11. }
  12. func NewSourceNode(name string, source api.Source) *SourceNode{
  13. return &SourceNode{
  14. source: source,
  15. outs: make(map[string]chan<- interface{}),
  16. name: name,
  17. ctx: nil,
  18. }
  19. }
  20. func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
  21. m.ctx = ctx
  22. logger := ctx.GetLogger()
  23. logger.Debugf("open source node %s", m.name)
  24. go func(){
  25. if err := m.source.Open(ctx, func(data interface{}){
  26. m.Broadcast(data)
  27. logger.Debugf("%s consume data %v complete", m.name, data)
  28. }); err != nil{
  29. select {
  30. case errCh <- err:
  31. case <-ctx.Done():
  32. if err := m.source.Close(ctx); err != nil{
  33. go func() { errCh <- err }()
  34. }
  35. }
  36. }
  37. for {
  38. select {
  39. case <-ctx.Done():
  40. logger.Infof("source %s done", m.name)
  41. if err := m.source.Close(ctx); err != nil{
  42. go func() { errCh <- err }()
  43. }
  44. return
  45. }
  46. }
  47. }()
  48. }
  49. func (m *SourceNode) Broadcast(data interface{}) int{
  50. return Broadcast(m.outs, data, m.ctx)
  51. }
  52. func (m *SourceNode) GetName() string{
  53. return m.name
  54. }
  55. func (m *SourceNode) AddOutput(output chan<- interface{}, name string) (err error) {
  56. if _, ok := m.outs[name]; !ok{
  57. m.outs[name] = output
  58. }else{
  59. return fmt.Errorf("fail to add output %s, stream node %s already has an output of the same name", name, m.name)
  60. }
  61. return nil
  62. }