1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556 |
- package nodes
- import (
- "engine/xstream/api"
- )
- type SinkNode struct {
- sink api.Sink
- input chan interface{}
- name string
- ctx api.StreamContext
- }
- func NewSinkNode(name string, sink api.Sink) *SinkNode{
- return &SinkNode{
- sink: sink,
- input: make(chan interface{}, 1024),
- name: name,
- ctx: nil,
- }
- }
- func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
- m.ctx = ctx
- logger := ctx.GetLogger()
- logger.Debugf("open sink node %s", m.name)
- go func() {
- if err := m.sink.Open(ctx); err != nil{
- go func() { result <- err }()
- return
- }
- for {
- select {
- case item := <-m.input:
- if err := m.sink.Collect(ctx, item); err != nil{
- //TODO deal with publish error
- logger.Errorf("sink node %s publish %v error: %v", ctx.GetOpId(), item, err)
- }
- case <-ctx.Done():
- logger.Infof("sink node %s done", m.name)
- if err := m.sink.Close(ctx); err != nil{
- go func() { result <- err }()
- }
- return
- }
- }
- }()
- }
- func (m *SinkNode) GetName() string{
- return m.name
- }
- func (m *SinkNode) GetInput() (chan<- interface{}, string) {
- return m.input, m.name
- }
|