RockyJin 4 éve
szülő
commit
c2627e5aa2

BIN
deploy/docker/conf_util


+ 136 - 0
deploy/docker/conf_util.go

@@ -0,0 +1,136 @@
+package main
+
+import (
+	"fmt"
+	"github.com/go-yaml/yaml"
+	"io/ioutil"
+	"os"
+	"strings"
+)
+
+var fileMap = map[string]string{
+	//"edgex":       "/kuiper/etc/sources/edgex.yaml",
+	"edgex":       "/tmp/edgex.yaml",
+	"mqtt_source": "/kuiper/etc/mqtt_source.yaml",
+	"kuiper":      "/kuiper/etc/kuiper.yaml",
+}
+
+var file_keys_map = map[string]map[string]string{
+	"edgex": {
+		"CLIENTID":          "ClientId",
+		"USERNAME":          "Username",
+		"PASSWORD":          "Password",
+		"QOS":               "Qos",
+		"KEEPALIVE":         "KeepAlive",
+		"RETAINED":          "Retained",
+		"CONNECTIONPAYLOAD": "ConnectionPayload",
+		"CERTFILE":          "CertFile",
+		"KEYFILE":           "KeyFile",
+		"CERTPEMBLOCK":      "CertPEMBlock",
+		"KEYPEMBLOCK":       "KeyPEMBlock",
+		"SKIPCERTVERIFY":    "SkipCertVerify",
+	},
+	"mqtt_source": {
+		"SHAREDSUBSCRIPTION": "sharedSubscription",
+		"CERTIFICATIONPATH":  "certificationPath",
+		"PRIVATEKEYPATH":     "privateKeyPath",
+	},
+	"kuiper": {
+		"CONSOLELOG":     "consoleLog",
+		"FILELOG":        "fileLog",
+		"RESTPORT":       "restPort",
+		"PROMETHEUSPORT": "prometheusPort",
+	},
+}
+
+func main() {
+	files := make(map[string]map[interface{}]interface{})
+	ProcessEnv(files, os.Environ())
+	for f, v := range files {
+		if bs, err := yaml.Marshal(v); err != nil {
+			fmt.Println(err)
+		} else {
+			message := fmt.Sprintf("-------------------\nConf file %s: \n %s", f, string(bs))
+			fmt.Println(message)
+		}
+	}
+}
+
+func ProcessEnv(files map[string]map[interface{}]interface{}, vars []string) {
+	for _, e := range vars {
+		pair := strings.SplitN(e, "=", 2)
+		if len(pair) != 2 {
+			fmt.Printf("invalid env %s, skip it.\n", e)
+			continue
+		}
+
+		valid := false
+		for k, _ := range fileMap {
+			if strings.HasPrefix(pair[0], strings.ToUpper(k)) {
+				valid = true
+				break
+			}
+		}
+		if !valid {
+			continue
+		} else {
+			fmt.Printf("Find env: %s, start to handle it.\n", e)
+		}
+
+		env_v := strings.ReplaceAll(pair[0], "__", "+")
+		keys := strings.Split(env_v, "_")
+		for i, v := range keys {
+			keys[i] = strings.ReplaceAll(v, "+", "_")
+		}
+
+		if len(keys) < 2 {
+			fmt.Printf("not concerned env %s, skip it.\n", e)
+			continue
+		} else {
+			k := strings.ToLower(keys[0])
+			if v, ok := files[k]; !ok {
+				if data, err := ioutil.ReadFile(fileMap[k]); err != nil {
+					fmt.Printf("%s\n", err)
+				} else {
+					m := make(map[interface{}]interface{})
+					err = yaml.Unmarshal([]byte(data), &m)
+					if err != nil {
+						fmt.Println(err)
+					}
+					files[k] = m
+					Handle(k, m, keys[1:], pair[1])
+
+				}
+			} else {
+				Handle(k, v, keys[1:], pair[1])
+			}
+		}
+	}
+}
+
+func Handle(file string, conf map[interface{}]interface{}, skeys []string, val string) {
+	key := getKey(file, skeys[0])
+	if len(skeys) == 1 {
+		conf[key] = val
+	} else if len(skeys) >= 2 {
+		if v, ok := conf[key]; ok {
+			if v1, ok1 := v.(map[interface{}]interface{}); ok1 {
+				Handle(file, v1, skeys[1:], val)
+			} else {
+				fmt.Printf("Not expected map: %v\n", v)
+			}
+		} else {
+			v1 := make(map[interface{}]interface{})
+			conf[key] = v1
+			Handle(file, v1, skeys[1:], val)
+		}
+	}
+}
+
+func getKey(file string, key string) string{
+	if m, ok := file_keys_map[file][key]; ok {
+		return m
+	} else {
+		return strings.ToLower(key)
+	}
+}

+ 155 - 0
deploy/docker/conf_util_test.go

@@ -0,0 +1,155 @@
+package main
+
+import (
+	"reflect"
+	"testing"
+)
+
+func TestHandle(t *testing.T) {
+	var tests = []struct {
+		config map[interface{}]interface{}
+		skeys  []string
+		val    string
+		exp    map[interface{}]interface{}
+	}{
+		{
+			config: map[interface{}]interface{}{
+				"default": map[interface{}]interface{} {
+					"protocol": "tcp",
+					"port": 5563,
+					"optional": map[interface{}] interface{} {
+						"ClientId": "client1",
+					},
+				},
+			},
+			skeys:[]string{"default", "protocol"},
+			val: "ssl",
+			exp: map[interface{}]interface{}{
+				"default": map[interface{}]interface{} {
+					"protocol": "ssl",
+					"port": 5563,
+					"optional": map[interface{}] interface{} {
+						"ClientId": "client1",
+					},
+				},
+			},
+		},
+
+		{
+			config: map[interface{}]interface{}{
+				"default": map[interface{}]interface{} {
+					"protocol": "tcp",
+					"port": 5563,
+					"optional": map[interface{}] interface{} {
+						"ClientId": "client1",
+					},
+				},
+			},
+			skeys:[]string{"default", "optional", "CLIENTID"},
+			val: "client2",
+			exp: map[interface{}]interface{}{
+				"default": map[interface{}]interface{} {
+					"protocol": "tcp",
+					"port": 5563,
+					"optional": map[interface{}] interface{} {
+						"ClientId": "client2",
+					},
+				},
+			},
+		},
+
+		{
+			config: map[interface{}]interface{}{
+				"default": map[interface{}]interface{} {
+					"protocol": "tcp",
+					"port": 5563,
+					"optional": map[interface{}] interface{} {
+						"ClientId": "client1",
+					},
+				},
+			},
+			skeys:[]string{"default", "optional", "KEEPALIVE"},
+			val: "5000",
+			exp: map[interface{}]interface{}{
+				"default": map[interface{}]interface{} {
+					"protocol": "tcp",
+					"port": 5563,
+					"optional": map[interface{}] interface{} {
+						"ClientId": "client1",
+						"KeepAlive": "5000",
+					},
+				},
+			},
+		},
+
+		{
+			config: map[interface{}]interface{}{
+				"default": map[interface{}]interface{} {
+					"protocol": "tcp",
+					"port": 5563,
+					"optional": map[interface{}] interface{} {
+						"ClientId": "client1",
+					},
+				},
+			},
+			skeys:[]string{"application_conf", "test"},
+			val: "ssl",
+			exp: map[interface{}]interface{}{
+				"default": map[interface{}]interface{} {
+					"protocol": "tcp",
+					"port": 5563,
+					"optional": map[interface{}] interface{} {
+						"ClientId": "client1",
+					},
+				},
+				"application_conf": map[interface{}]interface{} {
+					"test": "ssl",
+				},
+			},
+		},
+	}
+
+	for i, tt := range tests {
+		Handle("edgex", tt.config, tt.skeys, tt.val)
+		if !reflect.DeepEqual(tt.exp, tt.config) {
+			t.Errorf("%d \tresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.exp, tt.config)
+		}
+	}
+}
+
+func TestProcessEnv(t *testing.T) {
+	fileMap["edgex"] = "test/edgex.yaml"
+	var tests = []struct {
+		vars []string
+		file string
+		expt map[interface{}]interface{}
+	}{
+		{
+			vars: []string{
+				"EDGEX_DEFAULT_TYPE=zmq",
+				"EDGEX_DEFAULT_OPTIONAL_CLIENTID=clientid_0000",
+				"EDGEX_APPLICATION__CONF_PROTOCOL=ssl",
+			},
+			file: "edgex",
+			expt: map[interface{}]interface{}{
+				"default": map[interface {}]interface{} {
+					"protocol": "tcp",
+					"type": "zmq",
+					"optional": map[interface{}] interface{} {
+						"ClientId": "clientid_0000",
+					},
+				},
+				"application_conf": map[interface{}]interface{} {
+					"protocol":"ssl",
+				},
+			},
+		},
+	}
+	files := make(map[string]map[interface{}]interface{})
+	for i, tt := range tests {
+		ProcessEnv(files, tt.vars)
+		if !reflect.DeepEqual(tt.expt, files[tt.file]) {
+			t.Errorf("%d \tresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.expt, files[tt.file])
+		}
+	}
+}

+ 22 - 0
deploy/docker/test/edgex.yaml

@@ -0,0 +1,22 @@
+#Global Edgex configurations
+default:
+  protocol: tcp
+#  Below is optional configurations settings for mqtt
+#  type: mqtt
+#  optional:
+#    ClientId: client1
+#    Username: user1
+#    Password: password
+#    Qos: 1
+#    KeepAlive: 5000
+#    Retained: true
+#    ConnectionPayload:
+#    CertFile:
+#    KeyFile:
+#    CertPEMBlock:
+#    KeyPEMBlock:
+#    SkipCertVerify: true/false
+
+#Override the global configurations
+application_conf: #Conf_key
+  protocol: tcp

+ 1 - 1
docs/en_US/rules/sinks/edgex.md

@@ -14,7 +14,7 @@ The action is used for publish output message into EdgeX message bus.
 | type          | true     | The message bus type, two types of message buses are supported, ``zero`` or ``mqtt``, and ``zero`` is the default value. |
 | optional      | true     | If ``mqtt`` message bus type is specified, then some optional values can be specified. Please refer to below for supported optional supported configurations. |
 
-Please notice that all of values in optional are **<u>string type</u>**, so values for these configurations should be string - such as ``KeepAlive: "5000"``. Below optional configurations are supported, please check MQTT specification for the detailed information.
+Below optional configurations are supported, please check MQTT specification for the detailed information.
 
 - optional
   - ClientId

+ 1 - 1
docs/zh_CN/rules/sinks/edgex.md

@@ -14,7 +14,7 @@
 | type          | true     | 消息总线类型,目前支持两种类型的消息总线, ``zero`` 或者 ``mqtt``,其中 ``zero`` 为缺省类型。 |
 | optional      | true     | 如果指定了 ``mqtt`` 消息总线,那么还可以指定一下可选的值。请参考以下可选的支持的配置类型。 |
 
-请注意,所有在可选的配置项里指定的值都必须为**<u>字符类型</u>**,因此这里出现的所有的配置应该是字符类型的 - 例如 ``KeepAlive: "5000"``。以下为支持的可选的配置列表,您可以参考 MQTT 协议规范来获取更详尽的信息。
+以下为支持的可选的配置列表,您可以参考 MQTT 协议规范来获取更详尽的信息。
 
 - optional
   - ClientId

+ 12 - 12
etc/sources/edgex.yaml

@@ -8,18 +8,18 @@ default:
 #  Below is optional configurations settings for mqtt
 #  type: mqtt
 #  optional:
-#    ClientId: "client1"
-#    Username: "user1"
-#    Password: "password"
-#    Qos: "1"
-#    KeepAlive: "5000"
-#    Retained: "true/false"
-#    ConnectionPayload: ""
-#    CertFile: ""
-#    KeyFile: ""
-#    CertPEMBlock: ""
-#    KeyPEMBlock: ""
-#    SkipCertVerify: "true/false"
+#    ClientId: client1
+#    Username: user1
+#    Password: password
+#    Qos: 1
+#    KeepAlive: 5000
+#    Retained: true/false
+#    ConnectionPayload:
+#    CertFile:
+#    KeyFile:
+#    CertPEMBlock:
+#    KeyPEMBlock:
+#    SkipCertVerify: true/false
 
 #Override the global configurations
 application_conf: #Conf_key

+ 20 - 2
xstream/extensions/edgex_source.go

@@ -71,8 +71,11 @@ func (es *EdgexSource) Configure(device string, props map[string]interface{}) er
 		if ops1, ok1 := ops.(map[interface{}]interface{}); ok1 {
 			for k, v := range ops1 {
 				k1 := k.(string)
-				v1 := v.(string)
-				optional[k1] = v1
+				if cv, ok := castToString(v); ok {
+					optional[k1] = cv
+				} else {
+					common.Log.Infof("Cannot convert configuration %s: %s to string type.\n", k, v)
+				}
 			}
 		}
 		mbconf.Optional = optional
@@ -87,6 +90,21 @@ func (es *EdgexSource) Configure(device string, props map[string]interface{}) er
 
 }
 
+func castToString(v interface{}) (result string, ok bool) {
+	switch v := v.(type) {
+	case int:
+		return strconv.Itoa(v), true
+	case string:
+		return v, true
+	case bool:
+		return strconv.FormatBool(v), true
+	case float64, float32:
+		return fmt.Sprintf("%.2f", v), true
+	default:
+		return "", false
+	}
+}
+
 func (es *EdgexSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
 	log := ctx.GetLogger()
 	if err := es.client.Connect(); err != nil {

+ 15 - 0
xstream/extensions/edgex_source_test.go

@@ -178,3 +178,18 @@ func TestWrongValue(t *testing.T) {
 		}
 	}
 }
+
+func TestCastToString(t *testing.T) {
+	if v, ok := castToString(12); v != "12" || !ok {
+		t.Errorf("Failed to cast int.")
+	}
+	if v, ok := castToString(true); v != "true" || !ok {
+		t.Errorf("Failed to cast bool.")
+	}
+	if v, ok := castToString("hello"); v != "hello" || !ok {
+		t.Errorf("Failed to cast string.")
+	}
+	if v, ok := castToString(12.3); v != "12.30" || !ok {
+		t.Errorf("Failed to cast float.")
+	}
+}