在 EdgeX Geneva 版本中, EMQ X Kuiper - 基于 SQL 的轻量级流式数据处理软件与 EdgeX 进行了集成。在进入这篇教程之前,让我们先花一些时间来了解一些 Kuiper 的基本知识。EMQ X Kuiper 是 Golang 实现的轻量级物联网边缘分析、流式处理开源软件,可以运行在各类资源受限的边缘设备上。Kuiper 基于源 (Source)
,SQL (业务逻辑处理)
, 目标 (Sink)
的方式来支持流式数据处理。
使用 Kuiper,一般需要完成以下三个步骤。
该教程描述如何使用 Kuiper 处理来自于 EdgeX 消息总线的数据。
在不同的微服务之间,EdgeX 使用消息总线进行数据交换。它包含了一个抽象的消息总线接口,并且实现了 ZeroMQ 与 MQTT (注:Kuiper 目前只支持 ZeroMQ 消息总线 ,MQTT 将在后续版本中加入)在不同的微服务之间信息交互的支持。Kuiper 和 EdgeX 的集成工作包含了以下三部分,
扩展了一个 EdgeX 消息总线源,支持从 EdgeX 消息总线中接收数据
为了可以分析数据,Kuiper 需知道传入的数据流的格式。一般来说,用户最好在创建流的时候指定被分析的流数据的格式。如下所示,一个 demo
流包含了一个名为 temperature
的字段。这与在关系型数据库中创建表格定义的时候非常像。在创建了流定义以后,Kuiper 可以在编译或者运行时对进入的数据进行类型检查,相应错误也会报告给用户。
CREATE STREAM demo (temperature bigint) WITH (FORMAT="JSON"...)
然而在 EdgeX 中,数据类型定义在 EdgeX Core contract Service
中已经指定,为了提升使用体验,用户可以在创建流的时候不指定数据类型。Kuiper 源会在初始化规则的时候,从 Core contract Service
中获取所有的 value descriptors
定义(所以如果有任何数据类型定义的变化,你需要重启规则)。当接收到来自于消息总线的数据的时候,会根规则转换为相应的数据类型。
在 EdgeX Geneva 版本正式发布后,你可以按照这个文档来启动所有的服务。但是因为目前 EdgeX Geneva 还未正式发布,所以如果现在想试用 Kuiper 的话,不得不手工从这里下载 Docker Composer 文件,然后启动所有的 EdgeX 容器。
# wget https://github.com/edgexfoundry/developer-scripts/raw/master/releases/nightly-build/compose-files/docker-compose-nexus-mongo-no-secty.yml
# docker-compose -f ./docker-compose-nexus-redis-no-secty.yml up -d --build
所有的容器启动完毕之后,请使用 docker ps
命令确定所有的容器已经正常启动。
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
5618c93027a9 nexus3.edgexfoundry.org:10004/docker-device-virtual-go:master "/device-virtual --p…" 37 minutes ago Up 37 minutes 0.0.0.0:49990->49990/tcp edgex-device-virtual
fabe6b9052f5 nexus3.edgexfoundry.org:10004/docker-edgex-ui-go:master "./edgex-ui-server" 37 minutes ago Up 37 minutes 0.0.0.0:4000->4000/tcp edgex-ui-go
950135a7041d emqx/kuiper:0.3.1 "/usr/bin/docker-ent…" 37 minutes ago Up 37 minutes 0.0.0.0:20498->20498/tcp, 9081/tcp, 0.0.0.0:48075->48075/tcp edgex-kuiper
c49b0d6f9347 nexus3.edgexfoundry.org:10004/docker-support-scheduler-go:master "/support-scheduler …" 37 minutes ago Up 37 minutes 0.0.0.0:48085->48085/tcp edgex-support-scheduler
4265dcc2bb48 nexus3.edgexfoundry.org:10004/docker-core-command-go:master "/core-command -cp=c…" 37 minutes ago Up 37 minutes 0.0.0.0:48082->48082/tcp edgex-core-command
4667160e2f41 nexus3.edgexfoundry.org:10004/docker-app-service-configurable:master "/app-service-config…" 37 minutes ago Up 37 minutes 48095/tcp, 0.0.0.0:48100->48100/tcp edgex-app-service-configurable-rules
9bbfe95993f5 nexus3.edgexfoundry.org:10004/docker-core-metadata-go:master "/core-metadata -cp=…" 37 minutes ago Up 37 minutes 0.0.0.0:48081->48081/tcp, 48082/tcp edgex-core-metadata
2e342a3aae81 nexus3.edgexfoundry.org:10004/docker-support-notifications-go:master "/support-notificati…" 37 minutes ago Up 37 minutes 0.0.0.0:48060->48060/tcp edgex-support-notifications
3cfc628e013a nexus3.edgexfoundry.org:10004/docker-sys-mgmt-agent-go:master "/sys-mgmt-agent -cp…" 37 minutes ago Up 37 minutes 0.0.0.0:48090->48090/tcp edgex-sys-mgmt-agent
f69e9c4d6cc8 nexus3.edgexfoundry.org:10004/docker-core-data-go:master "/core-data -cp=cons…" 37 minutes ago Up 37 minutes 0.0.0.0:5563->5563/tcp, 0.0.0.0:48080->48080/tcp edgex-core-data
9e5091928409 nexus3.edgexfoundry.org:10004/docker-support-logging-go:master "/support-logging -c…" 37 minutes ago Up 37 minutes 0.0.0.0:48061->48061/tcp edgex-support-logging
74e8668f892c redis:5.0.7-alpine "docker-entrypoint.s…" 37 minutes ago Up 37 minutes 0.0.0.0:6379->6379/tcp edgex-redis
9b341bb217f9 consul:1.3.1 "docker-entrypoint.s…" 37 minutes ago Up 37 minutes 0.0.0.0:8400->8400/tcp, 8300-8302/tcp, 8301-8302/udp, 8600/tcp, 8600/udp, 0.0.0.0:8500->8500/tcp edgex-core-consul
ed7ad5ae08b2 nexus3.edgexfoundry.org:10004/docker-edgex-volume:master "/bin/sh -c '/usr/bi…" 37 minutes ago Up 37 minutes edgex-files
该步骤是创建一个可以从 EdgeX 消息总线进行数据消费的流。有两种方法来支持管理流,你可以选择喜欢的方式。
请注意: EdgeX 中的 Kuiper Rest 接口使用48075
端口,而不是缺省的9081
端口。所以在 EdgeX 调用 Kuiper Rest 的时候,请将文档中所有的 9081 替换为 48075。
请将 $kuiper_server
替换为本地运行的 Kuiper 实例的地址。
curl -X POST \
http://$kuiper_server:48075/streams \
-H 'Content-Type: application/json' \
-d '{
"sql": "create stream demo() WITH (FORMAT=\"JSON\", TYPE=\"edgex\")"
}'
关于其它 API,请参考该文档.
使用以下命令,进入运行中的 Kuiper docker 实例。
docker exec -it kuiper /bin/sh
使用以下命令,创建一个名为 demo
的流定义.
bin/cli create stream demo'() WITH (FORMAT="JSON", TYPE="edgex")'
其它命令行,请参考该文档。
现在流已经创建好了,但是你可能好奇 Kuiper 是如何知道消息总线的地址和端口,因为此类信息在 CREATE STREAM
并未指定。实际上这些信息是在配置文件 etc/sources/edgex.yaml
中指定的,你可以在命令行窗口中输入 cat etc/sources/edgex.yaml
来查看文件的内容。如果你有不同的服务器、端口和服务的地址,请更新相应的配置。正如之前提到的,这些配置选项可以在容器启动的时候进行重写。
#Global Edgex configurations
default:
protocol: tcp
server: localhost
port: 5566
topic: events
serviceServer: http://localhost:48080
.....
更多关于配置文件的信息,请参考该文档.
让我们创建一条规则,将分析结果发送至 MQTT 服务器,关于 MQTT 目标的相关配置,请参考这个链接。与创建流的过程类似,你可以选择使用 REST 或者命令行来管理规则。
以下例子将选出所有 events
主题上所有的数据,分析结果将被发布到公共的 MQTT 服务器 broker.emqx.io
的主题result
上。
curl -X POST \
http://$kuiper_server:48075/rules \
-H 'Content-Type: application/json' \
-d '{
"id": "rule1",
"sql": "SELECT * FROM demo WHERE randomnumber > 30",
"actions": [
{
"mqtt": {
"server": "tcp://broker.emqx.io:1883",
"topic": "result",
"clientId": "demo_001"
}
}
]
}'
你可以使用任意编辑器来创建一条规则,将下列内容拷贝到编辑器中,并命名为 rule.txt
。
{
"sql": "SELECT * from demo",
"actions": [
{
"mqtt": {
"server": "tcp://broker.emqx.io:1883",
"topic": "result",
"clientId": "demo_001"
}
}
]
}
在运行的容器中,执行以下命令。
# bin/cli create rule rule1 -f rule.txt
Connecting to 127.0.0.1:20498...
Creating a new rule from file rule.txt.
Rule rule1 was created, please use 'cli getstatus rule $rule_name' command to get rule status.
如想将结果发送到别的目标,请参考 Kuiper 中支持的其它目标。你现在可以看一下在 log/stream.log
中的日志文件,查看规则的详细信息。
time="2020-04-07T03:33:28Z" level=info msg="db location is /kuiper/data/"
time="2020-04-07T03:33:28Z" level=info msg="Starting rules"
time="2020-04-07T03:33:28Z" level=info msg="Serving kuiper (version - 0.2.1) on port 20498, and restful api on port 48075. \n"
time="2020-04-07T03:35:35Z" level=info msg="Rule rule1 is created."
time="2020-04-07T03:35:35Z" level=info msg="Init rule with options {isEventTime: false, lateTolerance: 0, concurrency: 1, bufferLength: 1024"
time="2020-04-07T03:35:35Z" level=info msg="Opening stream" rule=rule1
time="2020-04-07T03:35:35Z" level=info msg="open source node demo with option map[FORMAT:JSON TYPE:edgex]" rule=rule1
time="2020-04-07T03:35:35Z" level=info msg="open sink node 1 instances" rule=rule1
time="2020-04-07T03:35:35Z" level=info msg="open source node 1 instances" rule=rule1
time="2020-04-07T03:35:35Z" level=info msg="Opening mqtt sink for rule rule1." rule=rule1
time="2020-04-07T03:35:35Z" level=info msg="Connect to value descriptor service at: http://edgex-core-data:48080/api/v1/valuedescriptor \n"
time="2020-04-07T03:35:35Z" level=info msg="Use configuration for edgex messagebus {{ 0 } {edgex-core-data 5563 tcp} zero map[]}\n"
time="2020-04-07T03:35:35Z" level=info msg="Start source demo instance 0 successfully" rule=rule1
time="2020-04-07T03:35:35Z" level=info msg="The connection to edgex messagebus is established successfully." rule=rule1
time="2020-04-07T03:35:35Z" level=info msg="Connect MQTT broker with username and password." rule=rule1
time="2020-04-07T03:35:35Z" level=info msg="Successfully subscribed to edgex messagebus topic events." rule=rule1
time="2020-04-07T03:35:35Z" level=info msg="The connection to server tcp://broker.emqx.io:1883 was established successfully" rule=rule1
因为所有的分析结果都被发布到tcp://broker.emqx.io:1883
,你可以直接使用以下的 mosquitto_sub
命令来监听结果,你也可以参考别的 MQTT 客户端工具.
# mosquitto_sub -h broker.emqx.io -t result
[{"bool":true}]
[{"bool":false}]
[{"bool":true}]
[{"randomvalue_int16":3287}]
[{"float64":8.41326e+306}]
[{"randomvalue_int32":-1872949486}]
[{"randomvalue_int8":-53}]
[{"int64":-1829499332806053678}]
[{"int32":-1560624981}]
[{"int16":8991}]
[{"int8":-4}]
[{"bool":true}]
[{"bool":false}]
[{"float64":1.737076e+306}]
...
你也可以敲入以下的命令来查看规则执行的状态。相关的查看规则状态的 REST API 也有提供,请检查相关文档.
# bin/cli getstatus rule rule1
Connecting to 127.0.0.1:20498...
{
"source_demo_0_records_in_total": 29,
"source_demo_0_records_out_total": 29,
"source_demo_0_exceptions_total": 0,
"source_demo_0_process_latency_ms": 0,
"source_demo_0_buffer_length": 0,
"source_demo_0_last_invocation": "2020-03-19T10:30:09.294337",
"op_preprocessor_demo_0_records_in_total": 29,
"op_preprocessor_demo_0_records_out_total": 29,
"op_preprocessor_demo_0_exceptions_total": 0,
"op_preprocessor_demo_0_process_latency_ms": 0,
"op_preprocessor_demo_0_buffer_length": 0,
"op_preprocessor_demo_0_last_invocation": "2020-03-19T10:30:09.294355",
"op_filter_0_records_in_total": 29,
"op_filter_0_records_out_total": 21,
"op_filter_0_exceptions_total": 0,
"op_filter_0_process_latency_ms": 0,
"op_filter_0_buffer_length": 0,
"op_filter_0_last_invocation": "2020-03-19T10:30:09.294362",
"op_project_0_records_in_total": 21,
"op_project_0_records_out_total": 21,
"op_project_0_exceptions_total": 0,
"op_project_0_process_latency_ms": 0,
"op_project_0_buffer_length": 0,
"op_project_0_last_invocation": "2020-03-19T10:30:09.294382",
"sink_sink_mqtt_0_records_in_total": 21,
"sink_sink_mqtt_0_records_out_total": 21,
"sink_sink_mqtt_0_exceptions_total": 0,
"sink_sink_mqtt_0_process_latency_ms": 0,
"sink_sink_mqtt_0_buffer_length": 1,
"sink_sink_mqtt_0_last_invocation": "2020-03-19T10:30:09.294423"
}
在本教程中,我们介绍了使用 EdgeX Kuiper 规则引擎的非常简单的例子,如果使用过程中发现任何问题,请到 EdgeX,或者 Kuiper Github 中报问题。
目前的规则没有过滤发送给 Kuiper 的任何数据,那么如何过滤数据呢?请使用删除规则,然后试着更改一下 SQL 语句,完成更改后,重新部署规则。这时候如果监听 MQTT 服务的结果主题,检查一下相关的规则是否起作用?
如想了解更多的 EMQ X Kuiper 的信息,请参考以下资源。