Browse Source

feat(extension): Refactor built-in sources/sinks to comply extension rule

ngjaying 5 years ago
parent
commit
0c6a43738c

+ 16 - 0
common/util.go

@@ -2,6 +2,7 @@ package common
 
 import (
 	"bytes"
+	"encoding/json"
 	"fmt"
 	"github.com/dgraph-io/badger"
 	"github.com/go-yaml/yaml"
@@ -335,3 +336,18 @@ func ToInt(input interface{}) (int, error){
 		return 0, fmt.Errorf("unsupported type %T of %[1]v", input)
 	}
 }
+
+/*
+*   Convert a map into a struct. The output parameter must be a pointer to a struct
+*   The struct can have the json meta data
+ */
+func MapToStruct(input map[string]interface{}, output interface{}) error{
+	// convert map to json
+	jsonString, err := json.Marshal(input)
+	if err != nil{
+		return err
+	}
+
+	// convert json to struct
+	return json.Unmarshal(jsonString, output)
+}

+ 0 - 15
etc/mqtt_source.yaml

@@ -1,15 +0,0 @@
-#Global MQTT configurations
-default:
-  qos: 1
-  sharedSubscription: true
-  servers: [tcp://127.0.0.1:1883]
-  #username: user1
-  #password: password
-  #certificationPath: /var/kuiper/xyz-certificate.pem
-  #privateKeyPath: /var/kuiper/xyz-private.pem.key
-
-
-#Override the global configurations
-demo_conf: #Conf_key
-  qos: 0
-  servers: [tcp://10.211.55.6:1883, tcp://127.0.0.1]

xstream/extensions/mqtt_source.yaml → etc/sources/MQTT.yaml


etc/sources/RandomSource.yaml → etc/sources/Random.yaml


+ 2 - 2
examples/testExtension.go

@@ -20,7 +20,7 @@ func main() {
 	processors.NewStreamProcessor(demo, path.Join(BadgerDir, "stream")).Exec()
 
 
-	demo = "CREATE STREAM ext (count bigint) WITH (DATASOURCE=\"users\", FORMAT=\"JSON\", TYPE=\"RandomSource\")"
+	demo = "CREATE STREAM ext (count bigint) WITH (DATASOURCE=\"users\", FORMAT=\"JSON\", TYPE=\"random\")"
 	_, err = processors.NewStreamProcessor(demo, path.Join(BadgerDir, "stream")).Exec()
 	if err != nil{
 		panic(err)
@@ -28,7 +28,7 @@ func main() {
 
 	rp := processors.NewRuleProcessor(BadgerDir)
 	rp.ExecDrop("$$test1")
-	rs, err := rp.ExecCreate("$$test1", "{\"sql\": \"SELECT count FROM ext where ext.count > 3\",\"actions\": [{\"MemorySink\":  {}}]}")
+	rs, err := rp.ExecCreate("$$test1", "{\"sql\": \"SELECT count FROM ext where ext.count > 3\",\"actions\": [{\"memory\":  {}}]}")
 	if err != nil {
 		msg := fmt.Sprintf("failed to create rule: %s.", err)
 		log.Printf(msg)

+ 2 - 2
plugins/sinks/memory_sink.go

@@ -1,4 +1,4 @@
-package sinks
+package main
 
 import "engine/xstream/api"
 
@@ -34,4 +34,4 @@ func (m *memory) Configure(props map[string]interface{}) error {
 	return nil
 }
 
-var MemorySink memory
+var Memory memory

+ 2 - 2
plugins/sources/RandomSource.go

@@ -1,4 +1,4 @@
-package sources
+package main
 
 import (
 	"context"
@@ -62,4 +62,4 @@ func (s *randomSource) Close(ctx api.StreamContext) error{
 	return nil
 }
 
-var RandomSource randomSource
+var Random randomSource

+ 50 - 27
xsql/processors/xsql_processor.go

@@ -18,6 +18,7 @@ import (
 	"path"
 	"plugin"
 	"strings"
+	"unicode"
 )
 
 var log = common.Log
@@ -218,7 +219,11 @@ func (p *RuleProcessor) ExecInitRule(rule *api.Rule) (*xstream.TopologyNew, erro
 	}else{
 		for _, m := range rule.Actions {
 			for name, action := range m {
-				if s, err := getSink(name, action); err != nil{
+				props, ok := action.(map[string]interface{})
+				if !ok {
+					return nil, fmt.Errorf("expect map[string]interface{} type for the action properties, but found %v", action)
+				}
+				if s, err := getSink(name, props); err != nil{
 					return nil, err
 				}else{
 					tp.AddSink(inputs, nodes.NewSinkNode("sink_" + name, s))
@@ -439,32 +444,33 @@ func getSource(streamStmt *xsql.StreamStmt) (api.Source, error) {
 	if !ok{
 		t = "mqtt"
 	}
+	t = ucFirst(t)
+	var s api.Source
 	switch t {
-	case "mqtt":
-		mqs, err := extensions.NewMQTTSource(streamStmt.Options["DATASOURCE"], streamStmt.Options["CONF_KEY"])
-		if err != nil {
-			return nil, err
-		}
-		log.Tracef("Source mqtt created")
-		return mqs, nil
+	case "Mqtt":
+		s = &extensions.MQTTSource{}
+		log.Debugf("Source mqtt created")
 	default:
 		nf, err := getPlugin(t, "sources")
 		if err != nil {
 			return nil, err
 		}
-		s, ok := nf.(api.Source)
+		s, ok = nf.(api.Source)
 		if !ok {
 			return nil, fmt.Errorf("exported symbol %s is not type of api.Source", t)
 		}
-		props := getConf(t, streamStmt.Options["CONF_KEY"])
-		s.Configure(streamStmt.Options["DATASOURCE"], props)
-		log.Tracef("Source %s created", t)
-		return s, nil
 	}
+	props := getConf(t, streamStmt.Options["CONF_KEY"])
+	err := s.Configure(streamStmt.Options["DATASOURCE"], props)
+	if err != nil{
+		return nil, err
+	}
+	log.Debugf("Source %s created", t)
+	return s, nil
 }
 
 func getConf(t string, confkey string) map[string]interface{} {
-	conf, err := common.LoadConf(t + ".yaml")
+	conf, err := common.LoadConf("sources/" + t + ".yaml")
 	props := make(map[string]interface{})
 	if err == nil {
 		cfg := make(map[string]map[string]interface{})
@@ -472,10 +478,14 @@ func getConf(t string, confkey string) map[string]interface{} {
 			log.Warnf("fail to parse yaml for source %s. Return an empty configuration", t)
 		} else {
 			var ok bool
-			props, ok = cfg[confkey]
+			props, ok = cfg["default"]
 			if !ok {
-				log.Warnf("conf for conf_key %s not found, use default conf instead", confkey)
-				props = cfg["default"]
+				log.Warnf("default conf is not found", confkey)
+			}
+			if c, ok := cfg[confkey]; ok {
+				for k, v := range c {
+					props[k] = v
+				}
 			}
 		}
 	} else {
@@ -498,26 +508,39 @@ func getPlugin(t string, ptype string) (plugin.Symbol, error) {
 	return nf, nil
 }
 
-func getSink(name string, action interface{}) (api.Sink, error) {
+func getSink(name string, action map[string]interface{}) (api.Sink, error) {
 	log.Tracef("trying to get sink %s with action %v", name, action)
+	var s api.Sink
+	name = ucFirst(name)
 	switch name {
-	case "log":
-		return sinks.NewLogSink(), nil
-	case "mqtt":
-		return sinks.NewMqttSink(action)
+	case "Log":
+		s = sinks.NewLogSink()
+	case "Mqtt":
+		s = &sinks.MQTTSink{}
 	default:
 		nf, err := getPlugin(name, "sinks")
 		if err != nil {
 			return nil, err
 		}
-		s, ok := nf.(api.Sink)
+		var ok bool
+		s, ok = nf.(api.Sink)
 		if !ok {
 			return nil, fmt.Errorf("exported symbol %s is not type of api.Sink", name)
 		}
-		props := getConf(name, "default")
-		s.Configure(props)
-		log.Tracef("Sink %s created", name)
-		return s, nil
 	}
+
+	err := s.Configure(action)
+	if err != nil{
+		return nil, err
+	}
+	log.Debugf("Sink %s created", name)
+	return s, nil
+}
+
+func ucFirst(str string) string {
+	for i, v := range str {
+		return string(unicode.ToUpper(v)) + str[i+1:]
+	}
+	return ""
 }
 

+ 27 - 77
xstream/extensions/mqtt_source.go

@@ -8,7 +8,6 @@ import (
 	"engine/xstream/api"
 	"fmt"
 	MQTT "github.com/eclipse/paho.mqtt.golang"
-	"github.com/go-yaml/yaml"
 	"github.com/google/uuid"
 	"strconv"
 	"strings"
@@ -30,94 +29,45 @@ type MQTTSource struct {
 
 
 type MQTTConfig struct {
-	Qos string `yaml:"qos"`
-	Sharedsubscription string `yaml:"sharedSubscription"`
-	Servers []string `yaml:"servers"`
-	Clientid string `yaml:"clientid"`
-	PVersion string `yaml:"protocolVersion"`
-	Uname string `yaml:"username"`
-	Password string `yaml:"password"`
-	Certification string `yaml:"certificationPath"`
-	PrivateKPath string `yaml:"privateKeyPath"`
+	Qos int `json:"qos"`
+	Sharedsubscription bool `json:"sharedSubscription"`
+	Servers []string `json:"servers"`
+	Clientid string `json:"clientid"`
+	PVersion string `json:"protocolVersion"`
+	Uname string `json:"username"`
+	Password string `json:"password"`
+	Certification string `json:"certificationPath"`
+	PrivateKPath string `json:"privateKeyPath"`
 }
 
-const confName string = "mqtt_source.yaml"
+func (ms *MQTTSource) WithSchema(schema string) *MQTTSource {
+	return ms
+}
 
-func NewMQTTSource(topic string, confKey string) (*MQTTSource, error) {
-	b, err := common.LoadConf(confName)
+func (ms *MQTTSource) Configure(topic string, props map[string]interface{}) error {
+	cfg := &MQTTConfig{}
+	err := common.MapToStruct(props, cfg)
 	if err != nil {
-		common.Log.Fatal(err)
+		return fmt.Errorf("read properties %v fail with error: %v", props, err)
 	}
-	var cfg map[string]MQTTConfig
-	if err := yaml.Unmarshal(b, &cfg); err != nil {
-		return nil, err
-	}
-
-	ms := &MQTTSource{tpc: topic}
-	if srvs := cfg[confKey].Servers; srvs != nil && len(srvs) > 1 {
-		return nil, fmt.Errorf("It only support one server in %s section.", confKey)
-	} else if srvs == nil {
-		srvs = cfg["default"].Servers
-		if srvs != nil && len(srvs) == 1 {
-			ms.srv = srvs[0]
-		} else {
-			return nil, fmt.Errorf("Wrong configuration in default section!")
-		}
-	} else {
+	ms.tpc = topic
+	if srvs := cfg.Servers; srvs != nil && len(srvs) > 0 {
 		ms.srv = srvs[0]
-	}
-
-	if cid := cfg[confKey].Clientid; cid != "" {
-		ms.clientid = cid
-	} else {
-		ms.clientid = cfg["default"].Clientid
-	}
-
-	var pversion uint = 3
-	if pv := cfg[confKey].PVersion; pv != "" {
-		if pv == "3.1.1" {
-			pversion = 4
-		}
-	} else {
-		pv = cfg["default"].PVersion
-		if pv == "3.1.1" {
-			pversion = 4
-		}
-	}
-	ms.pVersion = pversion
-
-	if uname := cfg[confKey].Uname; uname != "" {
-		ms.uName = strings.Trim(uname, " ")
-	} else {
-		ms.uName = cfg["default"].Uname
-	}
-
-	if password := cfg[confKey].Password; password != "" {
-		ms.password = strings.Trim(password, " ")
 	} else {
-		ms.password = cfg["default"].Password
+		return fmt.Errorf("missing server property")
 	}
 
-	if cpath := cfg[confKey].Certification; cpath != "" {
-		ms.certPath = cpath
-	} else {
-		ms.certPath = cfg["default"].Certification
-	}
+	ms.clientid = cfg.Clientid
 
-	if pkpath := cfg[confKey].PrivateKPath; pkpath != "" {
-		ms.pkeyPath = pkpath
-	} else {
-		ms.pkeyPath = cfg["default"].PrivateKPath
+	ms.pVersion = 3
+	if cfg.PVersion == "3.1.1" {
+		ms.pVersion = 4
 	}
 
-	return ms, nil
-}
-
-func (ms *MQTTSource) WithSchema(schema string) *MQTTSource {
-	return ms
-}
-
-func (ms *MQTTSource) Configure(topic string, props map[string]interface{}) error {
+	ms.uName = cfg.Uname
+	ms.password = strings.Trim(cfg.PVersion, " ")
+	ms.certPath = cfg.Certification
+	ms.pkeyPath = cfg.PrivateKPath
 	return nil
 }
 

+ 13 - 13
xstream/sinks/mqtt_sink.go

@@ -23,23 +23,19 @@ type MQTTSink struct {
 	conn MQTT.Client
 }
 
-func NewMqttSink(properties interface{}) (*MQTTSink, error) {
-	ps, ok := properties.(map[string]interface{})
-	if !ok {
-		return nil, fmt.Errorf("expect map[string]interface{} type for the mqtt sink properties")
-	}
+func (ms *MQTTSink) Configure(ps map[string]interface{}) error {
 	srv, ok := ps["server"]
 	if !ok {
-		return nil, fmt.Errorf("mqtt sink is missing property server")
+		return fmt.Errorf("mqtt sink is missing property server")
 	}
 	tpc, ok := ps["topic"]
 	if !ok {
-		return nil, fmt.Errorf("mqtt sink is missing property topic")
+		return fmt.Errorf("mqtt sink is missing property topic")
 	}
 	clientid, ok := ps["clientId"]
 	if !ok{
 		if uuid, err := uuid.NewUUID(); err != nil {
-			return nil, fmt.Errorf("mqtt sink fails to get uuid, the error is %s", err)
+			return fmt.Errorf("mqtt sink fails to get uuid, the error is %s", err)
 		}else{
 			clientid = uuid.String()
 		}
@@ -53,7 +49,7 @@ func NewMqttSink(properties interface{}) (*MQTTSink, error) {
 		} else if v == "3.1.1" {
 			pVersion = 4
 		} else {
-			return nil, fmt.Errorf("Unknown protocol version {0}, the value could be only 3.1 or 3.1.1 (also refers to MQTT version 4).", pVersionStr)
+			return fmt.Errorf("unknown protocol version %s, the value could be only 3.1 or 3.1.1 (also refers to MQTT version 4)", pVersionStr)
 		}
 	}
 
@@ -89,11 +85,15 @@ func NewMqttSink(properties interface{}) (*MQTTSink, error) {
 		}
 	}
 
-	ms := &MQTTSink{srv: srv.(string), tpc: tpc.(string), clientid: clientid.(string), pVersion:pVersion, uName:uName, password:password, certPath:certPath, pkeyPath:pKeyPath}
-	return ms, nil
-}
+	ms.srv = srv.(string)
+	ms.tpc = tpc.(string)
+	ms.clientid = clientid.(string)
+	ms.pVersion = pVersion
+	ms.uName = uName
+	ms.password = password
+	ms.certPath = certPath
+	ms.pkeyPath = pKeyPath
 
-func (ms *MQTTSink) Configure(props map[string]interface{}) error {
 	return nil
 }