规则由 JSON 定义,下面是一个示例。
{
"id": "rule1",
"sql": "SELECT demo.temperature, demo1.temp FROM demo left join demo1 on demo.timestamp = demo1.timestamp where demo.temperature > demo1.temp GROUP BY demo.temperature, HOPPINGWINDOW(ss, 20, 10)",
"actions": [
{
"log": {}
},
{
"mqtt": {
"server": "tcp://47.52.67.87:1883",
"topic": "demoSink"
}
}
]
}
创建规则需要以下参数。
参数名 | 是否可选 | 说明 |
---|---|---|
id | 否 | 规则 id, 规则 id 在同一 eKuiper 实例中必须唯一。 |
name | 是 | 规则显示的名字或者描述。 |
sql | 如果 graph 未定义,则该属性必须定义 | 为规则运行的 sql 查询 |
actions | 如果 graph 未定义,则该属性必须定义 | Sink 动作数组 |
graph | 如果 sql 未定义,则该属性必须定义 | 规则有向无环图的 JSON 表示 |
options | 是 | 选项列表 |
一个规则代表了一个流处理流程,定义了从将数据输入流的数据源到各种处理逻辑,再到将数据输入到外部系统的动作。
有两种方法来定义规则的业务逻辑。要么使用SQL/动作组合,要么使用新增加的图API。
通过指定 sql
和 actions
属性,我们可以以声明的方式定义规则的业务逻辑。其中,sql
定义了针对预定义流运行的 SQL 查询,这将转换数据。然后,输出的数据可以通过 action
路由到多个位置。
最简单的规则 SQL 如 SELECT * FROM demo
。它有类似于 ANSI SQL 的语法,并且可以利用 eKuiper 运行时提供的丰富的运算符和函数。参见SQL获取更多 eKuiper SQL 的信息。
大部分的 SQL 子句都是定义逻辑的,除了负责指定流的 FROM
子句。在这个例子中,demo
是一个流。通过使用连接子句,可以有多个流或流/表。作为一个流引擎,一个规则中必须至少有一个流。
因此,这里的 SQL 查询语句实际上定义了两个部分。
在使用SQL规则之前,必须事先定义流。请查看streams了解详情。
动作部分定义了一个规则的输出行为。每个规则可以有多个动作。一个动作是一个 sink 连接器的实例。当定义动作时,键是 sink 连接器的类型名称,而值是其属性。
eKuiper 已经内置了丰富的 sink connector 类型,如 mqtt、rest 和 file 。用户也可以扩展更多的 sink 类型来用于规则动作中。每种水槽类型都有自己的属性集。更多细节,请查看sink。
从 eKuiper 1.6.0 开始, eKuiper 在规则模型中提供了图规则 API 作为创建规则的另一种方式。该属性以 JSON 格式定义了一个规则的 DAG。它很容易直接映射到 GUI 编辑器中的图形,并适合作为拖放用户界面的后端。下面是一个图形规则定义的例子。
{
"id": "rule1",
"name": "Test Condition",
"graph": {
"nodes": {
"demo": {
"type": "source",
"nodeType": "mqtt",
"props": {
"datasource": "devices/+/messages"
}
},
"humidityFilter": {
"type": "operator",
"nodeType": "filter",
"props": {
"expr": "humidity > 30"
}
},
"logfunc": {
"type": "operator",
"nodeType": "function",
"props": {
"expr": "log(temperature) as log_temperature"
}
},
"tempFilter": {
"type": "operator",
"nodeType": "filter",
"props": {
"expr": "log_temperature < 1.6"
}
},
"pick": {
"type": "operator",
"nodeType": "pick",
"props": {
"fields": ["log_temperature as temp", "humidity"]
}
},
"mqttout": {
"type": "sink",
"nodeType": "mqtt",
"props": {
"server": "tcp://${mqtt_srv}:1883",
"topic": "devices/result"
}
}
},
"topo": {
"sources": ["demo"],
"edges": {
"demo": ["humidityFilter"],
"humidityFilter": ["logfunc"],
"logfunc": ["tempFilter"],
"tempFilter": ["pick"],
"pick": ["mqttout"]
}
}
}
}
graph
属性是一个 json 结构,其中 nodes
定义图形中呈现的节点,topo
定义节点之间的边缘。节点类型可以是内置的节点类型,如窗口节点和过滤器节点等。它也可以是来自插件的用户定义的节点。请参考graph rule了解更多细节。
当前的选项包括:
选项名 | 类型和默认值 | 说明 |
---|---|---|
isEventTime | bool:false | 使用事件时间还是将时间用作事件的时间戳。 如果使用事件时间,则将从有效负载中提取时间戳。 必须通过 stream 定义指定时间戳记。 |
lateTolerance | int64:0 | 在使用事件时间窗口时,可能会出现元素延迟到达的情况。 LateTolerance 可以指定在删除元素之前可以延迟多少时间(单位为 ms)。 默认情况下,该值为0,表示后期元素将被删除。 |
concurrency | int: 1 | 一条规则运行时会根据 sql 语句分解成多个 plan 运行。该参数设置每个 plan 运行的线程数。该参数值大于1时,消息处理顺序可能无法保证。 |
bufferLength | int: 1024 | 指定每个 plan 可缓存消息数。若缓存消息数超过此限制,plan 将阻塞消息接收,直到缓存消息被消费使得缓存消息数目小于限制为止。此选项值越大,则消息吞吐能力越强,但是内存占用也会越多。 |
sendMetaToSink | bool:false | 指定是否将事件的元数据发送到目标。 如果为 true,则目标可以获取元数据信息。 |
sendError | bool: true | 指定是否将运行时错误发送到目标。如果为 true,则错误会在整个流中传递直到目标。否则,错误会被忽略,仅打印到日志中。 |
qos | int:0 | 指定流的 qos。 值为0对应最多一次; 1对应至少一次,2对应恰好一次。 如果 qos 大于0,将激活检查点机制以定期保存状态,以便可以从错误中恢复规则。 |
checkpointInterval | int:300000 | 指定触发检查点的时间间隔(单位为 ms)。 仅当 qos 大于0时才有效。 |
restartStrategy | 结构 | 指定规则运行失败后自动重新启动规则的策略。这可以帮助从可恢复的故障中回复,而无需手动操作。请查看规则重启策略了解详细的配置项目。 |
有关 qos
和 checkpointInterval
的详细信息,请查看状态和容错。
可以在 rules
下属的 etc/kuiper.yaml
中全局定义规则选项。 规则 json 中定义的选项将覆盖全局设置。
规则重启策略的配置项包括:
选项名 | 类型和默认值 | 说明 |
---|---|---|
attempts | int: 0 | 最大重试次数。如果设置为0,该规则将立即失败,不会进行重试。 |
delay | int: 1000 | 默认的重试间隔时间,以毫秒为单位。如果没有设置multiplier ,重试的时间间隔将固定为这个值。 |
maxDelay | int: 30000 | 重试的最大间隔时间,单位是毫秒。只有当multiplier 有设置时,从而使得每次重试的延迟都会增加时才会生效。 |
multiplier | float: 2 | 重试间隔时间的乘数。 |
jitterFactor | float: 0.1 | 添加或减去延迟的随机值系数,防止在同一时间重新启动多个规则。 |
这些选项的默认值定义于 etc/kuiper.yaml
配置文件,可通过修改该文件更改默认值。