12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061 |
- package nodes
- import (
- "engine/xstream/api"
- )
- type SinkNode struct {
- sink api.Sink
- input chan interface{}
- name string
- ctx api.StreamContext
- }
- func NewSinkNode(name string, sink api.Sink) *SinkNode{
- return &SinkNode{
- sink: sink,
- input: make(chan interface{}, 1024),
- name: name,
- ctx: nil,
- }
- }
- func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
- m.ctx = ctx
- logger := ctx.GetLogger()
- logger.Debugf("open sink node %s", m.name)
- go func() {
- if err := m.sink.Open(ctx); err != nil{
- go func() {
- select{
- case result <- err:
- case <-ctx.Done():
- }
- }()
- return
- }
- for {
- select {
- case item := <-m.input:
- if err := m.sink.Collect(ctx, item); err != nil{
- //TODO deal with publish error
- logger.Errorf("sink node %s publish %v error: %v", ctx.GetOpId(), item, err)
- }
- case <-ctx.Done():
- logger.Infof("sink node %s done", m.name)
- if err := m.sink.Close(ctx); err != nil{
- logger.Warnf("close sink node %s fails: %v", m.name, err)
- }
- return
- }
- }
- }()
- }
- func (m *SinkNode) GetName() string{
- return m.name
- }
- func (m *SinkNode) GetInput() (chan<- interface{}, string) {
- return m.input, m.name
- }
|