Quellcode durchsuchen

Merge pull request #56 from emqx/master

Sync develop branch
jinfahua vor 5 Jahren
Ursprung
Commit
0a39c5fca1

+ 2 - 2
Makefile

@@ -87,14 +87,14 @@ cross_build: cross_prepare
 
 
 .PHONY: docker
 .PHONY: docker
 docker:
 docker:
-	docker build --no-cache -t $(TARGET):$(VERSION) -f .
+	docker build --no-cache -t $(TARGET):$(VERSION) -f deploy/docker/Dockerfile .
 
 
 .PHONY:cross_docker
 .PHONY:cross_docker
 cross_docker: cross_prepare
 cross_docker: cross_prepare
 	docker buildx build --no-cache \
 	docker buildx build --no-cache \
 	--platform=linux/amd64,linux/arm64,linux/arm/v7,linux/386,linux/ppc64le \
 	--platform=linux/amd64,linux/arm64,linux/arm/v7,linux/386,linux/ppc64le \
 	-t $(TARGET):$(VERSION) \
 	-t $(TARGET):$(VERSION) \
-	-f docker/Dockerfile . \
+	-f deploy/docker/Dockerfile . \
 	--push
 	--push
 
 
 .PHONY: clean
 .PHONY: clean

Datei-Diff unterdrückt, da er zu groß ist
+ 86 - 0
README-CN.md


Datei-Diff unterdrückt, da er zu groß ist
+ 62 - 45
README.md


+ 14 - 0
common/templates/funcs.go

@@ -0,0 +1,14 @@
+package templates
+
+import (
+	"encoding/json"
+)
+
+//Use the name json in func map
+func JsonMarshal(v interface {}) (string, error) {
+	if a, err := json.Marshal(v); err != nil{
+		return "", err
+	}else{
+		return string(a), nil
+	}
+}

+ 11 - 10
common/util.go

@@ -3,7 +3,6 @@ package common
 import (
 import (
 	"bytes"
 	"bytes"
 	"encoding/json"
 	"encoding/json"
-	"flag"
 	"fmt"
 	"fmt"
 	"github.com/go-yaml/yaml"
 	"github.com/go-yaml/yaml"
 	"github.com/patrickmn/go-cache"
 	"github.com/patrickmn/go-cache"
@@ -72,14 +71,16 @@ type XStreamConf struct {
 }
 }
 
 
 var StreamConf = "kuiper.yaml"
 var StreamConf = "kuiper.yaml"
-var kpbase = flag.String("kuiper_base", "", "Specify Kuiper base directory")
-
+const KuiperBaseKey = "KuiperBaseKey"
 func init(){
 func init(){
 	Log = logrus.New()
 	Log = logrus.New()
 	Log.SetFormatter(&logrus.TextFormatter{
 	Log.SetFormatter(&logrus.TextFormatter{
 		DisableColors: true,
 		DisableColors: true,
 		FullTimestamp: true,
 		FullTimestamp: true,
 	})
 	})
+}
+
+func InitConf() {
 	b, err := LoadConf(StreamConf)
 	b, err := LoadConf(StreamConf)
 	if err != nil {
 	if err != nil {
 		Log.Fatal(err)
 		Log.Fatal(err)
@@ -90,8 +91,8 @@ func init(){
 	}
 	}
 
 
 	if c, ok := cfg["basic"]; !ok{
 	if c, ok := cfg["basic"]; !ok{
-		Log.Fatal("no basic config in kuiper.yaml")
-	}else{
+		Log.Fatal("No basic config in kuiper.yaml")
+	} else {
 		Config = &c
 		Config = &c
 	}
 	}
 
 
@@ -107,7 +108,7 @@ func init(){
 		} else {
 		} else {
 			Log.Infof("Failed to log to file, using default stderr")
 			Log.Infof("Failed to log to file, using default stderr")
 		}
 		}
-	}else{
+	} else {
 		Log.SetLevel(logrus.DebugLevel)
 		Log.SetLevel(logrus.DebugLevel)
 	}
 	}
 }
 }
@@ -231,10 +232,10 @@ func GetLoc(subdir string)(string, error) {
 		return "", err
 		return "", err
 	}
 	}
 
 
-	//flag.Parse()
-	//if loc := *kpbase; loc != "" {
-	//	dir = loc
-	//}
+	if base := os.Getenv(KuiperBaseKey); base != "" {
+		Log.Infof("Specified Kuiper base folder at location %s.\n", base)
+		dir = base
+	}
 
 
 	confDir := dir + subdir
 	confDir := dir + subdir
 	if _, err := os.Stat(confDir); os.IsNotExist(err) {
 	if _, err := os.Stat(confDir); os.IsNotExist(err) {

+ 22 - 0
deploy/chart/kuiper/.helmignore

@@ -0,0 +1,22 @@
+# Patterns to ignore when building packages.
+# This supports shell glob matching, relative path matching, and
+# negation (prefixed with !). Only one pattern per line.
+.DS_Store
+# Common VCS dirs
+.git/
+.gitignore
+.bzr/
+.bzrignore
+.hg/
+.hgignore
+.svn/
+# Common backup files
+*.swp
+*.bak
+*.tmp
+*~
+# Various IDEs
+.project
+.idea/
+*.tmproj
+.vscode/

+ 21 - 0
deploy/chart/kuiper/Chart.yaml

@@ -0,0 +1,21 @@
+apiVersion: v2
+name: kuiper
+description: A lightweight IoT edge analytic software
+
+# A chart can be either an 'application' or a 'library' chart.
+#
+# Application charts are a collection of templates that can be packaged into versioned archives
+# to be deployed.
+#
+# Library charts provide useful utilities or functions for the chart developer. They're included as
+# a dependency of application charts to inject those utilities and functions into the rendering
+# pipeline. Library charts do not define any templates and therefore cannot be deployed.
+type: application
+
+# This is the chart version. This version number should be incremented each time you make changes
+# to the chart and its templates, including the app version.
+version: 0.0.4
+
+# This is the version number of the application being deployed. This version number should be
+# incremented each time you make changes to the application.
+appVersion: 0.0.4

+ 206 - 0
deploy/chart/kuiper/README.md

@@ -0,0 +1,206 @@
+Kuiper can be deployed at k3s/k8s cluster through Helm chart. Below takes k3s as an example for demonstrating how to deploy at k3s.
+
+## Prepare
+
++ Install K3S
+  ```shell
+  $ curl -sfL https://get.k3s.io | sh -
+  $ sudo chmod 755 /etc/rancher/k3s/k3s.yaml
+  $ kubectl get nodes
+  NAME               STATUS   ROLES    AGE     VERSION
+  ip-172-31-16-120   Ready    master   4m31s   v1.16.3-k3s.2
+  ```
+
++ Install helm3
+  ```shell
+  $ curl -sfL https://raw.githubusercontent.com/helm/helm/master/scripts/get-helm-3 | bash -
+  Downloading https://get.helm.sh/helm-v3.0.1-linux-amd64.tar.gz
+  Preparing to install helm into /usr/local/bin
+  helm installed into /usr/local/bin/helm
+  
+  ## K8S can skip this step
+  $ export KUBECONFIG=/etc/rancher/k3s/k3s.yaml
+  ```
+
+## Get Kuiper Helm Chart
+
++ With any approaches in below to get Kuiper Helm Chart, here uses `git clone` to get Helm chart.
+
+  + Git clone
+
+    ```shell
+    $ git clone https://github.com/emqx/kuiper
+    $ cd kuiper/deploy/chart/kuiper
+    ```
+
+  + Helm repo (TODO)
+
+    + Add Helm repo
+
+      ```shell
+      $ helm repo add emqx https://repos.emqx.io/charts
+      ```
+
+    + Search Kuiper
+
+      ```shell
+      helm search kuiper
+      NAME     		CHART VERSION	APP VERSION  	DESCRIPTION
+      emqx/kuiper	0.0.3	        0.0.3	        A lightweight IoT edge analytic software
+      ```
+
++ By edit  `values.yaml` file or use command `helm install --set` to edit ``Kuiper Helm Chart`` configurations.
+
+  ##### Kuiper Helm Chart Configurations 
+
+  | Parameters                     | Descriptions                                         | Default Value            |
+  | ------------------------------ | ---------------------------------------------------- | ------------------------ |
+  | `replicaCount`                 | Deployed Kuiper instance number                      | 1                        |
+  | `image.repository`             | Docker image name                                    | emqx/kuiper              |
+  | `image.pullPolicy`             | Pull policy                                          | IfNotPresent             |
+  | `persistence.enabled`          | Enable PVC                                           | false                    |
+  | `persistence.storageClass`     | Storage class name                                   | `nil`                    |
+  | `persistence.existingClaim`    | PV name                                              | ""                       |
+  | `persistence.accessMode`       | PVC access mode                                      | ReadWriteOnce            |
+  | `persistence.size`             | PVC size                                             | 20Mi                     |
+  | `resources`                    | CPU/Memory                                           | {}                       |
+  | `nodeSelector`                 | Node selector                                        | {}                       |
+  | `tolerations`                  | Tolerations                                          | []                       |
+  | `affinity`                     | Affinity                                             | {}                       |
+  | `mqtt.servers`                 | MQTT broker address                                  | `[tcp://127.0.0.1:1883]` |
+  | `mqtt.qos`                     | QoS of message subscription                          | 1                        |
+  | `mqtt.sharedSubscription`      | Use shared subscription or not                       | true                     |
+  | `mqtt.username`                | MQTT connection user name                            |                          |
+  | `mqtt.password`                | MQTT connection password                             |                          |
+  | `mqtt.certificationSecretName` | Secret resource name created for certification file. |                          |
+  | `mqtt.privateKeySecretName`    | Secret resource name created fro private key file    |                          |
+  | `mqtt.certificationPath`       | Certification path for MQTT connection               |                          |
+  | `mqtt.privateKeyPath`          | Private key path for MQTT connection                 |                          |
+
+## Deploy Kuiper through Helm
+
+#### Deploy Kuiper quickly
+
++ Deploy Kuiper through Helm
+
+  ```shell
+  $ helm install my-kuiper .
+  NAME: my-kuiper
+  LAST DEPLOYED: Mon Dec  9 09:56:32 2019
+  NAMESPACE: default
+  STATUS: deployed
+  REVISION: 1
+  TEST SUITE: None
+  ```
+
++ Deployment is successful
+
+  ```shell
+  $ kubectl get pods
+  NAME       READY   STATUS    RESTARTS   AGE
+  my-kuiper-0   1/1     Running   0          19s
+  
+  $ kubectl exec -it  my-kuiper-0 sh
+  /kuiper # ./bin/cli
+  Connecting to 127.0.0.1:20498...
+  ```
+
+#### Deploy persisted Kuiper
+
++ Kuiper realized persisted  `pods` through creating PVC resources and mount `/kuiper/data` directory. **Before deploying Kuiper, user need to create PVC or Storage Classes resource in Kubernetes.**
+
++ Open and edit `values.yaml` file, set  `persistence.enabled=true`
+
+  + If user deploys PVC resource, , then set`persistence.existingClaim=your_pv_name`
+  + If user deploys Storage Classes resource, then set `persistence.storageClass=your_storageClass_name`
+
++ Deploy Kuiper through Helm 
+
+  ```
+  $ helm install my-kuiper .
+  NAME: my-kuiper
+  LAST DEPLOYED: Mon Dec  9 09:56:32 2019
+  NAMESPACE: default
+  STATUS: deployed
+  REVISION: 1
+  TEST SUITE: None
+  ```
+
++ Deployment is successful
+
+  ```shell
+  $ kubectl get pods
+  NAME       READY   STATUS    RESTARTS   AGE
+  my-kuiper-0   1/1     Running   0          19s
+  
+  $ kubectl exec -it  my-kuiper-0 sh
+  /kuiper # ./bin/cli
+  Connecting to 127.0.0.1:20498...
+  ```
+
+#### Deploy Kuiper and using SSL certification and key
+
++ Use command `kubectl create secret` , create certification & private keys to ``Secret resources``, the usage of command `kubectl create secret`  is listed as in below:
+
+  ```shell
+  $ kubectl create secret generic your-secret-name --from-file=/path/to/file
+  ```
+
+  Create Secret resource for certification file: 
+
+  ```shell
+  $ kubectl create secret generic client-cert --from-file=certs/client-cert.pem
+  ```
+
+  Create Secret for private key file: 
+
+  ```shell
+  $ kubectl create secret generic client-key --from-file=certs/client-key.pem
+  ```
+
+  Review Secret resources
+
+  ```shell
+  $ kubectl get secret
+  NAME                                         TYPE                                  DATA   AGE
+  client-cert                                  Opaque                                1      25m
+  client-key                                   Opaque                                1      24m
+  ```
+
++ Open and edit `values.yaml` file
+
+  + Set `mqtt.certificationSecretName` certification Secret resource: `mqtt.certificationSecretName: client-cert`
+  + Set `mqtt.privateKeySecretName` private key Secret resource:`mqtt.privateKeySecretName: client-key`
+  + Set certification file path: `mqtt.certificationPath: /var/kuiper/certificate.pem`
+  + Set private key file path: `mqtt.privateKeyPath: /var/kuiper/private.pem.key`
+
++ Deploy Kuiper through Helm 
+
+  ```shell
+  $ helm install my-kuiper .
+  NAME: my-kuiper
+  LAST DEPLOYED: Mon Dec  9 09:56:32 2019
+  NAMESPACE: default
+  STATUS: deployed
+  REVISION: 1
+  TEST SUITE: None
+  ```
+
++ Deployment is successful
+
+  ```shell
+  $ kubectl get pods
+  NAME       READY   STATUS    RESTARTS   AGE
+  my-kuiper-0   1/1     Running   0          19s
+  
+  $ kubectl exec -it my-kuiper-0 -- ls -al /var/kuiper
+  total 8
+  drwxr-xr-x    4 root     root          4096 Dec 10 02:23 .
+  drwxr-xr-x    1 root     root          4096 Dec 10 02:23 ..
+  drwxrwxrwt    3 root     root           100 Dec 10 02:23 certificate.pem
+  drwxrwxrwt    3 root     root           100 Dec 10 02:23 private.pem.key
+  
+  $ kubectl exec -it  my-kuiper-0 sh
+  /kuiper # ./bin/cli
+  Connecting to 127.0.0.1:20498...
+  ```

+ 206 - 0
deploy/chart/kuiper/README_zh.md

@@ -0,0 +1,206 @@
+Kuiper 可以通过 Helm chart 部署在 k3s / k8s 集群上。下面以 k3s 为例演示如何部署 Kuiper:
+
+## Prepare:
+
++ 安装 K3S: 
+  ```shell
+  $ curl -sfL https://get.k3s.io | sh -
+  $ sudo chmod 644 /etc/rancher/k3s/k3s.yaml
+  $ kubectl get nodes
+  NAME               STATUS   ROLES    AGE     VERSION
+  ip-172-31-16-120   Ready    master   4m31s   v1.16.3-k3s.2
+  ```
+
++ 安装 helm3
+  ```shell
+  $ curl -sfL https://raw.githubusercontent.com/helm/helm/master/scripts/get-helm-3 | bash -
+  Downloading https://get.helm.sh/helm-v3.0.1-linux-amd64.tar.gz
+  Preparing to install helm into /usr/local/bin
+  helm installed into /usr/local/bin/helm
+  
+  ## K8S 可以跳过这一步
+  $ export KUBECONFIG=/etc/rancher/k3s/k3s.yaml
+  ```
+
+## 获取 Kuiper Helm Chart
+
++ 可以通过一下两种方法的任意一种获取 Kuiper Helm Chart,本文以使用 `git clone` 拉取代码的方式为例讲解。
+
+  + Git clone
+
+    ```shell
+    $ git clone https://github.com/emqx/kuiper
+    $ cd kuiper/deploy/chart/kuiper
+    ```
+
+  + Helm repo
+
+    + 添加Helm repo
+
+      ```shell
+      $ helm repo add emqx https://repos.emqx.io/charts
+      ```
+
+    + 查询 Kuiper
+
+      ```shell
+      $ helm search repo kuiper
+      NAME       	CHART VERSION	APP VERSION	DESCRIPTION
+      emqx/kuiper	0.0.4        	0.0.4      	A lightweight IoT edge analytic software
+      ```
+
++ 可以通过编辑 `values.yaml` 文件或使用 `helm install --set` 命令编辑 Kuiper Helm Chart 的配置
+
+  ##### Kuiper Helm Chart 配置项
+
+  | 参数                           | 描述                                | Default Value            |
+  | ------------------------------ | ----------------------------------- | ------------------------ |
+  | `replicaCount`                 | 部署kuiper数量                      | 1                        |
+  | `image.repository`             | 拉取镜像名称                        | emqx/kuiper              |
+  | `image.pullPolicy`             | 拉取镜像策略                        | IfNotPresent             |
+  | `persistence.enabled`          | 是否启用 PVC                        | false                    |
+  | `persistence.storageClass`     | Storage class 名称                  | `nil`                    |
+  | `persistence.existingClaim`    | PV 名称                             | ""                       |
+  | `persistence.accessMode`       | PVC 访问模式                        | ReadWriteOnce            |
+  | `persistence.size`             | PVC 容量                            | 20Mi                     |
+  | `resources`                    | CPU/内存资源                        | {}                       |
+  | `nodeSelector`                 | 节点选择                            | {}                       |
+  | `tolerations`                  | 污点容忍                            | []                       |
+  | `affinity`                     | 节点亲和性                          | {}                       |
+  | `mqtt.servers`                 | mqtt服务器的代理地址                | `[tcp://127.0.0.1:1883]` |
+  | `mqtt.qos`                     | 消息转发的服务质量                  | 1                        |
+  | `mqtt.sharedSubscription`      | 是否使用共享订阅                    | true                     |
+  | `mqtt.username`                | 连接用户名                          |                          |
+  | `mqtt.password`                | 连接密码                            |                          |
+  | `mqtt.certificationSecretName` | 通过证书文件创建的 Secre 资源的名字 |                          |
+  | `mqtt.privateKeySecretName`    | 通过私钥文件创建的 Secre 资源的名字 |                          |
+  | `mqtt.certificationPath`       | 证书路径。必须是绝对路径。          |                          |
+  | `mqtt.privateKeyPath`          | 私钥路径。必须绝对路径。            |                          |
+
+## 通过 Helm 部署 Kuiper
+
+#### 快速部署Kuiper
+
++ 使用 Helm 部署 Kuiper
+
+  ```shell
+  $ helm install my-kuiper .
+  NAME: my-kuiper
+  LAST DEPLOYED: Mon Dec  9 09:56:32 2019
+  NAMESPACE: default
+  STATUS: deployed
+  REVISION: 1
+  TEST SUITE: None
+  ```
+
++ 部署成功
+
+  ```shell
+  $ kubectl get pods
+  NAME       READY   STATUS    RESTARTS   AGE
+  my-kuiper-0   1/1     Running   0          19s
+
+  $ kubectl exec -it  my-kuiper-0 sh
+  /kuiper # ./bin/cli
+  Connecting to 127.0.0.1:20498...
+  ```
+
+#### 部署持久化的 Kuiper
+
++ Kuiper 通过 创建 PVC 资源挂载 `/kuiper/data` 目录实现持久化 `pods`,**在部署 Kuiper 之前,用户需要自行在 Kubernetes 中创建 PVC 资源或 Storage Classes 资源**
+
++ 编辑 `values.yaml` 文件,设置 `persistence.enabled=true`
+
+  + 如果用户部署了 PVC 资源,那么设置 `persistence.existingClaim=your_pv_name`
+  + 如果用户部署了 Storage Classes 资源,那么设置`persistence.storageClass=your_storageClass_name`
+
++ 使用 Helm 部署 Kuiper
+
+  ```shell
+  $ helm install my-kuiper .
+  NAME: my-kuiper
+  LAST DEPLOYED: Mon Dec  9 09:56:32 2019
+  NAMESPACE: default
+  STATUS: deployed
+  REVISION: 1
+  TEST SUITE: None
+  ```
+
++ 部署成功
+
+  ```shell
+  $ kubectl get pods
+  NAME       READY   STATUS    RESTARTS   AGE
+  my-kuiper-0   1/1     Running   0          19s
+  
+  $ kubectl exec -it  my-kuiper-0 sh
+  /kuiper # ./bin/cli
+  Connecting to 127.0.0.1:20498...
+  ```
+
+#### 部署Kuiper并使用证书
+
++ 使用 `kubectl create secret` 将证书文件和私钥创建成 Secret 资源,`kubectl create secret` 命令的语法如下:
+
+  ```shell
+  $ kubectl create secret generic your-secret-name --from-file=/path/to/file
+  ```
+
+  创建证书文件 Secret 资源:
+
+  ```shell
+  $ kubectl create secret generic client-cert --from-file=certs/client-cert.pem
+  ```
+
+  创建私钥文件 Secret 资源:
+
+  ```shell
+  $ kubectl create secret generic client-key --from-file=certs/client-key.pem
+  ```
+
+  查看 Secret 资源:
+
+  ```shell
+  $ kubectl get secret
+  NAME                                         TYPE                                  DATA   AGE
+  client-cert                                  Opaque                                1      25m
+  client-key                                   Opaque                                1      24m
+  ```
+
++ 编辑 `values.yaml` 文件
+
+  + 设置 `mqtt.certificationSecretName` 为证书文件 Secret 资源: `mqtt.certificationSecretName: client-cert`
+  + 设置 `mqtt.privateKeySecretName` 为私钥文件 Secret 资源:`mqtt.privateKeySecretName: client-key`
+  + 设置证书文件部署路径:`mqtt.certificationPath: /var/kuiper/certificate.pem`
+  + 设置私钥文件部署路径:`mqtt.privateKeyPath: /var/kuiper/private.pem.key`
+
++ 使用 Helm 部署 Kuiper
+
+  ```shell
+  $ helm install my-kuiper .
+  NAME: my-kuiper
+  LAST DEPLOYED: Mon Dec  9 09:56:32 2019
+  NAMESPACE: default
+  STATUS: deployed
+  REVISION: 1
+  TEST SUITE: None
+  ```
+
++ 部署成功
+
+  ```shell
+  $ kubectl get pods
+  NAME       READY   STATUS    RESTARTS   AGE
+  my-kuiper-0   1/1     Running   0          19s
+
+  $ kubectl exec -it my-kuiper-0 -- ls -al /var/kuiper
+  total 8
+  drwxr-xr-x    4 root     root          4096 Dec 10 02:23 .
+  drwxr-xr-x    1 root     root          4096 Dec 10 02:23 ..
+  drwxrwxrwt    3 root     root           100 Dec 10 02:23 certificate.pem
+  drwxrwxrwt    3 root     root           100 Dec 10 02:23 private.pem.key
+
+  $ kubectl exec -it  my-kuiper-0 sh
+  /kuiper # ./bin/cli
+  Connecting to 127.0.0.1:20498...
+  ```

+ 101 - 0
deploy/chart/kuiper/templates/StatefulSet.yaml

@@ -0,0 +1,101 @@
+apiVersion: apps/v1
+kind: StatefulSet
+metadata:
+  name: {{ include "kuiper.fullname" . }}
+  namespace: {{ .Release.Namespace }}
+  labels:
+    {{- include "kuiper.labels" . | nindent 4 }}
+spec:
+  replicas: {{ .Values.replicaCount }}
+  serviceName: {{ include "kuiper.fullname" . }}-headless
+  {{- if and .Values.persistence.enabled (not .Values.persistence.existingClaim) }}
+  volumeClaimTemplates:
+    - metadata:
+        name: kuiper-data
+        namespace: {{ .Release.Namespace }}
+        labels:
+          {{- include "kuiper.labels" . | nindent 4 }}
+        annotations:
+        {{- if .Values.persistence.storageClass }}
+          volume.beta.kubernetes.io/storage-class: {{ .Values.persistence.storageClass | quote }}
+        {{- else }}
+          volume.alpha.kubernetes.io/storage-class: default
+        {{- end }}
+      spec:
+        accessModes:
+          - {{ .Values.persistence.accessMode | quote }}
+        resources:
+         requests:
+           storage: {{ .Values.persistence.size | quote }}
+  {{- end }}
+  selector:
+    matchLabels:
+      {{- include "kuiper.selectorLabels" . | nindent 6 }}
+  template:
+    metadata:
+      labels:
+        {{- include "kuiper.selectorLabels" . | nindent 8 }}
+    spec:
+      # securityContext:
+      #   fsGroup: 1000
+      volumes:
+      {{- if not .Values.persistence.enabled }}
+      - name: kuiper-data
+        emptyDir: {}
+      {{- else if .Values.persistence.existingClaim }}
+      - name: kuiper-data
+        persistentVolumeClaim:
+        {{- with .Values.persistence.existingClaim }}
+          claimName: {{ tpl . $ }}
+        {{- end }}
+      {{- end }}
+      - name: mqtt
+        configMap:
+          name: {{ include "kuiper.fullname" . }}
+          items:
+          - key: mqtt.yaml
+            path: mqtt.yaml
+      {{- if .Values.mqtt.certificationSecretName }}
+      - name: kuiper-certification
+        secret:
+          secretName: {{ .Values.mqtt.certificationSecretName }}
+      {{- end }}
+      {{- if .Values.mqtt.privateKeySecretName }}
+      - name: kuiper-private-key
+        secret:
+          secretName: {{ .Values.mqtt.privateKeySecretName }}
+      {{- end }}
+      containers:
+        - name: kuiper
+          image: "{{ .Values.image.repository }}:{{ .Chart.AppVersion }}"
+          imagePullPolicy: {{ .Values.image.pullPolicy }}
+          volumeMounts:
+          - name: kuiper-data
+            mountPath: "/kuiper/data"
+          - name: mqtt
+            mountPath: "/kuiper/etc/sources/mqtt.yaml"
+            subPath: "mqtt.yaml"
+          {{ if .Values.mqtt.certificationSecretName  }}
+          - name: kuiper-certification
+            mountPath: {{ .Values.mqtt.certificationPath | default "/var/kuiper/certificate.pem" }}
+            readOnly: true
+          {{ end }}
+          {{ if .Values.mqtt.privateKeySecretName  }}
+          - name: kuiper-private-key
+            mountPath: {{ .Values.mqtt.privateKeyPath | default "/var/kuiper/private.pem.key" }}
+            readOnly: true
+          {{ end }}
+          resources:
+          {{- toYaml .Values.resources | nindent 12 }}
+    {{- with .Values.nodeSelector }}
+      nodeSelector:
+        {{- toYaml . | nindent 8 }}
+      {{- end }}
+    {{- with .Values.affinity }}
+      affinity:
+        {{- toYaml . | nindent 8 }}
+    {{- end }}
+    {{- with .Values.tolerations }}
+      tolerations:
+        {{- toYaml . | nindent 8 }}
+    {{- end }}

+ 63 - 0
deploy/chart/kuiper/templates/_helpers.tpl

@@ -0,0 +1,63 @@
+{{/* vim: set filetype=mustache: */}}
+{{/*
+Expand the name of the chart.
+*/}}
+{{- define "kuiper.name" -}}
+{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" -}}
+{{- end -}}
+
+{{/*
+Create a default fully qualified app name.
+We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec).
+If release name contains chart name it will be used as a full name.
+*/}}
+{{- define "kuiper.fullname" -}}
+{{- if .Values.fullnameOverride -}}
+{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" -}}
+{{- else -}}
+{{- $name := default .Chart.Name .Values.nameOverride -}}
+{{- if contains $name .Release.Name -}}
+{{- .Release.Name | trunc 63 | trimSuffix "-" -}}
+{{- else -}}
+{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" -}}
+{{- end -}}
+{{- end -}}
+{{- end -}}
+
+{{/*
+Create chart name and version as used by the chart label.
+*/}}
+{{- define "kuiper.chart" -}}
+{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" -}}
+{{- end -}}
+
+{{/*
+Common labels
+*/}}
+{{- define "kuiper.labels" -}}
+helm.sh/chart: {{ include "kuiper.chart" . }}
+{{ include "kuiper.selectorLabels" . }}
+{{- if .Chart.AppVersion }}
+app.kubernetes.io/version: {{ .Chart.AppVersion | quote }}
+{{- end }}
+app.kubernetes.io/managed-by: {{ .Release.Service }}
+{{- end -}}
+
+{{/*
+Selector labels
+*/}}
+{{- define "kuiper.selectorLabels" -}}
+app.kubernetes.io/name: {{ include "kuiper.name" . }}
+app.kubernetes.io/instance: {{ .Release.Name }}
+{{- end -}}
+
+{{/*
+Create the name of the service account to use
+*/}}
+{{- define "kuiper.serviceAccountName" -}}
+{{- if .Values.serviceAccount.create -}}
+    {{ default (include "kuiper.fullname" .) .Values.serviceAccount.name }}
+{{- else -}}
+    {{ default "default" .Values.serviceAccount.name }}
+{{- end -}}
+{{- end -}}

+ 39 - 0
deploy/chart/kuiper/templates/configmap.yaml

@@ -0,0 +1,39 @@
+apiVersion: v1
+kind: ConfigMap
+metadata:
+  name: {{ include "kuiper.fullname" . }}
+  namespace: {{ .Release.Namespace }}
+  labels:
+    {{- include "kuiper.labels" . | nindent 4 }}
+data:
+    "random.yaml": |
+      default:
+        interval: 1000
+        pattern:
+          count: 50
+      ext:
+        interval: 300
+        seed: 1
+        pattern:
+          count: 50
+    "zmq.yaml": |
+      #Global Zmq configurations
+      default:
+        server: tcp://127.0.0.1:5563
+    "mqtt.yaml": |
+      #Global MQTT configurations
+      default:
+        {{- toYaml .Values.mqtt | nindent 8 }}
+      #Override the global configurations
+      demo_conf: #Conf_key
+        qos: 0
+        servers: [tcp://10.211.55.6:1883, tcp://127.0.0.1]
+    "client.yaml": |
+      basic:
+        host: 127.0.0.1
+        port: 20498
+    "kuiper.yaml": |
+      basic:
+        # true|false, with debug level, it prints more debug info
+        debug: false
+        port: 20498

+ 13 - 0
deploy/chart/kuiper/templates/sevice.yaml

@@ -0,0 +1,13 @@
+apiVersion: v1
+kind: Service
+metadata:
+  name: {{ include "kuiper.fullname" . }}-headless
+  namespace: {{ .Release.Namespace }}
+  labels:
+    {{- include "kuiper.labels" . | nindent 4 }}
+spec:
+  type: ClusterIP
+  sessionAffinity: None
+  clusterIP: None
+  selector:
+    {{- include "kuiper.selectorLabels" . | nindent 4 }}

+ 51 - 0
deploy/chart/kuiper/values.yaml

@@ -0,0 +1,51 @@
+# Default values for kuiper.
+# This is a YAML-formatted file.
+# Declare variables to be passed into your templates.
+
+replicaCount: 1
+
+image:
+  repository: emqx/kuiper
+  pullPolicy: IfNotPresent
+
+persistence:
+  enabled: false
+  size: 20Mi
+  ## If defined, volume.beta.kubernetes.io/storage-class: <storageClass>
+  ## Default: volume.alpha.kubernetes.io/storage-class: default
+  # storageClass: "-"
+  accessMode: ReadWriteOnce
+  ## Existing PersistentVolumeClaims
+  ## The value is evaluated as a template
+  ## So, for example, the name can depend on .Release or .Chart
+  # existingClaim: ""
+
+resources: {}
+  # We usually recommend not to specify default resources and to leave this as a conscious
+  # choice for the user. This also increases chances charts run on environments with little
+  # resources, such as Minikube. If you do want to specify resources, uncomment the following
+  # lines, adjust them as necessary, and remove the curly braces after 'resources:'.
+  # limits:
+  #   cpu: 100m
+  #   memory: 128Mi
+  # requests:
+  #   cpu: 100m
+  #   memory: 128Mi
+
+nodeSelector: {}
+
+tolerations: []
+
+affinity: {}
+
+mqtt:
+  #Global MQTT configurations
+  qos: 1
+  sharedSubscription: true
+  servers: [tcp://127.0.0.1:1883]
+  #username: user1
+  #password: password
+  certificationSecretName: client-cert
+  privateKeySecretName: client-key
+  #certificationPath: /var/kuiper/certificate.pem
+  #privateKeyPath: /var/kuiper/private.pem.key

+ 1 - 1
docker/Dockerfile

@@ -9,7 +9,7 @@ RUN apk add upx gcc make git libc-dev binutils-gold && make
 FROM alpine:3.10
 FROM alpine:3.10
 
 
 COPY --from=builder /go/kuiper/_build/kuiper-* /kuiper/
 COPY --from=builder /go/kuiper/_build/kuiper-* /kuiper/
-COPY ./docker/docker-entrypoint.sh /usr/bin/docker-entrypoint.sh
+COPY ./deploy/docker/docker-entrypoint.sh /usr/bin/docker-entrypoint.sh
 
 
 RUN apk add sed
 RUN apk add sed
 
 

Datei-Diff unterdrückt, da er zu groß ist
+ 164 - 0
deploy/docker/README.md


+ 2 - 2
docker/docker-entrypoint.sh

@@ -7,11 +7,11 @@ fi
 
 
 KUIPER_HOME="/kuiper"
 KUIPER_HOME="/kuiper"
 
 
-CONFIG="$KUIPER_HOME/etc/sources/mqtt.yaml"
+CONFIG="$KUIPER_HOME/etc/mqtt_source.yaml"
 
 
 if [ ! -z "$MQTT_BROKER_ADDRESS" ]; then
 if [ ! -z "$MQTT_BROKER_ADDRESS" ]; then
     sed -i '/default:/ ,/servers/{/servers/d}' $CONFIG
     sed -i '/default:/ ,/servers/{/servers/d}' $CONFIG
-    sed -i "/default:/a\  servers: [tcp://$MQTT_BROKER_ADDRESS]" $CONFIG
+    sed -i "/default:/a\  servers: [$MQTT_BROKER_ADDRESS]" $CONFIG
     echo "default.servers = $MQTT_BROKER_ADDRESS"
     echo "default.servers = $MQTT_BROKER_ADDRESS"
 fi
 fi
 
 

+ 0 - 83
docker/README.md

@@ -1,83 +0,0 @@
-# `Dockerfile` links
-
-- [emqx/kuiper](https://github.com/emqx/kuiper/blob/master/docker/Dockerfile)
-
-# Quick reference
-
-- **Where to get help**:
-
-  Web: https://github.com/emqx/kuiper
-  
-  Documents: https://docs.emqx.io/kuiper/v0.0.2/en/
-
-- **Where to file issues:**
-
-  https://github.com/emqx/kuiper/issues
-
-- **Supported architectures**
-
-  `amd64`, `arm64v8`,  `arm32v7`, `i386`, `ppc64le`
-
-- **Supported Docker versions**:
-
-  [the latest release](https://github.com/docker/docker-ce/releases/latest)
-
-# Image Variants
-
-The `emqx/kuiper` images come in many flavors, each designed for a specific operate systems.
-
-## `emqx/kuiper:<tag>`
-
-This is a stable release image that you can use with confidence.
-
-## `emqx/kuiper:<tag>-<number>-<commit>`
-
-This is an unstable version. It is an image built according to the commit number. You can use it to experience the latest features.
-
-
-# What is Kuiper
-
-A SQL based lightweight IoT analytics/streaming software running at resource constrained edge devices.
-
-- Native run with small overhead ( ~7MB package), support Linux/Mac OS
-- SQL based, easy to use
-- Built-in support for MQTT source
-- Extension - user can customize the rule engine
-- RESTful APIs for rules management
-
-# How to use this image
-
-### Run kuiper
-
-Execute some command under this docker image
-
-```
-docker run -d -v `pwd`:$somewhere emqx/kuiper:$tag $somecommand
-```
-
-For example
-
-```
-docker run -d --name kuiper -e MQTT_BROKER_ADDRESS=$MQTT_BROKER_ADDRESS emqx/kuiper:latest
-```
-
-### Configuration
-
-Use the environment variable to configure `etc/sources/mqtt.yaml`  on the Kuiper container.
-
-| Options                    | Default            | Mapped                    |
-| ---------------------------| ------------------ | ------------------------- |
-| MQTT_BROKER_ADDRESS         | 127.0.0.1:1883 | default.servers |
-| MQTT_BROKER_SHARED_SUBSCRIPTION | true   | default.sharedSubscription |
-| MQTT_BROKER_QOS | 1                 | default.qos    |
-| MQTT_BROKER_USERNAME |   | default.username |
-| MQTT_BROKER_PASSWORD |                | default.password |
-| MQTT_BROKER_CER_PATH |                | default.certificationPath |
-| MQTT_BROKER_KEY_PATH |     | default.privateKeyPath |
-
-If you want to configure more options, you can mount the configuration file into Kuiper container.
-
-# More
-
-If you'd like to know more about the project, please refer to [Github project](https://github.com/emqx/kuiper/blob/master/docs/en_US/README.md).
-

+ 10 - 0
docs/en_US/cross-compile.md

@@ -0,0 +1,10 @@
+## Cross-compile binaries
+
+**Notice: Kuiper plugins bases on Golang, and due to Golang restrictions, ``CGO_ENABLED``  flag must be set to 0 to use the Golang cross-compile. But with this flag mode, the Golang plugins will not work. So if you want to use plugins in Kuiper, you can NOT use cross-compile to produce the binary packages.**
+
+- Preparation
+  - docker version >= 19.03
+  - Enable Docker CLI  experimental mode
+- Cross-compile binary files: ``$ make cross_build``
+- Cross-compile images for all platforms and push to registry:``$ make cross_docker``
+

+ 2 - 2
docs/en_US/plugins/overview.md

@@ -4,8 +4,8 @@ Kuiper implemented several plugins.
 
 
 | Name                  | Description                                                  |
 | Name                  | Description                                                  |
 | --------------------- | ------------------------------------------------------------ |
 | --------------------- | ------------------------------------------------------------ |
-| [zmq](sources/zmq.md)   | The source will subscribe to a Zero Mq topic to import the messages into kuiper
-| [random](sources/random.md)   | The source will generate random inputs with a specified pattern
+| [zmq](sources/zmq.md)   | The source will subscribe to a Zero Mq topic to import the messages into kuiper |
+| [random](sources/random.md)   | The source will generate random inputs with a specified pattern |
 
 
 ## Sinks
 ## Sinks
 
 

+ 1 - 1
docs/en_US/rules/overview.md

@@ -45,7 +45,7 @@ The sql query to run for the rule.
 
 
 ### actions
 ### actions
 
 
-Currently, 2 kinds of actions are supported: [log](sinks/logs.md) and [mqtt](sinks/mqtt.md). Each action can define its own properties.
+Currently, 3 kinds of actions are supported: [log](sinks/logs.md), [mqtt](sinks/mqtt.md) and [rest](sinks/rest.md). Each action can define its own properties.
 
 
 Actions could be customized to support different kinds of outputs, see [extension](../extension/overview.md) for more detailed info.
 Actions could be customized to support different kinds of outputs, see [extension](../extension/overview.md) for more detailed info.
 
 

Datei-Diff unterdrückt, da er zu groß ist
+ 69 - 0
docs/en_US/rules/sinks/rest.md


BIN
docs/resources/arch.png


+ 12 - 0
docs/zh_CN/cross-compile.md

@@ -0,0 +1,12 @@
+## 交叉编译二进制文件
+
+**注:Kuiper 插件基于 Golang 的方式实现,由于 Golang 本身的限制,使用了交叉编译的方式必须将编译参数 ``CGO_ENABLED`` 设置为0,而在该模式下,<u>插件将不可工作</u>。所以如果使用了 Kuiper 的插件的话,<u>不能以交叉编译的方式来生成二进制包。</u>**
+
+- 准备
+  - docker version >= 19.03
+  - 启用 Docker CLI 的 experimental 模式(experimental mode)
+- 交叉编译二进制文件:``$ make cross_build``
+- 交叉编译跨平台镜像,并推到库中:``$ make cross_docker``
+
+
+

+ 1 - 1
docs/zh_CN/getting_started.md

@@ -149,4 +149,4 @@ $ bin/cli stop rule ruleDemo
 
 
 
 
 
 
-如果您想了解更多有关该项目的信息,请参考[doc home]()。
+如果您想了解更多有关该项目的信息,请参考[文档中心](reference.md)。

+ 1 - 0
docs/zh_CN/reference.md

@@ -10,3 +10,4 @@
 - [规则](rules/overview.md)
 - [规则](rules/overview.md)
 - [扩展Kuiper](extension/overview.md)
 - [扩展Kuiper](extension/overview.md)
 - [插件](plugins/overview.md)
 - [插件](plugins/overview.md)
+

+ 1 - 1
docs/zh_CN/rules/overview.md

@@ -45,7 +45,7 @@
 
 
 ### 动作
 ### 动作
 
 
-当前,支持两种操作: [log](sinks/logs.md) 和 [mqtt](sinks/mqtt.md).。 每个动作可以定义自己的属性。
+当前,支持两种操作: [log](sinks/logs.md) 、[mqtt](sinks/mqtt.md) 和 [rest](sinks/rest.md)。 每个动作可以定义自己的属性。
 
 
 可以自定义动作以支持不同种类的输出,有关更多详细信息,请参见 [extension](../extension/overview.md) 。
 可以自定义动作以支持不同种类的输出,有关更多详细信息,请参见 [extension](../extension/overview.md) 。
 
 

Datei-Diff unterdrückt, da er zu groß ist
+ 69 - 0
docs/zh_CN/rules/sinks/rest.md


+ 1 - 1
xsql/plans/preprocessor.go

@@ -65,7 +65,7 @@ func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}) interface{
 		if f.AName != "" && (!xsql.HasAggFuncs(f.Expr)) {
 		if f.AName != "" && (!xsql.HasAggFuncs(f.Expr)) {
 			ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(tuple, &xsql.FunctionValuer{})}
 			ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(tuple, &xsql.FunctionValuer{})}
 			if v := ve.Eval(f.Expr); v != nil {
 			if v := ve.Eval(f.Expr); v != nil {
-				result[f.AName] = v
+				result[strings.ToLower(f.AName)] = v
 			}
 			}
 		}
 		}
 	}
 	}

+ 2 - 0
xsql/processors/xsql_processor.go

@@ -477,6 +477,8 @@ func getSink(name string, action map[string]interface{}) (api.Sink, error) {
 		s = sinks.NewLogSink()
 		s = sinks.NewLogSink()
 	case "mqtt":
 	case "mqtt":
 		s = &sinks.MQTTSink{}
 		s = &sinks.MQTTSink{}
+	case "rest":
+		s = &sinks.RestSink{}
 	default:
 	default:
 		nf, err := plugin_manager.GetPlugin(name, "sinks")
 		nf, err := plugin_manager.GetPlugin(name, "sinks")
 		if err != nil {
 		if err != nil {

+ 13 - 0
xsql/processors/xsql_processor_test.go

@@ -404,6 +404,19 @@ func TestSingleSQL(t *testing.T) {
 					"ts":    float64(1541152488442),
 					"ts":    float64(1541152488442),
 				}},
 				}},
 			},
 			},
+		}, {
+			name: `rule3`,
+			sql:  `SELECT size as Int8, ts FROM demo where size > 3`,
+			r: [][]map[string]interface{}{
+				{{
+					"Int8":  float64(6),
+					"ts":    float64(1541152486822),
+				}},
+				{{
+					"Int8":  float64(4),
+					"ts":    float64(1541152488442),
+				}},
+			},
 		},
 		},
 	}
 	}
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))

+ 6 - 4
xstream/nodes/sink_node.go

@@ -37,10 +37,12 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 		for {
 		for {
 			select {
 			select {
 			case item := <-m.input:
 			case item := <-m.input:
-				if err := m.sink.Collect(ctx, item); err != nil{
-					//TODO deal with publish error
-					logger.Errorf("sink node %s publish %v error: %v", ctx.GetOpId(), item, err)
-				}
+				go func() {
+					if err := m.sink.Collect(ctx, item); err != nil{
+						//TODO deal with publish error
+						logger.Errorf("sink node %s publish %v error: %v", ctx.GetOpId(), item, err)
+					}
+				}()
 			case <-ctx.Done():
 			case <-ctx.Done():
 				logger.Infof("sink node %s done", m.name)
 				logger.Infof("sink node %s done", m.name)
 				if err := m.sink.Close(ctx); err != nil{
 				if err := m.sink.Close(ctx); err != nil{

+ 13 - 12
xstream/server/server/server.go

@@ -239,17 +239,6 @@ func (t *Server) DropRule(name string, reply *string) error{
 }
 }
 
 
 func init(){
 func init(){
-	var err error
-	dataDir, err = common.GetDataLoc()
-	if err != nil {
-		log.Panic(err)
-	}else{
-		log.Infof("db location is %s", dataDir)
-	}
-
-	processor = processors.NewRuleProcessor(path.Dir(dataDir))
-	registry = make(RuleRegistry)
-
 	ticker := time.NewTicker(time.Second * 5)
 	ticker := time.NewTicker(time.Second * 5)
 	go func() {
 	go func() {
 		for {
 		for {
@@ -271,6 +260,18 @@ func init(){
 
 
 
 
 func StartUp(Version string) {
 func StartUp(Version string) {
+	common.InitConf()
+
+	dr, err := common.GetDataLoc()
+	if err != nil {
+		log.Panic(err)
+	}else{
+		log.Infof("db location is %s", dr)
+		dataDir = dr
+	}
+	processor = processors.NewRuleProcessor(path.Dir(dataDir))
+	registry = make(RuleRegistry)
+
 	server := new(Server)
 	server := new(Server)
 	//Start rules
 	//Start rules
 	if rules, err := processor.GetAllRules(); err != nil{
 	if rules, err := processor.GetAllRules(); err != nil{
@@ -289,7 +290,7 @@ func StartUp(Version string) {
 	}
 	}
 
 
 	//Start server
 	//Start server
-	err := rpc.Register(server)
+	err = rpc.Register(server)
 	if err != nil {
 	if err != nil {
 		log.Fatal("Format of service Server isn't correct. ", err)
 		log.Fatal("Format of service Server isn't correct. ", err)
 	}
 	}

+ 6 - 8
xstream/sinks/mqtt_sink.go

@@ -2,10 +2,10 @@ package sinks
 
 
 import (
 import (
 	"crypto/tls"
 	"crypto/tls"
-	"github.com/emqx/kuiper/common"
-	"github.com/emqx/kuiper/xstream/api"
 	"fmt"
 	"fmt"
 	MQTT "github.com/eclipse/paho.mqtt.golang"
 	MQTT "github.com/eclipse/paho.mqtt.golang"
+	"github.com/emqx/kuiper/common"
+	"github.com/emqx/kuiper/xstream/api"
 	"github.com/google/uuid"
 	"github.com/google/uuid"
 	"strings"
 	"strings"
 )
 )
@@ -41,7 +41,7 @@ func (ms *MQTTSink) Configure(ps map[string]interface{}) error {
 		}
 		}
 	}
 	}
 	var pVersion uint = 3
 	var pVersion uint = 3
-	pVersionStr, ok := ps["protocolVersion"];
+	pVersionStr, ok := ps["protocolVersion"]
 	if ok {
 	if ok {
 		v, _ := pVersionStr.(string)
 		v, _ := pVersionStr.(string)
 		if v == "3.1" {
 		if v == "3.1" {
@@ -54,7 +54,7 @@ func (ms *MQTTSink) Configure(ps map[string]interface{}) error {
 	}
 	}
 
 
 	uName := ""
 	uName := ""
-	un, ok := ps["username"];
+	un, ok := ps["username"]
 	if ok {
 	if ok {
 		v, _ := un.(string)
 		v, _ := un.(string)
 		if strings.Trim(v, " ") != "" {
 		if strings.Trim(v, " ") != "" {
@@ -63,7 +63,7 @@ func (ms *MQTTSink) Configure(ps map[string]interface{}) error {
 	}
 	}
 
 
 	password := ""
 	password := ""
-	pwd, ok := ps["password"];
+	pwd, ok := ps["password"]
 	if ok {
 	if ok {
 		v, _ := pwd.(string)
 		v, _ := pwd.(string)
 		if strings.Trim(v, " ") != "" {
 		if strings.Trim(v, " ") != "" {
@@ -154,6 +154,4 @@ func (ms *MQTTSink) Close(ctx api.StreamContext) error {
 		ms.conn.Disconnect(5000)
 		ms.conn.Disconnect(5000)
 	}
 	}
 	return nil
 	return nil
-}
-
-
+}

+ 306 - 0
xstream/sinks/rest_sink.go

@@ -0,0 +1,306 @@
+package sinks
+
+import (
+	"bytes"
+	"encoding/json"
+	"fmt"
+	"github.com/emqx/kuiper/common/templates"
+	"github.com/emqx/kuiper/xstream/api"
+	"io/ioutil"
+	"net/http"
+	"net/url"
+	"strings"
+	"text/template"
+	"time"
+)
+
+type RestSink struct {
+	method      string
+	url         string
+	headers     map[string]string
+	bodyType    string
+	timeout		int64
+	sendSingle  bool
+	dataTemplate string
+
+	client      *http.Client
+	tp          *template.Template
+}
+
+var methodsMap = map[string]bool{"GET": true, "HEAD": true, "POST": true, "PUT": true, "DELETE": true, "PATCH": true}
+var bodyTypeMap = map[string]string{"none":"", "text": "text/plain", "json":"application/json", "html": "text/html", "xml": "application/xml", "javascript": "application/javascript", "form": ""}
+
+func (ms *RestSink) Configure(ps map[string]interface{}) error {
+	temp, ok := ps["method"]
+	if ok {
+		ms.method, ok = temp.(string)
+		if !ok {
+			return fmt.Errorf("rest sink property method %v is not a string", temp)
+		}
+		ms.method = strings.ToUpper(strings.Trim(ms.method, ""))
+	}else{
+		ms.method = "GET"
+	}
+	if _, ok = methodsMap[ms.method]; !ok {
+		return fmt.Errorf("invalid property method: %s", ms.method)
+	}
+	switch ms.method{
+	case "GET", "HEAD":
+		ms.bodyType = "none"
+	default:
+		ms.bodyType = "json"
+	}
+
+	temp, ok = ps["url"]
+	if !ok {
+		return fmt.Errorf("rest sink is missing property url")
+	}
+	ms.url, ok = temp.(string)
+	if !ok {
+		return fmt.Errorf("rest sink property url %v is not a string", temp)
+	}
+	ms.url = strings.ToLower(strings.Trim(ms.url, ""))
+
+	temp, ok = ps["headers"]
+	if ok{
+		ms.headers, ok = temp.(map[string]string)
+		if !ok {
+			return fmt.Errorf("rest sink property headers %v is not a map[string][]string", temp)
+		}
+	}
+
+	temp, ok = ps["bodyType"]
+	if ok{
+		ms.bodyType, ok = temp.(string)
+		if !ok {
+			return fmt.Errorf("rest sink property bodyType %v is not a string", temp)
+		}
+		ms.bodyType = strings.ToLower(strings.Trim(ms.bodyType, ""))
+	}
+	if _, ok = bodyTypeMap[ms.bodyType]; !ok {
+		return fmt.Errorf("invalid property bodyType: %s, should be \"none\" or \"form\"", ms.bodyType)
+	}
+
+	temp, ok = ps["timeout"]
+	if !ok {
+		ms.timeout = 5000
+	}else{
+		to, ok := temp.(float64)
+		if !ok {
+			return fmt.Errorf("rest sink property timeout %v is not a number", temp)
+		}
+		ms.timeout = int64(to)
+	}
+
+	temp, ok = ps["sendSingle"]
+	if !ok{
+		ms.sendSingle = false
+	}else{
+		ms.sendSingle, ok = temp.(bool)
+		if !ok {
+			return fmt.Errorf("rest sink property sendSingle %v is not a bool", temp)
+		}
+	}
+
+	temp, ok = ps["dataTemplate"]
+	if ok{
+		ms.dataTemplate, ok = temp.(string)
+		if !ok {
+			return fmt.Errorf("rest sink property dataTemplate %v is not a string", temp)
+		}
+	}
+
+	if ms.dataTemplate != ""{
+		funcMap := template.FuncMap{
+			"json": templates.JsonMarshal,
+		}
+		temp, err := template.New("restSink").Funcs(funcMap).Parse(ms.dataTemplate)
+		if err != nil{
+			return fmt.Errorf("rest sink property dataTemplate %v is invalid: %v", ms.dataTemplate, err)
+		}else{
+			ms.tp = temp
+		}
+	}
+	return nil
+}
+
+func (ms *RestSink) Open(ctx api.StreamContext) error {
+	logger := ctx.GetLogger()
+	ms.client = &http.Client{Timeout: time.Duration(ms.timeout) * time.Millisecond}
+	logger.Debugf("open rest sink with configuration: {method: %s, url: %s, bodyType: %s, timeout: %d,header: %v, sendSingle: %v, dataTemplate: %s", ms.method, ms.url, ms.bodyType, ms.timeout, ms.headers, ms.sendSingle, ms.dataTemplate)
+	return nil
+}
+
+func (ms *RestSink) Collect(ctx api.StreamContext, item interface{}) error {
+	logger := ctx.GetLogger()
+	v, ok := item.([]byte)
+	if !ok {
+		logger.Warnf("rest sink receive non []byte data: %v", item)
+	}
+	logger.Debugf("rest sink receive %s", item)
+	if !ms.sendSingle{
+		return ms.send(v, logger)
+	}else{
+		j, err := extractInput(v)
+		if err != nil {
+			return err
+		}
+		logger.Debugf("receive %d records", len(j))
+		for _, r := range j {
+			ms.send(r, logger)
+		}
+	}
+	return nil
+}
+
+func extractInput(v []byte) ([]map[string]interface{}, error) {
+	var j []map[string]interface{}
+	if err := json.Unmarshal(v, &j); err != nil {
+		return nil, fmt.Errorf("fail to decode the input %s as json: %v", v, err)
+	}
+	return j, nil
+}
+
+func (ms *RestSink) send(v interface{}, logger api.Logger) error {
+	var req *http.Request
+	var err error
+	switch ms.bodyType {
+	case "none":
+		req, err = http.NewRequest(ms.method, ms.url, nil)
+		if err != nil {
+			return fmt.Errorf("fail to create request: %v", err)
+		}
+	case "json", "text", "javascript", "html", "xml":
+		var body = &(bytes.Buffer{})
+		switch t := v.(type) {
+		case []byte:
+			if ms.tp != nil {
+				j, err := extractInput(t)
+				if err != nil {
+					return err
+				}
+				err = ms.tp.Execute(body, j)
+				if err != nil{
+					return fmt.Errorf("fail to decode content: %v", err)
+				}
+			}else{
+				body = bytes.NewBuffer(t)
+			}
+		case map[string]interface{}:
+			if ms.tp != nil{
+				err = ms.tp.Execute(body, t)
+				if err != nil{
+					return fmt.Errorf("fail to decode content: %v", err)
+				}
+			}else{
+				content, err := json.Marshal(t)
+				if err != nil{
+					return fmt.Errorf("fail to decode content: %v", err)
+				}
+				body = bytes.NewBuffer(content)
+			}
+		default:
+			return fmt.Errorf("invalid content: %v", v)
+		}
+
+		req, err = http.NewRequest(ms.method, ms.url, body)
+		if err != nil {
+			return fmt.Errorf("fail to create request: %v", err)
+		}
+		req.Header.Set("Content-Type", bodyTypeMap[ms.bodyType])
+	case "form":
+		form := url.Values{}
+		im, err := convertToMap(v, ms.tp)
+		if err != nil {
+			return err
+		}
+		for key, value := range im {
+			var vstr string
+			switch value.(type) {
+			case []interface{}, map[string]interface{}:
+				if temp, err := json.Marshal(value); err != nil {
+					return fmt.Errorf("fail to parse fomr value: %v", err)
+				}else{
+					vstr = string(temp)
+				}
+			default:
+				vstr = fmt.Sprintf("%v", value)
+			}
+			form.Set(key, vstr)
+		}
+		body := ioutil.NopCloser(strings.NewReader(form.Encode()))
+		req, err = http.NewRequest(ms.method, ms.url, body)
+		if err != nil {
+			return fmt.Errorf("fail to create request: %v", err)
+		}
+		req.Header.Set("Content-Type", "application/x-www-form-urlencoded;param=value")
+	default:
+		return fmt.Errorf("unsupported body type %s", ms.bodyType)
+	}
+
+	if len(ms.headers) > 0 {
+		for k, v := range ms.headers {
+			req.Header.Set(k, v)
+		}
+	}
+	logger.Debugf("do request: %s %s with %s", ms.method, ms.url, req.Body)
+	resp, err := ms.client.Do(req)
+	if err != nil {
+		return fmt.Errorf("rest sink fails to send out the data")
+	} else {
+		logger.Debugf("rest sink got response %v", resp)
+	}
+	return nil
+}
+
+func convertToMap(v interface{}, tp *template.Template) (map[string]interface{}, error) {
+	switch t := v.(type) {
+	case []byte:
+		if tp != nil{
+			j, err := extractInput(t)
+			if err != nil {
+				return nil, err
+			}
+			var output bytes.Buffer
+			err = tp.Execute(&output, j)
+			if err != nil{
+				return nil, fmt.Errorf("fail to decode content: %v", err)
+			}
+			r := make(map[string]interface{})
+			if err := json.Unmarshal(output.Bytes(), &r); err != nil{
+				return nil, fmt.Errorf("fail to decode content: %v", err)
+			}else{
+				return r, nil
+			}
+		}else{
+			r := make(map[string]interface{})
+			r["result"] = string(t)
+			return r, nil
+		}
+	case map[string]interface{}:
+		if tp != nil{
+			var output bytes.Buffer
+			err := tp.Execute(&output, t)
+			if err != nil{
+				return nil, fmt.Errorf("fail to decode content: %v", err)
+			}
+			r := make(map[string]interface{})
+			if err := json.Unmarshal(output.Bytes(), &r); err != nil{
+				return nil, fmt.Errorf("fail to decode content: %v", err)
+			}else{
+				return r, nil
+			}
+		}else{
+			return t, nil
+		}
+	default:
+		return nil, fmt.Errorf("invalid content: %v", v)
+	}
+	return nil, fmt.Errorf("invalid content: %v", v)
+}
+
+func (ms *RestSink) Close(ctx api.StreamContext) error {
+	logger := ctx.GetLogger()
+	logger.Infof("Closing rest sink")
+	return nil
+}

+ 372 - 0
xstream/sinks/rest_sink_test.go

@@ -0,0 +1,372 @@
+package sinks
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/emqx/kuiper/common"
+	"github.com/emqx/kuiper/xstream/contexts"
+	"io/ioutil"
+	"net/http"
+	"net/http/httptest"
+	"reflect"
+	"testing"
+)
+
+type request struct{
+	Method string
+	Body   string
+	ContentType string
+}
+
+func TestRestSink_Apply(t *testing.T) {
+	var tests = []struct {
+		config  map[string]interface{}
+		data 	[]map[string]interface{}
+		result  []request
+	}{
+		{
+			config: map[string]interface{}{
+				"method": "post",
+				//"url": "http://localhost/test",  //set dynamically to the test server
+				"sendSingle": true,
+			},
+			data:[]map[string]interface{}{{
+				"ab" : "hello1",
+			},{
+				"ab" : "hello2",
+			}},
+			result: []request{{
+				Method: "POST",
+				Body: `{"ab":"hello1"}`,
+				ContentType: "application/json",
+			},{
+				Method: "POST",
+				Body: `{"ab":"hello2"}`,
+				ContentType: "application/json",
+			}},
+		},{
+			config: map[string]interface{}{
+				"method": "post",
+				//"url": "http://localhost/test",  //set dynamically to the test server
+			},
+			data:[]map[string]interface{}{{
+				"ab" : "hello1",
+			},{
+				"ab" : "hello2",
+			}},
+			result: []request{{
+				Method: "POST",
+				Body: `[{"ab":"hello1"},{"ab":"hello2"}]`,
+				ContentType: "application/json",
+			}},
+		},{
+			config: map[string]interface{}{
+				"method": "get",
+				//"url": "http://localhost/test",  //set dynamically to the test server
+			},
+			data:[]map[string]interface{}{{
+				"ab" : "hello1",
+			},{
+				"ab" : "hello2",
+			}},
+			result: []request{{
+				Method: "GET",
+				ContentType: "",
+			}},
+		},{
+			config: map[string]interface{}{
+				"method": "put",
+				//"url": "http://localhost/test",  //set dynamically to the test server
+				"bodyType": "text",
+			},
+			data:[]map[string]interface{}{{
+				"ab" : "hello1",
+			},{
+				"ab" : "hello2",
+			}},
+			result: []request{{
+				Method: "PUT",
+				ContentType: "text/plain",
+				Body: `[{"ab":"hello1"},{"ab":"hello2"}]`,
+			}},
+		},{
+			config: map[string]interface{}{
+				"method": "post",
+				//"url": "http://localhost/test",  //set dynamically to the test server
+				"bodyType": "form",
+			},
+			data:[]map[string]interface{}{{
+				"ab" : "hello1",
+			},{
+				"ab" : "hello2",
+			}},
+			result: []request{{
+				Method: "POST",
+				ContentType: "application/x-www-form-urlencoded;param=value",
+				Body: `result=%5B%7B%22ab%22%3A%22hello1%22%7D%2C%7B%22ab%22%3A%22hello2%22%7D%5D`,
+			}},
+		},{
+			config: map[string]interface{}{
+				"method": "post",
+				//"url": "http://localhost/test",  //set dynamically to the test server
+				"bodyType": "form",
+				"sendSingle": true,
+			},
+			data:[]map[string]interface{}{{
+				"ab" : "hello1",
+			},{
+				"ab" : "hello2",
+			}},
+			result: []request{{
+				Method: "POST",
+				ContentType: "application/x-www-form-urlencoded;param=value",
+				Body: `ab=hello1`,
+			},{
+				Method: "POST",
+				ContentType: "application/x-www-form-urlencoded;param=value",
+				Body: `ab=hello2`,
+			}},
+		}, {
+			config: map[string]interface{}{
+				"method": "post",
+				//"url": "http://localhost/test",  //set dynamically to the test server
+				"bodyType": "json",
+				"sendSingle": true,
+				"timeout": float64(1000),
+			},
+			data: []map[string]interface{}{{
+				"ab": "hello1",
+			}, {
+				"ab": "hello2",
+			}},
+			result: []request{{
+				Method:      "POST",
+				Body:        `{"ab":"hello1"}`,
+				ContentType: "application/json",
+			}, {
+				Method:      "POST",
+				Body:        `{"ab":"hello2"}`,
+				ContentType: "application/json",
+			}},
+		},
+	}
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestRestSink_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
+
+	var requests []request
+	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request){
+		body, err := ioutil.ReadAll(r.Body)
+		if err != nil {
+			fmt.Printf("Error reading body: %v", err)
+			http.Error(w, "can't read body", http.StatusBadRequest)
+			return
+		}
+
+		requests = append(requests, request{
+			Method: r.Method,
+			Body: string(body),
+			ContentType: r.Header.Get("Content-Type"),
+		})
+		contextLogger.Debugf(string(body))
+		fmt.Fprintf(w, string(body))
+	}))
+	defer ts.Close()
+	for i, tt := range tests {
+		requests = nil
+		s := &RestSink{}
+		tt.config["url"] = ts.URL
+		s.Configure(tt.config)
+		s.Open(ctx)
+		input, err := json.Marshal(tt.data)
+		if err != nil{
+			t.Errorf("Failed to parse the input into []byte]")
+			continue
+		}
+		s.Collect(ctx, input)
+		s.Close(ctx)
+		if !reflect.DeepEqual(tt.result, requests) {
+			t.Errorf("%d \tresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.result, requests)
+		}
+	}
+}
+
+func TestRestSinkTemplate_Apply(t *testing.T) {
+	var tests = []struct {
+		config  map[string]interface{}
+		data 	[]map[string]interface{}
+		result  []request
+	}{
+		{
+			config: map[string]interface{}{
+				"method": "post",
+				//"url": "http://localhost/test",  //set dynamically to the test server
+				"sendSingle": true,
+				"dataTemplate": `{"wrapper":"w1","content":{{json .}},"ab":"{{.ab}}"}`,
+			},
+			data:[]map[string]interface{}{{
+				"ab" : "hello1",
+			},{
+				"ab" : "hello2",
+			}},
+			result: []request{{
+				Method: "POST",
+				Body: `{"wrapper":"w1","content":{"ab":"hello1"},"ab":"hello1"}`,
+				ContentType: "application/json",
+			},{
+				Method: "POST",
+				Body: `{"wrapper":"w1","content":{"ab":"hello2"},"ab":"hello2"}`,
+				ContentType: "application/json",
+			}},
+		},{
+			config: map[string]interface{}{
+				"method": "post",
+				//"url": "http://localhost/test",  //set dynamically to the test server
+				"dataTemplate": `{"wrapper":"arr","content":{{json .}},"content0":{{json (index . 0)}},ab0":"{{index . 0 "ab"}}"}`,
+			},
+			data:[]map[string]interface{}{{
+				"ab" : "hello1",
+			},{
+				"ab" : "hello2",
+			}},
+			result: []request{{
+				Method: "POST",
+				Body: `{"wrapper":"arr","content":[{"ab":"hello1"},{"ab":"hello2"}],"content0":{"ab":"hello1"},ab0":"hello1"}`,
+				ContentType: "application/json",
+			}},
+		},{
+			config: map[string]interface{}{
+				"method": "get",
+				//"url": "http://localhost/test",  //set dynamically to the test server
+				"dataTemplate": `{"wrapper":"w1","content":{{json .}},"ab":"{{.ab}}"}`,
+			},
+			data:[]map[string]interface{}{{
+				"ab" : "hello1",
+			},{
+				"ab" : "hello2",
+			}},
+			result: []request{{
+				Method: "GET",
+				ContentType: "",
+			}},
+		},{
+			config: map[string]interface{}{
+				"method": "put",
+				//"url": "http://localhost/test",  //set dynamically to the test server
+				"bodyType": "html",
+				"dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.ab}}</li>{{end}}</ul>`,
+			},
+			data:[]map[string]interface{}{{
+				"ab" : "hello1",
+			},{
+				"ab" : "hello2",
+			}},
+			result: []request{{
+				Method: "PUT",
+				ContentType: "text/html",
+				Body: `<div>results</div><ul><li>hello1</li><li>hello2</li></ul>`,
+			}},
+		},{
+			config: map[string]interface{}{
+				"method": "post",
+				//"url": "http://localhost/test",  //set dynamically to the test server
+				"bodyType": "form",
+				"dataTemplate": `{"content":{{json .}}}`,
+			},
+			data:[]map[string]interface{}{{
+				"ab" : "hello1",
+			},{
+				"ab" : "hello2",
+			}},
+			result: []request{{
+				Method: "POST",
+				ContentType: "application/x-www-form-urlencoded;param=value",
+				Body: `content=%5B%7B%22ab%22%3A%22hello1%22%7D%2C%7B%22ab%22%3A%22hello2%22%7D%5D`,
+			}},
+		},{
+			config: map[string]interface{}{
+				"method": "post",
+				//"url": "http://localhost/test",  //set dynamically to the test server
+				"bodyType": "form",
+				"sendSingle": true,
+				"dataTemplate": `{"newab":"{{.ab}}"}`,
+			},
+			data:[]map[string]interface{}{{
+				"ab" : "hello1",
+			},{
+				"ab" : "hello2",
+			}},
+			result: []request{{
+				Method: "POST",
+				ContentType: "application/x-www-form-urlencoded;param=value",
+				Body: `newab=hello1`,
+			},{
+				Method: "POST",
+				ContentType: "application/x-www-form-urlencoded;param=value",
+				Body: `newab=hello2`,
+			}},
+		}, {
+			config: map[string]interface{}{
+				"method": "post",
+				//"url": "http://localhost/test",  //set dynamically to the test server
+				"bodyType": "json",
+				"sendSingle": true,
+				"timeout": float64(1000),
+				"dataTemplate": `{"newab":"{{.ab}}"}`,
+			},
+			data: []map[string]interface{}{{
+				"ab": "hello1",
+			}, {
+				"ab": "hello2",
+			}},
+			result: []request{{
+				Method:      "POST",
+				Body:        `{"newab":"hello1"}`,
+				ContentType: "application/json",
+			}, {
+				Method:      "POST",
+				Body:        `{"newab":"hello2"}`,
+				ContentType: "application/json",
+			}},
+		},
+	}
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestRestSink_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
+
+	var requests []request
+	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request){
+		body, err := ioutil.ReadAll(r.Body)
+		if err != nil {
+			fmt.Printf("Error reading body: %v", err)
+			http.Error(w, "can't read body", http.StatusBadRequest)
+			return
+		}
+
+		requests = append(requests, request{
+			Method: r.Method,
+			Body: string(body),
+			ContentType: r.Header.Get("Content-Type"),
+		})
+		contextLogger.Debugf(string(body))
+		fmt.Fprintf(w, string(body))
+	}))
+	defer ts.Close()
+	for i, tt := range tests {
+		requests = nil
+		s := &RestSink{}
+		tt.config["url"] = ts.URL
+		s.Configure(tt.config)
+		s.Open(ctx)
+		input, err := json.Marshal(tt.data)
+		if err != nil{
+			t.Errorf("Failed to parse the input into []byte]")
+			continue
+		}
+		s.Collect(ctx, input)
+		s.Close(ctx)
+		if !reflect.DeepEqual(tt.result, requests) {
+			t.Errorf("%d \tresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.result, requests)
+		}
+	}
+}