在 eKuiper 源代码中,有内置的动作和扩展的动作。
用户可以直接使用标准 eKuiper 实例中的内置动作。内建动作的列表如下。
我们已经开发了一些官方的动作插件。这些插件可以在 eKuiper 的源代码中找到,用户需要手动构建它们。详细信息请查看每个动作的构建和使用方法。
这些插件有预编译的二进制文件,用于主流cpu架构,如AMD或ARM。预编译的插件托管在 https://packages.emqx.net/kuiper-plugins/$version/$os/sinks/$type_$arch.zip
中。例如,要获得用于 debian amd64 的 tdengine 插件,请从 https://packages.emqx.net/kuiper-plugins/1.4.4/debian/sinks/tdengine_amd64.zip
安装。
预定义的动作插件列表。
v1.x
。v2.x
。默认情况下,Sink 将数据附加到外部系统中。一些外部系统,如 SQL DB 本身是可更新的,允许更新或删除数据。与查找源类似,只有少数 Sink 是天然 "可更新 "的。可更新的 Sink 必须支持插入、更新和删除。产品自带的 Sink 种,可更新的包括:
为了激活更新功能,Sink 必须设置 rowkindField
属性,以指定数据中的哪个字段代表要采取的动作。在下面的例子中,rowkindField
被设置为 action
。
{"redis": {
"addr": "127.0.0.1:6379",
"dataType": "string",
"field": "id",
"rowkindField": "action",
"sendSingle": true
}}
流入的数据必须有一个字段来表示更新的动作。在下面的例子中,action
字段是要执行的动作。动作可以是插入、更新、 upsert 和删除。不同的 sink 的动作实现是不同的。有些 sink 可能对插入、upsert 和更新执行相同的动作。
{"action":"update", "id":5, "name":"abc"}
这条信息将把id 为 5的数据更新为新的名字。
动作用于将处理结果发送到外部系统中,存在外部系统不可用的情况,特别是在从边到云的场景中。例如,在弱网情况下,边到云的网络连接可能会不时断开和重连。因此,动作提供了缓存功能,用于在发送错误的情况下暂存数据,并在错误恢复之后自动重发缓存数据。动作的缓存可分为内存和磁盘的两级存储。用户可配置内存缓存条数,超过上限后,新的缓存将离线存储到磁盘中。缓存将同时保存在内存和磁盘中,这样缓存的容量就变得更大了;它还将持续检测故障恢复状态,并在不重新启动规则的情况下重新发送。
离线缓存的保存位置根据 etc/kuiper.yaml
里的 store 配置决定,默认为 sqlite 。如果磁盘存储是sqlite,所有的缓存将被保存到data/cache.db
文件。每个 sink 将有一个唯一的 sqlite 表来保存缓存。缓存的计数添加到 sink 的 指标中的 buffer length 部分。
每个sink都可以配置自己的缓存机制。每个 sink 的缓存流程是相同的。如果启用了缓存,所有 sink 的事件都会经过两个阶段:首先是将所有内容保存到缓存中;然后在收到ack后删除缓存。
Sink 缓存的配置有两个层次。etc/kuiper.yaml
中的全局配置,定义所有规则的默认行为。还有一个规则 sink 层的定义,用来覆盖默认行为。
etc/kuiper.yaml
中定义的元数据存储的配置。在以下规则的示例配置中,log sink 没有配置缓存相关选项,因此将会采用全局默认配置;而 mqtt sink 进行了自身缓存策略的配置。
{
"id": "rule1",
"sql": "SELECT * FROM demo",
"actions": [{
"log": {},
"mqtt": {
"server": "tcp://127.0.0.1:1883",
"topic": "result/cache",
"qos": 0,
"enableCache": true,
"memoryCacheThreshold": 2048,
"maxDiskCache": 204800,
"bufferPageSize": 512,
"resendInterval": 10
}
}]
}
像源一样,动作也支持配置复用,用户只需要在 sinks 文件夹中创建与目标动作同名的 yaml 文件并按照源一样的形式写入配置。
例如,针对 MQTT 动作场景, 用户可以在 sinks 目录下创建 mqtt.yaml 文件,并写入如下内容
test:
qos: 1
server: "tcp://broker.emqx.io:1883"
当用户需要 MQTT 动作时,除了采用传统的配置方式,如下所示
{
"mqtt": {
"server": "tcp://broker.emqx.io:1883",
"topic": "devices/demo_001/messages/events/",
"protocolVersion": "3.1.1",
"qos": 1,
"clientId": "demo_001",
"username": "xyz.azure-devices.net/demo_001/?api-version=2018-06-30",
"password": "SharedAccessSignature sr=*******************",
"retained": false
}
}
还可以通过 resourceId
引用形式,采用如下的配置
{
"mqtt": {
"resourceId": "test",
"topic": "devices/demo_001/messages/events/",
"protocolVersion": "3.1.1",
"clientId": "demo_001",
"username": "xyz.azure-devices.net/demo_001/?api-version=2018-06-30",
"password": "SharedAccessSignature sr=*******************",
"retained": false
}
}