|
@@ -1,67 +1,47 @@
|
|
-# Extensions
|
|
|
|
|
|
+# Extension
|
|
|
|
|
|
-Kuiper allows user to customize the different kinds of extensions.
|
|
|
|
-
|
|
|
|
-- The source extension is used for extending different stream source, such as consuming data from other message brokers. Kuiper has built-in source support for [MQTT broker](../rules/sources/mqtt.md).
|
|
|
|
-- Sink/Action extension is used for extending pub/push data to different targets, such as database, other message system, web interfaces or file systems. Built-in action support in Kuiper, see [MQTT](../rules/sinks/mqtt.md) & [log files](../rules/sinks/logs.md).
|
|
|
|
-- Functions extension allows user to extend different functions that used in SQL. Built-in functions supported in Kuiper, see [functions](../sqls/built-in_functions.md).
|
|
|
|
-
|
|
|
|
-Kuiper extensions are based on golang plugin system. The general steps to make extensions are:
|
|
|
|
-1. Create the plugin package that implements required source, sink or function interface.
|
|
|
|
-2. Compile the plugin into a _.so_ file, and put it into sources or sinks or functions folder under _plugins_ folder.
|
|
|
|
-
|
|
|
|
-Currently golang plugins are only supported on Linux and macOS which poses the same limitation for Kuiper extensions.
|
|
|
|
-
|
|
|
|
-## Naming
|
|
|
|
-
|
|
|
|
-We recommend plugin name to be camel case. Notice that, there are some restrictions for the names:
|
|
|
|
-1. The name of the export symbol of the plugin should be camel case with an **upper case first letter**. It must be the same as the plugin name except the first letter. For example, plugin name _file_ must export a symbol _File_.
|
|
|
|
-2. The name of _.so_ file must be the same as the export symbol name or the plugin name. For example, _MySource.so_ or _mySink.so_.
|
|
|
|
|
|
+Kuiper allows users to customize extension to support more functions. Users can write plugins for extension. They can also extend functions in SQL through configuration to call existing external REST or RPC services.
|
|
|
|
|
|
-### Version
|
|
|
|
|
|
+Extension by the use of plugin is more complex and requires users to write code and compile by themselves, which has a certain development cost. The scenarios used include:
|
|
|
|
|
|
-The user can **optionally** add a version string to the name of _.so_ to help identify the version of the plugin. The version can be then retrieved through describe CLI command or REST API. The naming convention is to add a version string to the name after _@_. The version can be any string. If the version string starts with "v", the "v" will be ignored in the return result. Below are some typical examples.
|
|
|
|
|
|
+- Need to extend the source or sink
|
|
|
|
+- With high performance requirements
|
|
|
|
|
|
-- _MySource@v1.0.0.so_ : version is 1.0.0
|
|
|
|
-- _MySource@20200331.so_: version is 20200331
|
|
|
|
|
|
+Extension by the use of external function requires only configuration, but it needs to be called through the network, which has a certain performance loss. The scenarios used include:
|
|
|
|
|
|
-If multiple versions of plugins with the same name in place, only the latest version(ordered by the version string) will be taken effect.
|
|
|
|
|
|
+- Call existing services, such as AI services provided by REST or grpc
|
|
|
|
+- Services that require flexible deployment
|
|
|
|
|
|
-## Setup the plugin developing environment
|
|
|
|
-It is required to build the plugin with exactly the same version of dependencies. And the plugin must implement interfaces exported by Kuiper, so the Kuiper project must be in the gopath.
|
|
|
|
|
|
+## Plugin extension
|
|
|
|
|
|
-A typical environment for developing plugins is to put the plugin and Kuiper in the same project. To set it up:
|
|
|
|
-1. Clone Kuiper project.
|
|
|
|
-2. Create the plugin implementation file inside plugins/sources or plugin/sinks or plugin/functions according to what extension type is developing.
|
|
|
|
-3. Build the file as plugin into the same folder. The build command is typically like:
|
|
|
|
-```bash
|
|
|
|
-go build -trimpath --buildmode=plugin -o plugins/sources/MySource.so plugins/sources/my_source.go
|
|
|
|
-```
|
|
|
|
|
|
+Kuiper allows user to customize the different kinds of extensions.
|
|
|
|
|
|
-Notice that, the `-trimpath` build flag is required if using the prebuilte kuiper or kuiper docker image because the kuiperd is also built with the flag to improve build reproducibility.
|
|
|
|
|
|
+- The source extension is used for extending different stream source, such as consuming data from other message brokers. Kuiper has built-in source support for [MQTT broker](../rules/sources/mqtt.md).
|
|
|
|
+- Sink/Action extension is used for extending pub/push data to different targets, such as database, other message system, web interfaces or file systems. Built-in action is supported in Kuiper, see [MQTT](../rules/sinks/mqtt.md) & [log files](../rules/sinks/logs.md).
|
|
|
|
+- Functions extension allows user to extend different functions that used in SQL. Built-in functions is supported in Kuiper, see [functions](../sqls/built-in_functions.md).
|
|
|
|
|
|
-### Plugin development
|
|
|
|
-The development of plugins is to implement a specific interface according to the plugin type and export the implementation with a specific name. There are two types of exported symbol supported:
|
|
|
|
|
|
+Please read the following to learn how to implement different extensions.
|
|
|
|
|
|
-1. Export a constructor function: Kuiper will use the constructor function to create a new instance of the plugin implementation for each load. So each rule will have one instance of the plugin and each instance will be isolated from others. This is the recommended way.
|
|
|
|
|
|
+- [Source extension](./source.md)
|
|
|
|
+- [Sink/Action extension](./sink.md)
|
|
|
|
+- [Function extension](./function.md)
|
|
|
|
|
|
-2. Export an instance: Kuiper will use the instance as singleton for all plugin load. So all rules will share the same instance. For such implementation, the developer will need to handle the shared states to avoid any potential multi-thread problems. This mode is recommended where there are no shared states and the performance is critical. Especially, function extension is usually functional without internal state which is suitable for this mode.
|
|
|
|
|
|
+## Naming
|
|
|
|
|
|
-Please read below for how to realize the different extensions.
|
|
|
|
|
|
+We recommend plugin name to be camel case. Notice that, there are some restrictions for the names:
|
|
|
|
|
|
-- [Source extension](source.md)
|
|
|
|
-- [Sink/Action extension](sink.md)
|
|
|
|
-- [Function extension](function.md)
|
|
|
|
|
|
+1. The name of the export symbol of the plugin should be camel case with an **upper case first letter**. It must be the same as the plugin name except the first letter. For example, plugin name _file_ must export a export symbol name _File_ .
|
|
|
|
+2. The name of _.so_ file must be the same as the export symbol name or the plugin name. For example, _MySource.so_ or _mySink.so_.
|
|
|
|
|
|
-### State Storage
|
|
|
|
|
|
+### State storage
|
|
|
|
|
|
-Kuiper extensions export a key value state storage interface for Source/Sink/Function through the context.
|
|
|
|
|
|
+Kuiper extension exposes a key value state storage interface through the context parameter, which can be used for all types of extensions, including Source/Sink/Function extensions.
|
|
|
|
|
|
-States are key-value pairs, where the key is a string and the value is arbitrary data. Keys are scoped to an individual extension.
|
|
|
|
|
|
+States are key-value pairs, where the key is a string and the value is arbitrary data. Keys are scoped the to current extended instance.
|
|
|
|
|
|
-You can access states within extensions using the putState, getState, incrCounter, getCounter and deleteState calls on the context object.
|
|
|
|
|
|
+Users can access the state storage through the context object. State-related methods include putState, getState, incrCounter, getCounter and deleteState.
|
|
|
|
|
|
-Below is an example of a function extension to access states. It will record the accumulate word count across a range of function calls.
|
|
|
|
|
|
+Below is an example of a function extension to access states. This function will count the number of words passed in and save the cumulative number in the state.
|
|
|
|
|
|
```go
|
|
```go
|
|
func (f *accumulateWordCountFunc) Exec(args []interface{}, ctx api.FunctionContext) (interface{}, bool) {
|
|
func (f *accumulateWordCountFunc) Exec(args []interface{}, ctx api.FunctionContext) (interface{}, bool) {
|
|
@@ -78,58 +58,21 @@ func (f *accumulateWordCountFunc) Exec(args []interface{}, ctx api.FunctionConte
|
|
}
|
|
}
|
|
```
|
|
```
|
|
|
|
|
|
-The state storage API includes
|
|
|
|
|
|
+### Runtime dependencies
|
|
|
|
+
|
|
|
|
+Some plugin may need to access dependencies in the file system. Those files is put under {{kuiperPath}}/etc/{{pluginType}}/{{pluginName}} directory. When packaging the plugin, put those files in [etc directory](../restapi/plugins.md#plugin-file-format). After installation, they will be moved to the recommended place.
|
|
|
|
+
|
|
|
|
+In the plugin source code, developers can access the dependencies of file system by getting the Kuiper root path from the context:
|
|
|
|
|
|
```go
|
|
```go
|
|
-/**
|
|
|
|
- * Increase the builtin distributed counter referred by key
|
|
|
|
- * @param key The name of the key
|
|
|
|
- * @param amount The amount to be incremented
|
|
|
|
- * @return error if any
|
|
|
|
- */
|
|
|
|
-IncrCounter(key string, amount int) error
|
|
|
|
-/**
|
|
|
|
- * Retrieve the counter value by key
|
|
|
|
- * @param key The name of the key
|
|
|
|
- * @return the counter value
|
|
|
|
- * @return error if any
|
|
|
|
- */
|
|
|
|
-GetCounter(key string) (int, error)
|
|
|
|
-/**
|
|
|
|
- * Set or update the state value for the key.
|
|
|
|
- *
|
|
|
|
- * @param key name of the key
|
|
|
|
- * @param value state value of the key
|
|
|
|
- * @return error if any
|
|
|
|
- */
|
|
|
|
-PutState(key string, value interface{}) error
|
|
|
|
-/**
|
|
|
|
- * Retrieve the state value for the key.
|
|
|
|
- *
|
|
|
|
- * @param key name of the key
|
|
|
|
- * @return the state value
|
|
|
|
- * @return error if any
|
|
|
|
- */
|
|
|
|
-GetState(key string) (interface{}, error)
|
|
|
|
-/**
|
|
|
|
- * Delete the state value for the key.
|
|
|
|
- *
|
|
|
|
- * @param key name of the key
|
|
|
|
- * @return error if any
|
|
|
|
- */
|
|
|
|
-DeleteState(key string) error
|
|
|
|
|
|
+ctx.GetRootPath()
|
|
```
|
|
```
|
|
|
|
|
|
-#### State data type
|
|
|
|
-
|
|
|
|
-The state can be any type. If the rule [checkpoint mechanism](../rules/state_and_fault_tolerance.md) is enabled, the state will be serialized by [golang gob](https://golang.org/pkg/encoding/gob/). So it is required to be gob compatibile. For custom data type, register the type by ``gob.Register(value interface{})`` .
|
|
|
|
|
|
+## External function extension
|
|
|
|
|
|
-### Runtime dependencies
|
|
|
|
|
|
+A configuration method is provided that Kuiper can use SQL to directly call external services in a functional manner, including various rpc services, http services, and so on. This method will greatly improve the ease of Kuiper extensions. External functions will be used as a supplement to the plugin system, and plugins are only recommended for high performance requirements.
|
|
|
|
|
|
-Some plugin may need to access dependencies in the file system. It is recommended to put those files under {{kuiperPath}}/etc/{{pluginType}}/{{pluginName}} directory. When packaging the plugin, put those files in [etc directory](../restapi/plugins.md#plugin-file-format). After installation, they will be moved to the recommended place.
|
|
|
|
|
|
+Take the getFeature function as an example, and suppose an AI service provides getFeature service based on grpc. After Kuiper is configured, you can use the method of `SELECT getFeature(self) from demo` to call the AI service without customizing the plugin.
|
|
|
|
|
|
-In the plugin source code, developers can access the file system by getting the Kuiper root path from the context:
|
|
|
|
|
|
+For detailed configuration method, please refer to [External function](external_func.md).
|
|
|
|
|
|
-```go
|
|
|
|
-ctx.GetRootPath()
|
|
|
|
-```
|
|
|