|
@@ -1,41 +1,42 @@
|
|
|
-# Sink Extension
|
|
|
+# Sink (目标) 扩展
|
|
|
|
|
|
-Sink feed data from Kuiper into external systems. Kuiper has built-in sink support for [MQTT broker](../rules/sinks/mqtt.md) and [log sink](../rules/sinks/logs.md). There are still needs to publish data to various external systems include messaging systems and database etc. Sink extension is presented to meet this requirement.
|
|
|
+Kuiper 可以将数据接收到外部系统。 Kuiper具有对 [MQTT 消息服务器](../rules/sinks/mqtt.md) 和 [日志目标](../rules/sinks/logs.md)的内置接收器支持。然而, 仍然需要将数据发布到各种外部系统,包括消息传递系统和数据库等。Sink (目标)扩展正是为了满足这一要求。
|
|
|
|
|
|
-## Developing
|
|
|
+## 开发
|
|
|
|
|
|
-### Develop a sink
|
|
|
+### 开发 Sink (目标)
|
|
|
|
|
|
-To develop a sink for Kuiper is to implement [api.Sink](../../../xstream/api/stream.go) interface and export it as a golang plugin.
|
|
|
+为 Kuiper 开发 Sink (目标),是实现 [api.Sink](../../../xstream/api/stream.go) 接口并将其导出为 golang 插件。
|
|
|
|
|
|
-Before starting the development, you must [setup the environment for golang plugin](overview.md#setup-the-plugin-developing-environment).
|
|
|
+在开始开发之前,您必须为 [golang 插件设置环境](overview.md#setup-the-plugin-developing-environment)。
|
|
|
|
|
|
-To develop a sink, the _Configure_ method must be implemented. This method will be called once the sink is initialized. In this method, a map that contains the configuration in the [rule actions definition](../rules/overview.md#actions) is passed in. Typically, there will be information such as host, port, user and password of the external system. You can use this map to initialize this sink.
|
|
|
+要开发 Sink (目标),必须实现 _Configure_ 方法。 接收器初始化后,将调用此方法。 在此方法中,将传入包含 [规则操作定义](../rules/overview.md#actions)中的配置映射,通常,将包含诸如外部系统的主机、端口、用户和密码之类的信息。您可以使用此映射来初始化此 Sink (目标)。
|
|
|
|
|
|
```go
|
|
|
//Called during initialization. Configure the sink with the properties from action definition
|
|
|
Configure(props map[string]interface{}) error
|
|
|
```
|
|
|
-The next task is to implement _open_ method. The implementation should be synchronized to create a connection to the external system. A context parameter is provided to retrieve the context information, logger and rule meta information.
|
|
|
+下一个任务是实现 _open_ 方法。 该实现应和创建到外部系统的连接同步。 提供了上下文参数以检索上下文信息、日志和规则元信息。
|
|
|
+
|
|
|
```go
|
|
|
//Should be sync function for normal case. The container will run it in go func
|
|
|
Open(ctx StreamContext) error
|
|
|
-```
|
|
|
+```
|
|
|
|
|
|
-The main task for a Sink is to implement _collect_ method. The function will be invoked when Kuiper feed any data into the sink. As an infinite stream, this function will be invoked continuously. The task of this function is to publish data to the external system. The first parameter is the context, and the second parameter is the data received from Kuiper.
|
|
|
+Sink (目标)的主要任务是实现 _collect_ 方法。 当 Kuiper 将任何数据输入 Sink (目标)时,将调用该函数。 作为无限流,此函数将被连续调用。 此功能的任务是将数据发布到外部系统。 第一个参数是上下文,第二个参数是从 Kuiper 接收的数据。
|
|
|
|
|
|
```go
|
|
|
//Called when each row of data has transferred to this sink
|
|
|
Collect(ctx StreamContext, data interface{}) error
|
|
|
-```
|
|
|
+```
|
|
|
|
|
|
-The last method to implement is _Close_ which literally close the connection. It is called when the stream is about to terminate. You could also do any clean up work in this function.
|
|
|
+最后要实现的方法是 _Close_ ,它实际上关闭了连接。 当流即将终止时调用它。 您也可以在此函数中执行任何清理工作。
|
|
|
|
|
|
```go
|
|
|
Close(ctx StreamContext) error
|
|
|
```
|
|
|
|
|
|
-As the sink itself is a plugin, it must be in the main package. Given the sink struct name is mySink. At last of the file, the sink must be exported as a symbol as below. There are [2 types of exported symbol supported](overview.md#plugin-development). For sink extension, states are usually needed, so it is recommended to export a constructor function.
|
|
|
+由于 Sink (目标)本身是一个插件,因此它必须位于主程序包中。 给定 Sink (目标)结构名称为 mySink。 在文件的最后,必须将 Sink (目标)导出为以下符号。 共有 [2种类型的导出符号](overview.md#plugin-development)。 对于 Sink (目标)扩展,通常需要状态,因此建议导出构造函数。
|
|
|
|
|
|
```go
|
|
|
func MySink() api.Sink {
|
|
@@ -43,24 +44,25 @@ func MySink() api.Sink {
|
|
|
}
|
|
|
```
|
|
|
|
|
|
-The [Memory Sink](../../../plugins/sinks/memory.go) is a good example.
|
|
|
+[Memory Sink](../../../plugins/sinks/memory.go) 是一个很好的示例。
|
|
|
|
|
|
-### Package the sink
|
|
|
-Build the implemented sink as a go plugin and make sure the output so file resides in the plugins/sinks folder.
|
|
|
+### 将 Sink (目标)打包
|
|
|
+将实现的 Sink (目标)构建为 go 插件,并确保输出的 so 文件位于 plugins/sinks 文件夹中。
|
|
|
|
|
|
```bash
|
|
|
go build --buildmode=plugin -o plugins/sinks/MySink.so plugins/sinks/my_sink.go
|
|
|
```
|
|
|
|
|
|
-### Usage
|
|
|
+### 使用
|
|
|
+
|
|
|
+自定义 Sink (目标)在 [动作定义](../rules/overview.md#actions)规定。 它的名称用作操作的键, 配置就是值。
|
|
|
|
|
|
-The customized sink is specified in a [actions definition](../rules/overview.md#actions). Its name is used as the key of the action. The configuration is the value.
|
|
|
+如果您开发了 Sink (目标)实现 MySink,则应该具有:
|
|
|
+1. 在插件文件中,将导出符号 MySink。
|
|
|
+2. 编译的 MySink.so 文件位于 _plugins/sinks_ 内部
|
|
|
|
|
|
-If you have developed a sink implementation MySink, you should have:
|
|
|
-1. In the plugin file, symbol MySink is exported.
|
|
|
-2. The compiled MySink.so file is located inside _plugins/sinks_
|
|
|
+要使用它,请在规则定义内定义动作 mySink:
|
|
|
|
|
|
-To use it, define the action mySink inside a rule definition:
|
|
|
```json
|
|
|
{
|
|
|
"id": "rule1",
|
|
@@ -75,4 +77,4 @@ To use it, define the action mySink inside a rule definition:
|
|
|
]
|
|
|
}
|
|
|
```
|
|
|
-Whereas, _mySink_ is a key of the actions. The value of mySink is the properties for that sink.
|
|
|
+而 _mySink_ 是动作的键。 mySink 的值是该 Sink (目标)的属性。
|