Prechádzať zdrojové kódy

fix(context): Refactor parseDynamicprop to use data template syntax (#1148)

1. Let it use data template syntax by default, but still allow jsonpath to be compatible.
2. The jsonpath syntax will have {$ prefix to avoid conflict with topic starts with $
3. In the next major release, the API must be refactored to separate jsonpath and dataTemplate. And let the context manage all jsonpath, dataTemplate that is using in the rule. Managed in the context makes it possible to be released after rule stopped.

Closes #1147

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
ngjaying 3 rokov pred
rodič
commit
97713b42a4

Rozdielové dáta súboru neboli zobrazené, pretože súbor je príliš veľký
+ 31 - 31
docs/en_US/rules/overview.md


+ 16 - 16
docs/en_US/rules/sinks/edgex.md

@@ -6,22 +6,22 @@ The action is used for publishing output message into EdgeX message bus.
 
 **Also, you need to expose the port number to host server before running the eKuiper server if you want to have the service available to other hosts.**
 
-| Property name | Optional | Description                                                  |
-| ------------- | -------- | ------------------------------------------------------------ |
-| type          | true     | The message bus type, three types of message buses are supported, ``zero``, ``mqtt`` and ``redis``, and ``redis`` is the default value. |
-| protocol      | true     | The protocol. If it's not specified, then use default value ``redis``. |
-| host          | true     | The host of message bus. If not specified, then use default value ``localhost``. |
-| port          | true     | The port of message bus. If not specified, then use default value ``6379``. |
-| connectionSelector | true     | reuse the connection to EdgeX message bus. [more info](../sources/edgex.md#connectionselector)
-| topic         | true     | The topic to be published. The topic is static across all messages. To use dynamic topic, leave this empty and specify the topicPrefix property. Only one of the topic and topicPrefix properties can be specified. If both are not specified, then use default topic value ``application``. |
-| topicPrefix         | true     | The prefix of a dynamic topic to be published. The topic will become a concatenation of `$topicPrefix/$profileName/$deviceName/$sourceName`. |
-| contentType   | true     | The content type of message to be published. If not specified, then use the default value ``application/json``. |
-| messageType   | true     | The EdgeX message model type. To publish the message as an event like EdgeX application service, use `event`. Otherwise, to publish the message as an event request like EdgeX device service or core data service, use `request`. If not specified, then use the default value ``event``. |
-| metadata      | true     | The property is a field name that allows user to specify a field name of SQL  select clause,  the field name should use ``meta(*) AS xxx``  to select all of EdgeX metadata from message. |
-| profileName    | true     | Allows user to specify the profile name in the event structure that are sent from eKuiper. The profileName in the meta take precedence if specified. |
-| deviceName    | true     | Allows user to specify the device name in the event structure that are sent from eKuiper. The deviceName in the meta take precedence if specified.  |
-| sourceName    | true     | Allows user to specify the source name in the event structure that are sent from eKuiper. The sourceName in the meta take precedence if specified. |
-| optional      | true     | If ``mqtt`` message bus type is specified, then some optional values can be specified. Please refer to below for supported optional supported configurations. |
+| Property name      | Optional | Description                                                                                                                                                                                                                                                                                  |
+|--------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| type               | true     | The message bus type, three types of message buses are supported, ``zero``, ``mqtt`` and ``redis``, and ``redis`` is the default value.                                                                                                                                                      |
+| protocol           | true     | The protocol. If it's not specified, then use default value ``redis``.                                                                                                                                                                                                                       |
+| host               | true     | The host of message bus. If not specified, then use default value ``localhost``.                                                                                                                                                                                                             |
+| port               | true     | The port of message bus. If not specified, then use default value ``6379``.                                                                                                                                                                                                                  |
+| connectionSelector | true     | reuse the connection to EdgeX message bus. [more info](../sources/edgex.md#connectionselector)                                                                                                                                                                                               |
+| topic              | true     | The topic to be published. The topic is static across all messages. To use dynamic topic, leave this empty and specify the topicPrefix property. Only one of the topic and topicPrefix properties can be specified. If both are not specified, then use default topic value ``application``. |
+| topicPrefix        | true     | The prefix of a dynamic topic to be published. The topic will become a concatenation of `$topicPrefix/$profileName/$deviceName/$sourceName`.                                                                                                                                                 |
+| contentType        | true     | The content type of message to be published. If not specified, then use the default value ``application/json``.                                                                                                                                                                              |
+| messageType        | true     | The EdgeX message model type. To publish the message as an event like EdgeX application service, use `event`. Otherwise, to publish the message as an event request like EdgeX device service or core data service, use `request`. If not specified, then use the default value ``event``.   |
+| metadata           | true     | The property is a field name that allows user to specify a field name of SQL  select clause,  the field name should use ``meta(*) AS xxx``  to select all of EdgeX metadata from message.                                                                                                    |
+| profileName        | true     | Allows user to specify the profile name in the event structure that are sent from eKuiper. The profileName in the meta take precedence if specified.                                                                                                                                         |
+| deviceName         | true     | Allows user to specify the device name in the event structure that are sent from eKuiper. The deviceName in the meta take precedence if specified.                                                                                                                                           |
+| sourceName         | true     | Allows user to specify the source name in the event structure that are sent from eKuiper. The sourceName in the meta take precedence if specified.                                                                                                                                           |
+| optional           | true     | If ``mqtt`` message bus type is specified, then some optional values can be specified. Please refer to below for supported optional supported configurations.                                                                                                                                |
 
 Below optional configurations are supported, please check MQTT specification for the detailed information.
 

+ 4 - 4
docs/en_US/rules/sinks/memory.md

@@ -2,9 +2,9 @@
 
 The action is used to flush the result into an in-memory topic so that it can be consumed by the [memory source](../sources/memory.md). The topic is like pubsub topic such as mqtt, so that there could be multiple memory sinks which publish to the same topic and multiple memory sources which subscribe to the same topic. The typical usage for memory action is to form [rule pipelines](../rule_pipeline.md).
 
-| Property name      | Optional | Description                                                  |
-| ------------------ | -------- | ------------------------------------------------------------ |
-| topic              | false    | The in-memory topic, such as `analysis/result`                    |
+| Property name | Optional | Description                                    |
+|---------------|----------|------------------------------------------------|
+| topic         | false    | The in-memory topic, such as `analysis/result` |
 
 Below is a sample memory action configuration:
 
@@ -21,7 +21,7 @@ Below is another sample for dynamic topic action:
 ```json
 {
   "memory": {
-    "topic": "$.topic"
+    "topic": "{{.topic}}"
   }
 }
 ```

+ 16 - 16
docs/en_US/rules/sinks/mqtt.md

@@ -2,21 +2,21 @@
 
 The action is used for publish output message into an MQTT server. 
 
-| Property name      | Optional | Description                                                  |
-| ------------------ | -------- | ------------------------------------------------------------ |
-| server             | false    | The broker address of the MQTT server, such as `tcp://127.0.0.1:1883` |
-| topic              | false    | The MQTT topic, such as `analysis/result`                    |
-| clientId           | true     | The client id for MQTT connection. If not specified, an uuid will be used |
-| protocolVersion    | true     | MQTT protocol version. 3.1 (also refer as MQTT 3) or 3.1.1 (also refer as MQTT 4).  If not specified, the default value is 3.1. |
-| qos                | true     | The QoS for message delivery. Only int type value 0 or 1 or 2. |
-| username           | true     | The username for the connection.                             |
-| password           | true     | The password for the connection.                             |
+| Property name      | Optional | Description                                                                                                                                                                                                                                                                                                                                               |
+|--------------------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| server             | false    | The broker address of the MQTT server, such as `tcp://127.0.0.1:1883`                                                                                                                                                                                                                                                                                     |
+| topic              | false    | The MQTT topic, such as `analysis/result`                                                                                                                                                                                                                                                                                                                 |
+| clientId           | true     | The client id for MQTT connection. If not specified, an uuid will be used                                                                                                                                                                                                                                                                                 |
+| protocolVersion    | true     | MQTT protocol version. 3.1 (also refer as MQTT 3) or 3.1.1 (also refer as MQTT 4).  If not specified, the default value is 3.1.                                                                                                                                                                                                                           |
+| qos                | true     | The QoS for message delivery. Only int type value 0 or 1 or 2.                                                                                                                                                                                                                                                                                            |
+| username           | true     | The username for the connection.                                                                                                                                                                                                                                                                                                                          |
+| password           | true     | The password for the connection.                                                                                                                                                                                                                                                                                                                          |
 | certificationPath  | true     | The certification path. It can be an absolute path, or a relative path. If it is an relative path, then the base path is where you excuting the `kuiperd` command. For example, if you run `bin/kuiperd` from `/var/kuiper`, then the base path is `/var/kuiper`; If you run `./kuiperd` from `/var/kuiper/bin`, then the base path is `/var/kuiper/bin`. |
-| privateKeyPath     | true     | The private key path. It can be either absolute path, or relative path, which is similar to use of certificationPath. |
-| rootCaPath     | true     | The location of root ca path. It can be an absolute path, or a relative path, which is similar to use of certificationPath. |
-| insecureSkipVerify | true     | If InsecureSkipVerify is `true`, TLS accepts any certificate presented by the server and any host name in that certificate.  In this mode, TLS is susceptible to man-in-the-middle attacks. The default value is `false`. The configuration item can only be used with TLS connections. |
-| retained           | true     | If retained is `true`,The broker stores the last retained message and the corresponding QoS for that topic.The default value is `false`.
-| connectionSelector | true     | reuse the connection to mqtt broker. [more info](../sources/mqtt.md#connectionselector)
+| privateKeyPath     | true     | The private key path. It can be either absolute path, or relative path, which is similar to use of certificationPath.                                                                                                                                                                                                                                     |
+| rootCaPath         | true     | The location of root ca path. It can be an absolute path, or a relative path, which is similar to use of certificationPath.                                                                                                                                                                                                                               |
+| insecureSkipVerify | true     | If InsecureSkipVerify is `true`, TLS accepts any certificate presented by the server and any host name in that certificate.  In this mode, TLS is susceptible to man-in-the-middle attacks. The default value is `false`. The configuration item can only be used with TLS connections.                                                                   |
+| retained           | true     | If retained is `true`,The broker stores the last retained message and the corresponding QoS for that topic.The default value is `false`.                                                                                                                                                                                                                  |
+| connectionSelector | true     | reuse the connection to mqtt broker. [more info](../sources/mqtt.md#connectionselector)                                                                                                                                                                                                                                                                   | 
 
 Below is sample configuration for connecting to Azure IoT Hub by using SAS authentication.
 ```json
@@ -53,13 +53,13 @@ Below is another sample configuration for connecting to AWS IoT by using certifi
 
 ## Dynamic Topic
 
-If the result data contains the topic name, we can use it as the property of the mqtt action to achieve dynamic topic support. Assume the selected data has a field named `mytopic`, we can use jsonpath syntax to set it as the property value for `topic` as below:
+If the result data contains the topic name, we can use it as the property of the mqtt action to achieve dynamic topic support. Assume the selected data has a field named `mytopic`, we can use data template syntax to set it as the property value for `topic` as below:
 
 ```json
     {
       "mqtt": {
         "server": "ssl://xyz-ats.iot.us-east-1.amazonaws.com:8883",
-        "topic": "$.mytopic",
+        "topic": "{{.mytopic}}",
         "qos": 1,
         "clientId": "demo_001",
         "certificationPath": "keys/d3807d9fa5-certificate.pem",

+ 3 - 3
docs/en_US/rules/sinks/nop.md

@@ -2,8 +2,8 @@
 
 The action is an Nop sink, the result sent to this sink will be ignored. If specify the `log` property to `true`, then the result will be saved into log file, the log file is at `$eKuiper_install/log/stream.log` by default.
 
-| Property name      | Optional | Description                                                  |
-| ------------------ | -------- | ------------------------------------------------------------ |
-| log             | true | true/false - print the sink result to log or not. By default is `false`, that will not print the result to log file. |
+| Property name | Optional | Description                                                                                                          |
+|---------------|----------|----------------------------------------------------------------------------------------------------------------------|
+| log           | true     | true/false - print the sink result to log or not. By default is `false`, that will not print the result to log file. |
 
 

Rozdielové dáta súboru neboli zobrazené, pretože súbor je príliš veľký
+ 15 - 15
docs/en_US/rules/sinks/rest.md


+ 31 - 31
docs/zh_CN/rules/overview.md

@@ -24,12 +24,12 @@
 
 ## 参数
 
-| 参数名 | 是否可选 | 说明                |
-| ------------- | -------- | ------------------------------------------------------------ |
-| id | 否  | 规则 id |
-| sql        | 否  | 为规则运行的 sql 查询 |
-| actions           | 否   | Sink 动作数组 |
-| options           | 是       | 选项图     |
+| 参数名     | 是否可选 | 说明            |
+|---------|------|---------------|
+| id      | 否    | 规则 id         |
+| sql     | 否    | 为规则运行的 sql 查询 |
+| actions | 否    | Sink 动作数组     |
+| options | 是    | 选项图           |
 
 ### id
 
@@ -43,16 +43,16 @@
 
 当前的选项包括:
 
-| 选项名             | 类型和默认值 | 说明                                                         |
-| ------------------ | ------------ | ------------------------------------------------------------ |
-| isEventTime        | bool:false   | 使用事件时间还是将时间用作事件的时间戳。 如果使用事件时间,则将从有效负载中提取时间戳。 必须通过 [stream](../sqls/streams.md) 定义指定时间戳记。 |
-| lateTolerance      | int64:0      | 在使用事件时间窗口时,可能会出现元素延迟到达的情况。 LateTolerance 可以指定在删除元素之前可以延迟多少时间(单位为 ms)。 默认情况下,该值为0,表示后期元素将被删除。 |
-| concurrency        | int: 1       | 一条规则运行时会根据 sql 语句分解成多个 plan 运行。该参数设置每个 plan 运行的线程数。该参数值大于1时,消息处理顺序可能无法保证。 |
-| bufferLength       | int: 1024    | 指定每个 plan 可缓存消息数。若缓存消息数超过此限制,plan 将阻塞消息接收,直到缓存消息被消费使得缓存消息数目小于限制为止。此选项值越大,则消息吞吐能力越强,但是内存占用也会越多。 |
-| sendMetaToSink     | bool:false   | 指定是否将事件的元数据发送到目标。 如果为 true,则目标可以获取元数据信息。 |
-| sendError  | bool: true | 指定是否将运行时错误发送到目标。如果为 true,则错误会在整个流中传递直到目标。否则,错误会被忽略,仅打印到日志中。 |
-| qos                | int:0        | 指定流的 qos。 值为0对应最多一次; 1对应至少一次,2对应恰好一次。 如果 qos 大于0,将激活检查点机制以定期保存状态,以便可以从错误中恢复规则。 |
-| checkpointInterval | int:300000   | 指定触发检查点的时间间隔(单位为 ms)。 仅当 qos 大于0时才有效。 |
+| 选项名                | 类型和默认值     | 说明                                                                                             |
+|--------------------|------------|------------------------------------------------------------------------------------------------|
+| isEventTime        | bool:false | 使用事件时间还是将时间用作事件的时间戳。 如果使用事件时间,则将从有效负载中提取时间戳。 必须通过 [stream](../sqls/streams.md) 定义指定时间戳记。       |
+| lateTolerance      | int64:0    | 在使用事件时间窗口时,可能会出现元素延迟到达的情况。 LateTolerance 可以指定在删除元素之前可以延迟多少时间(单位为 ms)。 默认情况下,该值为0,表示后期元素将被删除。   |
+| concurrency        | int: 1     | 一条规则运行时会根据 sql 语句分解成多个 plan 运行。该参数设置每个 plan 运行的线程数。该参数值大于1时,消息处理顺序可能无法保证。                      |
+| bufferLength       | int: 1024  | 指定每个 plan 可缓存消息数。若缓存消息数超过此限制,plan 将阻塞消息接收,直到缓存消息被消费使得缓存消息数目小于限制为止。此选项值越大,则消息吞吐能力越强,但是内存占用也会越多。 |
+| sendMetaToSink     | bool:false | 指定是否将事件的元数据发送到目标。 如果为 true,则目标可以获取元数据信息。                                                       |
+| sendError          | bool: true | 指定是否将运行时错误发送到目标。如果为 true,则错误会在整个流中传递直到目标。否则,错误会被忽略,仅打印到日志中。                                    |
+| qos                | int:0      | 指定流的 qos。 值为0对应最多一次; 1对应至少一次,2对应恰好一次。 如果 qos 大于0,将激活检查点机制以定期保存状态,以便可以从错误中恢复规则。                 |
+| checkpointInterval | int:300000 | 指定触发检查点的时间间隔(单位为 ms)。 仅当 qos 大于0时才有效。                                                          |
 
 有关 `qos` 和 `checkpointInterval` 的详细信息,请查看[状态和容错](./state_and_fault_tolerance.md)。
 
@@ -79,18 +79,18 @@
 
 每个动作可以定义自己的属性。当前有以下的公共属性:
 
-| 属性名 | 类型和默认值 | 描述                                                  |
-| ------------- | -------- | ------------------------------------------------------------ |
-| concurrency | int: 1   | 设置运行的线程数。该参数值大于1时,消息发出的顺序可能无法保证。 |
-| bufferLength | int: 1024   | 设置可缓存消息数目。若缓存消息数超过此限制,sink将阻塞消息接收,直到缓存消息被消费使得缓存消息数目小于限制为止。|
-| runAsync        | bool:false   | 设置是否异步运行输出操作以提升性能。请注意,异步运行的情况下,输出结果顺序不能保证。  |
-| retryInterval   | int:1000   | 设置信息发送失败后重试等待时间,单位为毫秒。如果该值的设置 <= 0,那么不会尝试重新发送。 |
-| retryCount | int:3 | 设置信息发送失败后重试次数,如果该值的设置 <= 0,那么不会尝试重新发送。 |
-| cacheLength     | int:1024   | 设置最大消息缓存数量。缓存的消息会一直保留直到消息发送成功。缓存消息将按顺序发送,除非运行在异步或者并发模式下。缓存消息会定期存储到磁盘中。  |
-| cacheSaveInterval  | int:1000   | 设置缓存存储间隔时间。需要注意的是,当规则关闭时,缓存会自动存储。该值越大,则缓存保存开销越小,但系统意外退出时缓存丢失的风险变大。 |
-| omitIfEmpty | bool: false | 如果配置项设置为 true,则当 SELECT 结果为空时,该结果将不提供给目标运算符。 |
-| sendSingle        | true     | 输出消息以数组形式接收,该属性意味着是否将结果一一发送。 如果为false,则输出消息将为`{"result":"${the string of received message}"}`。 例如,`{"result":"[{\"count\":30},"\"count\":20}]"}`。否则,结果消息将与实际字段名称一一对应发送。 对于与上述相同的示例,它将发送 `{"count":30}`,然后发送`{"count":20}`到 RESTful 端点。默认为 false。 |
-| dataTemplate      | true     | [golang 模板](https://golang.org/pkg/html/template)格式字符串,用于指定输出数据格式。 模板的输入是目标消息,该消息始终是映射数组。 如果未指定数据模板,则将数据作为原始输入。 |
+| 属性名               | 类型和默认值      | 描述                                                                                                                                                                                                                                               |
+|-------------------|-------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| concurrency       | int: 1      | 设置运行的线程数。该参数值大于1时,消息发出的顺序可能无法保证。                                                                                                                                                                                                                 |
+| bufferLength      | int: 1024   | 设置可缓存消息数目。若缓存消息数超过此限制,sink将阻塞消息接收,直到缓存消息被消费使得缓存消息数目小于限制为止。                                                                                                                                                                                       |
+| runAsync          | bool:false  | 设置是否异步运行输出操作以提升性能。请注意,异步运行的情况下,输出结果顺序不能保证。                                                                                                                                                                                                       |
+| retryInterval     | int:1000    | 设置信息发送失败后重试等待时间,单位为毫秒。如果该值的设置 <= 0,那么不会尝试重新发送。                                                                                                                                                                                                   |
+| retryCount        | int:3       | 设置信息发送失败后重试次数,如果该值的设置 <= 0,那么不会尝试重新发送。                                                                                                                                                                                                           |
+| cacheLength       | int:1024    | 设置最大消息缓存数量。缓存的消息会一直保留直到消息发送成功。缓存消息将按顺序发送,除非运行在异步或者并发模式下。缓存消息会定期存储到磁盘中。                                                                                                                                                                           |
+| cacheSaveInterval | int:1000    | 设置缓存存储间隔时间。需要注意的是,当规则关闭时,缓存会自动存储。该值越大,则缓存保存开销越小,但系统意外退出时缓存丢失的风险变大。                                                                                                                                                                               |
+| omitIfEmpty       | bool: false | 如果配置项设置为 true,则当 SELECT 结果为空时,该结果将不提供给目标运算符。                                                                                                                                                                                                     |
+| sendSingle        | true        | 输出消息以数组形式接收,该属性意味着是否将结果一一发送。 如果为false,则输出消息将为`{"result":"${the string of received message}"}`。 例如,`{"result":"[{\"count\":30},"\"count\":20}]"}`。否则,结果消息将与实际字段名称一一对应发送。 对于与上述相同的示例,它将发送 `{"count":30}`,然后发送`{"count":20}`到 RESTful 端点。默认为 false。 |
+| dataTemplate      | true        | [golang 模板](https://golang.org/pkg/html/template)格式字符串,用于指定输出数据格式。 模板的输入是目标消息,该消息始终是映射数组。 如果未指定数据模板,则将数据作为原始输入。                                                                                                                                  |
 
 ### 数据模板
 
@@ -171,7 +171,7 @@ eKuiper 扩展了几个可以在模版中使用的函数。
 
 ### 动态属性
 
-有些情况下,用户需要按照数据把结果发送到不同的目标中。例如,根据收到的数据,把计算结果发到不同的 mqtt 主题中。使用基于 jsonpath 格式的动态属性,可以实现这样的功能。在以下的例子中,目标的 topic 属性是一个 jsonpath 格式的字符串从而在运行时会将消息发送到动态的主题中。 
+有些情况下,用户需要按照数据把结果发送到不同的目标中。例如,根据收到的数据,把计算结果发到不同的 mqtt 主题中。使用基于[数据模板](#数据模板)格式的动态属性,可以实现这样的功能。在以下的例子中,目标的 topic 属性是一个数据模板格式的字符串从而在运行时会将消息发送到动态的主题中。 
 
 ```json
 {
@@ -180,10 +180,10 @@ eKuiper 扩展了几个可以在模版中使用的函数。
   "actions": [{
     "mqtt": {
       "sendSingle": true,
-      "topic": "$.topic"
+      "topic": "prefix/{{.topic}}"
     }
   }]
 }
 ```
 
-需要注意的是,上例中的 `sendSingle` 属性已设置。在默认情况下,目标接收到的是数组,使用的 jsonpath 需要采用 `$[0].topic`。
+需要注意的是,上例中的 `sendSingle` 属性已设置。在默认情况下,目标接收到的是数组,使用的 jsonpath 需要采用 `{{index . 0 \"topic\"}}`。

+ 16 - 16
docs/zh_CN/rules/sinks/edgex.md

@@ -6,22 +6,22 @@
 
 **另外,如果你需要在别的主机上对你的端口可以进行访问,你需要在开始运行 eKuiper 服务之前,把端口号映射到主机上。**
 
-| 名称        | 可选 | Description                                                  |
-| ----------- | -------- | ------------------------------------------------------------ |
-| type          | 是    | 消息总线类型,目前支持三种类型的消息总线, `redis`, `zero` 或者 `mqtt`,其中 `redis` 为缺省类型。 |
-| protocol    | 是     | 协议,如未指定,使用缺省值 `tcp` 。  |
-| host        | 是    | 消息总线主机地址,使用缺省值 `*` 。                    |
-| port        | 是    | 消息总线端口号。 如未指定,使用缺省值 `5563` 。              |
-| connectionSelector | 是     | 重用到 EdgeX 消息总线的连接,详细信息,[请参考](../sources/edgex.md#connectionselector)
-| topic       | 是    | 发布的主题名称。该主题为固定值。若不同的消息需要动态指定主题,则将该属性置空,并设置 topicPrefix 属性。这两个属性只能设置一个。若两者都未设置,则使用缺省主题 `application` 。          |
-| topicPrefix | 是     | 发布的主题的前缀。发送的主题将采用动态拼接,格式为`$topicPrefix/$profileName/$deviceName/$sourceName` 。|
-| contentType | 是    | 发布消息的内容类型,如未指定,使用缺省值 `application/json` 。|
-| messageType   | 是   | EdgeX 消息模型类型。若要将消息发送为类似 apllication service 的 event 类型,则应设置为 `event`。否则,若要将消息发送为类似 device service 或者 core data service 的 event request 类型,则应设置为 `request`。如未指定,使用缺省值 ``event`` 。|
-| metadata    | 是    | 该属性为一个字段名称,该字段是 SQL SELECT 子句的一个字段名称,这个字段应该类似于 `meta(*) AS xxx` ,用于选出消息中所有的 EdgeX 元数据 。 |
-| profileName  | 是    | 允许用户指定 Profile 名称,该名称将作为从 eKuiper 中发送出来的 Event 结构体的 profile 名称。若在 metadata 中设置了 profileName 将会优先采用。|
-| deviceName  | 是    | 允许用户指定设备名称,该名称将作为从 eKuiper 中发送出来的 Event 结构体的设备名称。若在 metadata 中设置了 deviceName 将会优先采用。 |
-| sourceName    | 是   | 允许用户指定源名称,该名称将作为从 eKuiper 中发送出来的 Event 结构体的源名称。若在 metadata 中设置了 sourceName 将会优先采用。 |
-| optional      | 是    | 如果指定了 `mqtt` 消息总线,那么还可以指定一下可选的值。请参考以下可选的支持的配置类型。 |
+| 名称                 | 可选  | Description                                                                                                                                                                      |
+|--------------------|-----|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| type               | 是   | 消息总线类型,目前支持三种类型的消息总线, `redis`, `zero` 或者 `mqtt`,其中 `redis` 为缺省类型。                                                                                                                |
+| protocol           | 是   | 协议,如未指定,使用缺省值 `tcp` 。                                                                                                                                                            |
+| host               | 是   | 消息总线主机地址,使用缺省值 `*` 。                                                                                                                                                             |
+| port               | 是   | 消息总线端口号。 如未指定,使用缺省值 `5563` 。                                                                                                                                                     |
+| connectionSelector | 是   | 重用到 EdgeX 消息总线的连接,详细信息,[请参考](../sources/edgex.md#connectionselector)                                                                                                             |
+| topic              | 是   | 发布的主题名称。该主题为固定值。若不同的消息需要动态指定主题,则将该属性置空,并设置 topicPrefix 属性。这两个属性只能设置一个。若两者都未设置,则使用缺省主题 `application` 。                                                                            |
+| topicPrefix        | 是   | 发布的主题的前缀。发送的主题将采用动态拼接,格式为`$topicPrefix/$profileName/$deviceName/$sourceName` 。                                                                                                   |
+| contentType        | 是   | 发布消息的内容类型,如未指定,使用缺省值 `application/json` 。                                                                                                                                        |
+| messageType        | 是   | EdgeX 消息模型类型。若要将消息发送为类似 apllication service 的 event 类型,则应设置为 `event`。否则,若要将消息发送为类似 device service 或者 core data service 的 event request 类型,则应设置为 `request`。如未指定,使用缺省值 ``event`` 。 |
+| metadata           | 是   | 该属性为一个字段名称,该字段是 SQL SELECT 子句的一个字段名称,这个字段应该类似于 `meta(*) AS xxx` ,用于选出消息中所有的 EdgeX 元数据 。                                                                                          |
+| profileName        | 是   | 允许用户指定 Profile 名称,该名称将作为从 eKuiper 中发送出来的 Event 结构体的 profile 名称。若在 metadata 中设置了 profileName 将会优先采用。                                                                              |
+| deviceName         | 是   | 允许用户指定设备名称,该名称将作为从 eKuiper 中发送出来的 Event 结构体的设备名称。若在 metadata 中设置了 deviceName 将会优先采用。                                                                                             |
+| sourceName         | 是   | 允许用户指定源名称,该名称将作为从 eKuiper 中发送出来的 Event 结构体的源名称。若在 metadata 中设置了 sourceName 将会优先采用。                                                                                               |
+| optional           | 是   | 如果指定了 `mqtt` 消息总线,那么还可以指定一下可选的值。请参考以下可选的支持的配置类型。                                                                                                                                 |
 
 以下为支持的可选的配置列表,您可以参考 MQTT 协议规范来获取更详尽的信息。
 

+ 1 - 1
docs/zh_CN/rules/sinks/memory.md

@@ -21,7 +21,7 @@
 ```json
 {
   "memory": {
-    "topic": "$.topic"
+    "topic": "{{.topic}}"
   }
 }
 ```

+ 18 - 17
docs/zh_CN/rules/sinks/mqtt.md

@@ -2,21 +2,22 @@
 
 该操作用于将输出消息发布到 MQTT 服务器中。
 
-| 属性名称 | 是否可选 | 说明                                          |
-| ------------- | -------- | ---------------------------------------------------- |
-| server        | 否    | MQTT  服务器地址,例如 `tcp://127.0.0.1:1883` |
-| topic          | 否    | MQTT 主题,例如 `analysis/result` , 也可设置为动态属性,例如 `$.col`, 将会把结果中的 col 列的值作为主题                   |
-| clientId      | 是     | MQTT 连接的客户端 ID。 如果未指定,将使用一个 uuid |
-| protocolVersion   | 是    | MQTT 协议版本。3.1 (也被称为 MQTT 3) 或者 3.1.1 (也被称为 MQTT 4)。 如果未指定,缺省值为 3.1。 |
-| qos               | 是    | 消息转发的服务质量                               |
-| username          | 是    | 连接用户名                            |
-| password          | 是    | 连接密码                             |
-| certificationPath | 是    | 证书路径。可以为绝对路径,也可以为相对路径。如果指定的是相对路径,那么父目录为执行 `kuiperd` 命令的路径。比如,如果你在 `/var/kuiper` 中运行 `bin/kuiperd` ,那么父目录为 `/var/kuiper`; 如果运行从 `/var/kuiper/bin` 中运行`./kuiperd`,那么父目录为 `/var/kuiper/bin`。 |
-| privateKeyPath    | 是    | 私钥路径。可以为绝对路径,也可以为相对路径,相对路径的用法与 `certificationPath` 类似。 |
-| rootCaPath    | 是    | 根证书路径,用以验证服务器证书。可以为绝对路径,也可以为相对路径,相对路径的用法与 `certificationPath` 类似。 |
-| insecureSkipVerify | 是     | 如果 InsecureSkipVerify 设置为 `true`, TLS接受服务器提供的任何证书以及该证书中的任何主机名。 在这种模式下,TLS容易受到中间人攻击。默认值为`false`。配置项只能用于TLS连接。|
-| retained           | 是     | 如果 retained 设置为 `true`,Broker会存储每个Topic的最后一条保留消息及其Qos。默认值是 `false`   
-| connectionSelector | 是     | 重用到 MQTT Broker 的连接,详细信息,[请参考](../sources/mqtt.md#connectionselector)
+| 属性名称               | 是否可选 | 说明                                                                                                                                                                                        |
+|--------------------|------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| server             | 否    | MQTT  服务器地址,例如 `tcp://127.0.0.1:1883`                                                                                                                                                     |
+| topic              | 否    | MQTT 主题,例如 `analysis/result` , 也可设置为动态属性,例如 `$.col`, 将会把结果中的 col 列的值作为主题                                                                                                                  |
+| clientId           | 是    | MQTT 连接的客户端 ID。 如果未指定,将使用一个 uuid                                                                                                                                                          |
+| protocolVersion    | 是    | MQTT 协议版本。3.1 (也被称为 MQTT 3) 或者 3.1.1 (也被称为 MQTT 4)。 如果未指定,缺省值为 3.1。                                                                                                                       |
+| qos                | 是    | 消息转发的服务质量                                                                                                                                                                                 |
+| username           | 是    | 连接用户名                                                                                                                                                                                     |
+| password           | 是    | 连接密码                                                                                                                                                                                      |
+| certificationPath  | 是    | 证书路径。可以为绝对路径,也可以为相对路径。如果指定的是相对路径,那么父目录为执行 `kuiperd` 命令的路径。比如,如果你在 `/var/kuiper` 中运行 `bin/kuiperd` ,那么父目录为 `/var/kuiper`; 如果运行从 `/var/kuiper/bin` 中运行`./kuiperd`,那么父目录为 `/var/kuiper/bin`。 |
+| privateKeyPath     | 是    | 私钥路径。可以为绝对路径,也可以为相对路径,相对路径的用法与 `certificationPath` 类似。                                                                                                                                    |
+| rootCaPath         | 是    | 根证书路径,用以验证服务器证书。可以为绝对路径,也可以为相对路径,相对路径的用法与 `certificationPath` 类似。                                                                                                                         |
+| insecureSkipVerify | 是    | 如果 InsecureSkipVerify 设置为 `true`, TLS接受服务器提供的任何证书以及该证书中的任何主机名。 在这种模式下,TLS容易受到中间人攻击。默认值为`false`。配置项只能用于TLS连接。                                                                              |
+| retained           | 是    | 如果 retained 设置为 `true`,Broker会存储每个Topic的最后一条保留消息及其Qos。默认值是 `false`                                                                                                                        |
+| connectionSelector | 是    | 重用到 MQTT Broker 的连接,详细信息,[请参考](../sources/mqtt.md#connectionselector)                                                                                                                     |
+
 以下为使用 SAS 连接到 Azure IoT Hub 的样例。
 ```json
     {
@@ -51,13 +52,13 @@
 
 ## 动态主题
 
-若结果数据中包含主题内容,可以将其作为主题属性,从而实现动态主题的需求。假设 SQL 选出的数据包含 `mytopic`, 则可以使用 jsonpath 语法将其设置为 `topic` 属性的值,如下所示:
+若结果数据中包含主题内容,可以将其作为主题属性,从而实现动态主题的需求。假设 SQL 选出的数据包含 `mytopic`, 则可以使用数据模板的语法将其设置为 `topic` 属性的值,如下所示:
 
 ```json
     {
       "mqtt": {
         "server": "ssl://xyz-ats.iot.us-east-1.amazonaws.com:8883",
-        "topic": "$.mytopic",
+        "topic": "{{.mytopic}}",
         "qos": 1,
         "clientId": "demo_001",
         "certificationPath": "keys/d3807d9fa5-certificate.pem",

+ 3 - 3
docs/zh_CN/rules/sinks/nop.md

@@ -2,8 +2,8 @@
 
 该 action 是一个空操作目标,所有发送到此的结果将被忽略。如果指定 `log` 属性为 `true`,那么结果将会保存到日志文件,日志文件缺省保存在  `$eKuiper_install/log/stream.log`。
 
-| 属性名称  | 是否可选 | 说明                                                  |
-| ------------------ | -------- | ------------------------------------------------------------ |
-| log             | true | true/false - 是否将结果打印到日志。缺省为 `false`,这种情况下将不会打印到日志文件。 |
+| 属性名称 | 是否可选 | 说明                                                   |
+|------|------|------------------------------------------------------|
+| log  | true | true/false - 是否将结果打印到日志。缺省为 `false`,这种情况下将不会打印到日志文件。 |
 
 

Rozdielové dáta súboru neboli zobrazené, pretože súbor je príliš veľký
+ 16 - 16
docs/zh_CN/rules/sinks/rest.md


+ 3 - 2
internal/binder/function/funcs_misc.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -255,7 +255,8 @@ func otherCall(name string, args []interface{}) (interface{}, bool) {
 }
 
 func jsonCall(ctx api.StreamContext, name string, args []interface{}) (interface{}, bool) {
-	result, err := ctx.ParseDynamicProp(args[1].(string), args[0])
+	// TO BE REMOVED prefix { to avoid conflict with regular string
+	result, err := ctx.ParseDynamicProp("{"+args[1].(string), args[0])
 	if err != nil {
 		if name == "json_path_exists" {
 			return false, true

+ 51 - 24
internal/topo/context/default.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -15,15 +15,19 @@
 package context
 
 import (
+	"bytes"
 	"context"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/topo/connection"
+	"github.com/lf-edge/ekuiper/internal/topo/transform"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/sirupsen/logrus"
+	"regexp"
 	"strings"
 	"sync"
+	"text/template"
 	"time"
 )
 
@@ -40,7 +44,8 @@ type DefaultContext struct {
 	state    *sync.Map
 	snapshot map[string]interface{}
 	// cache
-	jsonEvalReg sync.Map
+	tpReg sync.Map
+	jpReg sync.Map
 }
 
 func Background() *DefaultContext {
@@ -110,24 +115,45 @@ func (c *DefaultContext) SetError(err error) {
 }
 
 func (c *DefaultContext) ParseDynamicProp(prop string, data interface{}) (interface{}, error) {
-	// If not a json path, just return itself
-	if !strings.HasPrefix(prop, "$") {
-		return prop, nil
-	}
-	var (
-		je  conf.JsonPathEval
-		err error
-	)
-	if raw, ok := c.jsonEvalReg.Load(prop); ok {
-		je = raw.(conf.JsonPathEval)
-	} else {
-		je, err = conf.GetJsonPathEval(prop)
+	re := regexp.MustCompile(`{{(.*?)}}`)
+	if re.Match([]byte(prop)) {
+		var (
+			tp  *template.Template
+			err error
+		)
+		if raw, ok := c.tpReg.Load(prop); ok {
+			tp = raw.(*template.Template)
+		} else {
+			tp, err = transform.GenTp(prop)
+			if err != nil {
+				return fmt.Sprintf("%v", data), err
+			}
+			c.tpReg.Store(prop, tp)
+		}
+		var output bytes.Buffer
+		err = tp.Execute(&output, data)
 		if err != nil {
-			return nil, err
+			return fmt.Sprintf("%v", data), err
 		}
-		c.jsonEvalReg.Store(prop, je)
+		return output.String(), nil
+	} else if strings.HasPrefix(prop, "{$") { //TO BE REMOVED: will be extracted as a new function in the next release
+		var (
+			je  conf.JsonPathEval
+			err error
+		)
+		if raw, ok := c.jpReg.Load(prop); ok {
+			je = raw.(conf.JsonPathEval)
+		} else {
+			je, err = conf.GetJsonPathEval(prop[1:])
+			if err != nil {
+				return nil, err
+			}
+			c.jpReg.Store(prop, je)
+		}
+		return je.Eval(data)
+	} else {
+		return prop, nil
 	}
-	return je.Eval(data)
 }
 
 func (c *DefaultContext) WithMeta(ruleId string, opId string, store api.Store) api.StreamContext {
@@ -136,13 +162,14 @@ func (c *DefaultContext) WithMeta(ruleId string, opId string, store api.Store) a
 		c.GetLogger().Warnf("Initialize context store error for %s: %s", opId, err)
 	}
 	return &DefaultContext{
-		ruleId:      ruleId,
-		opId:        opId,
-		instanceId:  0,
-		ctx:         c.ctx,
-		store:       store,
-		state:       s,
-		jsonEvalReg: sync.Map{},
+		ruleId:     ruleId,
+		opId:       opId,
+		instanceId: 0,
+		ctx:        c.ctx,
+		store:      store,
+		state:      s,
+		tpReg:      sync.Map{},
+		jpReg:      sync.Map{},
 	}
 }
 

+ 28 - 7
internal/topo/context/default_test.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -116,7 +116,7 @@ func TestDynamicProp(t *testing.T) {
 		r []interface{} // parsed results
 	}{
 		{
-			j: "$.a",
+			j: "{$.a",
 			v: []interface{}{
 				map[string]interface{}{
 					"a": 123,
@@ -137,7 +137,7 @@ func TestDynamicProp(t *testing.T) {
 				nil,
 			},
 		}, {
-			j: "$[0].a",
+			j: "{$[0].a",
 			v: []interface{}{
 				[]map[string]interface{}{{
 					"a": 123,
@@ -161,7 +161,7 @@ func TestDynamicProp(t *testing.T) {
 				"single",
 			},
 		}, {
-			j: "a",
+			j: "$a",
 			v: []interface{}{
 				map[string]interface{}{
 					"a": 123,
@@ -177,9 +177,30 @@ func TestDynamicProp(t *testing.T) {
 				},
 			},
 			r: []interface{}{
-				"a",
-				"a",
-				"a",
+				"$a",
+				"$a",
+				"$a",
+			},
+		}, {
+			j: "devices/{{.a}}",
+			v: []interface{}{
+				map[string]interface{}{
+					"a": 123,
+					"b": "dafds",
+				},
+				map[string]interface{}{
+					"a": "single",
+					"c": 20.2,
+				},
+				map[string]interface{}{
+					"b": "b",
+					"c": "c",
+				},
+			},
+			r: []interface{}{
+				"devices/123",
+				"devices/single",
+				"devices/<no value>",
 			},
 		},
 	}

+ 10 - 5
internal/topo/sink/rest_sink.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -15,6 +15,7 @@
 package sink
 
 import (
+	"encoding/json"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/pkg/cert"
 	"github.com/lf-edge/ekuiper/internal/pkg/httpx"
@@ -280,7 +281,7 @@ func (ms *RestSink) Send(ctx api.StreamContext, v interface{}, logger api.Logger
 	if err != nil {
 		return nil, err
 	}
-	url, ok := temp.(string)
+	u, ok := temp.(string)
 	if !ok {
 		return nil, fmt.Errorf("the value %v of dynamic prop %s for url is not a string", ms.url, temp)
 	}
@@ -292,12 +293,16 @@ func (ms *RestSink) Send(ctx api.StreamContext, v interface{}, logger api.Logger
 		if err != nil {
 			return nil, err
 		}
-		headers, ok = temp.(map[string]string)
+		tstr, ok := temp.(string)
 		if !ok {
-			return nil, fmt.Errorf("the value %v of dynamic prop %s for headers is not a map[string]string", ms.headersTemplate, temp)
+			return nil, fmt.Errorf("the value %v of dynamic prop %s for headersTemplate is not a string", ms.headersTemplate, temp)
+		}
+		err = json.Unmarshal([]byte(tstr), &headers)
+		if err != nil {
+			return nil, fmt.Errorf("rest sink headers template decode error: %v", err)
 		}
 	}
-	return httpx.Send(logger, ms.client, bodyType, method, url, headers, ms.sendSingle, v)
+	return httpx.Send(logger, ms.client, bodyType, method, u, headers, ms.sendSingle, v)
 }
 
 func (ms *RestSink) Close(ctx api.StreamContext) error {

+ 5 - 1
internal/topo/transform/template.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -47,3 +47,7 @@ func GenTransform(dt string) (TransFunc, error) {
 		}
 	}, nil
 }
+
+func GenTp(dt string) (*template.Template, error) {
+	return template.New("sink").Funcs(ct.FuncMap).Parse(dt)
+}

+ 2 - 2
test/rule_pipeline.jmx

@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
-  ~ Copyright 2021 EMQ Technologies Co., Ltd.
+  ~ Copyright 2022 EMQ Technologies Co., Ltd.
   ~
   ~ Licensed under the Apache License, Version 2.0 (the "License");
   ~ you may not use this file except in compliance with the License.
@@ -344,7 +344,7 @@
     &quot;mqtt&quot;: {&#xd;
       &quot;server&quot;: &quot;tcp://${mqtt_srv}:1883&quot;,&#xd;
       &quot;sendSingle&quot;: true,&#xd;
-      &quot;topic&quot;: &quot;$.t&quot;&#xd;
+      &quot;topic&quot;: &quot;{{.t}}&quot;&#xd;
     }&#xd;
   }],&#xd;
   &quot;options&quot;: {&#xd;