plugins_tutorial.md 9.0 KB

Kuiper插件教程

EMQ X Kuiper - 基于 SQL 的轻量级流式数据处理软件提供了一套插件机制用于实现自定义源(source),目标(sink)以及SQL函数(function)以扩展流处理功能。本教程详细介绍了Kuiper插件的开发编译和部署过程。

概览

Kuiper插件机制基于Go语言的插件机制,使用户可以构建松散耦合的插件程序,在运行时动态加载和绑定。同时,由于GO语言插件系统的限制,Kuiper插件的编译和使用也有相应的限制:

  • 插件不支持windows系统
  • 插件编译环境要求跟Kuiper编译环境尽量一致,包括但不限于
    • 相同的GO版本
    • 插件与Kuiper自身依赖的相同包版本必须完全一致,包括Kuiper自身
    • 插件与Kuiper编译环境的GOPATH必须完全一致

这些限制较为苛刻,几乎要求插件和Kuiper在同一台机器编译运行,经常导致开发环境编译出的插件无法在生产Kuiper上使用。本文详细介绍了一种切实可用的插件开发环境设置和流程,推荐给Kuiper开发者使用。插件的开发和使用一般有如下流程:

  • 开发
    • 创建并开发插件项目
    • 编译调试插件
  • 部署
    • 编译生产环境可用插件
    • 部署插件到生产环境

插件开发

插件开发一般在开发环境中进行。在开发环境调试运行通过后再部署到生产环境中。

创建并开发插件项目

Kuiper项目源代码的plugins目录下有一些插件范例。用户自定义的插件也可以在Kuiper项目中开发。但是为了便于代码管理,一般应当在Kuiper项目之外另建项目开发自定义插件。插件项目建议使用Go module,典型的项目目录如下图所示:

plugin_project
  sources         //源(source)插件源代码目录
    mysource.go
  sinks           //目标(sink)插件源代码目录
    mysink.go
  functions       //函数(function)插件源代码目录
    myfunction.go
  target          //编译结果目录     
  go.mod          //go module文件

插件开发需要扩展Kuiper内的接口,因此必须依赖于Kuiper项目。最简单的go.mod也需要包含对Kuiper的依赖。典型的go.mod如下:

module samplePlugin

go 1.13

require (
	github.com/emqx/kuiper v0.0.0-20200323140757-60d00241372b
)

Kuiper插件有三种类型,源代码可放入对应的目录中。插件开发的详细方法请参看EMQ X Kuiper 扩展。本文以目标(sink)为例,介绍插件的开发部署过程。我们将开发一个最基本的MySql目标,用于将流输出写入到MySql数据库中。

  • 新建名为samplePlugin的插件项目,采用上文的目录结构
  • 在sinks目录下,新建mysql.go文件
  • 编辑mysql.go文件以实现插件
    • 实现api.Sink接口
    • 导出Symbol:Mysql
  • 编辑go.mod, 添加mysql驱动模块

mysql.go 完整代码如下

package main

import (
	"database/sql"
	"fmt"
	"github.com/emqx/kuiper/xstream/api"
	_ "github.com/go-sql-driver/mysql"
)

type mysqlSink struct {
	url       string
	table     string

	db        *sql.DB
}

func (m *mysqlSink) Configure(props map[string]interface{}) error {
	if i, ok := props["url"]; ok {
		if i, ok := i.(string); ok {
			m.url = i
		}
	}
	if i, ok := props["table"]; ok {
		if i, ok := i.(string); ok {
			m.table = i
		}
	}
	return nil
}

func (m *mysqlSink) Open(ctx api.StreamContext) (err error) {
	logger := ctx.GetLogger()
	logger.Debug("Opening mysql sink")
	m.db, err = sql.Open("mysql", m.url)
	return
}

func (m *mysqlSink) Collect(ctx api.StreamContext, item interface{}) error {
	logger := ctx.GetLogger()
	if v, ok := item.([]byte); ok {
		logger.Debugf("mysql sink receive %s", item)
		sql := fmt.Sprintf("INSERT INTO %s (`name`) VALUES ('%s')", m.table, v)
		logger.Debugf(sql)
		insert, err := m.db.Query(sql)
		if err != nil {
			return err
		}
		defer insert.Close()
	} else {
		logger.Debug("mysql sink receive non byte data")
	}
	return nil
}

func (m *mysqlSink) Close(ctx api.StreamContext) error {
	if m.db != nil{
		m.db.Close()
	}
	return nil
}

var Mysql mysqlSink

go.mod 完整代码如下

module samplePlugin

go 1.13

require (
	github.com/emqx/kuiper v0.0.0-20200323140757-60d00241372b
	github.com/go-sql-driver/mysql v1.5.0
)

编译调试插件

编译插件应当与编译Kuiper的环境一致。在开发环境中,典型的用法是在本地下载并编译Kuiper和插件,然后在本地Kuiper上调试插件功能;也可以在Kuiper的docker容器中编译插件,并用Kuiper容器运行调试。

本地编译

开发者可以在本地自行编译Kuiper和插件进行调试。其步骤如下:

  1. 下载Kuiper源代码 git clone https://github.com/emqx/kuiper.git
  2. 编译Kuiper:在Kuiper目录下,运行make
  3. 编译插件:
    1. 在插件项目下,运行go mod edit -replace github.com/emqx/kuiper=$kuiperPath,使得Kuiper依赖指向本地Kuiper,请替换$kuiperPath到步骤1下载目录,下同。
    2. 编译插件so到Kuiper插件目录下 go go build --buildmode=plugin -o $kuiperPath/_build/$build/plugins/sinks/Mysql@v1.0.0.so sinks/mysql.go

Docker编译

从0.3.0版本开始,Kuiper提供了开发版本docker image。与运行版本相比,开发版提供了go开发环境,使得用户可以在与运行版相同的环境中编译出可在运行版中使用的插件。Docker中编译步骤如下:

  1. 运行Kuiper开发版本docker。需要把本地插件目录mount到docker里的目录中,这样才能在docker中访问插件项目并编译。笔者的插件项目位于本地/var/git目录。下面的命令中,我们把本地的/var/git目录映射到docker内的/home目录中。 go docker run -d --name kuiper-dev --mount type=bind,source=/var/git,target=/home emqx/kuiper:0.3.0-dev
  2. 在docker环境中编译插件,其原理与本地编译一致。编译出的插件置于插件项目的target目录中

    -- In host
    # docker exec -it kuiper-dev /bin/sh
        
    -- In docker instance
    # cd /home/samplePlugin
    # go mod edit -replace github.com/emqx/kuiper=/go/kuiper
    # go build --buildmode=plugin -o /home/samplePlugin/target/plugins/sinks/Mysql@v1.0.0.so sinks/mysql.go
    

调试运行插件

在本地或Docker中启动Kuiper,创建流和规则,规则的action设置为mysql即可对自定义的mysql sink插件进行测试。创建流和规则的步骤请参考Kuiper文档。以下提供一个使用了mysql插件的规则供参考。

{
  "id": "ruleTest",
  "sql": "SELECT * from demo",
  "actions": [
    {
      "log": {},
      "mysql":{
        "url": "user:test@tcp(localhost:3307)/user",
        "table": "test"
      }
    }
  ]
}

需要注意的是,插件重新编译后需要重启Kuiper才能载入新的版本。

插件部署

Kuiper生产环境和开发环境如果不同,开发的插件需要重新编译并部署到生产环境。假设生产环境采用Kuiper docker进行部署,本节将描述如何部署插件到生产环境中。

插件编译

插件原则上应该与生产环境Kuiper采用相同环境进行编译。假设生产环境为Kuiper docker,则应当采用与生产环境相同版本的dev docker环境编译插件。例如,生产环境采用emqx/kuiper:0.3.0的docker image,则插件需要在emqx/kuiper:0.3.0-dev的环境中进行编译。

编译过程请参考Docker编译

插件部署

可以采用REST API或者CLI进行插件管理。下文以REST API为例,将上一节编译的插件部署到生产环境中。

  1. 插件打包并放到http服务器。将上一节编译好的插件.so文件及默认配置文件(只有source需要).yaml文件一起打包到一个.zip文件中,假设为mysqlSink.zip。把该文件放置到生产环境也可访问的http服务器中。
  2. 使用REST API创建插件: ``` POST http://{$production_kuiper_ip}:9081/plugins/sinks Content-Type: application/json

{"name":"mysql","file":"http://{$http_server_ip}/plugins/sinks/mysqlSink.zip"} ```

  1. 验证插件是否创建成功 GET http://{$production_kuiper_ip}:9081/plugins/sinks/mysql 返回 json { "name": "mysql", "version": "1.0.0" }

至此,插件部署成功。可以创建带有mysql sink的规则进行验证。