In the eKuiper source code, there are built-in sinks and sinks in extension.
Users can directly use the built-in sinks in the standard eKuiper instance. The list of built-in sinks are:
We have developed some official sink plugins. These plugins can be found in eKuiper's source code and users need to build them manually. Please check each sink about how to build and use.
Additionally, these plugins have pre-built binaries for the mainstream cpu architecture such as AMD or ARM. The pre-built plugin hosted in https://packages.emqx.net/kuiper-plugins/$version/$os/sinks/$type_$arch.zip
. For example, to get tdengine sink for debian amd64, install it from https://packages.emqx.net/kuiper-plugins/1.4.4/debian/sinks/tdengine_amd64.zip
.
The list of predefined sink plugins:
v1.x
.v2.x
.By default, sinks append data to the external system. Some external system such as SQL DB is updatable which allows to update or delete data. Similar to lookup source, only a few sinks are "updatable" naturally. The sink must support insert, update and delete. The shipped updatable sinks include:
To activate the update feature, the sink must set the rowkindField
property to specify which field in the data represents to action to take. In the below example, rowkindField
is set to action
.
{"redis": {
"addr": "127.0.0.1:6379",
"dataType": "string",
"field": "id",
"rowkindField": "action",
"sendSingle": true
}}
The data ingested must have a field to indicate the update action. In the below example, the action
field is the action to perform. The actions could be insert
, update
, upsert
and delete
. The action implementation varies between sinks. Some sinks may perform the same action for insert, upsert and update.
{"action":"update", "id":5, "name":"abc"}
This message will update the data of id 5 to the new name.
Sinks are used to send processing results to external systems. There are situations where the external system is not available, especially in edge-to-cloud scenarios. For example, in a weak network scenario, the edge-to-cloud network connection may be disconnected and reconnected from time to time. Therefore, sinks provide caching capabilities to temporarily store data in case of recoverable errors and automatically resend the cached data after the error is recovered. Sink's cache can be divided into two levels of storage, namely memory and disk. The user can configure the number of memory cache entries and when the limit is exceeded, the new cache will be stored offline to disk. The cache will be stored in both memory and disk so that the cache capacity becomes larger; it will also continuously detect the failure state and resend without restarting the rule.
The storage location of the offline cache is determined by the storage configuration in etc/kuiper.yaml
, which defaults to sqlite. If the disk storage is sqlite, all caches will be saved to the data/cache.db
file. Each sink will have a unique sqlite table to hold the cache. The number of caches is added to the buffer length section of the sink's metrics.
Each sink can configure its own caching mechanism. The caching process is the same for each sink. If caching is enabled, all sink's events go through two phases: first, saving all content to the cache; then deleting the cache after receiving an ack.
There are two levels of configuration for the Sink cache. A global configuration in etc/kuiper.yaml
that defines the default behavior of all rules. There is also a rule sink level definition to override the default behavior.
etc/kuiper.yaml
.In the following example configuration of the rule, log sink has no cache-related options configured, so the global default configuration will be used; whereas mqtt sink performs its own caching policy configuration.
{
"id": "rule1",
"sql": "SELECT * FROM demo",
"actions": [{
"log": {},
"mqtt": {
"server": "tcp://127.0.0.1:1883",
"topic": "result/cache",
"qos": 0,
"enableCache": true,
"memoryCacheThreshold": 2048,
"maxDiskCache": 204800,
"bufferPageSize": 512,
"resendInterval": 10
}
}]
}
Like sources, actions also support configuration reuse. Users only need to create a yaml file with the same name as the target action in the sinks folder and write the configuration in the same form as the source.
For example, for the MQTT action scenario, the user can create the mqtt.yaml file in the sinks directory and write the following content
test:
qos: 1
server: "tcp://broker.emqx.io:1883"
When users need MQTT actions, in addition to the traditional configuration method, as shown below
{
"mqtt": {
"server": "tcp://broker.emqx.io:1883",
"topic": "devices/demo_001/messages/events/",
"protocolVersion": "3.1.1",
"qos": 1,
"clientId": "demo_001",
"username": "xyz.azure-devices.net/demo_001/?api-version=2018-06-30",
"password": "SharedAccessSignature sr=*******************",
"retained": false
}
}
Can also use the resourceId
reference form with the following configuration
{
"mqtt": {
"resourceId": "test",
"topic": "devices/demo_001/messages/events/",
"protocolVersion": "3.1.1",
"clientId": "demo_001",
"username": "xyz.azure-devices.net/demo_001/?api-version=2018-06-30",
"password": "SharedAccessSignature sr=*******************",
"retained": false
}
}