sink_node.go 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. package nodes
  2. import (
  3. "github.com/emqx/kuiper/xstream/api"
  4. )
  5. type SinkNode struct {
  6. sink api.Sink
  7. input chan interface{}
  8. name string
  9. ctx api.StreamContext
  10. }
  11. func NewSinkNode(name string, sink api.Sink) *SinkNode{
  12. return &SinkNode{
  13. sink: sink,
  14. input: make(chan interface{}, 1024),
  15. name: name,
  16. ctx: nil,
  17. }
  18. }
  19. func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
  20. m.ctx = ctx
  21. logger := ctx.GetLogger()
  22. logger.Debugf("open sink node %s", m.name)
  23. go func() {
  24. if err := m.sink.Open(ctx); err != nil{
  25. go func() {
  26. select{
  27. case result <- err:
  28. case <-ctx.Done():
  29. }
  30. }()
  31. return
  32. }
  33. for {
  34. select {
  35. case item := <-m.input:
  36. go func() {
  37. if err := m.sink.Collect(ctx, item); err != nil{
  38. //TODO deal with publish error
  39. logger.Errorf("sink node %s publish %v error: %v", ctx.GetOpId(), item, err)
  40. }
  41. }()
  42. case <-ctx.Done():
  43. logger.Infof("sink node %s done", m.name)
  44. if err := m.sink.Close(ctx); err != nil{
  45. logger.Warnf("close sink node %s fails: %v", m.name, err)
  46. }
  47. return
  48. }
  49. }
  50. }()
  51. }
  52. func (m *SinkNode) GetName() string{
  53. return m.name
  54. }
  55. func (m *SinkNode) GetInput() (chan<- interface{}, string) {
  56. return m.input, m.name
  57. }