Ver código fonte

feat(extension): support to load source as a plugin

Refactor api.Source and api.Sink to support configure
Consume func now receive a message and a meta
ngjaying 5 anos atrás
pai
commit
5e865d2551

+ 17 - 5
common/util.go

@@ -10,6 +10,7 @@ import (
 	"os"
 	"path"
 	"path/filepath"
+	"time"
 )
 
 const (
@@ -49,18 +50,18 @@ func (l *logRedirect) Debugf(f string, v ...interface{}) {
 	Log.Debug(fmt.Sprintf(f, v...))
 }
 
-func LoadConf(confName string) []byte {
+func LoadConf(confName string) ([]byte, error) {
 	confDir, err := GetConfLoc()
 	if err != nil {
-		Log.Fatal(err)
+		return nil, err
 	}
 
 	file := confDir + confName
 	b, err := ioutil.ReadFile(file)
 	if err != nil {
-		Log.Fatal(err)
+		return nil, err
 	}
-	return b
+	return b, nil
 }
 
 type XStreamConf struct {
@@ -76,7 +77,10 @@ func init(){
 		DisableColors: true,
 		FullTimestamp: true,
 	})
-	b := LoadConf(StreamConf)
+	b, err := LoadConf(StreamConf)
+	if err != nil {
+		Log.Fatal(err)
+	}
 	var cfg map[string]XStreamConf
 	if err := yaml.Unmarshal(b, &cfg); err != nil {
 		Log.Fatal(err)
@@ -272,6 +276,14 @@ func GetTimer(duration int) Timer {
 	}
 }
 
+func GetNowInMilli() int64{
+	if IsTesting {
+		return GetMockNow()
+	}else{
+		return TimeToUnixMilli(time.Now())
+	}
+}
+
 func ProcessPath(p string) (string, error) {
 	if abs, err := filepath.Abs(p); err != nil {
 		return "", nil

+ 50 - 5
xsql/processors/xsql_processor.go

@@ -14,7 +14,9 @@ import (
 	"engine/xstream/sinks"
 	"fmt"
 	"github.com/dgraph-io/badger"
+	"github.com/go-yaml/yaml"
 	"path"
+	"plugin"
 	"strings"
 )
 
@@ -363,16 +365,16 @@ func (p *RuleProcessor) createTopoWithSources(rule *api.Rule, sources []*nodes.S
 					return nil, nil, err
 				}
 				if shouldCreateSource{
-					mqs, err := extensions.NewMQTTSource(streamStmt.Options["DATASOURCE"], streamStmt.Options["CONF_KEY"])
+					src, err := getSource(streamStmt)
 					if err != nil {
-						return nil, nil, err
+						return nil, nil, fmt.Errorf("fail to get source: %v", err)
 					}
-					node := nodes.NewSourceNode(string(streamStmt.Name), mqs)
+					node := nodes.NewSourceNode(s, src)
 					tp.AddSrc(node)
 					preprocessorOp := xstream.Transform(pp, "preprocessor_"+s)
 					tp.AddOperator([]api.Emitter{node}, preprocessorOp)
 					inputs = append(inputs, preprocessorOp)
-				}else{
+				} else {
 					tp.AddSrc(sources[i])
 					preprocessorOp := xstream.Transform(pp, "preprocessor_"+s)
 					tp.AddOperator([]api.Emitter{sources[i]}, preprocessorOp)
@@ -426,7 +428,7 @@ func (p *RuleProcessor) createTopoWithSources(rule *api.Rule, sources []*nodes.S
 			}
 
 			if selectStmt.SortFields != nil {
-				orderOp := xstream.Transform(&plans.OrderPlan{SortFields:selectStmt.SortFields}, "order")
+				orderOp := xstream.Transform(&plans.OrderPlan{SortFields: selectStmt.SortFields}, "order")
 				tp.AddOperator(inputs, orderOp)
 				inputs = []api.Emitter{orderOp}
 			}
@@ -441,3 +443,46 @@ func (p *RuleProcessor) createTopoWithSources(rule *api.Rule, sources []*nodes.S
 	}
 }
 
+func getSource(streamStmt *xsql.StreamStmt) (api.Source, error) {
+	if t, ok := streamStmt.Options["TYPE"]; ok{
+		mod := "plugins/" + t + ".so"
+		plug, err := plugin.Open(mod)
+		if err != nil {
+			return nil, fmt.Errorf("cannot open %s: %v", mod, err)
+		}
+		nf, err := plug.Lookup(t)
+		if err != nil{
+			return nil, fmt.Errorf("cannot find symbol %s, please check if it is exported", t)
+		}
+		s, ok := nf.(api.Source)
+		if !ok {
+			return nil, fmt.Errorf("exported symbol %s is not type of api.Source", t)
+		}
+		conf, err := common.LoadConf(t + ".yaml")
+		props := make(map[string]interface{})
+		if err == nil{
+			cfg := make(map[string]map[string]interface{})
+			if err := yaml.Unmarshal(conf, &cfg); err != nil {
+				log.Warnf("fail to parse yaml for source %s. Return an empty configuration", t)
+			}else{
+				props, ok = cfg[streamStmt.Options["CONF_KEY"]]
+				if !ok {
+					log.Warnf("conf for conf_key %s not found, use default conf instead", streamStmt.Options["CONF_KEY"])
+					props = cfg["default"]
+				}
+			}
+		}else{
+			log.Warnf("config file %s.yaml is not loaded properly. Return an empty configuration", t)
+		}
+		s.Configure(streamStmt.Options["DATASOURCE"], props)
+		log.Tracef("Source %s created", t)
+		return s, nil
+	}else{
+		mqs, err := extensions.NewMQTTSource(streamStmt.Options["DATASOURCE"], streamStmt.Options["CONF_KEY"])
+		if err != nil {
+			return nil, err
+		}
+		return mqs, nil
+	}
+}
+

+ 4 - 1
xstream/api/stream.go

@@ -2,10 +2,11 @@ package api
 
 import (
 	"context"
+	"engine/xsql"
 	"github.com/sirupsen/logrus"
 )
 
-type ConsumeFunc func(data interface{})
+type ConsumeFunc func(xsql.Message, xsql.Metadata)
 
 type Closable interface {
 	Close(StreamContext) error
@@ -14,12 +15,14 @@ type Closable interface {
 type Source interface {
 	//Should be sync function for normal case. The container will run it in go func
 	Open(StreamContext, ConsumeFunc) error
+	Configure(string, map[string]interface{}) error
 	Closable
 }
 
 type Sink interface {
 	//Should be sync function for normal case. The container will run it in go func
 	Open(StreamContext) error
+	Configure(map[string]interface{}) error
 	Collect(StreamContext, interface{}) error
 	Closable
 }

+ 4 - 1
xstream/cli/main.go

@@ -45,7 +45,10 @@ func main() {
 	//		Usage: "the name of stream",
 	//	}}
 
-	b := common.LoadConf(clientYaml)
+	b, err := common.LoadConf(clientYaml)
+	if err != nil {
+		common.Log.Fatal(err)
+	}
 	var cfg map[string]clientConf
 	var config *clientConf
 	if err := yaml.Unmarshal(b, &cfg); err != nil {

+ 5 - 0
xstream/collectors/func.go

@@ -25,6 +25,11 @@ func Func(f CollectorFunc) *FuncCollector {
 	return &FuncCollector{f: f}
 }
 
+func (c *FuncCollector) Configure(props map[string]interface{}) error{
+	//do nothing
+	return nil
+}
+
 // Open is the starting point that starts the collector
 func (c *FuncCollector) Open(ctx api.StreamContext) error {
 	log := ctx.GetLogger()

+ 8 - 5
xstream/extensions/mqtt_source.go

@@ -12,7 +12,6 @@ import (
 	"github.com/google/uuid"
 	"strconv"
 	"strings"
-	"time"
 )
 
 type MQTTSource struct {
@@ -45,7 +44,10 @@ type MQTTConfig struct {
 const confName string = "mqtt_source.yaml"
 
 func NewMQTTSource(topic string, confKey string) (*MQTTSource, error) {
-	b := common.LoadConf(confName)
+	b, err := common.LoadConf(confName)
+	if err != nil {
+		common.Log.Fatal(err)
+	}
 	var cfg map[string]MQTTConfig
 	if err := yaml.Unmarshal(b, &cfg); err != nil {
 		return nil, err
@@ -115,6 +117,9 @@ func (ms *MQTTSource) WithSchema(schema string) *MQTTSource {
 	return ms
 }
 
+func (ms *MQTTSource) Configure(topic string, props map[string]interface{}) error {
+	return nil
+}
 
 func (ms *MQTTSource) Open(ctx api.StreamContext, consume api.ConsumeFunc) error {
 	log := ctx.GetLogger()
@@ -173,9 +178,7 @@ func (ms *MQTTSource) Open(ctx api.StreamContext, consume api.ConsumeFunc) error
 		meta := make(map[string]interface{})
 		meta[xsql.INTERNAL_MQTT_TOPIC_KEY] = msg.Topic()
 		meta[xsql.INTERNAL_MQTT_MSG_ID_KEY] = strconv.Itoa(int(msg.MessageID()))
-
-		tuple := &xsql.Tuple{Emitter: ms.tpc, Message:result, Timestamp: common.TimeToUnixMilli(time.Now()), Metadata:meta}
-		consume(tuple)
+		consume(result, meta)
 	}
 	//TODO error listener?
 	opts.SetDefaultPublishHandler(h)

+ 6 - 3
xstream/nodes/source_node.go

@@ -1,6 +1,8 @@
 package nodes
 
 import (
+	"engine/common"
+	"engine/xsql"
 	"engine/xstream/api"
 	"fmt"
 )
@@ -26,9 +28,10 @@ func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
 	logger := ctx.GetLogger()
 	logger.Debugf("open source node %s", m.name)
 	go func(){
-		if err := m.source.Open(ctx, func(data interface{}){
-			m.Broadcast(data)
-			logger.Debugf("%s consume data %v complete", m.name, data)
+		if err := m.source.Open(ctx, func(message xsql.Message, meta xsql.Metadata){
+			tuple := &xsql.Tuple{Emitter: m.name, Message:message, Timestamp: common.GetNowInMilli(), Metadata:meta}
+			m.Broadcast(tuple)
+			logger.Debugf("%s consume data %v complete", m.name, tuple)
 		}); err != nil{
 			select {
 			case errCh <- err:

+ 3 - 1
xstream/sinks/mqtt_sink.go

@@ -93,7 +93,9 @@ func NewMqttSink(properties interface{}) (*MQTTSink, error) {
 	return ms, nil
 }
 
-
+func (ms *MQTTSink) Configure(props map[string]interface{}) error {
+	return nil
+}
 
 func (ms *MQTTSink) Open(ctx api.StreamContext) error {
 	log := ctx.GetLogger()

+ 5 - 0
xstream/test/mock_sink.go

@@ -36,6 +36,11 @@ func (m *MockSink) Close(ctx api.StreamContext) error {
 	return nil
 }
 
+
+func (m *MockSink) Configure(props map[string]interface{}) error {
+	return nil
+}
+
 func (m *MockSink) GetResults() [][]byte {
 	return m.results
 }

+ 5 - 1
xstream/test/mock_source.go

@@ -41,7 +41,7 @@ func (m *MockSource) Open(ctx api.StreamContext, consume api.ConsumeFunc) (err e
 					timer.DoTick(d.Timestamp)
 				}
 			}
-			consume(d)
+			consume(d.Message, nil)
 			if m.isEventTime{
 				time.Sleep(1000 * time.Millisecond) //Let window run to make sure timers are set
 			}else{
@@ -60,4 +60,8 @@ func (m *MockSource) Open(ctx api.StreamContext, consume api.ConsumeFunc) (err e
 
 func (m *MockSource) Close(ctx api.StreamContext) error{
 	return nil
+}
+
+func (m *MockSource) Configure(topic string, props map[string]interface{}) error {
+	return nil
 }