Explorar el Código

feat<stream>: add concurrency support for source

ngjaying hace 5 años
padre
commit
40c854f545
Se han modificado 3 ficheros con 131 adiciones y 108 borrados
  1. 1 0
      etc/mqtt_source.yaml
  2. 34 65
      xsql/processors/xsql_processor.go
  3. 96 43
      xstream/nodes/source_node.go

+ 1 - 0
etc/mqtt_source.yaml

@@ -3,6 +3,7 @@ default:
   qos: 1
   sharedSubscription: true
   servers: [tcp://127.0.0.1:1883]
+  concurrency: 1
   #username: user1
   #password: password
   #certificationPath: /var/kuiper/xyz-certificate.pem

+ 34 - 65
xsql/processors/xsql_processor.go

@@ -3,17 +3,16 @@ package processors
 import (
 	"bytes"
 	"encoding/json"
+	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/common/plugin_manager"
 	"github.com/emqx/kuiper/xsql"
 	"github.com/emqx/kuiper/xsql/plans"
 	"github.com/emqx/kuiper/xstream"
 	"github.com/emqx/kuiper/xstream/api"
-	"github.com/emqx/kuiper/xstream/extensions"
 	"github.com/emqx/kuiper/xstream/nodes"
 	"github.com/emqx/kuiper/xstream/operators"
 	"github.com/emqx/kuiper/xstream/sinks"
-	"fmt"
 	"path"
 	"strings"
 )
@@ -35,7 +34,6 @@ func NewStreamProcessor(s, d string) *StreamProcessor {
 	return processor
 }
 
-
 func (p *StreamProcessor) Exec() (result []string, err error) {
 	parser := xsql.NewParser(strings.NewReader(p.statement))
 	stmt, err := xsql.Language.Parse(parser)
@@ -78,12 +76,12 @@ func (p *StreamProcessor) execCreateStream(stmt *xsql.StreamStmt, db common.KeyV
 	err := db.Set(string(stmt.Name), p.statement)
 	if err != nil {
 		return "", fmt.Errorf("Create stream fails: %v.", err)
-	}else{
+	} else {
 		return fmt.Sprintf("Stream %s is created.", stmt.Name), nil
 	}
 }
 
-func (p *StreamProcessor) execShowStream(stmt *xsql.ShowStreamsStatement, db common.KeyValue) ([]string,error) {
+func (p *StreamProcessor) execShowStream(stmt *xsql.ShowStreamsStatement, db common.KeyValue) ([]string, error) {
 	keys, err := db.Keys()
 	if len(keys) == 0 {
 		keys = append(keys, "No stream definitions are found.")
@@ -91,7 +89,7 @@ func (p *StreamProcessor) execShowStream(stmt *xsql.ShowStreamsStatement, db com
 	return keys, err
 }
 
-func (p *StreamProcessor) execDescribeStream(stmt *xsql.DescribeStreamStatement, db common.KeyValue) (string,error) {
+func (p *StreamProcessor) execDescribeStream(stmt *xsql.DescribeStreamStatement, db common.KeyValue) (string, error) {
 	s, f := db.Get(stmt.Name)
 	s1, _ := s.(string)
 	if !f {
@@ -101,7 +99,7 @@ func (p *StreamProcessor) execDescribeStream(stmt *xsql.DescribeStreamStatement,
 	parser := xsql.NewParser(strings.NewReader(s1))
 	stream, err := xsql.Language.Parse(parser)
 	streamStmt, ok := stream.(*xsql.StreamStmt)
-	if !ok{
+	if !ok {
 		return "", fmt.Errorf("Error resolving the stream %s, the data in db may be corrupted.", stmt.Name)
 	}
 	var buff bytes.Buffer
@@ -116,7 +114,7 @@ func (p *StreamProcessor) execDescribeStream(stmt *xsql.DescribeStreamStatement,
 	return buff.String(), err
 }
 
-func (p *StreamProcessor) execExplainStream(stmt *xsql.ExplainStreamStatement, db common.KeyValue) (string,error) {
+func (p *StreamProcessor) execExplainStream(stmt *xsql.ExplainStreamStatement, db common.KeyValue) (string, error) {
 	_, f := db.Get(stmt.Name)
 	if !f {
 		return "", fmt.Errorf("Stream %s is not found.", stmt.Name)
@@ -128,12 +126,12 @@ func (p *StreamProcessor) execDropStream(stmt *xsql.DropStreamStatement, db comm
 	err := db.Delete(stmt.Name)
 	if err != nil {
 		return "", fmt.Errorf("Drop stream fails: %v.", err)
-	}else{
+	} else {
 		return fmt.Sprintf("Stream %s is dropped.", stmt.Name), nil
 	}
 }
 
-func GetStream(m *common.SimpleKVStore, name string) (stmt *xsql.StreamStmt, err error){
+func GetStream(m *common.SimpleKVStore, name string) (stmt *xsql.StreamStmt, err error) {
 	s, f := m.Get(name)
 	if !f {
 		return nil, fmt.Errorf("Cannot find key %s. ", name)
@@ -142,13 +140,12 @@ func GetStream(m *common.SimpleKVStore, name string) (stmt *xsql.StreamStmt, err
 	parser := xsql.NewParser(strings.NewReader(s1))
 	stream, err := xsql.Language.Parse(parser)
 	stmt, ok := stream.(*xsql.StreamStmt)
-	if !ok{
+	if !ok {
 		err = fmt.Errorf("Error resolving the stream %s, the data in db may be corrupted.", name)
 	}
 	return
 }
 
-
 type RuleProcessor struct {
 	dbDir string
 }
@@ -174,7 +171,7 @@ func (p *RuleProcessor) ExecCreate(name, ruleJson string) (*api.Rule, error) {
 	defer store.Close()
 	if err != nil {
 		return nil, err
-	}else{
+	} else {
 		log.Infof("Rule %s is created.", name)
 	}
 
@@ -203,13 +200,13 @@ func (p *RuleProcessor) getRuleByJson(name, ruleJson string) (*api.Rule, error)
 	}
 	rule.Id = name
 	//validation
-	if name == ""{
+	if name == "" {
 		return nil, fmt.Errorf("Missing rule id.")
 	}
-	if rule.Sql == ""{
+	if rule.Sql == "" {
 		return nil, fmt.Errorf("Missing rule SQL.")
 	}
-	if rule.Actions == nil || len(rule.Actions) == 0{
+	if rule.Actions == nil || len(rule.Actions) == 0 {
 		return nil, fmt.Errorf("Missing rule actions.")
 	}
 	return &rule, nil
@@ -218,17 +215,17 @@ func (p *RuleProcessor) getRuleByJson(name, ruleJson string) (*api.Rule, error)
 func (p *RuleProcessor) ExecInitRule(rule *api.Rule) (*xstream.TopologyNew, error) {
 	if tp, inputs, err := p.createTopo(rule); err != nil {
 		return nil, err
-	}else{
+	} else {
 		for _, m := range rule.Actions {
 			for name, action := range m {
 				props, ok := action.(map[string]interface{})
 				if !ok {
 					return nil, fmt.Errorf("expect map[string]interface{} type for the action properties, but found %v", action)
 				}
-				if s, err := getSink(name, props); err != nil{
+				if s, err := getSink(name, props); err != nil {
 					return nil, err
-				}else{
-					tp.AddSink(inputs, nodes.NewSinkNode("sink_" + name, s))
+				} else {
+					tp.AddSink(inputs, nodes.NewSinkNode("sink_"+name, s))
 				}
 			}
 		}
@@ -274,14 +271,14 @@ func (p *RuleProcessor) ExecDesc(name string) (string, error) {
 
 func (p *RuleProcessor) ExecShow() (string, error) {
 	keys, err := p.GetAllRules()
-	if err != nil{
+	if err != nil {
 		return "", err
 	}
 	if len(keys) == 0 {
 		keys = append(keys, "No rule definitions are found.")
 	}
 	var result string
-	for _, c := range keys{
+	for _, c := range keys {
 		result = result + fmt.Sprintln(c)
 	}
 	return result, nil
@@ -307,7 +304,7 @@ func (p *RuleProcessor) ExecDrop(name string) (string, error) {
 	err = store.Delete(string(name))
 	if err != nil {
 		return "", err
-	}else{
+	} else {
 		return fmt.Sprintf("Rule %s is dropped.", name), nil
 	}
 }
@@ -317,38 +314,38 @@ func (p *RuleProcessor) createTopo(rule *api.Rule) (*xstream.TopologyNew, []api.
 }
 
 //For test to mock source
-func (p *RuleProcessor) createTopoWithSources(rule *api.Rule, sources []*nodes.SourceNode) (*xstream.TopologyNew, []api.Emitter, error){
+func (p *RuleProcessor) createTopoWithSources(rule *api.Rule, sources []*nodes.SourceNode) (*xstream.TopologyNew, []api.Emitter, error) {
 	name := rule.Id
 	sql := rule.Sql
 	var isEventTime bool
 	var lateTol int64
-	if iet, ok := rule.Options["isEventTime"]; ok{
+	if iet, ok := rule.Options["isEventTime"]; ok {
 		isEventTime, ok = iet.(bool)
-		if !ok{
+		if !ok {
 			return nil, nil, fmt.Errorf("Invalid rule option isEventTime %v, bool type is required.", iet)
 		}
 	}
 	if isEventTime {
-		if l, ok := rule.Options["lateTolerance"]; ok{
-			if fl, ok := l.(float64); ok{
+		if l, ok := rule.Options["lateTolerance"]; ok {
+			if fl, ok := l.(float64); ok {
 				lateTol = int64(fl)
-			}else{
+			} else {
 				return nil, nil, fmt.Errorf("Invalid rule option lateTolerance %v, int type is required.", l)
 			}
 		}
 	}
 	shouldCreateSource := sources == nil
 	parser := xsql.NewParser(strings.NewReader(sql))
-	if stmt, err := xsql.Language.Parse(parser); err != nil{
-		return nil, nil, fmt.Errorf("Parse SQL %s error: %s.", sql , err)
-	}else {
+	if stmt, err := xsql.Language.Parse(parser); err != nil {
+		return nil, nil, fmt.Errorf("Parse SQL %s error: %s.", sql, err)
+	} else {
 		if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
 			return nil, nil, fmt.Errorf("SQL %s is not a select statement.", sql)
 		} else {
 			tp := xstream.NewWithName(name)
 			var inputs []api.Emitter
 			streamsFromStmt := xsql.GetStreams(selectStmt)
-			if !shouldCreateSource && len(streamsFromStmt) != len(sources){
+			if !shouldCreateSource && len(streamsFromStmt) != len(sources) {
 				return nil, nil, fmt.Errorf("Invalid parameter sources or streams, the length cannot match the statement, expect %d sources.", len(streamsFromStmt))
 			}
 			store := common.GetSimpleKVStore(path.Join(p.dbDir, "stream"))
@@ -364,15 +361,11 @@ func (p *RuleProcessor) createTopoWithSources(rule *api.Rule, sources []*nodes.S
 					return nil, nil, fmt.Errorf("fail to get stream %s, please check if stream is created", s)
 				}
 				pp, err := plans.NewPreprocessor(streamStmt, selectStmt.Fields, isEventTime)
-				if err != nil{
+				if err != nil {
 					return nil, nil, err
 				}
-				if shouldCreateSource{
-					src, err := getSource(streamStmt)
-					if err != nil {
-						return nil, nil, fmt.Errorf("fail to get source: %v", err)
-					}
-					node := nodes.NewSourceNode(s, src, streamStmt.Options)
+				if shouldCreateSource {
+					node := nodes.NewSourceNode(s, streamStmt.Options)
 					tp.AddSrc(node)
 					preprocessorOp := xstream.Transform(pp, "preprocessor_"+s)
 					tp.AddOperator([]api.Emitter{node}, preprocessorOp)
@@ -446,29 +439,6 @@ func (p *RuleProcessor) createTopoWithSources(rule *api.Rule, sources []*nodes.S
 	}
 }
 
-func getSource(streamStmt *xsql.StreamStmt) (api.Source, error) {
-	t, ok := streamStmt.Options["TYPE"]
-	if !ok{
-		t = "mqtt"
-	}
-	var s api.Source
-	switch t {
-	case "mqtt":
-		s = &extensions.MQTTSource{}
-	default:
-		nf, err := plugin_manager.GetPlugin(t, "sources")
-		if err != nil {
-			return nil, err
-		}
-		s, ok = nf.(api.Source)
-		if !ok {
-			return nil, fmt.Errorf("exported symbol %s is not type of api.Source", t)
-		}
-	}
-	log.Debugf("Source %s created", t)
-	return s, nil
-}
-
 func getSink(name string, action map[string]interface{}) (api.Sink, error) {
 	log.Tracef("trying to get sink %s with action %v", name, action)
 	var s api.Sink
@@ -492,10 +462,9 @@ func getSink(name string, action map[string]interface{}) (api.Sink, error) {
 	}
 
 	err := s.Configure(action)
-	if err != nil{
+	if err != nil {
 		return nil, err
 	}
 	log.Debugf("Sink %s created", name)
 	return s, nil
 }
-

+ 96 - 43
xstream/nodes/source_node.go

@@ -1,28 +1,40 @@
 package nodes
 
 import (
+	"fmt"
 	"github.com/emqx/kuiper/common"
+	"github.com/emqx/kuiper/common/plugin_manager"
 	"github.com/emqx/kuiper/xsql"
 	"github.com/emqx/kuiper/xstream/api"
-	"fmt"
+	"github.com/emqx/kuiper/xstream/extensions"
 	"github.com/go-yaml/yaml"
+	"sync"
 )
 
 type SourceNode struct {
-	source api.Source
-	outs   map[string]chan<- interface{}
-	name   string
-	ctx    api.StreamContext
-	options map[string]string
+	sourceType  string
+	outs        map[string]chan<- interface{}
+	name        string
+	ctx         api.StreamContext
+	options     map[string]string
+	concurrency int
+
+	mutex   sync.RWMutex
+	sources []api.Source
 }
 
-func NewSourceNode(name string, source api.Source, options map[string]string) *SourceNode {
+func NewSourceNode(name string, options map[string]string) *SourceNode {
+	t, ok := options["TYPE"]
+	if !ok {
+		t = "mqtt"
+	}
 	return &SourceNode{
-		source: source,
-		outs: make(map[string]chan<- interface{}),
-		name: name,
-		options: options,
-		ctx: nil,
+		sourceType:  t,
+		outs:        make(map[string]chan<- interface{}),
+		name:        name,
+		options:     options,
+		ctx:         nil,
+		concurrency: 1,
 	}
 }
 
@@ -30,52 +42,93 @@ func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
 	m.ctx = ctx
 	logger := ctx.GetLogger()
 	logger.Debugf("open source node %s with option %v", m.name, m.options)
-	go func(){
-		props := getConf(m.options["TYPE"], m.options["CONF_KEY"], ctx)
-		err := m.source.Configure(m.options["DATASOURCE"], props)
-		if err != nil{
-			m.drainError(errCh, err, ctx, logger)
-			return
+	go func() {
+		props := m.getConf(ctx)
+		if c, ok := props["concurrency"]; ok {
+			if f, ok := c.(float64); ok {
+				m.concurrency = int(f)
+			}
 		}
-		if err := m.source.Open(ctx, func(message map[string]interface{}, meta map[string]interface{}) {
-			tuple := &xsql.Tuple{Emitter: m.name, Message: message, Timestamp: common.GetNowInMilli(), Metadata: meta}
-			m.Broadcast(tuple)
-			logger.Debugf("%s consume data %v complete", m.name, tuple)
-		}); err != nil {
-			m.drainError(errCh, err, ctx, logger)
-			return
+
+		for i := 0; i < m.concurrency; i++ { // workers
+			go func() {
+				//Do open source instances
+				source, err := getSource(m.sourceType)
+				if err != nil {
+					m.drainError(errCh, err, ctx, logger)
+					return
+				}
+				err = source.Configure(m.options["DATASOURCE"], props)
+				if err != nil {
+					m.drainError(errCh, err, ctx, logger)
+					return
+				}
+				m.mutex.Lock()
+				m.sources = append(m.sources, source)
+				m.mutex.Unlock()
+				if err := source.Open(ctx, func(message map[string]interface{}, meta map[string]interface{}) {
+					tuple := &xsql.Tuple{Emitter: m.name, Message: message, Timestamp: common.GetNowInMilli(), Metadata: meta}
+					m.Broadcast(tuple)
+					logger.Debugf("%s consume data %v complete", m.name, tuple)
+				}); err != nil {
+					m.drainError(errCh, err, ctx, logger)
+					return
+				}
+			}()
 		}
+
 		for {
 			select {
 			case <-ctx.Done():
 				logger.Infof("source %s done", m.name)
-				if err := m.source.Close(ctx); err != nil {
-					logger.Warnf("close source fails: %v", err)
-				}
+				m.close(ctx, logger)
 				return
 			}
 		}
 	}()
 }
 
+func getSource(t string) (api.Source, error) {
+	var s api.Source
+	var ok bool
+	switch t {
+	case "mqtt":
+		s = &extensions.MQTTSource{}
+	default:
+		nf, err := plugin_manager.GetPlugin(t, "sources")
+		if err != nil {
+			return nil, err
+		}
+		s, ok = nf.(api.Source)
+		if !ok {
+			return nil, fmt.Errorf("exported symbol %s is not type of api.Source", t)
+		}
+	}
+	return s, nil
+}
+
 func (m *SourceNode) drainError(errCh chan<- error, err error, ctx api.StreamContext, logger api.Logger) {
 	select {
 	case errCh <- err:
 	case <-ctx.Done():
-		if err := m.source.Close(ctx); err != nil {
+		m.close(ctx, logger)
+	}
+	return
+}
+
+func (m *SourceNode) close(ctx api.StreamContext, logger api.Logger) {
+	for _, s := range m.sources {
+		if err := s.Close(ctx); err != nil {
 			logger.Warnf("close source fails: %v", err)
 		}
 	}
-	return
 }
 
-func getConf(t string, confkey string, ctx api.StreamContext) map[string]interface{} {
+func (m *SourceNode) getConf(ctx api.StreamContext) map[string]interface{} {
+	confkey := m.options["CONF_KEY"]
 	logger := ctx.GetLogger()
-	if t == ""{
-		t = "mqtt"
-	}
-	confPath := "sources/" + t + ".yaml"
-	if t == "mqtt"{
+	confPath := "sources/" + m.sourceType + ".yaml"
+	if m.sourceType == "mqtt" {
 		confPath = "mqtt_source.yaml"
 	}
 	conf, err := common.LoadConf(confPath)
@@ -83,7 +136,7 @@ func getConf(t string, confkey string, ctx api.StreamContext) map[string]interfa
 	if err == nil {
 		cfg := make(map[string]map[string]interface{})
 		if err := yaml.Unmarshal(conf, &cfg); err != nil {
-			logger.Warnf("fail to parse yaml for source %s. Return an empty configuration", t)
+			logger.Warnf("fail to parse yaml for source %s. Return an empty configuration", m.sourceType)
 		} else {
 			var ok bool
 			props, ok = cfg["default"]
@@ -97,24 +150,24 @@ func getConf(t string, confkey string, ctx api.StreamContext) map[string]interfa
 			}
 		}
 	} else {
-		logger.Warnf("config file %s.yaml is not loaded properly. Return an empty configuration", t)
+		logger.Warnf("config file %s.yaml is not loaded properly. Return an empty configuration", m.sourceType)
 	}
-	logger.Debugf("get conf for %s with conf key %s: %v", t, confkey, props)
+	logger.Debugf("get conf for %s with conf key %s: %v", m.sourceType, confkey, props)
 	return props
 }
 
-func (m *SourceNode) Broadcast(data interface{}) int{
+func (m *SourceNode) Broadcast(data interface{}) int {
 	return Broadcast(m.outs, data, m.ctx)
 }
 
-func (m *SourceNode) GetName() string{
+func (m *SourceNode) GetName() string {
 	return m.name
 }
 
 func (m *SourceNode) AddOutput(output chan<- interface{}, name string) (err error) {
-	if _, ok := m.outs[name]; !ok{
+	if _, ok := m.outs[name]; !ok {
 		m.outs[name] = output
-	}else{
+	} else {
 		return fmt.Errorf("fail to add output %s, stream node %s already has an output of the same name", name, m.name)
 	}
 	return nil