|
@@ -4,7 +4,7 @@
|
|
|
|
|
|
## 内置动作
|
|
## 内置动作
|
|
|
|
|
|
-用户可以直接使用标准 eKuiper 实例中的内置动作。内建动作的列表如下。
|
|
|
|
|
|
+用户可以直接使用标准 eKuiper 实例中的内置动作。内建动作的列表如下:
|
|
|
|
|
|
- [Mqtt sink](./builtin/mqtt.md):输出到外部 mqtt 服务。
|
|
- [Mqtt sink](./builtin/mqtt.md):输出到外部 mqtt 服务。
|
|
- [Neuron sink](./builtin/neuron.md):输出到本地的 Neuron 实例。
|
|
- [Neuron sink](./builtin/neuron.md):输出到本地的 Neuron 实例。
|
|
@@ -22,22 +22,23 @@
|
|
|
|
|
|
这些插件有预编译的二进制文件,用于主流cpu架构,如AMD或ARM。预编译的插件托管在 `https://packages.emqx.net/kuiper-plugins/$version/$os/sinks/$type_$arch.zip` 中。例如,要获得用于 debian amd64 的 tdengine 插件,请从 `https://packages.emqx.net/kuiper-plugins/1.4.4/debian/sinks/tdengine_amd64.zip` 安装。
|
|
这些插件有预编译的二进制文件,用于主流cpu架构,如AMD或ARM。预编译的插件托管在 `https://packages.emqx.net/kuiper-plugins/$version/$os/sinks/$type_$arch.zip` 中。例如,要获得用于 debian amd64 的 tdengine 插件,请从 `https://packages.emqx.net/kuiper-plugins/1.4.4/debian/sinks/tdengine_amd64.zip` 安装。
|
|
|
|
|
|
-预定义的动作插件列表。
|
|
|
|
|
|
+预定义的动作插件列表:
|
|
|
|
|
|
|
|
+- [SQL](./plugin/sql.md):写入 SQL。
|
|
- [InfluxDB sink](./plugin/influx.md): 写入 Influx DB `v1.x`。
|
|
- [InfluxDB sink](./plugin/influx.md): 写入 Influx DB `v1.x`。
|
|
- [InfluxDBV2 sink](./plugin/influx2.md): 写入 Influx DB `v2.x`。
|
|
- [InfluxDBV2 sink](./plugin/influx2.md): 写入 Influx DB `v2.x`。
|
|
-- [Tdengine sink](./plugin/tdengine.md): 写入 Tdengine 。
|
|
|
|
-- [Image sink](./plugin/image.md): 写入一个图像文件。仅用于处理二进制结果。
|
|
|
|
-- [Zero MQ sink](./plugin/zmq.md):输出到 Zero MQ 。
|
|
|
|
-- [Kafka sink](./plugin/kafka.md):输出到 Kafka 。
|
|
|
|
|
|
+- [Tdengine sink](./plugin/tdengine.md): 写入 Tdengine。
|
|
|
|
+- [Image sink](./plugin/image.md):写入一个图像文件。仅用于处理二进制结果。
|
|
|
|
+- [ZeroMQ sink](./plugin/zmq.md):输出到 ZeroMQ。
|
|
|
|
+- [Kafka sink](./plugin/kafka.md):输出到 Kafka。
|
|
|
|
|
|
## 更新
|
|
## 更新
|
|
|
|
|
|
-默认情况下,Sink 将数据附加到外部系统中。一些外部系统,如 SQL DB 本身是可更新的,允许更新或删除数据。与查找源类似,只有少数 Sink 是天然 "可更新 "的。可更新的 Sink 必须支持插入、更新和删除。产品自带的 Sink 种,可更新的包括:
|
|
|
|
|
|
+默认情况下,Sink 将数据附加到外部系统中。一些外部系统,如 SQL DB 本身是可更新的,允许更新或删除数据。与查找源类似,只有少数 Sink 是天然 "可更新 "的。可更新的 Sink 必须支持插入、更新和删除。产品自带的 Sink 中,可更新的包括:
|
|
|
|
|
|
-- Memory Sink
|
|
|
|
-- Redis Sink
|
|
|
|
-- SQL Sink
|
|
|
|
|
|
+- [Memory Sink](./builtin/memory.md)
|
|
|
|
+- [Redis Sink](./builtin/redis.md)
|
|
|
|
+- [SQL Sink](./plugin/sql.md)
|
|
|
|
|
|
为了激活更新功能,Sink 必须设置 `rowkindField` 属性,以指定数据中的哪个字段代表要采取的动作。在下面的例子中,`rowkindField` 被设置为 `action`。
|
|
为了激活更新功能,Sink 必须设置 `rowkindField` 属性,以指定数据中的哪个字段代表要采取的动作。在下面的例子中,`rowkindField` 被设置为 `action`。
|
|
|
|
|
|
@@ -115,10 +116,10 @@
|
|
|
|
|
|
### 流程
|
|
### 流程
|
|
|
|
|
|
-每个sink都可以配置自己的缓存机制。每个 sink 的缓存流程是相同的。如果启用了缓存,所有 sink 的事件都会经过两个阶段:首先是将所有内容保存到缓存中;然后在收到ack后删除缓存。
|
|
|
|
|
|
+每个 sink 都可以配置自己的缓存机制。每个 sink 的缓存流程是相同的。如果启用了缓存,所有 sink 的事件都会经过两个阶段:首先是将所有内容保存到缓存中;然后在收到 ack 后删除缓存。
|
|
|
|
|
|
- 错误检测:发送失败后,sink应该通过返回特定的错误类型来识别可恢复的失败(网络等),这将返回一个失败的ack,这样缓存就可以被保留下来。对于成功的发送或不可恢复的错误,将发送一个成功的 ack 来删除缓存。
|
|
- 错误检测:发送失败后,sink应该通过返回特定的错误类型来识别可恢复的失败(网络等),这将返回一个失败的ack,这样缓存就可以被保留下来。对于成功的发送或不可恢复的错误,将发送一个成功的 ack 来删除缓存。
|
|
-- 缓存机制:缓存将首先被保存在内存中。如果超过了内存的阈值,后面的缓存将被保存到磁盘中。一旦磁盘缓存超过磁盘存储阈值,缓存将开始rotate,即内存中最早的缓存将被丢弃,并加载磁盘中最早的缓存来代替。
|
|
|
|
|
|
+- 缓存机制:缓存将首先被保存在内存中。如果超过了内存的阈值,后面的缓存将被保存到磁盘中。一旦磁盘缓存超过磁盘存储阈值,缓存将开始 rotate,即内存中最早的缓存将被丢弃,并加载磁盘中最早的缓存来代替。
|
|
- 重发策略:目前缓存机制仅可运行在默认的同步模式中,如果有一条消息正在发送中,则会等待发送的结果以继续发送下个缓存数据。否则,当有新的数据到来时,发送缓存中的第一个数据以检测网络状况。如果发送成功,将按顺序链式发送所有内存和磁盘中的所有缓存。链式发送可定义一个发送间隔,防止形成消息风暴。
|
|
- 重发策略:目前缓存机制仅可运行在默认的同步模式中,如果有一条消息正在发送中,则会等待发送的结果以继续发送下个缓存数据。否则,当有新的数据到来时,发送缓存中的第一个数据以检测网络状况。如果发送成功,将按顺序链式发送所有内存和磁盘中的所有缓存。链式发送可定义一个发送间隔,防止形成消息风暴。
|
|
|
|
|
|
### 配置
|
|
### 配置
|
|
@@ -127,9 +128,9 @@ Sink 缓存的配置有两个层次。`etc/kuiper.yaml` 中的全局配置,定
|
|
|
|
|
|
- enableCache:是否启用sink cache。缓存存储配置遵循 `etc/kuiper.yaml` 中定义的元数据存储的配置。
|
|
- enableCache:是否启用sink cache。缓存存储配置遵循 `etc/kuiper.yaml` 中定义的元数据存储的配置。
|
|
- memoryCacheThreshold:要缓存在内存中的消息数量。出于性能方面的考虑,最早的缓存信息被存储在内存中,以便在故障恢复时立即重新发送。这里的数据会因为断电等故障而丢失。
|
|
- memoryCacheThreshold:要缓存在内存中的消息数量。出于性能方面的考虑,最早的缓存信息被存储在内存中,以便在故障恢复时立即重新发送。这里的数据会因为断电等故障而丢失。
|
|
-- maxDiskCache: 缓存在磁盘中的信息的最大数量。磁盘缓存是先进先出的。如果磁盘缓存满了,最早的一页信息将被加载到内存缓存中,取代旧的内存缓存。
|
|
|
|
-- bufferPageSize。缓冲页是批量读/写到磁盘的单位,以防止频繁的IO。如果页面未满,eKuiper 因硬件或软件错误而崩溃,最后未写入磁盘的页面将被丢失。
|
|
|
|
-- resendInterval: 故障恢复后重新发送信息的时间间隔,防止信息风暴。
|
|
|
|
|
|
+- maxDiskCache:缓存在磁盘中的信息的最大数量。磁盘缓存是先进先出的。如果磁盘缓存满了,最早的一页信息将被加载到内存缓存中,取代旧的内存缓存。
|
|
|
|
+- bufferPageSize:缓冲页是批量读/写到磁盘的单位,以防止频繁的IO。如果页面未满,eKuiper 因硬件或软件错误而崩溃,最后未写入磁盘的页面将被丢失。
|
|
|
|
+- resendInterval:故障恢复后重新发送信息的时间间隔,防止信息风暴。
|
|
- cleanCacheAtStop:是否在规则停止时清理所有缓存,以防止规则重新启动时对过期消息进行大量重发。如果不设置为true,一旦规则停止,内存缓存将被存储到磁盘中。否则,内存和磁盘规则会被清理掉。
|
|
- cleanCacheAtStop:是否在规则停止时清理所有缓存,以防止规则重新启动时对过期消息进行大量重发。如果不设置为true,一旦规则停止,内存缓存将被存储到磁盘中。否则,内存和磁盘规则会被清理掉。
|
|
|
|
|
|
在以下规则的示例配置中,log sink 没有配置缓存相关选项,因此将会采用全局默认配置;而 mqtt sink 进行了自身缓存策略的配置。
|
|
在以下规则的示例配置中,log sink 没有配置缓存相关选项,因此将会采用全局默认配置;而 mqtt sink 进行了自身缓存策略的配置。
|
|
@@ -155,7 +156,7 @@ Sink 缓存的配置有两个层次。`etc/kuiper.yaml` 中的全局配置,定
|
|
```
|
|
```
|
|
|
|
|
|
## 资源引用
|
|
## 资源引用
|
|
-
|
|
|
|
|
|
+
|
|
像源一样,动作也支持配置复用,用户只需要在 sinks 文件夹中创建与目标动作同名的 yaml 文件并按照源一样的形式写入配置。
|
|
像源一样,动作也支持配置复用,用户只需要在 sinks 文件夹中创建与目标动作同名的 yaml 文件并按照源一样的形式写入配置。
|
|
|
|
|
|
例如,针对 MQTT 动作场景, 用户可以在 sinks 目录下创建 mqtt.yaml 文件,并写入如下内容
|
|
例如,针对 MQTT 动作场景, 用户可以在 sinks 目录下创建 mqtt.yaml 文件,并写入如下内容
|