EMQ X Kuiper is an edge lightweight IoT data analytics / streaming software implemented by Golang, and it can be run at all kinds of resource constrained edge devices. One goal of Kuiper is to migrate the cloud streaming software frameworks (such as Apache Spark,Apache Storm and Apache Flink) to edge side. Kuiper references these cloud streaming frameworks, and also considered special requirement of edge analytics, and introduced rule engine, which is based on Source
, SQL (business logic)
and Sink
, rule engine is used for developing streaming applications at edge side.
User scenarios
It can be run at various IoT edge use scenarios, such as real-time processing of production line data in the IIoT; Gateway of Connected Vehicle analyze the data from data-bus in real time; Real-time analysis of urban facility data in smart city scenarios. Kuiper processing at the edge can reduce system response latency, save network bandwidth and storage costs, and improve system security.
Lightweight
Cross-platform
Data analysis support
Highly extensibile
Plugin system is provided, and it supports to extend at Source
, SQL functions
and Sink
.
UDF functions: embedded support for 60+ functions, and provide extension points for SQL functions
Management
Integration with EMQ X Edge
Seamless integration with EMQ X Edge, and provided an end to end solution from messaging to analytics.
Pull a Kuiper Docker image from https://hub.docker.com/r/emqx/kuiper/tags
.
Set Kuiper source to an MQTT server. This sample uses server locating at tcp://broker.emqx.io:1883
. broker.emqx.io
is a public MQTT test server hosted by EMQ.
docker run -d --name kuiper -e MQTT_BROKER_ADDRESS=tcp://broker.emqx.io:1883 emqx/kuiper:$tag
broker.emqx.io
, and those data will be processed in your LOCAL RUN Kuiper docker instance. Below steps will create a stream named demo
, and data are sent to devices/device_001/messages
topic, while device_001
could be other devices, such as device_002
, all of those data will be subscribed and handled by demo
stream. -- In host
# docker exec -it kuiper /bin/sh
-- In docker instance
# bin/cli create stream demo '(temperature float, humidity bigint) WITH (FORMAT="JSON", DATASOURCE="devices/+/messages")'
Connecting to 127.0.0.1:20498...
Stream demo is created.
# bin/cli query
Connecting to 127.0.0.1:20498...
kuiper > select * from demo where temperature > 30;
Query was submit successfully.
devices/device_001/messages
of server tcp://broker.emqx.io:1883
with any MQTT client tools. Below sample uses mosquitto_pub
. # mosquitto_pub -h broker.emqx.io -m '{"temperature": 40, "humidity" : 20}' -t devices/device_001/messages
bin/cli query
window. Please try to publish another message with temperature
less than 30, and it will be filtered by WHERE condition of the SQL. kuiper > select * from demo WHERE temperature > 30;
[{"temperature": 40, "humidity" : 20}]
If having any problems, please take a look at log/stream.log
.
To stop the test, just press ctrl + c
in bin/cli query
command console, or input exit
and press enter.
Next for exploring more powerful features of EMQ X Kuiper? Refer to below for how to apply EMQ X Kuiper in edge and integrate with AWS / Azure IoT cloud.
{"temperature": 10, "humidity" : 90}
, the value of temperature and humidity are random integer between 0 - 100.SELECT * FROM demo WHERE temperature > 50
Devices | Message # per second | CPU usage | Memory usage |
---|---|---|---|
Raspberry Pi 3B+ | 12k | sys+user: 70% | 20M |
AWS t2.micro( 1 Core * 1 GB) Ubuntu18.04 |
10k | sys+user: 25% | 20M |
Binary:
Binary: $ make
Binary files that support EdgeX: $ make build_with_edgex
Packages: $ make pkg
Packages: $ make pkg
Packages files that support EdgeX: $ make pkg_with_edgex
Docker images: $ make docker
Docker images support EdgeX by default
To using cross-compilation, refer to this doc.