Browse Source

Merge pull request #238 from emqx/develop

0.3.2 version
jinfahua 5 years atrás
parent
commit
770ba3f65e

+ 4 - 4
.github/workflows/build_packages.yaml

@@ -27,9 +27,9 @@ jobs:
           run: |
             mkdir -p $HOME/.docker
             echo '{ "experimental": "enabled" }' | tee $HOME/.docker/config.json
-            echo '{ "experimental": true, "storage-driver": "overlay2", "max-concurrent-downloads": 50, "max-concurrent-uploads": 50 }' | sudo tee /etc/docker/daemon.json
+            echo '{ "experimental": true, "storage-driver": "overlay2", "max-concurrent-downloads": 50, "max-concurrent-uploads": 50, "graph": "/mnt/docker" }' | sudo tee /etc/docker/daemon.json
             sudo systemctl restart docker
-            docker version
+            docker info
             docker buildx create --use --name mybuild
         - name: build
           run: |
@@ -47,7 +47,7 @@ jobs:
         - uses: actions/checkout@v2
         - uses: actions/setup-go@v1
           with:
-            go-version: '1.11.5'
+            go-version: '1.13.10'
         - name: prepare
           run: |
               /usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"
@@ -80,7 +80,7 @@ jobs:
           run: |
             mkdir -p $HOME/.docker
             echo '{ "experimental": "enabled" }' | tee $HOME/.docker/config.json
-            echo '{ "experimental": true, "storage-driver": "overlay2", "max-concurrent-downloads": 50, "max-concurrent-uploads": 50 }' | sudo tee /etc/docker/daemon.json
+            echo '{ "experimental": true, "storage-driver": "overlay2", "max-concurrent-downloads": 50, "max-concurrent-uploads": 50, "graph": "/mnt/docker" }' | sudo tee /etc/docker/daemon.json
             sudo systemctl restart docker
             docker version
             docker buildx create --use --name mybuild

+ 2 - 2
.github/workflows/run_fvt_tests.yaml

@@ -15,7 +15,7 @@ jobs:
         steps:
         - uses: actions/setup-go@v1
           with:
-            go-version: '1.11.5'
+            go-version: '1.13.10'
         - uses: actions/setup-java@v1
           with:
             java-version: '8' # The JDK version to make available on the path.
@@ -77,7 +77,7 @@ jobs:
         steps:
         - uses: actions/setup-go@v1
           with:
-            go-version: '1.11.5'
+            go-version: '1.13.10'
         - uses: actions/setup-java@v1
           with:
             java-version: '8' # The JDK version to make available on the path.

+ 1 - 1
Dockerfile

@@ -1,4 +1,4 @@
-FROM golang:1.13.4 AS builder
+FROM golang:1.13.10 AS builder
 
 COPY . /go/kuiper
 

+ 1 - 1
README-CN.md

@@ -121,7 +121,7 @@ Kuiper 可以运行在各类物联网的边缘使用场景中,比如工业物
 
 #### 准备
 
-+ Go version >= 1.11
++ Go version >= 1.13
 
 #### 编译
 

+ 1 - 1
README.md

@@ -121,7 +121,7 @@ It can be run at various IoT edge use scenarios, such as real-time processing of
 
 #### Preparation
 
-- Go version >= 1.11
+- Go version >= 1.13
 
 #### Compile
 

+ 2 - 2
deploy/chart/kuiper/Chart.yaml

@@ -14,8 +14,8 @@ type: application
 
 # This is the chart version. This version number should be incremented each time you make changes
 # to the chart and its templates, including the app version.
-version: 0.3.1
+version: 0.3.2
 
 # This is the version number of the application being deployed. This version number should be
 # incremented each time you make changes to the application.
-appVersion: 0.3.1
+appVersion: 0.3.2

+ 1 - 1
deploy/docker/Dockerfile

@@ -1,4 +1,4 @@
-FROM golang:1.13.4-alpine AS builder
+FROM golang:1.13.10-alpine AS builder
 
 COPY . /go/kuiper
 

+ 1 - 1
deploy/docker/Dockerfile-dev

@@ -1,4 +1,4 @@
-FROM golang:1.13.4-alpine AS builder
+FROM golang:1.13.10-alpine AS builder
 
 COPY . /go/kuiper
 

+ 12 - 5
docs/en_US/cli/rules.md

@@ -51,7 +51,7 @@ Below is the contents of ``rule.txt``.
 
 ## show rules
 
-The command is used for displaying all of rules defined in the server.
+The command is used for displaying all of rules defined in the server with a brief status.
 
 ```shell
 show rules
@@ -61,8 +61,16 @@ Sample:
 
 ```shell
 # bin/cli show rules
-rule1
-rule2
+[
+  {
+    "id": "rule1",
+    "status": "Running"
+  },
+  {
+     "id": "rule2",
+     "status": "Stopped: canceled by error."
+  }
+]
 ```
 
 ## describe a rule
@@ -156,7 +164,7 @@ Rule rule1 was restarted.
 ## get the status of a rule
 
 The command is used to get the status of the rule. If the rule is running, the metrics will be retrieved realtime. The status can be
-- running with metrics: $metrics
+- $metrics
 - stopped: $reason
 
 ```shell
@@ -167,7 +175,6 @@ Sample:
 
 ```shell
 # bin/cli getstatus rule rule1
-running with metrics:
 {
     "source_demo_0_records_in_total":5,
     "source_demo_0_records_out_total":5,

BIN
docs/en_US/edgex/arch_dark.png


BIN
docs/en_US/edgex/arch_light.png


+ 31 - 30
docs/en_US/edgex/edgex_rule_engine_tutorial.md

@@ -44,11 +44,9 @@ In out tutorial, we will use [Random Integer Device Service](https://github.com/
 
 ### Run EdgeX Docker instances
 
-After the EdgeX Geneva is offcially released, you can just follow steps in [this doc](https://fuji-docs.edgexfoundry.org/Ch-QuickStart.html) to start the service. But now since Kuiper has not been official released yet, you have to download Docker composer file from [here](https://github.com/edgexfoundry/developer-scripts/blob/master/releases/nightly-build/compose-files/docker-compose-nexus-mongo-no-secty.yml), and then bring up EdgeX Docker instances. 
+Go to [EdgeX develop-scripts project](https://github.com/edgexfoundry/developer-scripts/tree/master/releases), and download related Docker compose file for Geneva release,  then bring up EdgeX Docker instances. 
 
 ```shell
-$ wget https://github.com/edgexfoundry/developer-scripts/raw/master/releases/nightly-build/compose-files/docker-compose-nexus-mongo-no-secty.yml
-
 $ docker-compose -f ./docker-compose-nexus-redis-no-secty.yml up -d --build
 ```
 
@@ -130,13 +128,16 @@ For more detailed information of configuration file, please refer to [this doc](
 
 Let's create a rule that send result data to an MQTT broker, for detailed information of MQTT sink, please refer to [this link](../rules/sinks/mqtt.md).  Similar to create a stream, you can also choose REST or CLI to manage rules. 
 
-So the below rule will get all of values from ``event`` topic. The sink result will be published to topic ``result`` of public MQTT broker ``broker.emqx.io``. 
+So the below rule will get all of values from ``event`` topic. The sink result will 
+
+- Published to topic ``result`` of public MQTT broker ``broker.emqx.io``. 
+- Print to log file.
 
 #### Option 1: Use Rest API
 
 ```shell
 curl -X POST \
-  http://$kuiper_docker:48075/rules \
+  http://$kuiper_server:48075/rules \
   -H 'Content-Type: application/json' \
   -d '{
   "id": "rule1",
@@ -148,16 +149,19 @@ curl -X POST \
         "topic": "result",
         "clientId": "demo_001"
       }
+    },
+    {
+      "log":{}
     }
   ]
-}'
+}
 ```
 
 #### Option 2: Use Kuiper CLI
 
 You can create a rule file with any text editor, and copy following contents into it. Let's say the file name is ``rule.txt``.  
 
-```
+```json
 {
   "sql": "SELECT * from demo",
   "actions": [
@@ -167,6 +171,9 @@ You can create a rule file with any text editor, and copy following contents int
         "topic": "result",
         "clientId": "demo_001"
       }
+    },
+    {
+      "log":{}
     }
   ]
 }
@@ -185,26 +192,20 @@ Rule rule1 was created, please use 'cli getstatus rule $rule_name' command to ge
 
 If you want to send analysis result to another sink, please refer to [other sinks](../rules/overview.md#actions) that supported in Kuiper.
 
-Now you can also take a look at the log file under ``log/stream.log``, see detailed info of rule. 
+Now you can also take a look at the log file under ``log/stream.log``, or through command ``docker logs edgex-kuiper `` to see detailed info of rule. 
 
 ```
-time="2020-04-07T03:33:28Z" level=info msg="db location is /kuiper/data/"
-time="2020-04-07T03:33:28Z" level=info msg="Starting rules"
-time="2020-04-07T03:33:28Z" level=info msg="Serving kuiper (version - 0.2.1) on port 20498, and restful api on port 48075. \n"
-time="2020-04-07T03:35:35Z" level=info msg="Rule rule1 is created."
-time="2020-04-07T03:35:35Z" level=info msg="Init rule with options {isEventTime: false, lateTolerance: 0, concurrency: 1, bufferLength: 1024"
-time="2020-04-07T03:35:35Z" level=info msg="Opening stream" rule=rule1
-time="2020-04-07T03:35:35Z" level=info msg="open source node demo with option map[FORMAT:JSON TYPE:edgex]" rule=rule1
-time="2020-04-07T03:35:35Z" level=info msg="open sink node 1 instances" rule=rule1
-time="2020-04-07T03:35:35Z" level=info msg="open source node 1 instances" rule=rule1
-time="2020-04-07T03:35:35Z" level=info msg="Opening mqtt sink for rule rule1." rule=rule1
-time="2020-04-07T03:35:35Z" level=info msg="Connect to value descriptor service at: http://edgex-core-data:48080/api/v1/valuedescriptor \n"
-time="2020-04-07T03:35:35Z" level=info msg="Use configuration for edgex messagebus {{ 0 } {edgex-core-data 5563 tcp} zero map[]}\n"
-time="2020-04-07T03:35:35Z" level=info msg="Start source demo instance 0 successfully" rule=rule1
-time="2020-04-07T03:35:35Z" level=info msg="The connection to edgex messagebus is established successfully." rule=rule1
-time="2020-04-07T03:35:35Z" level=info msg="Connect MQTT broker with username and password." rule=rule1
-time="2020-04-07T03:35:35Z" level=info msg="Successfully subscribed to edgex messagebus topic events." rule=rule1
-time="2020-04-07T03:35:35Z" level=info msg="The connection to server tcp://broker.emqx.io:1883 was established successfully" rule=rule1
+time="2020-04-17T06:32:24Z" level=info msg="Serving kuiper (version - 0.3.1-4-g9e63fe1) on port 20498, and restful api on port 9081. \n" file="server.go:101"
+time="2020-04-17T06:32:24Z" level=info msg="The connection to edgex messagebus is established successfully." file="edgex_source.go:95" rule=rule1
+time="2020-04-17T06:32:24Z" level=info msg="Successfully subscribed to edgex messagebus topic events." file="edgex_source.go:104" rule=rule1
+time="2020-04-17T06:32:24Z" level=info msg="The connection to server tcp://broker.emqx.io:1883 was established successfully" file="mqtt_sink.go:161" rule=rule1
+time="2020-04-17T06:32:25Z" level=info msg="Get 24 of value descriptors from service." file="edgex_source.go:223"
+time="2020-04-17T06:32:25Z" level=info msg="sink result for rule rule1: [{\"int32\":-697766590}]" file="log_sink.go:16" rule=rule1
+time="2020-04-17T06:32:25Z" level=info msg="sink result for rule rule1: [{\"int8\":-47}]" file="log_sink.go:16" rule=rule1
+time="2020-04-17T06:32:25Z" level=info msg="sink result for rule rule1: [{\"int16\":-318}]" file="log_sink.go:16" rule=rule1
+time="2020-04-17T06:32:25Z" level=info msg="sink result for rule rule1: [{\"int64\":-8680421421398846880}]" file="log_sink.go:16" rule=rule1
+time="2020-04-17T06:32:31Z" level=info msg="sink result for rule rule1: [{\"bool\":true}]" file="log_sink.go:16" rule=rule1
+...
 ```
 
 ### Monitor analysis result
@@ -240,31 +241,31 @@ Connecting to 127.0.0.1:20498...
   "source_demo_0_exceptions_total": 0,
   "source_demo_0_process_latency_ms": 0,
   "source_demo_0_buffer_length": 0,
-  "source_demo_0_last_invocation": "2020-03-19T10:30:09.294337",
+  "source_demo_0_last_invocation": "2020-04-17T10:30:09.294337",
   "op_preprocessor_demo_0_records_in_total": 29,
   "op_preprocessor_demo_0_records_out_total": 29,
   "op_preprocessor_demo_0_exceptions_total": 0,
   "op_preprocessor_demo_0_process_latency_ms": 0,
   "op_preprocessor_demo_0_buffer_length": 0,
-  "op_preprocessor_demo_0_last_invocation": "2020-03-19T10:30:09.294355",
+  "op_preprocessor_demo_0_last_invocation": "2020-04-17T10:30:09.294355",
   "op_filter_0_records_in_total": 29,
   "op_filter_0_records_out_total": 21,
   "op_filter_0_exceptions_total": 0,
   "op_filter_0_process_latency_ms": 0,
   "op_filter_0_buffer_length": 0,
-  "op_filter_0_last_invocation": "2020-03-19T10:30:09.294362",
+  "op_filter_0_last_invocation": "2020-04-17T10:30:09.294362",
   "op_project_0_records_in_total": 21,
   "op_project_0_records_out_total": 21,
   "op_project_0_exceptions_total": 0,
   "op_project_0_process_latency_ms": 0,
   "op_project_0_buffer_length": 0,
-  "op_project_0_last_invocation": "2020-03-19T10:30:09.294382",
+  "op_project_0_last_invocation": "2020-04-17T10:30:09.294382",
   "sink_sink_mqtt_0_records_in_total": 21,
   "sink_sink_mqtt_0_records_out_total": 21,
   "sink_sink_mqtt_0_exceptions_total": 0,
   "sink_sink_mqtt_0_process_latency_ms": 0,
   "sink_sink_mqtt_0_buffer_length": 1,
-  "sink_sink_mqtt_0_last_invocation": "2020-03-19T10:30:09.294423"
+  "sink_sink_mqtt_0_last_invocation": "2020-04-17T10:30:09.294423"
 }
 ```
 

+ 12 - 4
docs/en_US/restapi/rules.md

@@ -23,7 +23,7 @@ Request Sample
 
 ## show rules
 
-The API is used for displaying all of rules defined in the server.
+The API is used for displaying all of rules defined in the server with a brief status.
 
 ```shell
 GET http://localhost:9081/rules
@@ -32,7 +32,16 @@ GET http://localhost:9081/rules
 Response Sample:
 
 ```json
-["rule1","rule2"]
+[
+  {
+    "id": "rule1",
+    "status": "Running"
+  },
+  {
+     "id": "rule2",
+     "status": "Stopped: canceled by error."
+  }
+]
 ```
 
 ## describe a rule
@@ -101,7 +110,7 @@ POST http://localhost:8080/rules/{id}/restart
 ## get the status of a rule
 
 The command is used to get the status of the rule. If the rule is running, the metrics will be retrieved realtime. The status can be
-- running with metrics: $metrics
+- $metrics
 - stopped: $reason
 
 ```shell
@@ -111,7 +120,6 @@ GET http://localhost:8080/rules/{id}/status
 Response Sample:
 
 ```shell
-running with metrics:
 {
     "source_demo_0_records_in_total":5,
     "source_demo_0_records_out_total":5,

+ 11 - 4
docs/zh_CN/cli/rules.md

@@ -51,7 +51,7 @@ create rule $rule_name $rule_json | create rule $rule_name -f $rule_def_file
 
 ## 展示规则
 
-该命令用于显示服务器中定义的所有规则。
+该命令用于显示服务器中定义的所有规则,包括规则id和当前状态
 
 ```shell
 show rules
@@ -61,8 +61,16 @@ show rules
 
 ```shell
 # bin/cli show rules
-rule1
-rule2
+[
+  {
+    "id": "rule1",
+    "status": "Running"
+  },
+  {
+     "id": "rule2",
+     "status": "Stopped: canceled by error."
+  }
+]
 ```
 
 ## 描述规则
@@ -167,7 +175,6 @@ getstatus rule $rule_name
 
 ```shell
 # bin/cli getstatus rule rule1
-running with metrics:
 {
     "source_demo_0_records_in_total":5,
     "source_demo_0_records_out_total":5,

+ 30 - 31
docs/zh_CN/edgex/edgex_rule_engine_tutorial.md

@@ -40,11 +40,9 @@
 
 ## 运行 EdgeX Docker 实例
 
-在 EdgeX Geneva 版本正式发布后,你可以按照[这个文档](https://fuji-docs.edgexfoundry.org/Ch-QuickStart.html)来启动所有的服务。但是因为目前 EdgeX Geneva 还未正式发布,所以如果现在想试用 Kuiper 的话,不得不手工从[这里](https://github.com/edgexfoundry/developer-scripts/blob/master/releases/nightly-build/compose-files/docker-compose-nexus-mongo-no-secty.yml)下载 Docker Composer 文件,然后启动所有的 EdgeX 容器。
+打开 [EdgeX develop-scripts 项目](https://github.com/edgexfoundry/developer-scripts/tree/master/releases),并且下载 Geneva 版本的 Docker compose file,然后启动所有的 EdgeX 容器。
 
 ```shell
-# wget https://github.com/edgexfoundry/developer-scripts/raw/master/releases/nightly-build/compose-files/docker-compose-nexus-mongo-no-secty.yml
-
 # docker-compose -f ./docker-compose-nexus-redis-no-secty.yml up -d --build
 ```
 
@@ -126,17 +124,20 @@ default:
 
 让我们创建一条规则,将分析结果发送至 MQTT 服务器,关于 MQTT 目标的相关配置,请参考[这个链接](../rules/sinks/mqtt.md)。与创建流的过程类似,你可以选择使用 REST 或者命令行来管理规则。
 
-以下例子将选出所有 ``events`` 主题上所有的数据,分析结果将被发布到公共的 MQTT 服务器 ``broker.emqx.io`` 的主题``result`` 上。 
+以下例子将选出所有 ``events`` 主题上所有的数据,分析结果将被
+
+- 发布到公共的 MQTT 服务器 ``broker.emqx.io`` 的主题``result`` 上;
+- 打印至日志文件
 
 #### 选项1: 使用 Rest API
 
 ```shell
 curl -X POST \
-  http://$kuiper_server:48075/rules \
+  http://$kuiper_server:9081/rules \
   -H 'Content-Type: application/json' \
   -d '{
   "id": "rule1",
-  "sql": "SELECT * FROM demo WHERE randomnumber > 30",
+  "sql": "SELECT * FROM demo",
   "actions": [
     {
       "mqtt": {
@@ -144,16 +145,19 @@ curl -X POST \
         "topic": "result",
         "clientId": "demo_001"
       }
+    },
+    {
+      "log":{}
     }
   ]
-}'
+}
 ```
 
 #### 选项2: 使用 Kuiper 命令行
 
 你可以使用任意编辑器来创建一条规则,将下列内容拷贝到编辑器中,并命名为 ``rule.txt``。
 
-```
+```json
 {
   "sql": "SELECT * from demo",
   "actions": [
@@ -163,6 +167,9 @@ curl -X POST \
         "topic": "result",
         "clientId": "demo_001"
       }
+    },
+    {
+      "log":{}
     }
   ]
 }
@@ -182,23 +189,16 @@ Rule rule1 was created, please use 'cli getstatus rule $rule_name' command to ge
 如想将结果发送到别的目标,请参考 Kuiper 中支持的[其它目标](../rules/overview.md#actions)。你现在可以看一下在 ``log/stream.log``中的日志文件,查看规则的详细信息。
 
 ```
-time="2020-04-07T03:33:28Z" level=info msg="db location is /kuiper/data/"
-time="2020-04-07T03:33:28Z" level=info msg="Starting rules"
-time="2020-04-07T03:33:28Z" level=info msg="Serving kuiper (version - 0.2.1) on port 20498, and restful api on port 48075. \n"
-time="2020-04-07T03:35:35Z" level=info msg="Rule rule1 is created."
-time="2020-04-07T03:35:35Z" level=info msg="Init rule with options {isEventTime: false, lateTolerance: 0, concurrency: 1, bufferLength: 1024"
-time="2020-04-07T03:35:35Z" level=info msg="Opening stream" rule=rule1
-time="2020-04-07T03:35:35Z" level=info msg="open source node demo with option map[FORMAT:JSON TYPE:edgex]" rule=rule1
-time="2020-04-07T03:35:35Z" level=info msg="open sink node 1 instances" rule=rule1
-time="2020-04-07T03:35:35Z" level=info msg="open source node 1 instances" rule=rule1
-time="2020-04-07T03:35:35Z" level=info msg="Opening mqtt sink for rule rule1." rule=rule1
-time="2020-04-07T03:35:35Z" level=info msg="Connect to value descriptor service at: http://edgex-core-data:48080/api/v1/valuedescriptor \n"
-time="2020-04-07T03:35:35Z" level=info msg="Use configuration for edgex messagebus {{ 0 } {edgex-core-data 5563 tcp} zero map[]}\n"
-time="2020-04-07T03:35:35Z" level=info msg="Start source demo instance 0 successfully" rule=rule1
-time="2020-04-07T03:35:35Z" level=info msg="The connection to edgex messagebus is established successfully." rule=rule1
-time="2020-04-07T03:35:35Z" level=info msg="Connect MQTT broker with username and password." rule=rule1
-time="2020-04-07T03:35:35Z" level=info msg="Successfully subscribed to edgex messagebus topic events." rule=rule1
-time="2020-04-07T03:35:35Z" level=info msg="The connection to server tcp://broker.emqx.io:1883 was established successfully" rule=rule1
+time="2020-04-17T06:32:24Z" level=info msg="Serving kuiper (version - 0.3.1-4-g9e63fe1) on port 20498, and restful api on port 9081. \n" file="server.go:101"
+time="2020-04-17T06:32:24Z" level=info msg="The connection to edgex messagebus is established successfully." file="edgex_source.go:95" rule=rule1
+time="2020-04-17T06:32:24Z" level=info msg="Successfully subscribed to edgex messagebus topic events." file="edgex_source.go:104" rule=rule1
+time="2020-04-17T06:32:24Z" level=info msg="The connection to server tcp://broker.emqx.io:1883 was established successfully" file="mqtt_sink.go:161" rule=rule1
+time="2020-04-17T06:32:25Z" level=info msg="Get 24 of value descriptors from service." file="edgex_source.go:223"
+time="2020-04-17T06:32:25Z" level=info msg="sink result for rule rule1: [{\"int32\":-697766590}]" file="log_sink.go:16" rule=rule1
+time="2020-04-17T06:32:25Z" level=info msg="sink result for rule rule1: [{\"int8\":-47}]" file="log_sink.go:16" rule=rule1
+time="2020-04-17T06:32:25Z" level=info msg="sink result for rule rule1: [{\"int16\":-318}]" file="log_sink.go:16" rule=rule1
+time="2020-04-17T06:32:25Z" level=info msg="sink result for rule rule1: [{\"int64\":-8680421421398846880}]" file="log_sink.go:16" rule=rule1
+time="2020-04-17T06:32:31Z" level=info msg="sink result for rule rule1: [{\"bool\":true}]" file="log_sink.go:16" rule=rule1
 ```
 
 ### 监控分析结果
@@ -235,32 +235,31 @@ Connecting to 127.0.0.1:20498...
   "source_demo_0_exceptions_total": 0,
   "source_demo_0_process_latency_ms": 0,
   "source_demo_0_buffer_length": 0,
-  "source_demo_0_last_invocation": "2020-03-19T10:30:09.294337",
+  "source_demo_0_last_invocation": "2020-04-17T10:30:09.294337",
   "op_preprocessor_demo_0_records_in_total": 29,
   "op_preprocessor_demo_0_records_out_total": 29,
   "op_preprocessor_demo_0_exceptions_total": 0,
   "op_preprocessor_demo_0_process_latency_ms": 0,
   "op_preprocessor_demo_0_buffer_length": 0,
-  "op_preprocessor_demo_0_last_invocation": "2020-03-19T10:30:09.294355",
+  "op_preprocessor_demo_0_last_invocation": "2020-04-17T10:30:09.294355",
   "op_filter_0_records_in_total": 29,
   "op_filter_0_records_out_total": 21,
   "op_filter_0_exceptions_total": 0,
   "op_filter_0_process_latency_ms": 0,
   "op_filter_0_buffer_length": 0,
-  "op_filter_0_last_invocation": "2020-03-19T10:30:09.294362",
+  "op_filter_0_last_invocation": "2020-04-17T10:30:09.294362",
   "op_project_0_records_in_total": 21,
   "op_project_0_records_out_total": 21,
   "op_project_0_exceptions_total": 0,
   "op_project_0_process_latency_ms": 0,
   "op_project_0_buffer_length": 0,
-  "op_project_0_last_invocation": "2020-03-19T10:30:09.294382",
+  "op_project_0_last_invocation": "2020-04-17T10:30:09.294382",
   "sink_sink_mqtt_0_records_in_total": 21,
   "sink_sink_mqtt_0_records_out_total": 21,
   "sink_sink_mqtt_0_exceptions_total": 0,
   "sink_sink_mqtt_0_process_latency_ms": 0,
   "sink_sink_mqtt_0_buffer_length": 1,
-  "sink_sink_mqtt_0_last_invocation": "2020-03-19T10:30:09.294423"
-}
+  "sink_sink_mqtt_0_last_invocation": "2020-04-17T10:30:09.294423"
 ```
 
 ### 总结

+ 2 - 1
fvt_scripts/edgex/pub.go

@@ -48,8 +48,9 @@ func pubEventClientZeroMq() {
 
 				r4 := models.Reading{Name:"i1", Value:fmt.Sprintf("%d", i)}
 				r5 := models.Reading{Name:"f1", Value:fmt.Sprintf("%.2f", float64(i)/2.0)}
+				r6 := models.Reading{Name:"ui64", Value:"10796529505058023104"}
 
-				testEvent.Readings = append(testEvent.Readings, r1, r2, r3, r4, r5)
+				testEvent.Readings = append(testEvent.Readings, r1, r2, r3, r4, r5, r6)
 
 				data, err := client.MarshalEvent(testEvent)
 				if err != nil {

+ 6 - 1
fvt_scripts/edgex/valuedesc/vd_server.go

@@ -16,6 +16,7 @@ const (
 	desc4 = "Int descriptor"
 	desc5 = "Float descriptor"
 	desc6 = "String descriptor"
+	desc7 = "UInt64 descriptor"
 )
 
 var vd1 = models.ValueDescriptor{Id: "Temperature", Created: 123, Modified: 123, Origin: 123, Name: "Temperature",
@@ -30,6 +31,7 @@ var vd3 = models.ValueDescriptor{Id: "b1", Name: "b1", Formatting: "%t", Type:"B
 var vd4 = models.ValueDescriptor{Id: "i1", Name: "i1", Formatting: "%d", Type:"UINT8", MediaType: clients.ContentTypeJSON}
 var vd5 = models.ValueDescriptor{Id: "f1", Name: "f1", Formatting: "%f", Type:"FLOAT64", MediaType: clients.ContentTypeJSON}
 var vd6 = models.ValueDescriptor{Id: "s1", Name: "s1", Formatting: "%s", Type:"String", MediaType: clients.ContentTypeJSON}
+var vd7 = models.ValueDescriptor{Id: "ui64", Name: "ui64", Formatting: "%d", Type:"UINT64", MediaType: clients.ContentTypeJSON}
 
 func main() {
 	http.HandleFunc(clients.ApiValueDescriptorRoute, Hello)
@@ -57,7 +59,10 @@ func Hello(w http.ResponseWriter, req *http.Request) {
 	descriptor6 := vd6
 	descriptor6.Description = desc6
 
-	descriptors := []models.ValueDescriptor{descriptor1, descriptor2, descriptor3, descriptor4, descriptor5, descriptor6}
+	descriptor7 := vd7
+	descriptor7.Description = desc7
+
+	descriptors := []models.ValueDescriptor{descriptor1, descriptor2, descriptor3, descriptor4, descriptor5, descriptor6, descriptor7}
 
 	data, err := json.Marshal(descriptors)
 	if err != nil {

+ 1 - 1
fvt_scripts/edgex_sink_rule.jmx

@@ -438,7 +438,7 @@
           <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="temperature Assertion" enabled="true">
             <stringProp name="JSON_PATH">$.readings[1].value</stringProp>
             <stringProp name="EXPECTED_VALUE">72</stringProp>
-            <boolProp name="JSONVALIDATION">true</boolProp>
+            <boolProp name="JSONVALIDATION">false</boolProp>
             <boolProp name="EXPECT_NULL">false</boolProp>
             <boolProp name="INVERT">false</boolProp>
             <boolProp name="ISREGEX">false</boolProp>

+ 1 - 1
fvt_scripts/rule_test.jmx

@@ -174,7 +174,7 @@
           </HTTPSamplerProxy>
           <hashTree>
             <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="JSON Assertion" enabled="true">
-              <stringProp name="JSON_PATH">$[0]</stringProp>
+              <stringProp name="JSON_PATH">$[0].id</stringProp>
               <stringProp name="EXPECTED_VALUE">rule1</stringProp>
               <boolProp name="JSONVALIDATION">false</boolProp>
               <boolProp name="EXPECT_NULL">false</boolProp>

+ 9 - 0
fvt_scripts/select_edgex_condition_rule.jmx

@@ -459,6 +459,15 @@
             <boolProp name="ISREGEX">false</boolProp>
           </JSONPathAssertion>
           <hashTree/>
+          <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="uint64 Assertion" enabled="true">
+            <stringProp name="JSON_PATH">$[0].ui64</stringProp>
+            <stringProp name="EXPECTED_VALUE">10796529505058023104</stringProp>
+            <boolProp name="JSONVALIDATION">true</boolProp>
+            <boolProp name="EXPECT_NULL">false</boolProp>
+            <boolProp name="INVERT">false</boolProp>
+            <boolProp name="ISREGEX">false</boolProp>
+          </JSONPathAssertion>
+          <hashTree/>
           <BeanShellAssertion guiclass="BeanShellAssertionGui" testclass="BeanShellAssertion" testname="temperature value assertion" enabled="true">
             <stringProp name="BeanShellAssertion.query">import net.sf.json.JSONArray;
 import net.sf.json.JSONObject;

+ 2 - 1
go.mod

@@ -4,7 +4,7 @@ require (
 	github.com/benbjohnson/clock v1.0.0
 	github.com/buger/jsonparser v0.0.0-20191004114745-ee4c978eae7e
 	github.com/eclipse/paho.mqtt.golang v1.2.0
-	github.com/edgexfoundry/go-mod-core-contracts v0.1.53
+	github.com/edgexfoundry/go-mod-core-contracts v0.1.57
 	github.com/edgexfoundry/go-mod-messaging v0.1.18
 	github.com/go-yaml/yaml v2.1.0+incompatible
 	github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3
@@ -15,6 +15,7 @@ require (
 	github.com/prometheus/client_golang v1.2.1
 	github.com/prometheus/common v0.7.0
 	github.com/sirupsen/logrus v1.4.2
+	github.com/ugorji/go v1.1.4 // indirect
 	github.com/urfave/cli v1.22.0
 )
 

+ 10 - 1
xsql/plans/preprocessor.go

@@ -6,6 +6,7 @@ import (
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xsql"
 	"github.com/emqx/kuiper/xstream/api"
+	"math"
 	"reflect"
 	"strconv"
 	"strings"
@@ -112,13 +113,21 @@ func (p *Preprocessor) addRecField(ft xsql.FieldType, r map[string]interface{},
 				if jtype == reflect.Int {
 					r[n] = t.(int)
 				} else if jtype == reflect.Float64 {
-					r[n] = int(t.(float64))
+					if tt, ok1 := t.(float64); ok1 {
+						if tt > math.MaxInt64 {
+							r[n] = uint64(tt)
+						} else {
+							r[n] = int(tt)
+						}
+					}
 				} else if jtype == reflect.String {
 					if i, err := strconv.Atoi(t.(string)); err != nil {
 						return fmt.Errorf("invalid data type for %s, expect bigint but found %[2]T(%[2]v)", n, t)
 					} else {
 						r[n] = i
 					}
+				} else if jtype == reflect.Uint64 {
+					r[n] = t.(uint64)
 				} else {
 					return fmt.Errorf("invalid data type for %s, expect bigint but found %[2]T(%[2]v)", n, t)
 				}

+ 0 - 15
xsql/processors/xsql_processor.go

@@ -317,21 +317,6 @@ func (p *RuleProcessor) ExecDesc(name string) (string, error) {
 	return fmt.Sprintln(dst.String()), nil
 }
 
-func (p *RuleProcessor) ExecShow() (string, error) {
-	keys, err := p.GetAllRules()
-	if err != nil {
-		return "", err
-	}
-	if len(keys) == 0 {
-		keys = append(keys, "No rule definitions are found.")
-	}
-	var result string
-	for _, c := range keys {
-		result = result + fmt.Sprintln(c)
-	}
-	return result, nil
-}
-
 func (p *RuleProcessor) GetAllRules() ([]string, error) {
 	err := p.db.Open()
 	if err != nil {

+ 86 - 7
xstream/extensions/edgex_source.go

@@ -3,7 +3,10 @@
 package extensions
 
 import (
+	"bytes"
 	"context"
+	"encoding/base64"
+	"encoding/binary"
 	"fmt"
 	"github.com/edgexfoundry/go-mod-core-contracts/clients"
 	"github.com/edgexfoundry/go-mod-core-contracts/clients/coredata"
@@ -120,7 +123,7 @@ func (es *EdgexSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTup
 						result := make(map[string]interface{})
 						meta := make(map[string]interface{})
 
-						log.Debugf("receive message from device %s", e.Device)
+						log.Debugf("receive message %s from device %s", env.Payload, e.Device)
 						for _, r := range e.Readings {
 							if r.Name != "" {
 								if v, err := es.getValue(r, log); err != nil {
@@ -156,7 +159,7 @@ func (es *EdgexSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTup
 								return
 							}
 						} else {
-							log.Warnf("got an empty result, ignored")
+							log.Warnf("No readings are processed for the event, so ignore it.")
 						}
 					}
 				} else {
@@ -169,6 +172,7 @@ func (es *EdgexSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTup
 
 func (es *EdgexSource) getValue(r models.Reading, logger api.Logger) (interface{}, error) {
 	t, err := es.getType(r.Name, logger)
+	var ot = t
 	if err != nil {
 		return nil, err
 	}
@@ -182,28 +186,103 @@ func (es *EdgexSource) getValue(r models.Reading, logger api.Logger) (interface{
 		} else {
 			return r, nil
 		}
-	case "INT8", "INT16", "INT32", "INT64", "UINT8", "UINT16", "UINT32", "UINT64":
+	case "INT8", "INT16", "INT32", "INT64", "UINT8", "UINT16", "UINT32":
 		if r, err := strconv.Atoi(v); err != nil {
 			return nil, err
 		} else {
 			return r, nil
 		}
-	case "FLOAT32", "FLOAT64":
-		if r, err := strconv.ParseFloat(v, 64); err != nil {
+	case "UINT64":
+		if u64, err := strconv.ParseUint(v, 10, 64); err != nil {
 			return nil, err
 		} else {
-			return r, nil
+			return u64, nil
+		}
+	case "FLOAT32", "FLOAT64":
+		if r.ValueType == "" {
+			r.ValueType = ot
 		}
+		return es.getFloatValue(r, logger)
 	case "STRING":
 		return v, nil
 	case "BINARY":
 		return nil, fmt.Errorf("Unsupport for binary type, the value will be ignored.")
 	default:
-		logger.Warnf("unknown type %s return the string value", t)
+		logger.Warnf("Not supported type %s, and processed as string value", t)
 		return v, nil
 	}
 }
 
+func (es *EdgexSource) getFloatValue(r models.Reading, logger api.Logger) (interface{}, error) {
+	if len(r.FloatEncoding) == 0 {
+		if strings.Contains(r.Value, "==") {
+			r.FloatEncoding = models.Base64Encoding
+		} else {
+			r.FloatEncoding = models.ENotation
+		}
+	}
+	switch r.ValueType {
+	case models.ValueTypeFloat32:
+		var value float64
+
+		switch r.FloatEncoding {
+		case models.Base64Encoding:
+			data, err := base64.StdEncoding.DecodeString(r.Value)
+			if err != nil {
+				return false, fmt.Errorf("unable to Base 64 decode float32 value ('%s'): %s", r.Value, err.Error())
+			}
+			var value1 float32
+			err = binary.Read(bytes.NewReader(data), binary.BigEndian, &value1)
+			if err != nil {
+				return false, fmt.Errorf("unable to decode float32 value bytes: %s", err.Error())
+			}
+			value = float64(value1)
+		case models.ENotation:
+			var err error
+			var temp float64
+			temp, err = strconv.ParseFloat(r.Value, 64)
+			if err != nil {
+				return false, fmt.Errorf("unable to parse Float64 eNotation value: %s", err.Error())
+			}
+
+			value = float64(temp)
+
+		default:
+			return false, fmt.Errorf("unkown FloatEncoding for float32 value: %s", r.FloatEncoding)
+
+		}
+		return value, nil
+
+	case models.ValueTypeFloat64:
+		var value float64
+		switch r.FloatEncoding {
+		case models.Base64Encoding:
+			data, err := base64.StdEncoding.DecodeString(r.Value)
+			if err != nil {
+				return false, fmt.Errorf("unable to Base 64 decode float64 value ('%s'): %s", r.Value, err.Error())
+			}
+
+			err = binary.Read(bytes.NewReader(data), binary.BigEndian, &value)
+			if err != nil {
+				return false, fmt.Errorf("unable to decode float64 value bytes: %s", err.Error())
+			}
+			return value, nil
+		case models.ENotation:
+			var err error
+			value, err = strconv.ParseFloat(r.Value, 64)
+			if err != nil {
+				return false, fmt.Errorf("unable to parse Float64 eNotation value: %s", err.Error())
+			}
+			return value, nil
+		default:
+			return false, fmt.Errorf("unkown FloatEncoding for float64 value: %s", r.FloatEncoding)
+		}
+	default:
+		return nil, fmt.Errorf("unkown value type: %s", r.ValueType)
+	}
+}
+
+
 func (es *EdgexSource) fetchAllDataDescriptors() error {
 	if vdArr, err := es.vdc.ValueDescriptors(context.Background()); err != nil {
 		return err

+ 28 - 6
xstream/extensions/edgex_source_test.go

@@ -18,16 +18,16 @@ var es = EdgexSource{valueDescs: map[string]string{
 	"i5" : "UINT8",
 	"i6" : "UINT16",
 	"i7" : "UINT32",
-	"i8" : "UINT64",
-	"f1" : "FLOAT32",
-	"f2" : "FLOAT64",
 	"s1" : "String",
+	"f1" : "Float32", //FLOAT32 will be handled by special case
+	"f2" : "Float64", //FLOAT64 will be handled by special case
+	"i8" : "UINT64",  //UINT64 will be handled by special case
 	},
 }
 
 func TestGetValue_Int(t *testing.T) {
 	var testEvent = models.Event{Device: "test"}
-	for i := 1; i < 9; i++{
+	for i := 1; i < 8; i++{
 		r1 := models.Reading{Name: fmt.Sprintf("i%d", i), Value: "1"}
 		testEvent.Readings = append(testEvent.Readings, r1)
 	}
@@ -39,6 +39,28 @@ func TestGetValue_Int(t *testing.T) {
 			expectOne(t, v)
 		}
 	}
+
+	rf_01 := models.Reading{Name:"f1", Value:"fwtOaw=="}
+	if v, e := es.getValue(rf_01, common.Log); e != nil {
+		t.Errorf("%s", e)
+	} else {
+		if v1, ok := v.(float32); ok {
+			if v1 != 1.8516986e+38 {
+				t.Errorf("expected 1.8516986e+38, but it's %f.", v1)
+			}
+		}
+	}
+
+	r1 := models.Reading{Name: "i8", Value: "10796529505058023104"}
+	if v, e := es.getValue(r1, common.Log); e != nil {
+		t.Errorf("%s", e)
+	} else {
+		if v1, ok := v.(uint64); ok {
+			if v1 != 10796529505058023104 {
+				t.Errorf("expected 10796529505058023104, but it's %d.", v1)
+			}
+		}
+	}
 }
 
 func expectOne(t *testing.T, expected interface{}) {
@@ -47,7 +69,7 @@ func expectOne(t *testing.T, expected interface{}) {
 			t.Errorf("expected 1, but it's %d.", v1)
 		}
 	} else {
-		t.Errorf("expected int type, but it's %t.", expected)
+		t.Errorf("expected int type, but it's %T.", expected)
 	}
 }
 
@@ -73,7 +95,7 @@ func expectPi(t *testing.T, expected interface{}) {
 			t.Errorf("expected 3.14, but it's %f.", v1)
 		}
 	} else {
-		t.Errorf("expected float type, but it's %t.", expected)
+		t.Errorf("expected float type, but it's %T.", expected)
 	}
 }
 

+ 1 - 1
xstream/server/server/rest.go

@@ -176,7 +176,7 @@ func rulesHandler(w http.ResponseWriter, r *http.Request) {
 		w.WriteHeader(http.StatusCreated)
 		w.Write([]byte(result))
 	case http.MethodGet:
-		content, err := ruleProcessor.GetAllRules()
+		content, err := getAllRulesWithStatus()
 		if err != nil {
 			handleError(w, fmt.Errorf("Show rules error: %s", err), http.StatusBadRequest, logger)
 			return

+ 13 - 2
xstream/server/server/rpc.go

@@ -137,11 +137,22 @@ func (t *Server) DescRule(name string, reply *string) error {
 }
 
 func (t *Server) ShowRules(_ int, reply *string) error {
-	r, err := ruleProcessor.ExecShow()
+	r, err := getAllRulesWithStatus()
 	if err != nil {
 		return fmt.Errorf("Show rule error : %s.", err)
+	}
+	if len(r) == 0 {
+		*reply = "No rule definitions are found."
 	} else {
-		*reply = r
+		result, err := json.Marshal(r)
+		if err != nil {
+			return fmt.Errorf("Show rule error : %s.", err)
+		}
+		dst := &bytes.Buffer{}
+		if err := json.Indent(dst, result, "", "  "); err != nil {
+			return fmt.Errorf("Show rule error : %s.", err)
+		}
+		*reply = dst.String()
 	}
 	return nil
 }

+ 72 - 34
xstream/server/server/ruleManager.go

@@ -69,45 +69,83 @@ func doStartRule(rs *RuleState) error {
 	return nil
 }
 
-func getRuleStatus(name string) (string, error) {
+func getAllRulesWithStatus() ([]map[string]interface{}, error) {
+	names, err := ruleProcessor.GetAllRules()
+	if err != nil {
+		return nil, err
+	}
+	result := make([]map[string]interface{}, len(names))
+	for i, name := range names {
+		s, err := getRuleState(name)
+		if err != nil {
+			return nil, err
+		}
+		result[i] = map[string]interface{}{
+			"id":     name,
+			"status": s,
+		}
+	}
+	return result, nil
+}
+
+func getRuleState(name string) (string, error) {
+	if rs, ok := registry.Load(name); ok {
+		return doGetRuleState(rs)
+	} else {
+		return "", fmt.Errorf("Rule %s is not found", name)
+	}
+}
+
+func doGetRuleState(rs *RuleState) (string, error) {
 	result := ""
+	if !rs.Triggered {
+		result = "Stopped: canceled manually."
+		return result, nil
+	}
+	c := (*rs.Topology).GetContext()
+	if c != nil {
+		err := c.Err()
+		switch err {
+		case nil:
+			result = "Running"
+		case context.Canceled:
+			result = "Stopped: canceled by error."
+		case context.DeadlineExceeded:
+			result = "Stopped: deadline exceed."
+		default:
+			result = fmt.Sprintf("Stopped: %v.", err)
+		}
+	} else {
+		result = "Stopped: no context found."
+	}
+	return result, nil
+}
+
+func getRuleStatus(name string) (string, error) {
 	if rs, ok := registry.Load(name); ok {
-		if !rs.Triggered {
-			result = "Stopped: canceled manually."
-			return result, nil
+		result, err := doGetRuleState(rs)
+		if err != nil {
+			return "", err
 		}
-		c := (*rs.Topology).GetContext()
-		if c != nil {
-			err := c.Err()
-			switch err {
-			case nil:
-				keys, values := (*rs.Topology).GetMetrics()
-				metrics := "{"
-				for i, key := range keys {
-					value := values[i]
-					switch value.(type) {
-					case string:
-						metrics += fmt.Sprintf("\"%s\":%q,", key, value)
-					default:
-						metrics += fmt.Sprintf("\"%s\":%v,", key, value)
-					}
-				}
-				metrics = metrics[:len(metrics)-1] + "}"
-				dst := &bytes.Buffer{}
-				if err = json.Indent(dst, []byte(metrics), "", "  "); err != nil {
-					result = metrics
-				} else {
-					result = dst.String()
+		if result == "Running" {
+			keys, values := (*rs.Topology).GetMetrics()
+			metrics := "{"
+			for i, key := range keys {
+				value := values[i]
+				switch value.(type) {
+				case string:
+					metrics += fmt.Sprintf("\"%s\":%q,", key, value)
+				default:
+					metrics += fmt.Sprintf("\"%s\":%v,", key, value)
 				}
-			case context.Canceled:
-				result = "Stopped: canceled by error."
-			case context.DeadlineExceeded:
-				result = "Stopped: deadline exceed."
-			default:
-				result = fmt.Sprintf("Stopped: %v.", err)
 			}
-		} else {
-			result = "Stopped: no context found."
+			metrics = metrics[:len(metrics)-1] + "}"
+			dst := &bytes.Buffer{}
+			if err = json.Indent(dst, []byte(metrics), "", "  "); err != nil {
+				result = metrics
+			} else {
+				result = dst.String()
+			}
 		}
 		return result, nil
 	} else {