EMQ X Kuiper - 基于 SQL 的轻量级流式数据处理软件提供了一套插件机制用于实现自定义源(source),目标(sink)以及 SQL 函数(function)以扩展流处理功能。本教程详细介绍了Kuiper插件的开发编译和部署过程。
Kuiper 插件机制基于 Go 语言的插件机制,使用户可以构建松散耦合的插件程序,在运行时动态加载和绑定。同时,由于 GO 语言插件系统的限制, Kuiper 插件的编译和使用也有相应的限制:
这些限制较为苛刻,几乎要求插件和 Kuiper 在同一台机器编译运行,经常导致开发环境编译出的插件无法在生产 Kuiper 上使用。本文详细介绍了一种切实可用的插件开发环境设置和流程,推荐给 Kuiper 插件开发者使用。插件的开发和使用一般有如下流程:
插件开发一般在开发环境中进行。在开发环境调试运行通过后再部署到生产环境中。
Kuiper 项目源代码的 plugins 目录下有一些插件范例。用户自定义的插件也可以在 Kuiper 项目中开发。但是为了便于代码管理,一般应当在 Kuiper 项目之外另建项目开发自定义插件。插件项目建议使用 Go module,典型的项目目录如下图所示:
plugin_project
sources //源(source)插件源代码目录
mysource.go
sinks //目标(sink)插件源代码目录
mysink.go
functions //函数(function)插件源代码目录
myfunction.go
target //编译结果目录
go.mod //go module文件
插件开发需要扩展 Kuiper 内的接口,因此必须依赖于 Kuiper 项目。最简单的 go.mod 也需要包含对 Kuiper 的依赖。典型的 go.mod 如下:
module samplePlugin
go 1.13
require (
github.com/emqx/kuiper v0.0.0-20200323140757-60d00241372b
)
Kuiper 插件有三种类型,源代码可放入对应的目录中。插件开发的详细方法请参看 EMQ X Kuiper 扩展。本文以目标(sink)为例,介绍插件的开发部署过程。我们将开发一个最基本的 MySql 目标,用于将流输出写入到 MySql 数据库中。
mysql.go 完整代码如下
package main
// 该例子为简化样例,仅建议测试时使用
import (
"database/sql"
"fmt"
"github.com/emqx/kuiper/common"
"github.com/emqx/kuiper/xstream/api"
_ "github.com/go-sql-driver/mysql"
)
type mysqlConfig struct {
Url string `json:"url"`
Table string `json:"table"`
}
type mysqlSink struct {
conf *mysqlConfig
//数据库连接实例
db *sql.DB
}
func (m *mysqlSink) Configure(props map[string]interface{}) error {
cfg := &mysqlConfig{}
err := common.MapToStruct(props, cfg)
if err != nil {
return fmt.Errorf("read properties %v fail with error: %v", props, err)
}
if cfg.Url == ""{
return fmt.Errorf("property Url is required")
}
if cfg.Table == ""{
return fmt.Errorf("property Table is required")
}
return nil
}
func (m *mysqlSink) Open(ctx api.StreamContext) (err error) {
logger := ctx.GetLogger()
logger.Debug("Opening mysql sink")
m.db, err = sql.Open("mysql", m.conf.Url)
return
}
// 该函数为数据处理简化函数。
func (m *mysqlSink) Collect(ctx api.StreamContext, item interface{}) error {
logger := ctx.GetLogger()
if v, ok := item.([]byte); ok {
//TODO 生产环境中需要处理item unmarshall后的各种类型。
// 默认的类型为 []map[string]interface{}
// 如果sink的`dataTemplate`属性有设置,则可能为各种其他的类型
logger.Debugf("mysql sink receive %s", item)
//TODO 此处列名写死。生产环境中一般可从item中的键值对获取列名
sql := fmt.Sprintf("INSERT INTO %s (`name`) VALUES ('%s')", m.conf.Table, v)
logger.Debugf(sql)
insert, err := m.db.Query(sql)
if err != nil {
return err
}
defer insert.Close()
} else {
logger.Debug("mysql sink receive non byte data")
}
return nil
}
func (m *mysqlSink) Close(ctx api.StreamContext) error {
if m.db != nil {
return m.db.Close()
}
return nil
}
// export the constructor function to be used to instantiates the plugin
func Mysql() api.Sink {
return &mysqlSink{}
}
go.mod 完整代码如下
module samplePlugin
go 1.13
require (
github.com/emqx/kuiper v0.0.0-20200323140757-60d00241372b
github.com/go-sql-driver/mysql v1.5.0
)
编译插件应当与编译 Kuiper 的环境一致。在开发环境中,典型的用法是在本地下载并编译 Kuiper 和插件,然后在本地 Kuiper 上调试插件功能;也可以在 Kuiper 的 docker 容器中编译插件,并用 Kuiper 容器运行调试。
开发者可以在本地自行编译 Kuiper 和插件进行调试。其步骤如下:
git clone https://github.com/emqx/kuiper.git
make
go mod edit -replace github.com/emqx/kuiper=$kuiperPath
,使得 Kuiper 依赖指向本地 Kuiper,请替换$kuiperPath 到步骤1下载目录,下同。go
go build --buildmode=plugin -o $kuiperPath/_build/$build/plugins/sinks/Mysql@v1.0.0.so sinks/mysql.go
从0.3.0版本开始,Kuiper 提供了开发版本 docker 镜像。其中, 0.4.0及之后版本的开发镜像为x.x.x,例如kuiper:0.4.0
;而0.3.x版本的开发镜像名为x.x.x-dev,例如kuiper:0.3.0-dev
。与运行版本相比,开发版提供了 go 开发环境,使得用户可以在编译出在 Kuiper 正式发布版本中完全兼容的插件。Docker 中编译步骤如下:
/var/git
目录。下面的命令中,我们把本地的/var/git
目录映射到docker内的/home
目录中。
go
docker run -d --name kuiper-dev --mount type=bind,source=/var/git,target=/home emqx/kuiper:0.3.0-dev
在 docker 环境中编译插件,其原理与本地编译一致。编译出的插件置于插件项目的 target 目录中
-- In host
# docker exec -it kuiper-dev /bin/sh
-- In docker instance
# cd /home/samplePlugin
# go mod edit -replace github.com/emqx/kuiper=/go/kuiper
# go build --buildmode=plugin -o /home/samplePlugin/target/plugins/sinks/Mysql@v1.0.0.so sinks/mysql.go
在本地或 Docker 中启动 Kuiper,创建流和规则,规则的 action 设置为 mysql 即可对自定义的 mysql sink 插件进行测试。创建流和规则的步骤请参考 Kuiper 文档。以下提供一个使用了 mysql 插件的规则供参考。
{
"id": "ruleTest",
"sql": "SELECT * from demo",
"actions": [
{
"log": {},
"mysql":{
"url": "user:test@tcp(localhost:3307)/user",
"table": "test"
}
}
]
}
需要注意的是,插件重新编译后需要重启 Kuiper 才能载入新的版本。
Kuiper 生产环境和开发环境如果不同,开发的插件需要重新编译并部署到生产环境。假设生产环境采用 Kuiper docker 进行部署,本节将描述如何部署插件到生产环境中。
插件原则上应该与生产环境 Kuiper 采用相同环境进行编译。假设生产环境为 Kuiper docker,则应当采用与生产环境相同版本的 dev docker 环境编译插件。例如,生产环境采用 emqx/kuiper:0.3.0的 docker 镜像,则插件需要在emqx/kuiper:0.3.0-dev 的环境中进行编译。
编译过程请参考 Docker 编译。
可以采用 REST API 或者 CLI 进行插件管理。下文以 REST API 为例,将上一节编译的插件部署到生产环境中。
.so
文件及默认配置文件(只有 source 需要) .yaml
文件一起打包到一个 .zip
文件中,假设为 mysqlSink.zip
。把该文件放置到生产环境也可访问的 http 服务器中。
{"name":"mysql","file":"http://{$http_server_ip}/plugins/sinks/mysqlSink.zip"} ```
GET http://{$production_kuiper_ip}:9081/plugins/sinks/mysql
返回
json
{
"name": "mysql",
"version": "1.0.0"
}
至此,插件部署成功。可以创建带有 mysql sink 的规则进行验证。