source_node.go 1.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. package nodes
  2. import (
  3. "engine/xstream/api"
  4. "fmt"
  5. )
  6. type SourceNode struct {
  7. source api.Source
  8. outs map[string]chan<- interface{}
  9. name string
  10. ctx api.StreamContext
  11. }
  12. func NewSourceNode(name string, source api.Source) *SourceNode{
  13. return &SourceNode{
  14. source: source,
  15. outs: make(map[string]chan<- interface{}),
  16. name: name,
  17. ctx: nil,
  18. }
  19. }
  20. func (m *SourceNode) Open(ctx api.StreamContext) error {
  21. m.ctx = ctx
  22. logger := ctx.GetLogger()
  23. logger.Debugf("open source node %s", m.name)
  24. return m.source.Open(ctx, func(data interface{}){
  25. m.Broadcast(data)
  26. logger.Debugf("%s consume data %v complete", m.name, data)
  27. })
  28. }
  29. func (m *SourceNode) Broadcast(data interface{}) (err error){
  30. return Broadcast(m.outs, data)
  31. }
  32. func (m *SourceNode) GetName() string{
  33. return m.name
  34. }
  35. func (m *SourceNode) AddOutput(output chan<- interface{}, name string) (err error) {
  36. if _, ok := m.outs[name]; !ok{
  37. m.outs[name] = output
  38. }else{
  39. return fmt.Errorf("fail to add output %s, stream node %s already has an output of the same name", name, m.name)
  40. }
  41. return nil
  42. }