Przeglądaj źródła

feat(extension): source refactor for extension

Extended source will only need to implement open method
ngjaying 5 lat temu
rodzic
commit
1647e5501f

+ 22 - 19
xsql/processors/xsql_processor.go

@@ -7,7 +7,9 @@ import (
 	"engine/xsql"
 	"engine/xsql/plans"
 	"engine/xstream"
+	"engine/xstream/api"
 	"engine/xstream/extensions"
+	"engine/xstream/nodes"
 	"engine/xstream/operators"
 	"engine/xstream/sinks"
 	"fmt"
@@ -156,7 +158,7 @@ func NewRuleProcessor(d string) *RuleProcessor {
 	return processor
 }
 
-func (p *RuleProcessor) ExecCreate(name, ruleJson string) (*xstream.Rule, error) {
+func (p *RuleProcessor) ExecCreate(name, ruleJson string) (*api.Rule, error) {
 	rule, err := p.getRuleByJson(name, ruleJson)
 	if err != nil {
 		return nil, err
@@ -176,7 +178,7 @@ func (p *RuleProcessor) ExecCreate(name, ruleJson string) (*xstream.Rule, error)
 	return rule, nil
 }
 
-func (p *RuleProcessor) GetRuleByName(name string) (*xstream.Rule, error) {
+func (p *RuleProcessor) GetRuleByName(name string) (*api.Rule, error) {
 	db, err := common.DbOpen(path.Join(p.badgerDir, "rule"))
 	if err != nil {
 		return nil, err
@@ -189,8 +191,8 @@ func (p *RuleProcessor) GetRuleByName(name string) (*xstream.Rule, error) {
 	return p.getRuleByJson(name, s)
 }
 
-func (p *RuleProcessor) getRuleByJson(name, ruleJson string) (*xstream.Rule, error) {
-	var rule xstream.Rule
+func (p *RuleProcessor) getRuleByJson(name, ruleJson string) (*api.Rule, error) {
+	var rule api.Rule
 	if err := json.Unmarshal([]byte(ruleJson), &rule); err != nil {
 		return nil, fmt.Errorf("parse rule %s error : %s", ruleJson, err)
 	}
@@ -208,7 +210,7 @@ func (p *RuleProcessor) getRuleByJson(name, ruleJson string) (*xstream.Rule, err
 	return &rule, nil
 }
 
-func (p *RuleProcessor) ExecInitRule(rule *xstream.Rule) (*xstream.TopologyNew, error) {
+func (p *RuleProcessor) ExecInitRule(rule *api.Rule) (*xstream.TopologyNew, error) {
 	if tp, inputs, err := p.createTopo(rule); err != nil {
 		return nil, err
 	}else{
@@ -235,7 +237,7 @@ func (p *RuleProcessor) ExecInitRule(rule *xstream.Rule) (*xstream.TopologyNew,
 }
 
 func (p *RuleProcessor) ExecQuery(ruleid, sql string) (*xstream.TopologyNew, error) {
-	if tp, inputs, err := p.createTopo(&xstream.Rule{Id: ruleid, Sql: sql}); err != nil {
+	if tp, inputs, err := p.createTopo(&api.Rule{Id: ruleid, Sql: sql}); err != nil {
 		return nil, err
 	} else {
 		tp.AddSink(inputs, sinks.NewLogSinkToMemory("sink_log", ruleid))
@@ -306,12 +308,12 @@ func (p *RuleProcessor) ExecDrop(name string) (string, error) {
 	}
 }
 
-func (p *RuleProcessor) createTopo(rule *xstream.Rule) (*xstream.TopologyNew, []xstream.Emitter, error) {
+func (p *RuleProcessor) createTopo(rule *api.Rule) (*xstream.TopologyNew, []api.Emitter, error) {
 	return p.createTopoWithSources(rule, nil)
 }
 
 //For test to mock source
-func (p *RuleProcessor) createTopoWithSources(rule *xstream.Rule, sources []xstream.Source) (*xstream.TopologyNew, []xstream.Emitter, error){
+func (p *RuleProcessor) createTopoWithSources(rule *api.Rule, sources []*nodes.SourceNode) (*xstream.TopologyNew, []api.Emitter, error){
 	name := rule.Id
 	sql := rule.Sql
 	var isEventTime bool
@@ -340,7 +342,7 @@ func (p *RuleProcessor) createTopoWithSources(rule *xstream.Rule, sources []xstr
 			return nil, nil, fmt.Errorf("sql %s is not a select statement", sql)
 		} else {
 			tp := xstream.NewWithName(name)
-			var inputs []xstream.Emitter
+			var inputs []api.Emitter
 			streamsFromStmt := xsql.GetStreams(selectStmt)
 			if !shouldCreateSource && len(streamsFromStmt) != len(sources){
 				return nil, nil, fmt.Errorf("invalid parameter sources or streams, the length cannot match the statement, expect %d sources", len(streamsFromStmt))
@@ -361,18 +363,19 @@ func (p *RuleProcessor) createTopoWithSources(rule *xstream.Rule, sources []xstr
 					return nil, nil, err
 				}
 				if shouldCreateSource{
-					mqs, err := extensions.NewWithName(string(streamStmt.Name), streamStmt.Options["DATASOURCE"], streamStmt.Options["CONF_KEY"])
+					mqs, err := extensions.NewMQTTSource(streamStmt.Options["DATASOURCE"], streamStmt.Options["CONF_KEY"])
 					if err != nil {
 						return nil, nil, err
 					}
-					tp.AddSrc(mqs)
+					node := nodes.NewSourceNode(string(streamStmt.Name), mqs)
+					tp.AddSrc(node)
 					preprocessorOp := xstream.Transform(pp, "preprocessor_"+s)
-					tp.AddOperator([]xstream.Emitter{mqs}, preprocessorOp)
+					tp.AddOperator([]api.Emitter{node}, preprocessorOp)
 					inputs = append(inputs, preprocessorOp)
 				}else{
 					tp.AddSrc(sources[i])
 					preprocessorOp := xstream.Transform(pp, "preprocessor_"+s)
-					tp.AddOperator([]xstream.Emitter{sources[i]}, preprocessorOp)
+					tp.AddOperator([]api.Emitter{sources[i]}, preprocessorOp)
 					inputs = append(inputs, preprocessorOp)
 				}
 			}
@@ -386,7 +389,7 @@ func (p *RuleProcessor) createTopoWithSources(rule *xstream.Rule, sources []xstr
 						return nil, nil, err
 					}
 					tp.AddOperator(inputs, wop)
-					inputs = []xstream.Emitter{wop}
+					inputs = []api.Emitter{wop}
 				}
 			}
 
@@ -395,7 +398,7 @@ func (p *RuleProcessor) createTopoWithSources(rule *xstream.Rule, sources []xstr
 				//TODO concurrency setting by command
 				//joinOp.SetConcurrency(3)
 				tp.AddOperator(inputs, joinOp)
-				inputs = []xstream.Emitter{joinOp}
+				inputs = []api.Emitter{joinOp}
 			}
 
 			if selectStmt.Condition != nil {
@@ -403,7 +406,7 @@ func (p *RuleProcessor) createTopoWithSources(rule *xstream.Rule, sources []xstr
 				//TODO concurrency setting by command
 				// filterOp.SetConcurrency(3)
 				tp.AddOperator(inputs, filterOp)
-				inputs = []xstream.Emitter{filterOp}
+				inputs = []api.Emitter{filterOp}
 			}
 
 			var ds xsql.Dimensions
@@ -412,7 +415,7 @@ func (p *RuleProcessor) createTopoWithSources(rule *xstream.Rule, sources []xstr
 				if ds != nil && len(ds) > 0 {
 					aggregateOp := xstream.Transform(&plans.AggregatePlan{Dimensions: ds}, "aggregate")
 					tp.AddOperator(inputs, aggregateOp)
-					inputs = []xstream.Emitter{aggregateOp}
+					inputs = []api.Emitter{aggregateOp}
 				}
 			}
 
@@ -425,13 +428,13 @@ func (p *RuleProcessor) createTopoWithSources(rule *xstream.Rule, sources []xstr
 			if selectStmt.SortFields != nil {
 				orderOp := xstream.Transform(&plans.OrderPlan{SortFields:selectStmt.SortFields}, "order")
 				tp.AddOperator(inputs, orderOp)
-				inputs = []xstream.Emitter{orderOp}
+				inputs = []api.Emitter{orderOp}
 			}
 
 			if selectStmt.Fields != nil {
 				projectOp := xstream.Transform(&plans.ProjectPlan{Fields: selectStmt.Fields, IsAggregate: xsql.IsAggStatement(selectStmt)}, "project")
 				tp.AddOperator(inputs, projectOp)
-				inputs = []xstream.Emitter{projectOp}
+				inputs = []api.Emitter{projectOp}
 			}
 			return tp, inputs, nil
 		}

+ 12 - 11
xsql/processors/xsql_processor_test.go

@@ -4,7 +4,8 @@ import (
 	"encoding/json"
 	"engine/common"
 	"engine/xsql"
-	"engine/xstream"
+	"engine/xstream/api"
+	"engine/xstream/nodes"
 	"engine/xstream/test"
 	"fmt"
 	"path"
@@ -144,7 +145,7 @@ func dropStreams(t *testing.T){
 	}
 }
 
-func getMockSource(name string, done chan<- struct{}, size int) *test.MockSource{
+func getMockSource(name string, done chan<- struct{}, size int) *nodes.SourceNode{
 	var data []*xsql.Tuple
 	switch name{
 	case "demo":
@@ -346,7 +347,7 @@ func getMockSource(name string, done chan<- struct{}, size int) *test.MockSource
 			},
 		}
 	}
-	return test.NewMockSource(data[:size], name, done, false)
+	return nodes.NewSourceNode(name, test.NewMockSource(data[:size], done, false))
 }
 
 func TestSingleSQL(t *testing.T) {
@@ -408,7 +409,7 @@ func TestSingleSQL(t *testing.T) {
 	for i, tt := range tests {
 		p := NewRuleProcessor(BadgerDir)
 		parser := xsql.NewParser(strings.NewReader(tt.sql))
-		var sources []xstream.Source
+		var sources []*nodes.SourceNode
 		if stmt, err := xsql.Language.Parse(parser); err != nil{
 			t.Errorf("parse sql %s error: %s", tt.sql , err)
 		}else {
@@ -422,7 +423,7 @@ func TestSingleSQL(t *testing.T) {
 				}
 			}
 		}
-		tp, inputs, err := p.createTopoWithSources(&xstream.Rule{Id:tt.name, Sql: tt.sql}, sources)
+		tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql}, sources)
 		if err != nil{
 			t.Error(err)
 		}
@@ -672,7 +673,7 @@ func TestWindow(t *testing.T) {
 	for i, tt := range tests {
 		p := NewRuleProcessor(BadgerDir)
 		parser := xsql.NewParser(strings.NewReader(tt.sql))
-		var sources []xstream.Source
+		var sources []*nodes.SourceNode
 		if stmt, err := xsql.Language.Parse(parser); err != nil{
 			t.Errorf("parse sql %s error: %s", tt.sql , err)
 		}else {
@@ -686,7 +687,7 @@ func TestWindow(t *testing.T) {
 				}
 			}
 		}
-		tp, inputs, err := p.createTopoWithSources(&xstream.Rule{Id:tt.name, Sql: tt.sql}, sources)
+		tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql}, sources)
 		if err != nil{
 			t.Error(err)
 		}
@@ -779,7 +780,7 @@ func dropEventStreams(t *testing.T){
 	}
 }
 
-func getEventMockSource(name string, done chan<- struct{}, size int) *test.MockSource{
+func getEventMockSource(name string, done chan<- struct{}, size int) *nodes.SourceNode{
 	var data []*xsql.Tuple
 	switch name{
 	case "demoE":
@@ -1008,7 +1009,7 @@ func getEventMockSource(name string, done chan<- struct{}, size int) *test.MockS
 			},
 		}
 	}
-	return test.NewMockSource(data[:size], name, done, true)
+	return nodes.NewSourceNode(name, test.NewMockSource(data[:size], done, true))
 }
 
 func TestEventWindow(t *testing.T) {
@@ -1215,7 +1216,7 @@ func TestEventWindow(t *testing.T) {
 	for i, tt := range tests {
 		p := NewRuleProcessor(BadgerDir)
 		parser := xsql.NewParser(strings.NewReader(tt.sql))
-		var sources []xstream.Source
+		var sources []*nodes.SourceNode
 		if stmt, err := xsql.Language.Parse(parser); err != nil{
 			t.Errorf("parse sql %s error: %s", tt.sql , err)
 		}else {
@@ -1229,7 +1230,7 @@ func TestEventWindow(t *testing.T) {
 				}
 			}
 		}
-		tp, inputs, err := p.createTopoWithSources(&xstream.Rule{
+		tp, inputs, err := p.createTopoWithSources(&api.Rule{
 			Id:tt.name, Sql: tt.sql,
 			Options: map[string]interface{}{
 				"isEventTime": true,

+ 54 - 0
xstream/api/stream.go

@@ -0,0 +1,54 @@
+package api
+
+import (
+	"context"
+	"github.com/sirupsen/logrus"
+)
+
+type ConsumeFunc func(data interface{})
+
+type Source interface {
+	Open(context StreamContext, consume ConsumeFunc) error
+}
+
+type Emitter interface {
+	AddOutput(chan<- interface{}, string) error
+}
+
+type Collector interface {
+	GetInput() (chan<- interface{}, string)
+}
+
+type TopNode interface {
+	GetName() string
+}
+
+type Rule struct {
+	Id      string                   `json:"id"`
+	Sql     string                   `json:"sql"`
+	Actions []map[string]interface{} `json:"actions"`
+	Options map[string]interface{}   `json:"options"`
+}
+
+type StreamContext interface {
+	GetContext() context.Context
+	GetLogger()  *logrus.Entry
+	GetRuleId() string
+	GetOpId() string
+}
+
+type SinkConnector interface {
+	Open(context.Context, chan<- error)
+}
+
+type Sink interface {
+	Collector
+	SinkConnector
+}
+
+type Operator interface {
+	Emitter
+	Collector
+	Exec(context context.Context) error
+}
+

+ 40 - 0
xstream/contexts/default.go

@@ -0,0 +1,40 @@
+package contexts
+
+import (
+	"context"
+	"engine/common"
+	"github.com/sirupsen/logrus"
+)
+
+type DefaultContext struct {
+	ruleId string
+	opId   string
+	ctx    context.Context
+	logger *logrus.Entry
+}
+
+func NewDefaultContext(ruleId string, opId string, ctx context.Context) *DefaultContext{
+	c := &DefaultContext{
+		ruleId: ruleId,
+		opId:	opId,
+		ctx:    ctx,
+		logger: common.GetLogger(ctx),
+	}
+	return c
+}
+
+func (c *DefaultContext) GetContext() context.Context {
+	return c.ctx
+}
+
+func (c *DefaultContext) GetLogger() *logrus.Entry {
+	return c.logger
+}
+
+func (c *DefaultContext) GetRuleId() string {
+	return c.ruleId
+}
+
+func (c *DefaultContext) GetOpId() string {
+	return c.opId
+}

+ 8 - 35
xstream/extensions/mqtt_source.go

@@ -5,11 +5,11 @@ import (
 	"encoding/json"
 	"engine/common"
 	"engine/xsql"
+	"engine/xstream/api"
 	"fmt"
 	MQTT "github.com/eclipse/paho.mqtt.golang"
 	"github.com/go-yaml/yaml"
 	"github.com/google/uuid"
-	"os"
 	"strconv"
 	"strings"
 	"time"
@@ -24,11 +24,7 @@ type MQTTSource struct {
 	password string
 
 	schema   map[string]interface{}
-
-	outs  map[string]chan<- interface{}
 	conn MQTT.Client
-	name 		string
-	//ctx context.Context
 }
 
 
@@ -44,15 +40,14 @@ type MQTTConfig struct {
 
 const confName string = "mqtt_source.yaml"
 
-func NewWithName(name string, topic string, confKey string) (*MQTTSource, error) {
+func NewMQTTSource(topic string, confKey string) (*MQTTSource, error) {
 	b := common.LoadConf(confName)
 	var cfg map[string]MQTTConfig
 	if err := yaml.Unmarshal(b, &cfg); err != nil {
 		return nil, err
 	}
 
-	ms := &MQTTSource{tpc: topic, name: name}
-	ms.outs = make(map[string]chan<- interface{})
+	ms := &MQTTSource{tpc: topic}
 	if srvs := cfg[confKey].Servers; srvs != nil && len(srvs) > 1 {
 		return nil, fmt.Errorf("It only support one server in %s section.", confKey)
 	} else if srvs == nil {
@@ -91,34 +86,14 @@ func NewWithName(name string, topic string, confKey string) (*MQTTSource, error)
 	return ms, nil
 }
 
-func fileExists(filename string) bool {
-	info, err := os.Stat(filename)
-	if os.IsNotExist(err) {
-		return false
-	}
-	return !info.IsDir()
-}
-
 func (ms *MQTTSource) WithSchema(schema string) *MQTTSource {
 	return ms
 }
 
-func (ms *MQTTSource) GetName() string {
-	return ms.name
-}
-
-func (ms *MQTTSource) AddOutput(output chan<- interface{}, name string) {
-	if _, ok := ms.outs[name]; !ok{
-		ms.outs[name] = output
-	}else{
-		common.Log.Warnf("fail to add output %s, operator %s already has an output of the same name", name, ms.name)
-	}
-}
-
-func (ms *MQTTSource) Open(ctx context.Context) error {
-	log := common.GetLogger(ctx)
+func (ms *MQTTSource) Open(ctx api.StreamContext, consume api.ConsumeFunc) error {
+	log := ctx.GetLogger()
 	go func() {
-		exeCtx, cancel := context.WithCancel(ctx)
+		exeCtx, cancel := context.WithCancel(ctx.GetContext())
 		opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetProtocolVersion(ms.pVersion)
 
 		if ms.clientid == "" {
@@ -159,15 +134,13 @@ func (ms *MQTTSource) Open(ctx context.Context) error {
 			meta[xsql.INTERNAL_MQTT_MSG_ID_KEY] = strconv.Itoa(int(msg.MessageID()))
 
 			tuple := &xsql.Tuple{Emitter: ms.tpc, Message:result, Timestamp: common.TimeToUnixMilli(time.Now()), Metadata:meta}
-			for _, out := range ms.outs{
-				out <- tuple
-			}
+			consume(tuple)
 		}
 
 		opts.SetDefaultPublishHandler(h)
 		c := MQTT.NewClient(opts)
 		if token := c.Connect(); token.Wait() && token.Error() != nil {
-			log.Printf("Found error when connecting to %s for %s: %s", ms.srv, ms.name, token.Error())
+			log.Printf("Found error when connecting to %s: %s", ms.srv, token.Error())
 			cancel()
 			return
 		}

+ 21 - 0
xstream/nodes/common_func.go

@@ -0,0 +1,21 @@
+package nodes
+
+import "fmt"
+
+func Broadcast(outputs map[string]chan<- interface{}, val interface{}) (err error) {
+	for n, out := range outputs {
+		select {
+		case out <- val:
+			//All ok
+		default: //TODO channel full strategy?
+			if err != nil {
+				err = fmt.Errorf("%v;channel full for %s", err, n)
+			} else {
+				err = fmt.Errorf("channel full for %s", n)
+			}
+		}
+	}
+	return err
+}
+
+

+ 49 - 0
xstream/nodes/source_node.go

@@ -0,0 +1,49 @@
+package nodes
+
+import (
+	"engine/xstream/api"
+	"fmt"
+)
+
+type SourceNode struct {
+	source api.Source
+	outs   map[string]chan<- interface{}
+	name   string
+	ctx    api.StreamContext
+}
+
+func NewSourceNode(name string, source api.Source) *SourceNode{
+	return &SourceNode{
+		source: source,
+		outs: make(map[string]chan<- interface{}),
+		name: name,
+		ctx: nil,
+	}
+}
+
+func (m *SourceNode) Open(ctx api.StreamContext) error {
+	m.ctx = ctx
+	logger := ctx.GetLogger()
+	logger.Debugf("open source node %s", m.name)
+	return m.source.Open(ctx, func(data interface{}){
+		m.Broadcast(data)
+		logger.Debugf("%s consume data %v complete", m.name, data)
+	})
+}
+
+func (m *SourceNode) Broadcast(data interface{}) (err error){
+	return Broadcast(m.outs, data)
+}
+
+func (m *SourceNode) GetName() string{
+	return m.name
+}
+
+func (m *SourceNode) AddOutput(output chan<- interface{}, name string) (err error) {
+	if _, ok := m.outs[name]; !ok{
+		m.outs[name] = output
+	}else{
+		return fmt.Errorf("fail to add output %s, stream node %s already has an output of the same name", name, m.name)
+	}
+	return nil
+}

+ 6 - 26
xstream/operators/operations.go

@@ -3,6 +3,7 @@ package operators
 import (
 	"context"
 	"engine/common"
+	"engine/xstream/nodes"
 	"fmt"
 	"sync"
 )
@@ -61,12 +62,13 @@ func (o *UnaryOperator) SetConcurrency(concurr int) {
 	}
 }
 
-func (o *UnaryOperator) AddOutput(output chan<- interface{}, name string) {
+func (o *UnaryOperator) AddOutput(output chan<- interface{}, name string) (err error){
 	if _, ok := o.outputs[name]; !ok{
 		o.outputs[name] = output
 	}else{
-		common.Log.Warnf("fail to add output %s, operator %s already has an output of the same name", name, o.name)
+		return fmt.Errorf("fail to add output %s, operator %s already has an output of the same name", name, o.name)
 	}
+	return nil
 }
 
 func (o *UnaryOperator) GetInput() (chan<- interface{}, string) {
@@ -143,36 +145,13 @@ func (o *UnaryOperator) doOp(ctx context.Context) {
 			switch val := result.(type) {
 			case nil:
 				continue
-			//case api.StreamError:
-			//	fmt.Println( val)
-			//	fmt.Println( val)
-			//	if item := val.Item(); item != nil {
-			//		select {
-			//		case o.output <- *item:
-			//		case <-exeCtx.Done():
-			//			return
-			//		}
-			//	}
-			//	continue
-			//case api.PanicStreamError:
-			//	util.Logfn(o.logf, val)
-			//	autoctx.Err(o.errf, api.StreamError(val))
-			//	panic(val)
-			//case api.CancelStreamError:
-			//	util.Logfn(o.logf, val)
-			//	autoctx.Err(o.errf, api.StreamError(val))
-			//	return
 			case error:
 				log.Println(val)
 				log.Println(val.Error())
 				continue
 
 			default:
-				for _, output := range o.outputs{
-					select {
-					case output <- val:
-					}
-				}
+				nodes.Broadcast(o.outputs, val)
 			}
 
 		// is cancelling
@@ -186,3 +165,4 @@ func (o *UnaryOperator) doOp(ctx context.Context) {
 		}
 	}
 }
+

+ 7 - 8
xstream/operators/window_op.go

@@ -4,6 +4,7 @@ import (
 	"context"
 	"engine/common"
 	"engine/xsql"
+	"engine/xstream/nodes"
 	"fmt"
 	"github.com/sirupsen/logrus"
 	"math"
@@ -79,12 +80,13 @@ func (o *WindowOperator) GetName() string {
 	return o.name
 }
 
-func (o *WindowOperator) AddOutput(output chan<- interface{}, name string) {
+func (o *WindowOperator) AddOutput(output chan<- interface{}, name string) error {
 	if _, ok := o.outputs[name]; !ok{
 		o.outputs[name] = output
 	}else{
-		common.Log.Warnf("fail to add output %s, operator %s already has an output of the same name", name, o.name)
+		fmt.Errorf("fail to add output %s, operator %s already has an output of the same name", name, o.name)
 	}
+	return nil
 }
 
 func (o *WindowOperator) GetInput() (chan<- interface{}, string) {
@@ -225,12 +227,9 @@ func (o *WindowOperator) scan(inputs []*xsql.Tuple, triggerTime int64, ctx conte
 		if o.isEventTime{
 			results.Sort()
 		}
-		for _, output := range o.outputs {
-			select {
-			case output <- results:
-				triggered = true
-			default: //TODO need to set buffer
-			}
+		err := nodes.Broadcast(o.outputs, results)
+		if err != nil{
+			triggered = true
 		}
 	}
 

+ 2 - 1
xstream/server/main.go

@@ -5,6 +5,7 @@ import (
 	"engine/common"
 	"engine/xsql/processors"
 	"engine/xstream"
+	"engine/xstream/api"
 	"engine/xstream/sinks"
 	"fmt"
 	"net"
@@ -105,7 +106,7 @@ func (t *Server) CreateRule(rule *common.Rule, reply *string) error{
 	return nil
 }
 
-func (t *Server) createRuleState(rule *xstream.Rule) (*RuleState, error){
+func (t *Server) createRuleState(rule *api.Rule) (*RuleState, error){
 	if tp, err := processor.ExecInitRule(rule); err != nil{
 		return nil, err
 	}else{

+ 26 - 21
xstream/streams.go

@@ -3,16 +3,19 @@ package xstream
 import (
 	"context"
 	"engine/common"
+	"engine/xstream/api"
+	"engine/xstream/contexts"
+	"engine/xstream/nodes"
 	"engine/xstream/operators"
 )
 
 type TopologyNew struct {
-	sources []Source
-	sinks []Sink
+	sources []*nodes.SourceNode
+	sinks []api.Sink
 	ctx context.Context
 	cancel context.CancelFunc
 	drain chan error
-	ops []Operator
+	ops []api.Operator
 	name string
 }
 
@@ -29,12 +32,12 @@ func (s *TopologyNew) Cancel(){
 	s.cancel()
 }
 
-func (s *TopologyNew) AddSrc(src Source) *TopologyNew {
+func (s *TopologyNew) AddSrc(src *nodes.SourceNode) *TopologyNew {
 	s.sources = append(s.sources, src)
 	return s
 }
 
-func (s *TopologyNew) AddSink(inputs []Emitter, snk Sink) *TopologyNew {
+func (s *TopologyNew) AddSink(inputs []api.Emitter, snk api.Sink) *TopologyNew {
 	for _, input := range inputs{
 		input.AddOutput(snk.GetInput())
 	}
@@ -42,7 +45,7 @@ func (s *TopologyNew) AddSink(inputs []Emitter, snk Sink) *TopologyNew {
 	return s
 }
 
-func (s *TopologyNew) AddOperator(inputs []Emitter, operator Operator) *TopologyNew {
+func (s *TopologyNew) AddOperator(inputs []api.Emitter, operator api.Operator) *TopologyNew {
 	for _, input := range inputs{
 		input.AddOutput(operator.GetInput())
 	}
@@ -108,13 +111,14 @@ func (s *TopologyNew) Open() <-chan error {
 
 	// open stream
 	go func() {
-		// open source, if err bail
-		for _, src := range s.sources{
-			if err := src.Open(s.ctx); err != nil {
-				s.drainErr(err)
-				log.Println("Closing stream")
-				return
-			}
+		sinkErr := make(chan error)
+		defer func() {
+			log.Println("Closing sinkErr channel")
+			close(sinkErr)
+		}()
+		// open stream sink, after log sink is ready.
+		for _, snk := range s.sinks{
+			snk.Open(s.ctx, sinkErr)
 		}
 
 		//apply operators, if err bail
@@ -125,15 +129,16 @@ func (s *TopologyNew) Open() <-chan error {
 				return
 			}
 		}
-		sinkErr := make(chan error)
-		defer func() {
-			log.Println("Closing sinkErr channel")
-			close(sinkErr)
-		}()
-		// open stream sink, after log sink is ready.
-		for _, snk := range s.sinks{
-			snk.Open(s.ctx, sinkErr)
+
+		// open source, if err bail
+		for _, node := range s.sources{
+			if err := node.Open(contexts.NewDefaultContext(s.name, node.GetName(), s.ctx)); err != nil {
+				s.drainErr(err)
+				log.Println("Closing stream")
+				return
+			}
 		}
+
 		select {
 		case err := <- sinkErr:
 			log.Println("Closing stream")

+ 7 - 28
xstream/test/mock_source.go

@@ -1,35 +1,31 @@
 package test
 
 import (
-	"context"
 	"engine/common"
 	"engine/xsql"
+	"engine/xstream/api"
 	"time"
 )
 
 type MockSource struct {
-	outs map[string]chan<- interface{}
 	data []*xsql.Tuple
-	name string
 	done chan<- struct{}
 	isEventTime bool
 }
 
 // New creates a new CsvSource
-func NewMockSource(data []*xsql.Tuple, name string, done chan<- struct{}, isEventTime bool) *MockSource {
+func NewMockSource(data []*xsql.Tuple, done chan<- struct{}, isEventTime bool) *MockSource {
 	mock := &MockSource{
 		data: data,
-		name: name,
-		outs: make(map[string]chan<- interface{}),
 		done: done,
 		isEventTime: isEventTime,
 	}
 	return mock
 }
 
-func (m *MockSource) Open(ctx context.Context) (err error) {
-	log := common.GetLogger(ctx)
-	log.Trace("Mocksource starts")
+func (m *MockSource) Open(ctx api.StreamContext, consume api.ConsumeFunc) (err error) {
+	log := ctx.GetLogger()
+	log.Trace("mock source starts")
 	go func(){
 		for _, d := range m.data{
 			log.Infof("mock source is sending data %s", d)
@@ -44,16 +40,7 @@ func (m *MockSource) Open(ctx context.Context) (err error) {
 					timer.DoTick(d.Timestamp)
 				}
 			}
-			for _, out := range m.outs{
-				select {
-				case out <- d:
-				case <-ctx.Done():
-					log.Trace("Mocksource stop")
-					return
-//				default:  TODO non blocking must have buffer?
-				}
-				time.Sleep(50 * time.Millisecond)
-			}
+			consume(d)
 			if m.isEventTime{
 				time.Sleep(1000 * time.Millisecond) //Let window run to make sure timers are set
 			}else{
@@ -68,12 +55,4 @@ func (m *MockSource) Open(ctx context.Context) (err error) {
 		m.done <- struct{}{}
 	}()
 	return nil
-}
-
-func (m *MockSource) AddOutput(output chan<- interface{}, name string) {
-	if _, ok := m.outs[name]; !ok{
-		m.outs[name] = output
-	}else{
-		common.Log.Warnf("fail to add output %s, operator %s already has an output of the same name", name, m.name)
-	}
-}
+}

+ 0 - 40
xstream/types.go

@@ -1,40 +0,0 @@
-package xstream
-
-import (
-	"context"
-)
-
-type Emitter interface {
-	AddOutput(chan<- interface{}, string)
-}
-
-type Source interface {
-	Emitter
-	Open(context context.Context) error
-}
-
-type Collector interface {
-	GetInput() (chan<- interface{}, string)
-}
-
-type Sink interface {
-	Collector
-	Open(context.Context, chan<- error)
-}
-
-type Operator interface{
-	Emitter
-	Collector
-	Exec(context context.Context) error
-}
-
-type TopNode interface{
-	GetName() string
-}
-
-type Rule struct{
-	Id string `json:"id"`
-	Sql string `json:"sql"`
-	Actions []map[string]interface{} `json:"actions"`
-	Options map[string]interface{} `json:"options"`
-}