sink_node.go 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. package nodes
  2. import (
  3. "engine/xstream/api"
  4. )
  5. type SinkNode struct {
  6. sink api.Sink
  7. input chan interface{}
  8. name string
  9. ctx api.StreamContext
  10. }
  11. func NewSinkNode(name string, sink api.Sink) *SinkNode{
  12. return &SinkNode{
  13. sink: sink,
  14. input: make(chan interface{}, 1024),
  15. name: name,
  16. ctx: nil,
  17. }
  18. }
  19. func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
  20. m.ctx = ctx
  21. logger := ctx.GetLogger()
  22. logger.Debugf("open sink node %s", m.name)
  23. go func() {
  24. if err := m.sink.Open(ctx); err != nil{
  25. go func() {
  26. select{
  27. case result <- err:
  28. case <-ctx.Done():
  29. }
  30. }()
  31. return
  32. }
  33. for {
  34. select {
  35. case item := <-m.input:
  36. if err := m.sink.Collect(ctx, item); err != nil{
  37. //TODO deal with publish error
  38. logger.Errorf("sink node %s publish %v error: %v", ctx.GetOpId(), item, err)
  39. }
  40. case <-ctx.Done():
  41. logger.Infof("sink node %s done", m.name)
  42. if err := m.sink.Close(ctx); err != nil{
  43. logger.Warnf("close sink node %s fails: %v", m.name, err)
  44. }
  45. return
  46. }
  47. }
  48. }()
  49. }
  50. func (m *SinkNode) GetName() string{
  51. return m.name
  52. }
  53. func (m *SinkNode) GetInput() (chan<- interface{}, string) {
  54. return m.input, m.name
  55. }