Originally, eKuiper leverage SQL to define the rule logic. Although it is handy for developers, it is still not easy to use for users with no development knowledge. During runtime, rules are a DAG of elements(source/operator/sink) even when defining by SQL. The graph can be easily mapping to a drag and drop UI to facilitate the users. Thus, an alternative graph
property is provided in the rule API.
The graph
property is a JSON presentation of the DAG. It is consisted by nodes
and topo
which defines the nodes in the graph and their edges respectively. Below is a simplest rule defined by graph. It defines 3 nodes demo
, humidityFilter
and mqttOut
. And the graph is linear as demo
-> humidityFilter
-> mqttOut
. The rule will read from mqtt(demo
), filter by humidity(humidityFilter
) and sink to mqtt(mqttOut
).
{
"id": "rule1",
"name": "Test Condition",
"graph": {
"nodes": {
"demo": {
"type": "source",
"nodeType": "mqtt",
"props": {
"datasource": "devices/+/messages"
}
},
"humidityFilter": {
"type": "operator",
"nodeType": "filter",
"props": {
"expr": "humidity > 30"
}
},
"mqttout": {
"type": "sink",
"nodeType": "mqtt",
"props": {
"server": "tcp://${mqtt_srv}:1883",
"topic": "devices/result"
}
}
},
"topo": {
"sources": ["demo"],
"edges": {
"demo": ["humidityFilter"],
"humidityFilter": ["mqttout"]
}
}
}
}
Each node in the graph JSON has at least 3 fields:
source
, operator
and sink
.For source node, the nodeType is the type of the source like mqtt
and edgex
. Please refer to source for all supported types. Notice that, all source node shared the same properties which is the same as the properties when defining a stream. The specific configuration are referred by CONF_KEY
. In the below example, the nodeType specifies the source node is a mqtt source. The datasource and format property has the same meaning as defining a stream.
{
"type": "source",
"nodeType": "mqtt",
"props": {
"datasource": "devices/+/messages",
"format":"json"
}
}
For sink node, the nodeType is the type of the sink like mqtt
and edgex
. Please refer to sink for all supported types. For all sink nodes, they share some common properties but each type will have some owned properties.
For operator node, the nodeType are newly defined. Each nodeType will have different properties.
The source node is the data source of the rule. It can be a stream or table. User needs to define the stream/table before using it in the rule. The sourceType
property defines the type of the source. It can be stream
or table
. The sourceName
property defines the name of the stream/table. The below example defines a source node which reads from a stream named demoStream
. Please make sure the nodeType is the same as the type of the stream/table.
{
"type": "source",
"nodeType": "mqtt",
"props": {
"sourceType": "stream",
"sourceName": "demoStream"
}
}
Currently, users can define the source node to refer to table as well. But only lookup table can be connected to Join node, scan table is not supported. The below example defines a source node which reads from a lookup table named demoTable
. Please make sure the nodeType is the same as the type of the stream/table.
{
"type": "source",
"nodeType": "redis",
"props": {
"sourceType": "table",
"sourceName": "demoTable"
}
}
Currently, we supported the below node types for operator type.
This node defines a function call expression. The node return a new field with the name of the function or the alias name define in the expr property. It has only one property:
Example:
{
"type": "operator",
"nodeType": "function",
"props": {
"expr": "log(temperature) as log_temperature"
}
}
This node defines an aggregate function call expression. The input for the node must be a collection of rows as the output of a window. The node will aggregate multiple rows into one aggregated row. For example, calculate the count of the window of 10 rows will produce only one row with field count
= 10. Calculate the count of the grouped rows will produce one row for each group. It has only one property:
Example:
{
"type": "operator",
"nodeType": "aggfunc",
"props": {
"expr": "count(*)"
}
}
This node filter the data stream with a condition expression. It has only one propety:
Example:
{
"type": "operator",
"nodeType": "filter",
"props": {
"expr": "temperature > 20"
}
}
This node selects the fields to be presented in the following workflow. It is usually used in the end of a workflow to define the data to be selected. It has only one property:
Example:
{
"type": "operator",
"nodeType": "pick",
"props": {
"fields": ["log_temperature", "humidity", "window_end()"]
}
}
This node defines a window in the workflow. It can accept multiple inputs but each input must be a single row. It will produce a collection of rows.
Example:
{
"type": "operator",
"nodeType": "window",
"props": {
"type": "hoppingwindow",
"unit": "ss",
"size": 10,
"interval": 5
}
}
This node can merge data from different sources like a SQL join operation. The input must be a collection of row produced by a window. The output is another row collection whose rows are joined tuples. The properties are:
Example:
{
"type": "operator",
"nodeType": "join",
"props": {
"from": "device1",
"joins": [
{
"name": "device2",
"type": "inner",
"on": "abs(device1.ts - device2.ts) < 200"
}
]
}
}
Join operator supports to connect stream/stream join and stream/lookup table join. Stream/scan table join is not supported. If using stream/stream join, the prior node must be a window node. If using stream/lookup table join, only one join condition is supported. Below is an example of stream/lookup table join.
{
"type": "operator",
"nodeType": "join",
"props": {
"from": "demoStream",
"joins": [
{
"name": "demoTable",
"type": "inner",
"on": "deviceStream.id = demoTable.id"
}
]
}
}
This node defines the dimension to group by. The input must be a collection of rows. The output is a collection of grouped tuples. The properties are:
Example:
{
"type": "operator",
"nodeType": "groupby",
"props": {
"dimensions": ["device1.humidity"]
}
}
This node will sort the input collection. So the input must be a collection of rows and the output will be the same type. The properties are:
Example:
{
"type": "operator",
"nodeType": "orderby",
"props": {
"sorts": [{
"field": "count",
"order": "desc"
}]
}
}
This node allows message to be routed to different branches of flows which is similar to switch statement in programming languages. Currently, this is the only node which have multiple output paths.
The switch node accepts multiple conditional expression as cases in order and evaluate events against the cases. The properties are:
In the edges definition, the output of the node has multiple paths, which is represented as a two-dimensional array. In
the following example, the switch node has two conditions defined in its cases
property. Correspondingly, in edges ->
switch, you need to define a two-dimensional array of length 2 to specify the paths after the corresponding conditions
are met.
{
"id": "ruleSwitch",
"name": "Demonstrate how to use switch node",
"graph": {
"nodes": {
"abc": {
"type": "source",
"nodeType": "mqtt",
"props": {
"datasource": "demo",
"confKey": "syno"
}
},
"switch": {
"type": "operator",
"nodeType": "switch",
"props": {
"cases": [
"temperature > 20",
"temperature <= 20"
],
"stopAtFirstMatch": true
}
},
"mqttpv": {
"type": "sink",
"nodeType": "mqtt",
"props": {
"server": "tcp://syno.home:1883",
"topic": "result/switch1",
"sendSingle": true
}
},
"mqttpv2": {
"type": "sink",
"nodeType": "mqtt",
"props": {
"server": "tcp://syno.home:1883",
"topic": "result/switch2",
"sendSingle": true
}
}
},
"topo": {
"sources": [
"abc"
],
"edges": {
"abc": [
"switch"
],
"switch": [
[
"mqttpv"
],
[
"mqttpv2"
]
]
}
}
}
}
This node allows JavaScript code to be run against the messages that are passed through it.
There must be a function named exec
defined in the script. If isAgg is false, the script node can accept a single message and must return a processed message. If isAgg is true, it will receive a message array (connected to window etc.) and must return an array.
{
"type": "operator",
"nodeType": "script",
"props": {
"script": "function exec(msg, meta) {msg.temperature = 1.8 * msg.temperature + 32; return msg;}"
}
}
{
"type": "operator",
"nodeType": "script",
"props": {
"script": "function exec(msgs) {agg = {value:0}\nfor (let i = 0; i < msgs.length; i++) {\nagg.value = agg.value + msgs[i].value;\n}\nreturn agg;\n}",
"isAgg": true
}
}