|
@@ -130,13 +130,16 @@ For more detailed information of configuration file, please refer to [this doc](
|
|
|
|
|
|
Let's create a rule that send result data to an MQTT broker, for detailed information of MQTT sink, please refer to [this link](../rules/sinks/mqtt.md). Similar to create a stream, you can also choose REST or CLI to manage rules.
|
|
|
|
|
|
-So the below rule will get all of values from ``event`` topic. The sink result will be published to topic ``result`` of public MQTT broker ``broker.emqx.io``.
|
|
|
+So the below rule will get all of values from ``event`` topic. The sink result will
|
|
|
+
|
|
|
+- Published to topic ``result`` of public MQTT broker ``broker.emqx.io``.
|
|
|
+- Print to log file.
|
|
|
|
|
|
#### Option 1: Use Rest API
|
|
|
|
|
|
```shell
|
|
|
curl -X POST \
|
|
|
- http://$kuiper_docker:48075/rules \
|
|
|
+ http://localhost:48075/rules \
|
|
|
-H 'Content-Type: application/json' \
|
|
|
-d '{
|
|
|
"id": "rule1",
|
|
@@ -148,16 +151,19 @@ curl -X POST \
|
|
|
"topic": "result",
|
|
|
"clientId": "demo_001"
|
|
|
}
|
|
|
+ },
|
|
|
+ {
|
|
|
+ "log":{}
|
|
|
}
|
|
|
]
|
|
|
-}'
|
|
|
+}
|
|
|
```
|
|
|
|
|
|
#### Option 2: Use Kuiper CLI
|
|
|
|
|
|
You can create a rule file with any text editor, and copy following contents into it. Let's say the file name is ``rule.txt``.
|
|
|
|
|
|
-```
|
|
|
+```json
|
|
|
{
|
|
|
"sql": "SELECT * from demo",
|
|
|
"actions": [
|
|
@@ -167,6 +173,9 @@ You can create a rule file with any text editor, and copy following contents int
|
|
|
"topic": "result",
|
|
|
"clientId": "demo_001"
|
|
|
}
|
|
|
+ },
|
|
|
+ {
|
|
|
+ "log":{}
|
|
|
}
|
|
|
]
|
|
|
}
|
|
@@ -185,26 +194,20 @@ Rule rule1 was created, please use 'cli getstatus rule $rule_name' command to ge
|
|
|
|
|
|
If you want to send analysis result to another sink, please refer to [other sinks](../rules/overview.md#actions) that supported in Kuiper.
|
|
|
|
|
|
-Now you can also take a look at the log file under ``log/stream.log``, see detailed info of rule.
|
|
|
+Now you can also take a look at the log file under ``log/stream.log``, or through command ``docker logs edgex-kuiper `` to see detailed info of rule.
|
|
|
|
|
|
```
|
|
|
-time="2020-04-07T03:33:28Z" level=info msg="db location is /kuiper/data/"
|
|
|
-time="2020-04-07T03:33:28Z" level=info msg="Starting rules"
|
|
|
-time="2020-04-07T03:33:28Z" level=info msg="Serving kuiper (version - 0.2.1) on port 20498, and restful api on port 48075. \n"
|
|
|
-time="2020-04-07T03:35:35Z" level=info msg="Rule rule1 is created."
|
|
|
-time="2020-04-07T03:35:35Z" level=info msg="Init rule with options {isEventTime: false, lateTolerance: 0, concurrency: 1, bufferLength: 1024"
|
|
|
-time="2020-04-07T03:35:35Z" level=info msg="Opening stream" rule=rule1
|
|
|
-time="2020-04-07T03:35:35Z" level=info msg="open source node demo with option map[FORMAT:JSON TYPE:edgex]" rule=rule1
|
|
|
-time="2020-04-07T03:35:35Z" level=info msg="open sink node 1 instances" rule=rule1
|
|
|
-time="2020-04-07T03:35:35Z" level=info msg="open source node 1 instances" rule=rule1
|
|
|
-time="2020-04-07T03:35:35Z" level=info msg="Opening mqtt sink for rule rule1." rule=rule1
|
|
|
-time="2020-04-07T03:35:35Z" level=info msg="Connect to value descriptor service at: http://edgex-core-data:48080/api/v1/valuedescriptor \n"
|
|
|
-time="2020-04-07T03:35:35Z" level=info msg="Use configuration for edgex messagebus {{ 0 } {edgex-core-data 5563 tcp} zero map[]}\n"
|
|
|
-time="2020-04-07T03:35:35Z" level=info msg="Start source demo instance 0 successfully" rule=rule1
|
|
|
-time="2020-04-07T03:35:35Z" level=info msg="The connection to edgex messagebus is established successfully." rule=rule1
|
|
|
-time="2020-04-07T03:35:35Z" level=info msg="Connect MQTT broker with username and password." rule=rule1
|
|
|
-time="2020-04-07T03:35:35Z" level=info msg="Successfully subscribed to edgex messagebus topic events." rule=rule1
|
|
|
-time="2020-04-07T03:35:35Z" level=info msg="The connection to server tcp://broker.emqx.io:1883 was established successfully" rule=rule1
|
|
|
+time="2020-04-17T06:32:24Z" level=info msg="Serving kuiper (version - 0.3.1-4-g9e63fe1) on port 20498, and restful api on port 9081. \n" file="server.go:101"
|
|
|
+time="2020-04-17T06:32:24Z" level=info msg="The connection to edgex messagebus is established successfully." file="edgex_source.go:95" rule=rule1
|
|
|
+time="2020-04-17T06:32:24Z" level=info msg="Successfully subscribed to edgex messagebus topic events." file="edgex_source.go:104" rule=rule1
|
|
|
+time="2020-04-17T06:32:24Z" level=info msg="The connection to server tcp://broker.emqx.io:1883 was established successfully" file="mqtt_sink.go:161" rule=rule1
|
|
|
+time="2020-04-17T06:32:25Z" level=info msg="Get 24 of value descriptors from service." file="edgex_source.go:223"
|
|
|
+time="2020-04-17T06:32:25Z" level=info msg="sink result for rule rule1: [{\"int32\":-697766590}]" file="log_sink.go:16" rule=rule1
|
|
|
+time="2020-04-17T06:32:25Z" level=info msg="sink result for rule rule1: [{\"int8\":-47}]" file="log_sink.go:16" rule=rule1
|
|
|
+time="2020-04-17T06:32:25Z" level=info msg="sink result for rule rule1: [{\"int16\":-318}]" file="log_sink.go:16" rule=rule1
|
|
|
+time="2020-04-17T06:32:25Z" level=info msg="sink result for rule rule1: [{\"int64\":-8680421421398846880}]" file="log_sink.go:16" rule=rule1
|
|
|
+time="2020-04-17T06:32:31Z" level=info msg="sink result for rule rule1: [{\"bool\":true}]" file="log_sink.go:16" rule=rule1
|
|
|
+...
|
|
|
```
|
|
|
|
|
|
### Monitor analysis result
|
|
@@ -240,31 +243,31 @@ Connecting to 127.0.0.1:20498...
|
|
|
"source_demo_0_exceptions_total": 0,
|
|
|
"source_demo_0_process_latency_ms": 0,
|
|
|
"source_demo_0_buffer_length": 0,
|
|
|
- "source_demo_0_last_invocation": "2020-03-19T10:30:09.294337",
|
|
|
+ "source_demo_0_last_invocation": "2020-04-17T10:30:09.294337",
|
|
|
"op_preprocessor_demo_0_records_in_total": 29,
|
|
|
"op_preprocessor_demo_0_records_out_total": 29,
|
|
|
"op_preprocessor_demo_0_exceptions_total": 0,
|
|
|
"op_preprocessor_demo_0_process_latency_ms": 0,
|
|
|
"op_preprocessor_demo_0_buffer_length": 0,
|
|
|
- "op_preprocessor_demo_0_last_invocation": "2020-03-19T10:30:09.294355",
|
|
|
+ "op_preprocessor_demo_0_last_invocation": "2020-04-17T10:30:09.294355",
|
|
|
"op_filter_0_records_in_total": 29,
|
|
|
"op_filter_0_records_out_total": 21,
|
|
|
"op_filter_0_exceptions_total": 0,
|
|
|
"op_filter_0_process_latency_ms": 0,
|
|
|
"op_filter_0_buffer_length": 0,
|
|
|
- "op_filter_0_last_invocation": "2020-03-19T10:30:09.294362",
|
|
|
+ "op_filter_0_last_invocation": "2020-04-17T10:30:09.294362",
|
|
|
"op_project_0_records_in_total": 21,
|
|
|
"op_project_0_records_out_total": 21,
|
|
|
"op_project_0_exceptions_total": 0,
|
|
|
"op_project_0_process_latency_ms": 0,
|
|
|
"op_project_0_buffer_length": 0,
|
|
|
- "op_project_0_last_invocation": "2020-03-19T10:30:09.294382",
|
|
|
+ "op_project_0_last_invocation": "2020-04-17T10:30:09.294382",
|
|
|
"sink_sink_mqtt_0_records_in_total": 21,
|
|
|
"sink_sink_mqtt_0_records_out_total": 21,
|
|
|
"sink_sink_mqtt_0_exceptions_total": 0,
|
|
|
"sink_sink_mqtt_0_process_latency_ms": 0,
|
|
|
"sink_sink_mqtt_0_buffer_length": 1,
|
|
|
- "sink_sink_mqtt_0_last_invocation": "2020-03-19T10:30:09.294423"
|
|
|
+ "sink_sink_mqtt_0_last_invocation": "2020-04-17T10:30:09.294423"
|
|
|
}
|
|
|
```
|
|
|
|