Browse Source

feat(sink): Add dynamic properties to various sinks

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 3 years ago
parent
commit
f4b3dbd539

+ 10 - 0
docs/en_US/rules/sinks/memory.md

@@ -14,4 +14,14 @@ Below is a sample memory action configuration:
     "topic": "devices/result"
     "topic": "devices/result"
   }
   }
 }
 }
+```
+
+Below is another sample for dynamic topic action:
+
+```json
+{
+  "memory": {
+    "topic": "$.topic"
+  }
+}
 ```
 ```

+ 17 - 0
docs/en_US/rules/sinks/mqtt.md

@@ -51,3 +51,20 @@ Below is another sample configuration for connecting to AWS IoT by using certifi
     }
     }
 ```
 ```
 
 
+## Dynamic Topic
+
+If the result data contains the topic name, we can use it as the property of the mqtt action to achieve dynamic topic support. Assume the selected data has a field named `mytopic`, we can use jsonpath syntax to set it as the property value for `topic` as below:
+
+```json
+    {
+      "mqtt": {
+        "server": "ssl://xyz-ats.iot.us-east-1.amazonaws.com:8883",
+        "topic": "$.mytopic",
+        "qos": 1,
+        "clientId": "demo_001",
+        "certificationPath": "keys/d3807d9fa5-certificate.pem",
+        "privateKeyPath": "keys/d3807d9fa5-private.pem.key",
+        "retained": false
+      }
+    }
+```

+ 34 - 0
docs/en_US/rules/sinks/rest.md

@@ -54,3 +54,37 @@ Example for taosdb rest:
   ]
   ]
 }
 }
 ```
 ```
+
+## Configure dynamic properties
+
+There are many scenarios that we need to sink to dynamic url and configurations through REST sink. The properties `method`, `url`,`bodyType` and `headers` support dynamic property through jsonpath syntax. Let's look at an example to modify the previous sample to a dynamic version. Assume we receive data which have metadata like http method and url postfix. We can modify the SQL to fetch these metadata in the result. The rule result will be like:
+
+```json
+{
+  "method":"post",
+  "url":"http://xxx.xxx.xxx.xxx:6041/rest/sql",
+  "temperature": 20,
+  "humidity": 80
+}
+```
+
+Then in the action, we set the `method` and `url` to be the value of the result by using jsonpath syntax as below:
+
+```json
+{"id": "rest2",
+  "sql": "SELECT tele[0]->Tag00001 AS temperature, tele[0]->Tag00002 AS humidity, method, concat(\"http://xxx.xxx.xxx.xxx:6041/rest/sql\", urlPostfix) as url FROM neuron", 
+  "actions": [
+    {
+      "rest": {
+        "bodyType": "text",
+        "dataTemplate": "insert into mqtt.kuiper values (now, {{.temperature}}, {{.humidity}})", 
+        "debugResp": true,
+        "headers": {"Authorization": "Basic cm9vdDp0YW9zZGF0YQ=="},
+        "method": "$.method",
+        "sendSingle": true,
+        "url": "$.url"
+      }
+    }
+  ]
+}
+```

+ 11 - 1
docs/zh_CN/rules/sinks/memory.md

@@ -4,7 +4,7 @@
 
 
 | 属性名称 | 是否可选 | 描述                                 |
 | 属性名称 | 是否可选 | 描述                                 |
 | -------- | -------- | ------------------------------------ |
 | -------- | -------- | ------------------------------------ |
-| topic    | 否       | 内存中的主题,例如 `analysis/result` |
+| topic    | 否       | 内存中的主题,例如 `analysis/result`, 支持动态属性 |
 
 
 下面是一个内存动作配置示例:
 下面是一个内存动作配置示例:
 
 
@@ -14,4 +14,14 @@
     "topic": "devices/result"
     "topic": "devices/result"
   }
   }
 }
 }
+```
+
+下面是动态主题示例:
+
+```json
+{
+  "memory": {
+    "topic": "$.topic"
+  }
+}
 ```
 ```

+ 19 - 1
docs/zh_CN/rules/sinks/mqtt.md

@@ -5,7 +5,7 @@
 | 属性名称 | 是否可选 | 说明                                          |
 | 属性名称 | 是否可选 | 说明                                          |
 | ------------- | -------- | ---------------------------------------------------- |
 | ------------- | -------- | ---------------------------------------------------- |
 | server        | 否    | MQTT  服务器地址,例如 `tcp://127.0.0.1:1883` |
 | server        | 否    | MQTT  服务器地址,例如 `tcp://127.0.0.1:1883` |
-| topic          | 否    | MQTT 主题,例如 `analysis/result`                     |
+| topic          | 否    | MQTT 主题,例如 `analysis/result` , 也可设置为动态属性,例如 `$.col`, 将会把结果中的 col 列的值作为主题                   |
 | clientId      | 是     | MQTT 连接的客户端 ID。 如果未指定,将使用一个 uuid |
 | clientId      | 是     | MQTT 连接的客户端 ID。 如果未指定,将使用一个 uuid |
 | protocolVersion   | 是    | MQTT 协议版本。3.1 (也被称为 MQTT 3) 或者 3.1.1 (也被称为 MQTT 4)。 如果未指定,缺省值为 3.1。 |
 | protocolVersion   | 是    | MQTT 协议版本。3.1 (也被称为 MQTT 3) 或者 3.1.1 (也被称为 MQTT 4)。 如果未指定,缺省值为 3.1。 |
 | qos               | 是    | 消息转发的服务质量                               |
 | qos               | 是    | 消息转发的服务质量                               |
@@ -49,3 +49,21 @@
     }
     }
 ```
 ```
 
 
+## 动态主题
+
+若结果数据中包含主题内容,可以将其作为主题属性,从而实现动态主题的需求。假设 SQL 选出的数据包含 `mytopic`, 则可以使用 jsonpath 语法将其设置为 `topic` 属性的值,如下所示:
+
+```json
+    {
+      "mqtt": {
+        "server": "ssl://xyz-ats.iot.us-east-1.amazonaws.com:8883",
+        "topic": "$.mytopic",
+        "qos": 1,
+        "clientId": "demo_001",
+        "certificationPath": "keys/d3807d9fa5-certificate.pem",
+        "privateKeyPath": "keys/d3807d9fa5-private.pem.key",
+        "retained": false
+      }
+    }
+```
+

+ 38 - 4
docs/zh_CN/rules/sinks/rest.md

@@ -4,11 +4,11 @@
 
 
 | 属性名称   | 是否可选 | 说明                                                  |
 | 属性名称   | 是否可选 | 说明                                                  |
 | ----------------- | -------- | ------------------------------------------------------------ |
 | ----------------- | -------- | ------------------------------------------------------------ |
-| method            | 是    | RESTful API 的 HTTP 方法。 这是一个不区分大小写的字符串,其值范围为"get","post","put","patch","delete" 和 "head"。 默认值为 "get"。 |
-| url             | 否    | RESTful API 终端地址,例如 `https://www.example.com/api/dummy` |
-| bodyType          | 是    | 消息体的类型。 当前,支持以下类型:"none", "json", "text", "html", "xml", "javascript"  和 "form"。 对于 "get" 和 "head",不需要正文,因此默认值为 "none"。 对于其他 http 方法,默认值为 "json"。对于 "html","xml" 和 "javascript",必须仔细设置 dataTemplate 以确保格式正确。 |
+| method            | 是    | RESTful API 的 HTTP 方法。 这是一个不区分大小写的字符串,其值范围为"get","post","put","patch","delete" 和 "head"。 默认值为 "get",支持动态获取。 |
+| url             | 否    | RESTful API 终端地址,例如 `https://www.example.com/api/dummy`,支持动态获取。 |
+| bodyType          | 是    | 消息体的类型。 当前,支持以下类型:"none", "json", "text", "html", "xml", "javascript"  和 "form"。 对于 "get" 和 "head",不需要正文,因此默认值为 "none"。 对于其他 http 方法,默认值为 "json"。对于 "html","xml" 和 "javascript",必须仔细设置 dataTemplate 以确保格式正确。支持动态获取。 |
 | timeout   | 是    | HTTP 请求超时的时间(毫秒),默认为5000毫秒 |
 | timeout   | 是    | HTTP 请求超时的时间(毫秒),默认为5000毫秒 |
-| headers            | 是    | 要为 HTTP 请求设置的其它 HTTP 头。 |
+| headers            | 是    | 要为 HTTP 请求设置的其它 HTTP 头。支持动态获取。 |
 | debugResp | 是 | 控制是否将响应信息打印到控制台中。 如果将其设置为 `true`,则打印响应。 如果设置为`false`,则跳过打印日志。 默认值为 `false`。 |
 | debugResp | 是 | 控制是否将响应信息打印到控制台中。 如果将其设置为 `true`,则打印响应。 如果设置为`false`,则跳过打印日志。 默认值为 `false`。 |
 | insecureSkipVerify | 是 | 控制是否跳过证书认证。如果被设置为 `true`,那么跳过证书认证;否则进行证书验证。缺省为 `true`。 |
 | insecureSkipVerify | 是 | 控制是否跳过证书认证。如果被设置为 `true`,那么跳过证书认证;否则进行证书验证。缺省为 `true`。 |
 
 
@@ -53,3 +53,37 @@ Text mode
   ]
   ]
 }
 }
 ```
 ```
+
+## 设置动态输出参数
+
+很多情况下,我们需要根据结果数据,决定写入的目的地址和参数。在 REST sink 里,`method`, `url`, `bodyType` 和 `headers` 支持动态参数。动态参数可通过 jsonpath 语法配置。接下来,让我们使用动态参数改写上例。假设我们收到了数据中包含了 http 方法和 url 后缀等元数据。我们可以通过改写 SQL 语句,在输出结果中得到这两个值。规则输出的单条数据类似:
+
+```json
+{
+  "method":"post",
+  "url":"http://xxx.xxx.xxx.xxx:6041/rest/sql",
+  "temperature": 20,
+  "humidity": 80
+}
+```
+
+在规则 action 中,可以通过 jsonpath 语法取得结果数据作为属性变量。如下例子中,`method` 和 `url` 为动态变量。
+
+```json
+{"id": "rest2",
+  "sql": "SELECT tele[0]->Tag00001 AS temperature, tele[0]->Tag00002 AS humidity, method, concat(\"http://xxx.xxx.xxx.xxx:6041/rest/sql\", urlPostfix) as url FROM neuron", 
+  "actions": [
+    {
+      "rest": {
+        "bodyType": "text",
+        "dataTemplate": "insert into mqtt.kuiper values (now, {{.temperature}}, {{.humidity}})", 
+        "debugResp": true,
+        "headers": {"Authorization": "Basic cm9vdDp0YW9zZGF0YQ=="},
+        "method": "$.method",
+        "sendSingle": true,
+        "url": "$.url"
+      }
+    }
+  ]
+}
+```

+ 12 - 6
internal/topo/memory/sink.go

@@ -47,18 +47,24 @@ func (s *sink) Configure(props map[string]interface{}) error {
 
 
 func (s *sink) Collect(ctx api.StreamContext, data interface{}) error {
 func (s *sink) Collect(ctx api.StreamContext, data interface{}) error {
 	ctx.GetLogger().Debugf("receive %+v", data)
 	ctx.GetLogger().Debugf("receive %+v", data)
-	var outs []map[string]interface{}
+	tpc, err := ctx.ParseDynamicProp(s.topic, data)
+	if err != nil {
+		return err
+	}
+	topic, ok := tpc.(string)
+	if !ok {
+		return fmt.Errorf("the value %v of dynamic prop %s for topic is not a string", s.topic, tpc)
+	}
 	switch d := data.(type) {
 	switch d := data.(type) {
 	case []map[string]interface{}:
 	case []map[string]interface{}:
-		outs = d
+		for _, el := range d {
+			produce(ctx, topic, el)
+		}
 	case map[string]interface{}:
 	case map[string]interface{}:
-		outs = append(outs, d)
+		produce(ctx, topic, d)
 	default:
 	default:
 		return fmt.Errorf("unrecognized format of %s", data)
 		return fmt.Errorf("unrecognized format of %s", data)
 	}
 	}
-	for _, el := range outs {
-		produce(ctx, s.topic, el)
-	}
 	return nil
 	return nil
 }
 }
 
 

+ 9 - 2
internal/topo/sink/mqtt_sink.go

@@ -241,7 +241,7 @@ func (ms *MQTTSink) Open(ctx api.StreamContext) error {
 	return nil
 	return nil
 }
 }
 
 
-func (ms *MQTTSink) Collect(ctx api.StreamContext, _ interface{}) error {
+func (ms *MQTTSink) Collect(ctx api.StreamContext, item interface{}) error {
 	logger := ctx.GetLogger()
 	logger := ctx.GetLogger()
 	jsonBytes, _, err := ctx.TransformOutput()
 	jsonBytes, _, err := ctx.TransformOutput()
 	if err != nil {
 	if err != nil {
@@ -249,7 +249,14 @@ func (ms *MQTTSink) Collect(ctx api.StreamContext, _ interface{}) error {
 	}
 	}
 	c := ms.conn
 	c := ms.conn
 	logger.Debugf("%s publish %s", ctx.GetOpId(), jsonBytes)
 	logger.Debugf("%s publish %s", ctx.GetOpId(), jsonBytes)
-	if token := c.Publish(ms.tpc, ms.qos, ms.retained, jsonBytes); token.Wait() && token.Error() != nil {
+	tpc, err := ctx.ParseDynamicProp(ms.tpc, item)
+	if err != nil {
+		return err
+	}
+	if tpc, ok := tpc.(string); !ok {
+		return fmt.Errorf("the value %v of dynamic prop %s for topic is not a string", ms.tpc, tpc)
+	}
+	if token := c.Publish(tpc.(string), ms.qos, ms.retained, jsonBytes); token.Wait() && token.Error() != nil {
 		return fmt.Errorf("publish error: %s", token.Error())
 		return fmt.Errorf("publish error: %s", token.Error())
 	}
 	}
 	return nil
 	return nil

+ 48 - 7
internal/topo/sink/rest_sink.go

@@ -30,6 +30,7 @@ type RestSink struct {
 	method             string
 	method             string
 	url                string
 	url                string
 	headers            map[string]string
 	headers            map[string]string
+	headersTemplate    string
 	bodyType           string
 	bodyType           string
 	timeout            int64
 	timeout            int64
 	sendSingle         bool
 	sendSingle         bool
@@ -74,16 +75,19 @@ func (ms *RestSink) Configure(ps map[string]interface{}) error {
 
 
 	temp, ok = ps["headers"]
 	temp, ok = ps["headers"]
 	if ok {
 	if ok {
-		ms.headers = make(map[string]string)
-		if m, ok := temp.(map[string]interface{}); ok {
-			for k, v := range m {
+		switch h := temp.(type) {
+		case map[string]interface{}:
+			ms.headers = make(map[string]string)
+			for k, v := range h {
 				if v1, ok1 := v.(string); ok1 {
 				if v1, ok1 := v.(string); ok1 {
 					ms.headers[k] = v1
 					ms.headers[k] = v1
 				} else {
 				} else {
 					return fmt.Errorf("header value %s for header %s is not a string", v, k)
 					return fmt.Errorf("header value %s for header %s is not a string", v, k)
 				}
 				}
 			}
 			}
-		} else {
+		case string:
+			ms.headersTemplate = h
+		default:
 			return fmt.Errorf("rest sink property headers %v is not a map[string]interface", temp)
 			return fmt.Errorf("rest sink property headers %v is not a map[string]interface", temp)
 		}
 		}
 	}
 	}
@@ -186,7 +190,7 @@ func (ms *RestSink) Collect(ctx api.StreamContext, item interface{}) error {
 	if transed {
 	if transed {
 		d = output
 		d = output
 	}
 	}
-	resp, err := ms.Send(d, logger)
+	resp, err := ms.Send(ctx, d, logger)
 	if err != nil {
 	if err != nil {
 		return fmt.Errorf("rest sink fails to send out the data: %s", err)
 		return fmt.Errorf("rest sink fails to send out the data: %s", err)
 	} else {
 	} else {
@@ -213,8 +217,45 @@ func (ms *RestSink) Collect(ctx api.StreamContext, item interface{}) error {
 	return nil
 	return nil
 }
 }
 
 
-func (ms *RestSink) Send(v interface{}, logger api.Logger) (*http.Response, error) {
-	return httpx.Send(logger, ms.client, ms.bodyType, ms.method, ms.url, ms.headers, ms.sendSingle, v)
+func (ms *RestSink) Send(ctx api.StreamContext, v interface{}, logger api.Logger) (*http.Response, error) {
+	temp, err := ctx.ParseDynamicProp(ms.bodyType, v)
+	if err != nil {
+		return nil, err
+	}
+	bodyType, ok := temp.(string)
+	if !ok {
+		return nil, fmt.Errorf("the value %v of dynamic prop %s for bodyType is not a string", ms.bodyType, temp)
+	}
+	temp, err = ctx.ParseDynamicProp(ms.method, v)
+	if err != nil {
+		return nil, err
+	}
+	method, ok := temp.(string)
+	if !ok {
+		return nil, fmt.Errorf("the value %v of dynamic prop %s for method is not a string", ms.method, temp)
+	}
+	temp, err = ctx.ParseDynamicProp(ms.url, v)
+	if err != nil {
+		return nil, err
+	}
+	url, ok := temp.(string)
+	if !ok {
+		return nil, fmt.Errorf("the value %v of dynamic prop %s for url is not a string", ms.url, temp)
+	}
+	var headers map[string]string
+	if ms.headers != nil {
+		headers = ms.headers
+	} else if ms.headersTemplate != "" {
+		temp, err = ctx.ParseDynamicProp(ms.headersTemplate, v)
+		if err != nil {
+			return nil, err
+		}
+		headers, ok = temp.(map[string]string)
+		if !ok {
+			return nil, fmt.Errorf("the value %v of dynamic prop %s for headers is not a map[string]string", ms.headersTemplate, temp)
+		}
+	}
+	return httpx.Send(logger, ms.client, bodyType, method, url, headers, ms.sendSingle, v)
 }
 }
 
 
 func (ms *RestSink) Close(ctx api.StreamContext) error {
 func (ms *RestSink) Close(ctx api.StreamContext) error {