Browse Source

update docs

RockyJin 5 years ago
parent
commit
7e981d7fda

+ 27 - 39
docs/en_US/edgex/edgex_rule_engine_tutorial.md

@@ -40,37 +40,23 @@ EdgeX uses [message bus](https://github.com/edgexfoundry/go-mod-messaging) to ex
 
 ## Start to use
 
-### Pull Kuiper Docker and run
+In out tutorial, we will use [Random Integer Device Service](https://github.com/edgexfoundry/device-random) which is shipped in official released EdgeX, and run rules against the data generated by this sample device service.
 
-It's **STRONGLY** recommended to use Docker, since related dependency libraries (such ZeroMQ lib) are already installed in Docker images.
+### Run EdgeX Docker instances
 
-```shell
-docker pull emqx/kuiper:0.3.0
-```
-
-<u>TODO: After offcially releasing of EdgeX Geneva, the Kuiper docker image will be pulled automatically by EdgeX docker composer files. The command will be updated by then.</u>  
+After the EdgeX Geneva is offcially released, you can just follow steps in [this doc](https://fuji-docs.edgexfoundry.org/Ch-QuickStart.html) to start the service. But now since Kuiper has not been official released yet, you have to download Docker composer file from [here](https://github.com/edgexfoundry/developer-scripts/blob/master/releases/nightly-build/compose-files/docker-compose-nexus-mongo-no-secty.yml), and then bring up EdgeX Docker instances. 
 
-**Run Docker**
+```shell
+# wget https://github.com/edgexfoundry/developer-scripts/raw/master/releases/nightly-build/compose-files/docker-compose-nexus-mongo-no-secty.yml
 
-```
-docker run -d --name kuiper emqx/kuiper:0.3.0
+# docker-compose -f ./docker-compose-nexus-redis-no-secty.yml up -d --build
 ```
 
-If the docker instance is failed to start, please use ``docker logs kuiper`` to see the log files.
+After all of the Docker instances are started, you can use ``docker ps`` command to verify all of services are runnings correctly.
 
-Notice 1: The default EdgeX message bus configuration could be updated when bring-up the Docker instance.  As listed in below, override the default configurations for message bus server, port and service server address for getting value descriptors in Kuiper instance.
-
-```shell
-docker run -d --name kuiper -e EDGEX_SERVER=10.211.55.2 -e EDGEX_PORT=5563 -e EDGEX_SERVICE_SERVER=http://10.211.55.2:48080 emqx/kuiper:0.3
 ```
-
-For more detailed supported Docer environment varialbles, please refer to [this link](https://hub.docker.com/r/emqx/kuiper).
-
-*Notice 2: If you'd like to use Kuiper with EdgeX support seperately (without Docker), you could build Kuiper by yourself with ``make pkg_with_edgex`` command.*
-
-### Create a device service
-
-In this tutorial, we use a very simple mock-up device service. Please follow the steps in [this doc](https://fuji-docs.edgexfoundry.org/Ch-GettingStartedSDK-Go.html) to develop and run the random number service.  
+TODO: The docker instance list
+```
 
 ### Create a stream
 
@@ -82,7 +68,7 @@ The next step is to create a stream that can consume data from EdgeX message bus
 
 ```shell
 curl -X POST \
-  http://$your_server:9081/streams \
+  http://$TODO:9081/streams \
   -H 'Content-Type: application/json' \
   -d '{
   "sql": "create stream demo() WITH (FORMAT=\"JSON\", TYPE=\"edgex\")"
@@ -134,7 +120,7 @@ So the below rule will filter all of ``randomnumber`` that is less than 31. The
 
 ```shell
 curl -X POST \
-  http://$your_server:9081/rules \
+  http://$TODO:9081/rules \
   -H 'Content-Type: application/json' \
   -d '{
   "id": "rule1",
@@ -157,7 +143,7 @@ You can create a rule file with any text editor, and copy following contents int
 
 ```
 {
-  "sql": "SELECT * from demo where randomnumber > 30",
+  "sql": "SELECT * from demo",
   "actions": [
     {
       "mqtt": {
@@ -186,13 +172,7 @@ If you want to send analysis result to another sink, please refer to [other sink
 Now you can also take a look at the log file under ``log/stream.log``, see detailed info of rule. 
 
 ```
-time="2020-03-19T10:23:40+08:00" level=info msg="open source node 1 instances" rule=rule1
-time="2020-03-19T10:23:40+08:00" level=info msg="Connect to value descriptor service at: http://localhost:48080/api/v1/valuedescriptor \n"
-time="2020-03-19T10:23:40+08:00" level=info msg="Use configuration for edgex messagebus {{ 0 } {localhost 5563 tcp} zero map[]}\n"
-time="2020-03-19T10:23:40+08:00" level=info msg="Start source demo instance 0 successfully" rule=rule1
-time="2020-03-19T10:23:40+08:00" level=info msg="The connection to edgex messagebus is established successfully." rule=rule1
-time="2020-03-19T10:23:40+08:00" level=info msg="Successfully subscribed to edgex messagebus topic events." rule=rule1
-time="2020-03-19T10:23:40+08:00" level=info msg="The connection to server tcp://broker.emqx.io:1883 was established successfully" rule=rule1
+//TODO
 ```
 
 ### Monitor analysis result
@@ -201,12 +181,7 @@ Since all of the analysis result are published to  ``tcp://broker.emqx.io:1883``
 
 ```shell
 # mosquitto_sub -h broker.emqx.io -t result
-[{"randomnumber":81}]
-[{"randomnumber":87}]
-[{"randomnumber":47}]
-[{"randomnumber":59}]
-[{"randomnumber":81}]
-...
+//TODO
 ```
 
 You'll find that only those randomnumber larger than 30 will be published to ``result`` topic.
@@ -254,6 +229,19 @@ Connecting to 127.0.0.1:20498...
 
 In this tutorial,  we introduce a very simple use of EdgeX Kuiper rule engine. If having any issues regarding to use of Kuiper rule engine, you can open issues in EdgeX or Kuiper Github respository.
 
+### More Excecise 
+
+Current rule does not filter any data that are sent to Kuiper, so how to filter data?  For example,  if you  only concern ``Int32`` field value that are great than 30.  Please [drop rule](../cli/rules.md) and change the SQL in previous rule file  as following.  
+
+```
+{
+  "sql": "SELECT * from demo WHERE Int32 > 30",
+  "actions": [...]
+}
+```
+
+After update the rule file, and then deploy the rule again. Please monitor the result topic of MQTT broker, you will find that only ``Int32`` field value that are great than 30 will be sent out.
+
 #### Extended Reading
 
 - Read [EdgeX source](../rules/sources/edgex.md) for more detailed information of configurations and data type conversion.

+ 8 - 8
docs/en_US/rules/sinks/edgex.md

@@ -51,17 +51,17 @@ In this case, the original metadata value (such as ``id, pushed, created, modifi
 3) The data sent to EdgeX message bus.
 ```
 {
-  "Device": "kuiper", "Created": 1000, …
+  "Device": "kuiper", "Created": 0, …
   "readings": 
   [
-     {"Name": "t1", value: "90" , "Created": 1000 …},
-     {"Name": "humidity", value: "20" , "Created": 1000 …}
+     {"Name": "t1", value: "90" , "Created": 0 …},
+     {"Name": "humidity", value: "20" , "Created": 0 …}
   ]
 }
 ```
 Please notice that, 
-- The device name is changed to ``kuiper``
-- All of metadata for ``Events and Readings`` structure will be updated with new value. ``Created`` field is updated to another value generated by Kuiper (here is ``1000``).
+- The device name of ``Event`` structure is changed to ``kuiper``
+- All of metadata for ``Events and Readings`` structure will be updated with new value. ``Created`` field is updated to another value generated by Kuiper (here is ``0``).
 
 ### Publish result to EdgeX message bus with keeping original metadata
 But for some scenarios, you may want to keep some of original metadata. Such as keep the device name as original value that published to Kuiper (``demo`` in the sample), and also other metadata of readings arrays. In such case, Kuiper is acting as a filter - to filter NOT concerned messages, but still keep original data.
@@ -100,7 +100,7 @@ Below is an example,
 }
 ```
 Please notice that,
-- User need to add ``meta(*) AS edgex_meta`` in the SQL statement, the ``meta(*)`` returns all of metadata.
+- User need to add ``meta(*) AS edgex_meta`` in the SQL clause, the ``meta(*)`` returns all of metadata.
 - In ``edgex`` action, value ``edgex_meta``  is specified for ``metadata`` property. This property specifies which field contains metadata of message.
 
 3) The data sent to EdgeX message bus.
@@ -109,7 +109,7 @@ Please notice that,
   "Device": "demo", "Created": 000, …
   "readings": 
   [
-     {"Name": "t1", value: "90" , "Created": 2000 …},
+     {"Name": "t1", value: "90" , "Created": 0 …},
      {"Name": "humidity", value: "20", "Created":456 …}
   ]
 }
@@ -117,7 +117,7 @@ Please notice that,
 Please notice that,
 - The metadata of ``Events`` structure is still kept, such as ``Device`` & ``Created``.
 - For the reading that can be found in original message, the metadata will be kept. Such as ``humidity`` metadata will be the ``old values`` received from EdgeX message bus.
-- For the reading that can NOT be found in original message,  the metadata will be updated with new value.  Such as metadata of ``t1`` in the sample will fill with default value that generated by Kuiper. 
+- For the reading that can NOT be found in original message,  the metadata will not be set.  Such as metadata of ``t1`` in the sample will fill with default value that generated by Kuiper. 
 - If your SQL has aggregated function, then it does not make sense to keep these metadata, but Kuiper will still fill with metadata from a particular message in the time window. For example, with following SQL, 
 ```SELECT avg(temperature) AS temperature, meta(*) AS edgex_meta FROM ... GROUP BY TUMBLINGWINDOW(ss, 10)```. 
 In this case, there are possibly several messages in the window, the metadata value for ``temperature`` will be filled with value from 1st message that received from bus.

+ 5 - 9
docs/en_US/rules/sources/edgex.md

@@ -21,7 +21,7 @@ The types defined in EdgeX value descriptors will be converted into related [dat
 
 ### Boolean
 
-If  ``Type`` value of ``ValueDescriptor`` is ``B``, ``Bool`` or ``Boolean``, then Kuiper tries to convert to ``boolean`` type. Following values will be converted into ``true``.
+If  ``Type`` value of ``ValueDescriptor`` is ``Bool``, then Kuiper tries to convert to ``boolean`` type. Following values will be converted into ``true``.
 
 - "1", "t", "T", "true", "TRUE", "True" 
 
@@ -31,19 +31,15 @@ Following will be converted into ``false``.
 
 ### Bigint
 
-If  ``Type`` value of ``ValueDescriptor`` is ``I``, ``INT``,  ``INT8`` , ``INT16``, ``INT32``,  ``INT64``,``UINT`` , ``UINT8`` , ``UINT16`` ,  ``UINT32`` , ``UINT64`` then Kuiper tries to convert to ``Bigint`` type. 
+If  ``Type`` value of ``ValueDescriptor`` is ``INT8`` , ``INT16``, ``INT32``,  ``INT64``,``UINT`` , ``UINT8`` , ``UINT16`` ,  ``UINT32`` , ``UINT64`` then Kuiper tries to convert to ``Bigint`` type. 
 
 ### Float
 
-If  ``Type`` value of ``ValueDescriptor`` is ``F``, ``FLOAT``,  ``FLOAT16`` , ``FLOAT32``, ``FLOAT64``then Kuiper tries to convert to ``Float`` type. 
+If  ``Type`` value of ``ValueDescriptor`` is ``FLOAT16`` , ``FLOAT32``, ``FLOAT64``then Kuiper tries to convert to ``Float`` type. 
 
 ### String
 
-If  ``Type`` value of ``ValueDescriptor`` is ``S``, ``String``, then Kuiper tries to convert to ``String`` type. 
-
-### Struct
-
-If  ``Type`` value of ``ValueDescriptor`` is ``J``, ``Json``, then Kuiper tries to convert to ``Struct`` type. 
+If  ``Type`` value of ``ValueDescriptor`` is ``String``, then Kuiper tries to convert to ``String`` type. 
 
 # Global configurations
 
@@ -89,7 +85,7 @@ The base service address for getting value descriptors, the value of ``serviceSe
 
 ## Override the default settings
 
-In some cases, maybe you want to consume message from multiple topics or event bus.  Kuiper supports to specify another configuration, and use the ``CONF_KEY`` to specify the newly created key when you create a stream.
+In some cases, maybe you want to consume message from multiple topics from message bus.  Kuiper supports to specify another configuration, and use the ``CONF_KEY`` to specify the newly created key when you create a stream.
 
 ```yaml
 #Override the global configurations

BIN
docs/zh_CN/edgex/arch_dark.png


BIN
docs/zh_CN/edgex/arch_light.png


BIN
docs/zh_CN/edgex/bus_data.png


BIN
docs/zh_CN/edgex/create_stream.png


+ 87 - 0
docs/zh_CN/edgex/edgex_meta.md

@@ -0,0 +1,87 @@
+# 如何使用 meta 函数抽取在 EdgeX 消息总线中发送的其它信息?
+
+当数据被发布到 EdgeX 消息总线的时候,除了真正的设备发出的值之外,还包含了一些额外的值,例如事件创建的时间,修改时间。有时在数据分析的时候需要这些值,本文描述如何使用 Kuiper 提供的函数来实现这个功能。
+
+## EdgeX 消息总线上收到的消息模型
+
+在 EdgeX 消息总线上收到的数据结构如下,一个 ``Event`` 结构体封装了相关的「元数据」(ID, Pushed, Device, Created, Modified, Origin),以及从设备服务中采集到的实际数据 (在 ``Readings`` 字段中) 。
+
+与``Event`` 类似, ``Reading`` 也包含了一些元数据 (ID, Pushed... 等)。
+
+- Event
+  - ID
+  - Pushed
+  - Device
+  - Created
+  - Modified
+  - Origin
+  - Readings
+    - reading [0]
+      - Id
+      - Pushed
+      - Created
+      - Origin
+      - Modified
+      - Device
+      - Name
+      - Value
+    - reading [1]
+      - ... // The same as in reading[0]
+      - ...
+    - reading [n] ...
+
+## Kuiper 中的 EdgeX 数据模型
+
+那么在 Kuiper 中, EdgeX 数据是如何被管理的?让我们来看个例子。
+
+如下所示,首先用户创建了一个名为 ``events`` 的 EdgeX 流定义(以黄色高亮标示)。
+
+<img src="create_stream.png" style="zoom:50%;" />
+
+其次,如下所示,一条消息被发送到消息总线。
+
+- Device name 为 ``demo``,以绿色高亮标示
+- Reading 名称为 ``temperature`` & ``Humidity`` ,用红色高亮标示
+- 这里有些 ``元数据`` 是没有必要「可见」的,但是这些值在分析的时候可能会被用到,例如``Event`` 结构体中的 ``Created`` 字段。Kuiper 将这些值保存在 Kuiper 消息中的名为 metadata 的字段中,用户在分析阶段可以获取到这些值。
+
+<img src="bus_data.png" style="zoom:50%;" />
+
+最后,提供一条 SQL 用于数据分析,此处请注意,
+
+- FROM 子句中的 ``events`` 为黄色高亮,就是在第一步中定义的流名字。
+- SELECT 中的 ``temperature`` & ``humidity`` 字段为红色高亮,它们是 readings 中的 ``Name`` 字段的值。
+- WHERE 子句中的 ``meta(device)`` 为绿色高亮,用于从 ``Events ``结构体中抽取 ``device`` 字段。该 SQL 语句将过滤所有设备名称不是 ``demo`` 的记录。
+
+<img src="sql.png" style="zoom:50%;" />
+
+以下是使用 ``meta`` 函数抽取别的元数据的一些例子。
+
+1. ``meta(created)``: 000  
+
+   从 Event 结构体中获取 'created' 元数据
+
+2. ``meta(temperature -> created)``: 123 
+
+   从 reading[0] 中获取  'created' 元数据,以 'temperature'  为 key
+
+3. ``meta(humidity -> created)``: 456 
+
+   从 reading[1] 中获取  'created' 元数据,以 'humidity' 为 key
+
+请注意,如果你想从 readings 中获取元数据,你需要使用 ``reading-name -> key`` 操作符来访问这些值。在前述例子中,``temperature`` & ``humidity``  是  ``reading-names``,并且  ``key`` 是 readings 中的字段名字。
+
+但是,如果你从 ``Events`` 中获取元数据,只需直接指定 key,如第一个例子所示。
+
+``meta`` 函数也可以用在 ``SELECT`` 子句中,以下为另外一个例子。请注意,如果在 ``SELECT`` 子句中使用了多个 ``meta`` 函数,你应该使用 ``AS`` 来指定一个别名,否则在前面的字段中的值将会被覆盖(不加别名,都有 meta 作为字段名)。
+
+```sql
+SELECT temperature,humidity, meta(id) AS eid,meta(Created) AS ec, meta(temperature->pushed) AS tpush, meta(temperature->Created) AS tcreated, meta(temperature->Origin) AS torigin, meta(Humidity->Device) AS hdevice, meta(Humidity->Modified) AS hmodified FROM demo WHERE meta(device)="demo2"
+```
+
+## 总结
+
+Kuper 的 ``meta`` 函数可以用于访问元数据,以下列出了所有在 EdgeX 的 ``Events`` 和 ``Reading`` 中支持的 key,
+
+- Events: id, pushed, device, created, modified, origin, correlationid
+- Readning: id, created, modified, origin, pushed, device
+

File diff suppressed because it is too large
+ 250 - 0
docs/zh_CN/edgex/edgex_rule_engine_tutorial.md


BIN
docs/zh_CN/edgex/sql.png


+ 127 - 0
docs/zh_CN/rules/sinks/edgex.md

@@ -0,0 +1,127 @@
+# EdgeX 消息总线目标
+
+该目标用于将消息发送到 EdgeX 消息总线上。
+
+| name        | Optional | Description                                                  |
+| ----------- | -------- | ------------------------------------------------------------ |
+| protocol    | true     | 如未指定,使用缺省值 ``tcp``.                                |
+| host        | true     | 消息总线目标主机地址,使用缺省值 ``*``.                      |
+| port        | true     | 消息总线端口号。 如未指定,使用缺省值 ``5563``.              |
+| topic       | true     | 发布的主题名称,如未指定,使用缺省值 ``events``.             |
+| contentType | true     | 发布消息的内容类型,如未指定,使用缺省值 ``application/json``. |
+| metadata    | true     | 该属性为一个字段名称,该字段是 SQL SELECT 子句的一个字段名称,这个字段应该类似于 ``meta(*) AS xxx`` ,用于选出消息中所有的 EdgeX 元数据. |
+| deviceName  | true     | 允许用户指定设备名称,该名称将作为从 Kuiper 中发送出来的 Event 结构体的设备名称. |
+
+## 例子
+
+### 发布结果到 EdgeX 消息总线,而不保留原有的元数据
+在此情况下,原有的元数据 (例如``Events`` 结构体中的 ``id, pushed, created, modified, origin``,以及``Reading`` 结构体中的  ``id, created, modified, origin, pushed, device`` 不会被保留)。Kuiper 在此情况下作为 EdgeX 的一个单独微服务,它有自己的 ``device name``。 提供了属性 ``deviceName``, 该属性允许用户指定 Kuiper 的设备名称。如下所示,
+
+1) 从 EdgeX 消息总线上的 ``events`` 主题上收到的消息,
+
+```
+{
+  "Device": "demo", "Created": 000, …
+  "readings": 
+  [
+     {"Name": "Temperature", value: "30", "Created":123 …},
+     {"Name": "Humidity", value: "20", "Created":456 …}
+  ]
+}
+```
+2) 使用如下的规则,并且在 ``edgex`` action 中给属性 ``deviceName`` 指定 ``kuiper``。
+
+```json
+{
+  "id": "rule1",
+  "sql": "SELECT temperature * 3 AS t1, humidity FROM events",
+  "actions": [
+    {
+      "edgex": {
+        "protocol": "tcp",
+        "host": "*",
+        "port": 5571,
+        "topic": "application",
+        "deviceName": "kuiper",
+        "contentType": "application/json"
+      }
+    }
+  ]
+}
+```
+3) 发送到 EdgeX 消息总线上的数据。
+
+```
+{
+  "Device": "kuiper", "Created": 0, …
+  "readings": 
+  [
+     {"Name": "t1", value: "90" , "Created": 0 …},
+     {"Name": "humidity", value: "20" , "Created": 0 …}
+  ]
+}
+```
+请注意,
+- Event 结构体中的设备名称( `` Device``)变成了 ``kuiper``
+- ``Events and Readings`` 结构体中的数据被更新为新的值. 字段 ``Created`` 被 Kuiper 更新为新的值 (这里为 ``0``).
+
+### 发布结果到 EdgeX 消息总线,并保留原有的元数据
+但是在某些场景中,你可能需要保留原来的元数据。比如保留发送到 Kuiper 的设备名称,在本例中为 ``demo``, 还有 reading 数组中的其它元数据。在此情况下,Kuiper 更像是一个过滤器 - 将不关心的数据过滤掉,但是依然保留原有的数据。
+
+参考以下的例子,
+
+1) 从 EdgeX 消息总线上的 ``events`` 主题上收到的消息,
+
+```
+{
+  "Device": "demo", "Created": 000, …
+  "readings": 
+  [
+     {"Name": "Temperature", value: "30", "Created":123 …},
+     {"Name": "Humidity", value: "20", "Created":456 …}
+  ]
+}
+```
+2) 使用如下规则,在``edgex`` action 中,为 ``metadata`` 指定值 ``edgex_meta`` 。
+
+```json
+{
+  "id": "rule1",
+  "sql": "SELECT meta(*) AS edgex_meta, temperature * 3 AS t1, humidity FROM events WHERE temperature > 30",
+  "actions": [
+    {
+      "edgex": {
+        "protocol": "tcp",
+        "host": "*",
+        "port": 5571,
+        "topic": "application",
+        "metadata": "edgex_meta",
+        "contentType": "application/json"
+      }
+    }
+  ]
+}
+```
+请注意,
+- 用户需要在 SQL 子句中加 ``meta(*) AS edgex_meta`` ,函数 ``meta(*)`` 返回所有的元数据。
+- 在 ``edgex`` action里, 属性 ``metadata`` 指定值 ``edgex_meta`` 。该属性指定哪个字段包含了元数据。
+
+3) 发送给 EdgeX 消息总线的数据
+
+```
+{
+  "Device": "demo", "Created": 000, …
+  "readings": 
+  [
+     {"Name": "t1", value: "90" , "Created": 0 …},
+     {"Name": "humidity", value: "20", "Created":456 …}
+  ]
+}
+```
+请注意,
+- ``Events`` 结构体的元数据依然保留,例如 ``Device`` & ``Created``.
+- 对于在原有消息中可以找到的 reading,元数据将继续保留。 比如 ``humidity`` 的元数据就是从 EdgeX 消息总线里接收到的``原值 - 或者说是旧值``。
+- 对于在原有消息中无法找到的 reading,元数据将不会被设置。如例子中的``t1`` 的元数据被设置为 Kuiper 产生的缺省值。
+- 如果你的 SQL 包含了聚合函数,那保留原有的元数据就没有意义,但是 Kuiper 还是会使用时间窗口中的某一条记录的元数据。例如,在下面的 SQL 里,
+```SELECT avg(temperature) AS temperature, meta(*) AS edgex_meta FROM ... GROUP BY TUMBLINGWINDOW(ss, 10)```. 
+这种情况下,在时间窗口中可能有几条数据,Kuiper 会使用窗口中的第一条数据的元数据来填充 ``temperature`` 的元数据。

+ 108 - 0
docs/zh_CN/rules/sources/edgex.md

@@ -0,0 +1,108 @@
+
+
+# EdgeX 源
+
+Kuiper 提供了内置的 EdgeX 源支持,它可以被用来订阅来自于[EdgeX 消息总线](https://github.com/edgexfoundry/go-mod-messaging)的数据,并且将数据放入 Kuiper 数据处理流水线中。
+
+## EdgeX 流定义
+
+EdgeX 在 [value descriptors](https://github.com/edgexfoundry/go-mod-core-contracts) 已经定义了数据类型,因此在 Kuiper 中建议采用 schema-less 方式的 EdgeX 流式定义,如下所示。
+
+```shell
+# cd $kuiper_base
+# bin/cli CREATE STREAM demo'() with(format="json", datasource="demo" type="edgex")'
+```
+
+EdgeX 源会试图取得某个字段的类型,
+
+- 如果在 value descriptors 中可找到其数据类型,就将其转换为对应类型;
+- 如果在 value descriptors 中可找不到到其数据类型,将保留原值;
+- 如果类型转换失败,该值将被**丢弃**,并在日志上打印一条告警消息;
+
+在 EdgeX value descriptors 中定义的数据类型,将被转换为 Kuiper 中相应支持的[数据类型](../../sqls/streams.md)。
+
+### Boolean
+
+如果 ``ValueDescriptor`` 中  ``Type`` 的值为 ``Bool`` ,那么 Kuiper 会试着将其转换为 ``boolean`` 类型,以下的值将被转化为 ``true``。
+
+- "1", "t", "T", "true", "TRUE", "True" 
+
+以下值将被转换为 ``false``。
+
+- "0", "f", "F", "false", "FALSE", "False"
+
+### Bigint
+
+如果 ``ValueDescriptor`` 中  ``Type`` 的值为 ``INT8`` , ``INT16``, ``INT32``,  ``INT64`` , ``UINT8`` , ``UINT16`` ,  ``UINT32`` , ``UINT64`` 那么 Kuiper 会试着将其转换为 ``Bigint`` 类型。 
+
+### Float
+
+如果 ``ValueDescriptor`` 中  ``Type`` 的值为 ``FLOAT32``, ``FLOAT64`` ,那么 Kuiper 会试着将其转换为 ``Float`` 类型。 
+
+### String
+
+如果 ``ValueDescriptor`` 中  ``Type`` 的值为 ``String``,那么 Kuiper 会试着将其转换为 ``String`` 类型。 
+
+# 全局配置
+
+EdgeX 源配置文件为 ``$kuiper/etc/sources/edgex.yaml``,以下配置文件内容。
+
+```yaml
+#Global Edgex configurations
+default:
+  protocol: tcp
+  server: localhost
+  port: 5573
+  topic: events
+  serviceServer: http://localhost:48080
+#  optional:
+#    ClientId: client1
+#    Username: user1
+#    Password: password
+```
+
+用户可以在此指定全局的 EdgeX 配置。在 ``default`` 部分中指定的配置将作为所有 EdgeX 源的缺省配置。
+
+## protocol
+
+连接到 EdgeX 消息总线的协议,缺省为 ``tcp``
+
+## server
+
+EdgeX 消息总线的地址,缺省为 ``localhost``
+
+## port
+
+EdgeX 消息总线的端口,缺省为 ``5573``.
+
+## topic
+
+EdgeX 消息总线上监听的主题名称,缺省为 ``events``.
+
+## serviceServer
+
+访问 value descriptors 的基础服务地址,配置项 ``serviceServer`` 的值与 ``/api/v1/valuedescriptor`` 拼接后,用于获取 EdgeX 服务器上定义的所有 value descriptors。
+
+## 重载缺省设置
+
+在某些情况下,你可能想消费来自于多个主题的数据。Kuiper 支持指定别的配置,并且在创建流定义的时候使用 ``CONF_KEY`` 来指定新的配置。
+
+```yaml
+#Override the global configurations
+demo1: #Conf_key
+  protocol: tcp
+  server: 10.211.55.6
+  port: 5571
+  topic: events
+```
+
+如果你有个特定的源需要覆盖缺省的设置,你可以定义一个自定义的配置段。在上面的例子中,我们创建了一个新的配置 ``demo1``,然后你在创建流定义的时候可以使用选项 ``CONF_KEY`` 来使用新的配置 (参考 [流定义规范](../../sqls/streams.md) 获取更多详细信息)。
+
+**例子**
+
+```
+create stream demo1() WITH (FORMAT="JSON", type="edgex", CONF_KEY="demo1");
+```
+
+在自定义的配置中,能够使用的配置项与 ``default`` 部分的是一样的,任何在自定义段中设置的值将覆盖 ``default`` 部分里的配置。
+

+ 15 - 3
xstream/nodes/sink_node.go

@@ -102,6 +102,15 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 				cacheSaveInterval = t
 			}
 		}
+		omitIfEmpty := true
+		if c, ok := m.options["omitIfEmpty"]; ok {
+			if t, ok := c.(bool); !ok {
+				logger.Warnf("invalid type for omitIfEmpty property, should be a bool value 'true/false'.", c)
+			} else {
+				omitIfEmpty = t
+			}
+		}
+
 		m.reset()
 		logger.Infof("open sink node %d instances", m.concurrency)
 		for i := 0; i < m.concurrency; i++ { // workers
@@ -140,9 +149,9 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 					case data := <-cache.Out:
 						stats.SetBufferLength(int64(cache.Length()))
 						if runAsync {
-							go doCollect(sink, data, stats, retryInterval, cache.Complete, ctx)
+							go doCollect(sink, data, stats, retryInterval, omitIfEmpty, cache.Complete, ctx)
 						} else {
-							doCollect(sink, data, stats, retryInterval, cache.Complete, ctx)
+							doCollect(sink, data, stats, retryInterval, omitIfEmpty,cache.Complete, ctx)
 						}
 					case <-ctx.Done():
 						logger.Infof("sink node %s instance %d done", m.name, instance)
@@ -164,7 +173,7 @@ func (m *SinkNode) reset() {
 	m.statManagers = nil
 }
 
-func doCollect(sink api.Sink, item *CacheTuple, stats StatManager, retryInterval int, signalCh chan<- int, ctx api.StreamContext) {
+func doCollect(sink api.Sink, item *CacheTuple, stats StatManager, retryInterval int, omitIfEmpty bool, signalCh chan<- int, ctx api.StreamContext) {
 	stats.IncTotalRecordsIn()
 	stats.ProcessTimeStart()
 	logger := ctx.GetLogger()
@@ -178,6 +187,9 @@ func doCollect(sink api.Sink, item *CacheTuple, stats StatManager, retryInterval
 		outdata = []byte(fmt.Sprintf(`[{"error":"result is not a string but found %#v"}]`, val))
 	}
 	for {
+		if omitIfEmpty && string(outdata) == "[{}]" {
+			break
+		}
 		if err := sink.Collect(ctx, outdata); err != nil {
 			stats.IncTotalExceptions()
 			logger.Warnf("sink node %s instance %d publish %s error: %v", ctx.GetOpId(), ctx.GetInstanceId(), outdata, err)