sink_node.go 1.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. package nodes
  2. import (
  3. "engine/xstream/api"
  4. )
  5. type SinkNode struct {
  6. sink api.Sink
  7. input chan interface{}
  8. name string
  9. ctx api.StreamContext
  10. }
  11. func NewSinkNode(name string, sink api.Sink) *SinkNode{
  12. return &SinkNode{
  13. sink: sink,
  14. input: make(chan interface{}, 1024),
  15. name: name,
  16. ctx: nil,
  17. }
  18. }
  19. func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
  20. m.ctx = ctx
  21. logger := ctx.GetLogger()
  22. logger.Debugf("open sink node %s", m.name)
  23. go func() {
  24. if err := m.sink.Open(ctx); err != nil{
  25. go func() { result <- err }()
  26. return
  27. }
  28. for {
  29. select {
  30. case item := <-m.input:
  31. if err := m.sink.Collect(ctx, item); err != nil{
  32. //TODO deal with publish error
  33. logger.Errorf("sink node %s publish %v error: %v", ctx.GetOpId(), item, err)
  34. }
  35. case <-ctx.Done():
  36. logger.Infof("sink node %s done", m.name)
  37. if err := m.sink.Close(ctx); err != nil{
  38. go func() { result <- err }()
  39. }
  40. return
  41. }
  42. }
  43. }()
  44. }
  45. func (m *SinkNode) GetName() string{
  46. return m.name
  47. }
  48. func (m *SinkNode) GetInput() (chan<- interface{}, string) {
  49. return m.input, m.name
  50. }