瀏覽代碼

feat(kubeEdge):KubeEdge (MQTT) source #345 (#369)

* feat(kubeEdge):KubeEdge (MQTT) source #345

* feat(kubeEdge):KubeEdge (MQTT) source #345

* feat(kubeEdge):KubeEdge (MQTT) source #345

* feat(kubeEdge):KubeEdge (MQTT) source #345

* feat(kubeEdge):KubeEdge (MQTT) source #345
EMQmyd 4 年之前
父節點
當前提交
29eb9cf73c

+ 8 - 0
docs/en_US/rules/sources/mqtt.md

@@ -57,6 +57,14 @@ The location of certification path. It can be an absolute path, or a relative pa
 
 The location of private key path. It can be an absolute path, or a relative path.  For more detailed information, please refer to ``certificationPath``. Such as ``d3807d9fa5-private.pem.key``.
 
+### kubeedgeVersion
+
+kubeedge 版本号,不同的版本号对应的文件内容不同。
+
+### kubeedgeModelFile
+
+kubeedge 模版文件名,文件指定放在 etc/sources 文件夹中
+
 ### bufferLength
 specify the maximum number of messages to be buffered in the memory. This is used to avoid the extra large memory usage that would cause out of memory error. Notice that the memory usage will be varied to the actual buffer. Increase the length here won't increase the initial memory allocation so it is safe to set a large buffer length. The default value is 102400, that is if each payload size is about 100 bytes, the maximum buffer size will be about 102400 * 100B ~= 10MB.
 

+ 86 - 76
docs/zh_CN/rules/sources/mqtt.md

@@ -1,76 +1,86 @@
-# MQTT源
-
-Kuiper 为 MQTT 源流提供了内置支持,流可以订阅来自 MQTT 代理的消息并输入Kuiper 处理管道。 MQTT 源的配置文件位于 `$kuiper/etc/mqtt_source.yaml`。 以下是文件格式。
-
-```yaml
-#全局MQTT配置
-default:
-  qos: 1
-  sharedsubscription: true
-  servers: [tcp://127.0.0.1:1883]
-  concurrency: 1
-  #username: user1
-  #password: password
-  #certificationPath: /var/kuiper/xyz-certificate.pem
-  #privateKeyPath: /var/kuiper/xyz-private.pem.key
-
-
-#重载全局配置
-demo: #Conf_key
-  qos: 0
-  servers: [tcp://10.211.55.6:1883]
-```
-
-## 全局 MQTT 配置
-
-用户可以在此处指定全局 MQTT 设置。`default` 部分中指定的配置项将用作所有MQTT 连接的默认设置。
-
-### qos
-
-默认订阅QoS级别。
-
-### concurrency
-设置运行的协程数,默认值为1。如果设置协程数大于1,必须使用共享订阅模式。
-
-### sharedsubscription
-
-是否使用共享订阅模式。 如果使用共享订阅模式,那么多个 Kuiper 进程可以进行负载平衡。
-
-### servers
-
-MQTT 消息代理的服务器列表。 当前,只能指定一个服务器。
-
-### username
-
-MQTT 连接用户名。如果指定了 `certificationPath`  或者 `privateKeyPath`,那么该项配置不会被使用。
-
-### password
-
-MQTT 连接密码。如果指定了 `certificationPath` 或者 `privateKeyPath`,那么该项配置不会被使用。
-
-### certificationPath
-
-证书路径。可以为绝对路径,也可以为相对路径。如果指定的是相对路径,那么父目录为执行 `server` 命令的路径。比如,如果你在 `/var/kuiper` 中运行 `bin/server` ,那么父目录为 `/var/kuiper`; 如果运行从`/var/kuiper/bin`中运行`./server`,那么父目录为 `/var/kuiper/bin`。 比如  `d3807d9fa5-certificate.pem`。
-
-### privateKeyPath
-
-私钥路径。可以为绝对路径,也可以为相对路径。更详细的信息,请参考 `certificationPath`,比如 `d3807d9fa5-private.pem.key`。
-
-### bufferLength
-
-指定最大缓存消息数目。该参数主要用于防止内存溢出。实际内存用量会根据当前缓存消息数目动态变化。增大该参数不会增加初始内存分配量,因此设置较大的数值是安全的。该参数默认值为102400;如果每条消息为100字节,则默认情况下,缓存最大占用内存量为102400 * 100B ~= 10MB. 
-
-## 重载默认设置
-
-如果您有一个特定连接需要重载默认设置,则可以创建一个自定义模块。 在上一个示例中,我们创建一个名为 `demo` 的特定设置。 然后,您可以在创建流定义时使用选项 `CONF_KEY` 指定配置(有关更多信息,请参见 [stream specs](../../sqls/streams.md) )。
-
-**示例**
-
-```
-demo (
-		...
-	) WITH (DATASOURCE="test/", FORMAT="JSON", KEY="USERID", CONF_KEY="demo");
-```
-
-这些特定设置使用的配置键与 `default` 设置中的配置键相同,在特定设置中指定的任何值都将覆盖 `default` 部分中的值。
-
+# MQTT源
+
+Kuiper 为 MQTT 源流提供了内置支持,流可以订阅来自 MQTT 代理的消息并输入Kuiper 处理管道。 MQTT 源的配置文件位于 `$kuiper/etc/mqtt_source.yaml`。 以下是文件格式。
+
+```yaml
+#全局MQTT配置
+default:
+  qos: 1
+  sharedsubscription: true
+  servers: [tcp://127.0.0.1:1883]
+  concurrency: 1
+  #username: user1
+  #password: password
+  #certificationPath: /var/kuiper/xyz-certificate.pem
+  #privateKeyPath: /var/kuiper/xyz-private.pem.key
+  kubeedgeVersion: "1.0"
+  kubeedgeModelFile: "mqtt_model.json"
+
+
+#重载全局配置
+demo: #Conf_key
+  qos: 0
+  servers: [tcp://10.211.55.6:1883]
+```
+
+## 全局 MQTT 配置
+
+用户可以在此处指定全局 MQTT 设置。`default` 部分中指定的配置项将用作所有MQTT 连接的默认设置。
+
+### qos
+
+默认订阅QoS级别。
+
+### concurrency
+设置运行的协程数,默认值为1。如果设置协程数大于1,必须使用共享订阅模式。
+
+### sharedsubscription
+
+是否使用共享订阅模式。 如果使用共享订阅模式,那么多个 Kuiper 进程可以进行负载平衡。
+
+### servers
+
+MQTT 消息代理的服务器列表。 当前,只能指定一个服务器。
+
+### username
+
+MQTT 连接用户名。如果指定了 `certificationPath`  或者 `privateKeyPath`,那么该项配置不会被使用。
+
+### password
+
+MQTT 连接密码。如果指定了 `certificationPath` 或者 `privateKeyPath`,那么该项配置不会被使用。
+
+### certificationPath
+
+证书路径。可以为绝对路径,也可以为相对路径。如果指定的是相对路径,那么父目录为执行 `server` 命令的路径。比如,如果你在 `/var/kuiper` 中运行 `bin/server` ,那么父目录为 `/var/kuiper`; 如果运行从`/var/kuiper/bin`中运行`./server`,那么父目录为 `/var/kuiper/bin`。 比如  `d3807d9fa5-certificate.pem`。
+
+### privateKeyPath
+
+私钥路径。可以为绝对路径,也可以为相对路径。更详细的信息,请参考 `certificationPath`,比如 `d3807d9fa5-private.pem.key`。
+
+### kubeedgeVersion
+
+kubeedge 版本号,不同的版本号对应的文件内容不同。
+
+### kubeedgeModelFile
+
+kubeedge 模版文件名,文件指定放在 etc/sources 文件夹中
+
+### bufferLength
+
+指定最大缓存消息数目。该参数主要用于防止内存溢出。实际内存用量会根据当前缓存消息数目动态变化。增大该参数不会增加初始内存分配量,因此设置较大的数值是安全的。该参数默认值为102400;如果每条消息为100字节,则默认情况下,缓存最大占用内存量为102400 * 100B ~= 10MB. 
+
+## 重载默认设置
+
+如果您有一个特定连接需要重载默认设置,则可以创建一个自定义模块。 在上一个示例中,我们创建一个名为 `demo` 的特定设置。 然后,您可以在创建流定义时使用选项 `CONF_KEY` 指定配置(有关更多信息,请参见 [stream specs](../../sqls/streams.md) )。
+
+**示例**
+
+```
+demo (
+		...
+	) WITH (DATASOURCE="test/", FORMAT="JSON", KEY="USERID", CONF_KEY="demo");
+```
+
+这些特定设置使用的配置键与 `default` 设置中的配置键相同,在特定设置中指定的任何值都将覆盖 `default` 部分中的值。
+

+ 2 - 2
etc/mqtt_source.yaml

@@ -8,9 +8,9 @@ default:
   #password: password
   #certificationPath: /var/kuiper/xyz-certificate.pem
   #privateKeyPath: /var/kuiper/xyz-private.pem.key
+  #kubeedgeVersion: 
+  #kubeedgeModelFile: ""
 
-
-#Override the global configurations
 demo_conf: #Conf_key
   qos: 0
   servers: [tcp://10.211.55.6:1883, tcp://127.0.0.1]

+ 26 - 3
xstream/extensions/mqtt_source.go

@@ -8,6 +8,7 @@ import (
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xstream/api"
 	"github.com/google/uuid"
+	"path"
 	"strconv"
 	"strings"
 )
@@ -22,6 +23,7 @@ type MQTTSource struct {
 	certPath string
 	pkeyPath string
 
+	model  modelVersion
 	schema map[string]interface{}
 	conn   MQTT.Client
 }
@@ -36,6 +38,8 @@ type MQTTConfig struct {
 	Password           string   `json:"password"`
 	Certification      string   `json:"certificationPath"`
 	PrivateKPath       string   `json:"privateKeyPath"`
+	KubeedgeModelFile  string   `json:"kubeedgeModelFile"`
+	KubeedgeVersion    string   `json:"kubeedgeVersion"`
 }
 
 func (ms *MQTTSource) WithSchema(schema string) *MQTTSource {
@@ -66,6 +70,18 @@ func (ms *MQTTSource) Configure(topic string, props map[string]interface{}) erro
 	ms.password = strings.Trim(cfg.Password, " ")
 	ms.certPath = cfg.Certification
 	ms.pkeyPath = cfg.PrivateKPath
+
+	if 0 != len(cfg.KubeedgeModelFile) {
+		conf, err := common.LoadConf(path.Join("sources", cfg.KubeedgeModelFile))
+		if nil != err {
+			return err
+		}
+		ms.model = modelFactory(cfg.KubeedgeVersion)
+		if err = json.Unmarshal(conf, ms.model); err != nil {
+			ms.model = nil
+			return err
+		}
+	}
 	return nil
 }
 
@@ -120,7 +136,7 @@ func (ms *MQTTSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTupl
 	opts.SetConnectionLostHandler(func(client MQTT.Client, e error) {
 		log.Errorf("The connection %s is disconnected due to error %s, will try to re-connect later.", ms.srv+": "+ms.clientid, e)
 		reconn = true
-		subscribe(ms.tpc, client, ctx, consumer)
+		subscribe(ms.tpc, client, ctx, consumer, ms.model)
 	})
 
 	opts.SetOnConnectHandler(func(client MQTT.Client) {
@@ -135,11 +151,11 @@ func (ms *MQTTSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTupl
 	}
 	log.Infof("The connection to server %s was established successfully", ms.srv)
 	ms.conn = c
-	subscribe(ms.tpc, c, ctx, consumer)
+	subscribe(ms.tpc, c, ctx, consumer, ms.model)
 	log.Infof("Successfully subscribe to topic %s", ms.srv+": "+ms.clientid)
 }
 
-func subscribe(topic string, client MQTT.Client, ctx api.StreamContext, consumer chan<- api.SourceTuple) {
+func subscribe(topic string, client MQTT.Client, ctx api.StreamContext, consumer chan<- api.SourceTuple, model modelVersion) {
 	log := ctx.GetLogger()
 	h := func(client MQTT.Client, msg MQTT.Message) {
 		log.Debugf("instance %d received %s", ctx.GetInstanceId(), msg.Payload())
@@ -154,6 +170,13 @@ func subscribe(topic string, client MQTT.Client, ctx api.StreamContext, consumer
 		meta["topic"] = msg.Topic()
 		meta["messageid"] = strconv.Itoa(int(msg.MessageID()))
 
+		if nil != model {
+			sliErr := model.checkType(result, msg.Topic())
+			for _, v := range sliErr {
+				log.Errorf(v)
+			}
+		}
+
 		select {
 		case consumer <- api.NewDefaultSourceTuple(result, meta):
 			log.Debugf("send data to source node")

+ 151 - 0
xstream/extensions/mqtt_type.go

@@ -0,0 +1,151 @@
+package extensions
+
+import (
+	"fmt"
+	"reflect"
+	"strconv"
+	"strings"
+)
+
+type (
+	property struct {
+		Name     string `json:"name"`
+		DataType string `json:"dataType"`
+	}
+	device struct {
+		Name       string      `json:"name"`
+		Properties []*property `json:"properties"`
+	}
+	deviceModel struct {
+		Devices []*device `json:"deviceModels"`
+	}
+)
+
+type modelVersion interface {
+	checkType(map[string]interface{}, string) []string
+}
+
+func modelFactory(version string) modelVersion {
+	return new(deviceModel)
+}
+func (this *property) getName() string {
+	return this.Name
+}
+func (this *property) getDataType() string {
+	return this.DataType
+}
+func (this *device) getName() string {
+	return this.Name
+}
+func (this *device) findDataType(name string) string {
+	for _, v := range this.Properties {
+		if strings.ToLower(v.getName()) == strings.ToLower(name) {
+			return v.getDataType()
+		}
+	}
+	return ""
+}
+func (this *deviceModel) findDataType(deviceId, dataName string) string {
+	for _, v := range this.Devices {
+		if strings.ToLower(v.getName()) == strings.ToLower(deviceId) {
+			return v.findDataType(dataName)
+		}
+	}
+	return ""
+}
+func boolToInt(b bool) int {
+	if b {
+		return 1
+	}
+	return 0
+}
+func intToBool(i int) bool {
+	if 0 == i {
+		return false
+	}
+	return true
+}
+func changeType(modelType string, data interface{}) (interface{}, string) {
+	dataType := reflect.TypeOf(data).Kind()
+	switch dataType {
+	case reflect.Bool:
+		b, _ := data.(bool)
+		switch modelType {
+		case "int":
+			data = boolToInt(b)
+		case "bool":
+			return data, ""
+		default:
+			return data, fmt.Sprintf("not support modelType : %s", modelType)
+		}
+	case reflect.Int:
+		i, _ := data.(int)
+		switch modelType {
+		case "int":
+			return data, ""
+		case "float":
+			data = float64(i)
+		case "boolean":
+			data = intToBool(i)
+		case "string":
+			data = strconv.Itoa(i)
+		default:
+			return data, fmt.Sprintf("not support modelType : %s", modelType)
+		}
+	case reflect.String:
+		s, _ := data.(string)
+		switch modelType {
+		case "string":
+			return data, ""
+		case "float":
+			data, _ = strconv.ParseFloat(s, 64)
+		case "int":
+			data, _ = strconv.Atoi(s)
+		default:
+			return data, fmt.Sprintf("not support modelType : %s", modelType)
+		}
+	case reflect.Float64:
+		f, _ := data.(float64)
+		switch modelType {
+		case "double", "float":
+			return data, ""
+		case "int":
+			data = int(f)
+		case "string":
+			data = strconv.FormatFloat(f, 'f', -1, 64)
+		default:
+			return data, fmt.Sprintf("not support modelType : %s", modelType)
+		}
+	default:
+		return data, fmt.Sprintf("not support type : %v", dataType)
+	}
+	return data, ""
+}
+func topicToDeviceid(topic string) string {
+	sliStr := strings.Split(topic, `/`)
+	if 4 > len(sliStr) {
+		return ""
+	}
+	return sliStr[3]
+}
+func (this *deviceModel) checkType(m map[string]interface{}, topic string) []string {
+	var sliErr []string
+	strErr := ""
+	for k, v := range m {
+		deviceid := topicToDeviceid(topic)
+		if 0 == len(deviceid) {
+			sliErr = append(sliErr, fmt.Sprintf("not find deviceid : %s", topic))
+			continue
+		}
+		modelType := this.findDataType(deviceid, k)
+		if 0 == len(modelType) {
+			continue
+		}
+		m[k], strErr = changeType(modelType, v)
+		if 0 != len(strErr) {
+			sliErr = append(sliErr, strErr)
+			delete(m, k)
+		}
+	}
+	return sliErr
+}

+ 36 - 0
xstream/extensions/mqtt_type_test.go

@@ -0,0 +1,36 @@
+package extensions
+
+import (
+	"encoding/json"
+	"reflect"
+	"testing"
+)
+
+func TestCheckType(t *testing.T) {
+	strModel := `{"deviceModels":[{"name":"device1","properties":[{"name":"temperature","dataType":"int"},{"name":"humidity","dataType":"int"}]},{"name":"device2","properties":[{"name":"temperature","dataType":"string"},{"name":"humidity","dataType":"string"}]}]}`
+	mode := new(deviceModel)
+	json.Unmarshal([]byte(strModel), mode)
+
+	datas := []map[string]interface{}{
+		{"temperature": 1, "humidity": 1},
+		{"temperature": "1", "humidity": "1"},
+		{"temperature": 1, "humidity": "1"},
+		{"temperature": "1", "humidity": 1},
+	}
+
+	topics := []string{`$ke/events/device/device1/data/update`, `$ke/events/device/device2/data/update`}
+
+	for _, topic := range topics {
+		for _, data := range datas {
+			mode.checkType(data, topic)
+			for k, v := range data {
+				deviceid := topicToDeviceid(topic)
+				modelType := mode.findDataType(deviceid, k)
+				dataType := reflect.TypeOf(v).String()
+				if modelType != dataType {
+					t.Errorf("data:%s=%s mode:%s=%s\n", k, dataType, k, modelType)
+				}
+			}
+		}
+	}
+}