Quellcode durchsuchen

Merge pull request #250 from emqx/develop

Develop
jinfahua vor 5 Jahren
Ursprung
Commit
28f707934d
52 geänderte Dateien mit 2282 neuen und 470 gelöschten Zeilen
  1. 4 2
      .github/workflows/run_fvt_tests.yaml
  2. 10 3
      Makefile
  3. 28 0
      common/util.go
  4. 27 0
      common/util_test.go
  5. 2 2
      deploy/chart/kuiper/Chart.yaml
  6. 1 1
      deploy/chart/kuiper/templates/StatefulSet.yaml
  7. 7 13
      deploy/docker/Dockerfile
  8. 24 0
      deploy/docker/Dockerfile-alpine
  9. 0 18
      deploy/docker/Dockerfile-dev
  10. 24 0
      deploy/docker/Dockerfile-slim
  11. 11 4
      deploy/docker/README.md
  12. 1 1
      deploy/docker/docker-entrypoint.sh
  13. 3 0
      docs/.gitignore
  14. 68 0
      docs/_scripts/make.js
  15. 18 3
      docs/book.json
  16. BIN
      docs/cover.jpg
  17. 1 2
      docs/en_US/SUMMARY.md
  18. 0 48
      docs/en_US/book.json
  19. 1 1
      docs/en_US/edgex/edgex_rule_engine_tutorial.md
  20. 231 0
      docs/en_US/plugins/plugins_tutorial.md
  21. 47 1
      docs/en_US/rules/overview.md
  22. 1 47
      docs/en_US/rules/sinks/rest.md
  23. 14 5
      docs/package.json
  24. 15 15
      docs/zh_CN/SUMMARY.md
  25. 0 48
      docs/zh_CN/book.json
  26. 94 0
      docs/zh_CN/cli/plugins.md
  27. 1 1
      docs/zh_CN/edgex/edgex_rule_engine_tutorial.md
  28. 8 0
      docs/zh_CN/restapi/overview.md
  29. 80 0
      docs/zh_CN/restapi/plugins.md
  30. 139 0
      docs/zh_CN/restapi/rules.md
  31. 74 0
      docs/zh_CN/restapi/streams.md
  32. 47 0
      docs/zh_CN/rules/overview.md
  33. 1 47
      docs/zh_CN/rules/sinks/rest.md
  34. 15 0
      etc/sources/test.yaml
  35. 480 0
      fvt_scripts/plugin_end_2_end.jmx
  36. 99 0
      fvt_scripts/plugins/pub/zmq_pub.go
  37. 27 0
      fvt_scripts/plugins/service/server.go
  38. 3 0
      fvt_scripts/plugins/zmq.yaml
  39. 35 0
      fvt_scripts/prepare_plugins.sh
  40. 4 1
      fvt_scripts/run_jmeter.sh
  41. 13 1
      fvt_scripts/setup_env.sh
  42. 35 7
      plugins/manager.go
  43. 70 0
      xsql/processors/simple_processor_test.go
  44. 1 1
      xsql/processors/xsql_processor.go
  45. 249 5
      xsql/processors/xsql_processor_test.go
  46. 102 25
      xstream/nodes/sink_node.go
  47. 76 0
      xstream/nodes/sink_node_test.go
  48. 38 0
      xstream/nodes/source_node_test.go
  49. 3 1
      xstream/server/server/rest.go
  50. 2 2
      xstream/server/server/rpc.go
  51. 17 119
      xstream/sinks/rest_sink.go
  52. 31 46
      xstream/sinks/rest_sink_test.go

+ 4 - 2
.github/workflows/run_fvt_tests.yaml

@@ -43,6 +43,7 @@ jobs:
           run: |
           run: |
             sudo apt update && sudo apt install pkg-config libczmq-dev -y
             sudo apt update && sudo apt install pkg-config libczmq-dev -y
             make build_with_edgex
             make build_with_edgex
+            go build --buildmode=plugin -o plugins/sources/Zmq.so plugins/sources/zmq.go
         - name: run edgex && emqx && kuiper
         - name: run edgex && emqx && kuiper
           run: |
           run: |
             sudo ./fvt_scripts/setup_env.sh
             sudo ./fvt_scripts/setup_env.sh
@@ -105,6 +106,7 @@ jobs:
           run: |
           run: |
             sudo apt update && sudo apt install pkg-config libczmq-dev -y
             sudo apt update && sudo apt install pkg-config libczmq-dev -y
             make
             make
+            go build --buildmode=plugin -o plugins/sources/Zmq.so plugins/sources/zmq.go
         - name: run edgex && emqx && kuiper
         - name: run edgex && emqx && kuiper
           run: |
           run: |
             sudo ./fvt_scripts/setup_env.sh
             sudo ./fvt_scripts/setup_env.sh
@@ -187,7 +189,7 @@ jobs:
           sudo ./get_helm.sh
           sudo ./get_helm.sh
           helm version
           helm version
       - name: build kuiper for docker
       - name: build kuiper for docker
-        run: make docker
+        run: sudo docker build --no-cache -t emqx/kuiper:$(git describe --tags --alway)-alpine -f deploy/docker/Dockerfile-alpine .
       - name: run emqx on chart
       - name: run emqx on chart
         env:
         env:
           KUBECONFIG: "/etc/rancher/k3s/k3s.yaml"
           KUBECONFIG: "/etc/rancher/k3s/k3s.yaml"
@@ -209,7 +211,7 @@ jobs:
           version=$(git describe --tags --always)
           version=$(git describe --tags --always)
           emqx_address=$(kubectl get svc --namespace default emqx -o jsonpath="{.spec.clusterIP}")
           emqx_address=$(kubectl get svc --namespace default emqx -o jsonpath="{.spec.clusterIP}")
           
           
-          sudo docker save emqx/kuiper:$version -o kuier.tar.gz
+          sudo docker save emqx/kuiper:$version-alpine -o kuier.tar.gz
           sudo k3s ctr image import kuier.tar.gz
           sudo k3s ctr image import kuier.tar.gz
           
           
           sed -i -r "s/^appVersion: .*$/appVersion: ${version}/g" deploy/chart/kuiper/Chart.yaml
           sed -i -r "s/^appVersion: .*$/appVersion: ${version}/g" deploy/chart/kuiper/Chart.yaml

+ 10 - 3
Makefile

@@ -114,7 +114,8 @@ cross_build: cross_prepare
 .PHONY: docker
 .PHONY: docker
 docker:
 docker:
 	docker build --no-cache -t $(TARGET):$(VERSION) -f deploy/docker/Dockerfile .
 	docker build --no-cache -t $(TARGET):$(VERSION) -f deploy/docker/Dockerfile .
-	docker build --no-cache -t $(TARGET):$(VERSION)-dev -f deploy/docker/Dockerfile-dev .
+	docker build --no-cache -t $(TARGET):$(VERSION)-slim -f deploy/docker/Dockerfile-slim .
+	docker build --no-cache -t $(TARGET):$(VERSION)-alpine -f deploy/docker/Dockerfile-alpine .
 
 
 .PHONY:cross_docker
 .PHONY:cross_docker
 cross_docker: cross_prepare
 cross_docker: cross_prepare
@@ -126,8 +127,14 @@ cross_docker: cross_prepare
 
 
 	docker buildx build --no-cache \
 	docker buildx build --no-cache \
 	--platform=linux/amd64,linux/arm64,linux/arm/v7,linux/386,linux/ppc64le \
 	--platform=linux/amd64,linux/arm64,linux/arm/v7,linux/386,linux/ppc64le \
-	-t $(TARGET):$(VERSION)-dev \
-	-f deploy/docker/Dockerfile-dev . \
+	-t $(TARGET):$(VERSION)-slim \
+	-f deploy/docker/Dockerfile-slim . \
+	--push
+
+	docker buildx build --no-cache \
+	--platform=linux/amd64,linux/arm64,linux/arm/v7,linux/386,linux/ppc64le \
+	-t $(TARGET):$(VERSION)-alpine \
+	-f deploy/docker/Dockerfile-alpine . \
 	--push
 	--push
 
 
 .PHONY: clean
 .PHONY: clean

+ 28 - 0
common/util.go

@@ -246,3 +246,31 @@ func MapToStruct(input map[string]interface{}, output interface{}) error {
 	// convert json to struct
 	// convert json to struct
 	return json.Unmarshal(jsonString, output)
 	return json.Unmarshal(jsonString, output)
 }
 }
+
+func ConvertMap(s map[interface{}]interface{}) map[string]interface{} {
+	r := make(map[string]interface{})
+	for k, v := range s {
+		switch t := v.(type) {
+		case map[interface{}]interface{}:
+			v = ConvertMap(t)
+		case []interface{}:
+			v = ConvertArray(t)
+		}
+		r[fmt.Sprintf("%v", k)] = v
+	}
+	return r
+}
+
+func ConvertArray(s []interface{}) []interface{} {
+	r := make([]interface{}, len(s))
+	for i, e := range s {
+		switch t := e.(type) {
+		case map[interface{}]interface{}:
+			e = ConvertMap(t)
+		case []interface{}:
+			e = ConvertArray(t)
+		}
+		r[i] = e
+	}
+	return r
+}

+ 27 - 0
common/util_test.go

@@ -57,3 +57,30 @@ func TestSimpleKVStore_Funcs(t *testing.T) {
 
 
 	_ = os.Remove(abs)
 	_ = os.Remove(abs)
 }
 }
+
+func TestMapConvert_Funcs(t *testing.T) {
+	source := map[interface{}]interface{}{
+		"QUERY_TABLE": "VBAP",
+		"ROWCOUNT":    10,
+		"FIELDS": []interface{}{
+			map[interface{}]interface{}{"FIELDNAME": "MANDT"},
+			map[interface{}]interface{}{"FIELDNAME": "VBELN"},
+			map[interface{}]interface{}{"FIELDNAME": "POSNR"},
+		},
+	}
+
+	exp := map[string]interface{}{
+		"QUERY_TABLE": "VBAP",
+		"ROWCOUNT":    10,
+		"FIELDS": []interface{}{
+			map[string]interface{}{"FIELDNAME": "MANDT"},
+			map[string]interface{}{"FIELDNAME": "VBELN"},
+			map[string]interface{}{"FIELDNAME": "POSNR"},
+		},
+	}
+
+	got := ConvertMap(source)
+	if !reflect.DeepEqual(exp, got) {
+		t.Errorf("result mismatch:\n\nexp=%s\n\ngot=%s\n\n", exp, got)
+	}
+}

+ 2 - 2
deploy/chart/kuiper/Chart.yaml

@@ -14,8 +14,8 @@ type: application
 
 
 # This is the chart version. This version number should be incremented each time you make changes
 # This is the chart version. This version number should be incremented each time you make changes
 # to the chart and its templates, including the app version.
 # to the chart and its templates, including the app version.
-version: 0.3.2
+version: 0.4.0
 
 
 # This is the version number of the application being deployed. This version number should be
 # This is the version number of the application being deployed. This version number should be
 # incremented each time you make changes to the application.
 # incremented each time you make changes to the application.
-appVersion: 0.3.2
+appVersion: 0.4.0

+ 1 - 1
deploy/chart/kuiper/templates/StatefulSet.yaml

@@ -77,7 +77,7 @@ spec:
       {{- end }}
       {{- end }}
       containers:
       containers:
         - name: kuiper
         - name: kuiper
-          image: "{{ .Values.image.repository }}:{{ .Chart.AppVersion }}"
+          image: "{{ .Values.image.repository }}:{{ .Chart.AppVersion }}-alpine"
           imagePullPolicy: {{ .Values.image.pullPolicy }}
           imagePullPolicy: {{ .Values.image.pullPolicy }}
           command: ["./bin/server"]
           command: ["./bin/server"]
           ports:
           ports:

+ 7 - 13
deploy/docker/Dockerfile

@@ -1,24 +1,18 @@
-FROM golang:1.13.10-alpine AS builder
+FROM golang:1.13.10 AS builder
 
 
 COPY . /go/kuiper
 COPY . /go/kuiper
 
 
 WORKDIR /go/kuiper
 WORKDIR /go/kuiper
 
 
-RUN apk add upx gcc make git libc-dev binutils-gold pkgconfig zeromq-dev && make build_with_edgex
-
-FROM alpine:3.10
-
-COPY --from=builder /go/kuiper/_build/kuiper-* /kuiper/
-COPY ./deploy/docker/docker-entrypoint.sh /usr/bin/docker-entrypoint.sh
-
-RUN apk add sed libzmq
-
-WORKDIR /kuiper
+RUN apt update && apt install -y upx pkg-config libczmq-dev \
+    && make build_with_edgex \
+    && ln -s /go/kuiper/_build/kuiper-$(git describe --tags --always)-$(uname -s | tr "[A-Z]" "[a-z]")-$(uname -m) /usr/local/kuiper \
+    && ln -s /go/kuiper/deploy/docker/docker-entrypoint.sh /usr/bin/docker-entrypoint.sh
 
 
 EXPOSE 9081 20498
 EXPOSE 9081 20498
 
 
-ENV KUIPER_HOME /kuiper
+ENV KUIPER_HOME /usr/local/kuiper
 
 
 ENTRYPOINT ["/usr/bin/docker-entrypoint.sh"]
 ENTRYPOINT ["/usr/bin/docker-entrypoint.sh"]
 
 
-CMD ["./bin/server"]
+CMD ["sh", "-c", "cd /usr/local/kuiper && ./bin/server"]

+ 24 - 0
deploy/docker/Dockerfile-alpine

@@ -0,0 +1,24 @@
+FROM golang:1.13.10-alpine AS builder
+
+COPY . /go/kuiper
+
+WORKDIR /go/kuiper
+
+RUN apk add upx gcc make git libc-dev binutils-gold pkgconfig zeromq-dev && make build_with_edgex
+
+FROM alpine:3.10
+
+COPY --from=builder /go/kuiper/_build/kuiper-* /kuiper/
+COPY ./deploy/docker/docker-entrypoint.sh /usr/bin/docker-entrypoint.sh
+
+RUN apk add sed libzmq
+
+WORKDIR /kuiper
+
+EXPOSE 9081 20498
+
+ENV KUIPER_HOME /kuiper
+
+ENTRYPOINT ["/usr/bin/docker-entrypoint.sh"]
+
+CMD ["./bin/server"]

+ 0 - 18
deploy/docker/Dockerfile-dev

@@ -1,18 +0,0 @@
-FROM golang:1.13.10-alpine AS builder
-
-COPY . /go/kuiper
-
-WORKDIR /go/kuiper
-
-RUN apk add vim upx gcc make git sed libc-dev binutils-gold pkgconfig zeromq-dev libzmq \
-    && make build_with_edgex \
-    && ln -s /go/kuiper/_build/kuiper-$(git describe --tags --always)-$(uname -s | tr "[A-Z]" "[a-z]")-$(uname -m) /usr/local/kuiper \
-    && ln -s /go/kuiper/deploy/docker/docker-entrypoint.sh /usr/bin/docker-entrypoint.sh
-
-EXPOSE 9081 20498
-
-ENV KUIPER_HOME /usr/local/kuiper
-
-ENTRYPOINT ["/usr/bin/docker-entrypoint.sh"]
-
-CMD ["sh", "-c", "cd /usr/local/kuiper && ./bin/server"]

+ 24 - 0
deploy/docker/Dockerfile-slim

@@ -0,0 +1,24 @@
+FROM golang:1.13.10 AS builder
+
+COPY . /go/kuiper
+
+WORKDIR /go/kuiper
+
+RUN apt update && apt install -y upx pkg-config libczmq-dev && make build_with_edgex
+
+FROM debian:10
+
+COPY --from=builder /go/kuiper/_build/kuiper-* /kuiper/
+COPY ./deploy/docker/docker-entrypoint.sh /usr/bin/docker-entrypoint.sh
+
+RUN apt update && apt install -y pkg-config libczmq-dev
+
+WORKDIR /kuiper
+
+EXPOSE 9081 20498
+
+ENV KUIPER_HOME /kuiper
+
+ENTRYPOINT ["/usr/bin/docker-entrypoint.sh"]
+
+CMD ["./bin/server"]

Datei-Diff unterdrückt, da er zu groß ist
+ 11 - 4
deploy/docker/README.md


+ 1 - 1
deploy/docker/docker-entrypoint.sh

@@ -1,5 +1,5 @@
 #!/bin/sh
 #!/bin/sh
-if [[ ! -z "$DEBUG" ]]; then
+if [ ! -z "$DEBUG" ]; then
     set -ex
     set -ex
 else
 else
     set -e
     set -e

+ 3 - 0
docs/.gitignore

@@ -1 +1,4 @@
 .lock
 .lock
+*.pdf
+*.lock
+yarn-error.log

+ 68 - 0
docs/_scripts/make.js

@@ -0,0 +1,68 @@
+#!/usr/bin/env node
+
+const fs = require('fs')
+const path = require('path')
+
+const ZH_CN = path.join(__dirname, '../zh_CN')
+const EN_US = path.join(__dirname, '../en_US')
+
+// [[match: regexp | fn | string, replace: regexp fn| string]]
+const replaceRule = [
+  [
+    '[English](README.md) | [简体中文](README-CN.md)',
+    ''
+  ],
+  [
+    '![arch](docs/resources/arch.png)',
+    '![arch](../resources/arch.png)'
+  ],
+  [
+    /]\(docs\/zh_CN\//gim,
+    '](./'
+  ],
+  [
+    /]\(docs\/en_US\//gim,
+    '](./'
+  ],
+  [
+    '(fvt_scripts/edgex/benchmark/pub.go)',
+    'https://github.com/emqx/kuiper/blob/master/fvt_scripts/edgex/pub.go'
+  ],
+  [
+    '[Apache 2.0](LICENSE)',
+    '[Apache 2.0](https://github.com/emqx/kuiper/blob/master/LICENSE)'
+  ]
+]
+
+const readmeMoveRule = [
+  {
+    from: path.join(__dirname, '../../README.md'),
+    to: path.join(EN_US, './README.md'),
+  },
+
+  {
+    from: path.join(__dirname, '../../README-CN.md'),
+    to: path.join(ZH_CN, './README.md'),
+  }
+]
+
+
+function generateReadme() {
+  readmeMoveRule.forEach(fileInfo => {
+    const { from, to } = fileInfo
+    // read
+    let content = fs.readFileSync(from).toString()
+
+    replaceRule.forEach(rule => {
+      content = content.replace(rule[0], rule[1])
+    })
+
+    fs.writeFileSync(
+      to,
+      content,
+    )
+    console.log(`move ${from} to ${to}`)
+  })
+}
+
+generateReadme()

+ 18 - 3
docs/book.json

@@ -1,6 +1,7 @@
 {
 {
   "plugins": [
   "plugins": [
     "prism",
     "prism",
+    "edit-link",
     "-highlight",
     "-highlight",
     "-sharing",
     "-sharing",
     "simple-page-toc",
     "simple-page-toc",
@@ -8,7 +9,8 @@
     "copy-code-button",
     "copy-code-button",
     "anchor-navigation-ex",
     "anchor-navigation-ex",
     "cuav-chapters",
     "cuav-chapters",
-    "local-pagefooter"
+    "local-pagefooter",
+    "github-buttons"
   ],
   ],
   "structure": {
   "structure": {
     "readme": "getting_started.md"
     "readme": "getting_started.md"
@@ -19,6 +21,19 @@
         "shell": "bash"
         "shell": "bash"
       }
       }
     },
     },
+    "edit-link": {
+      "base": "https://github.com/emqx/kuiper/tree/master/docs",
+      "label": "Edit"
+    },
+    "github-buttons": {
+      "buttons": [{
+        "user": "emqx",
+        "repo": "kuiper",
+        "type": "star",
+        "count": true,
+        "size": "small"
+      }]
+    },
     "anchor-navigation-ex": {
     "anchor-navigation-ex": {
       "showLevel": false
       "showLevel": false
     },
     },
@@ -37,8 +52,8 @@
     "epub": "styles/epub.css"
     "epub": "styles/epub.css"
   },
   },
   "title": "Kuiper",
   "title": "Kuiper",
-  "author": "EMQX Team",
-  "language": "zh",
+  "author": "EMQ X Team",
+  "language": "zh-hans",
   "links": {
   "links": {
     "sharing": {
     "sharing": {
       "facebook": false,
       "facebook": false,

BIN
docs/cover.jpg


+ 1 - 2
docs/en_US/SUMMARY.md

@@ -1,6 +1,5 @@
-<!-- - [Introduction](README.md) -->
+- [Introduction](./README.md)
 - [Getting started](getting_started.md)
 - [Getting started](getting_started.md)
-<!-- ## Reference Guide -->
 - [Reference Guide](reference.md)
 - [Reference Guide](reference.md)
 - [Install and operation](operation/overview.md)
 - [Install and operation](operation/overview.md)
   - [Install instruction](operation/install/overview.md)
   - [Install instruction](operation/install/overview.md)

+ 0 - 48
docs/en_US/book.json

@@ -1,48 +0,0 @@
-{
-  "plugins": [
-    "prism",
-    "-highlight",
-    "-sharing",
-    "simple-page-toc",
-    "anchors",
-    "copy-code-button",
-    "anchor-navigation-ex",
-    "cuav-chapters",
-    "local-pagefooter"
-  ],
-  "structure": {
-    "readme": "getting_started.md"
-  },
-  "pluginsConfig": {
-    "prism": {
-      "lang": {
-        "shell": "bash"
-      }
-    },
-    "anchor-navigation-ex": {
-      "showLevel": false
-    },
-    "local-pagefooter": {
-      "copyright": "© Copyright 2016-2019, EMQ Technologies Co., Ltd.",
-      "islocal": true,
-      "modify_label": "The document reversion time: ",
-      "modify_format": "YYYY-MM-DD HH:mm:ss"
-    }
-  },
-  "styles": {
-    "website": "styles/website.css",
-    "ebook": "styles/ebook.css",
-    "pdf": "styles/pdf.css",
-    "mobi": "styles/mobi.css",
-    "epub": "styles/epub.css"
-  },
-  "title": "Kuiper",
-  "author": "EMQX Team",
-  "language": "zh",
-  "links": {
-    "sharing": {
-      "facebook": false,
-      "twitter": false
-    }
-  }
-}

+ 1 - 1
docs/en_US/edgex/edgex_rule_engine_tutorial.md

@@ -185,7 +185,7 @@ In the running Kuiper instance, and execute following command.
 $ bin/cli create rule rule1 -f rule.txt
 $ bin/cli create rule rule1 -f rule.txt
 Connecting to 127.0.0.1:20498...
 Connecting to 127.0.0.1:20498...
 Creating a new rule from file rule.txt.
 Creating a new rule from file rule.txt.
-Rule rule1 was created, please use 'cli getstatus rule $rule_name' command to get rule status.
+Rule rule1 was created successfully, please use 'cli getstatus rule rule1' command to get rule status.
 ```
 ```
 
 
 ------
 ------

Datei-Diff unterdrückt, da er zu groß ist
+ 231 - 0
docs/en_US/plugins/plugins_tutorial.md


Datei-Diff unterdrückt, da er zu groß ist
+ 47 - 1
docs/en_US/rules/overview.md


Datei-Diff unterdrückt, da er zu groß ist
+ 1 - 47
docs/en_US/rules/sinks/rest.md


+ 14 - 5
docs/package.json

@@ -7,12 +7,21 @@
   "license": "Apache 2.0",
   "license": "Apache 2.0",
   "private": true,
   "private": true,
   "scripts": {
   "scripts": {
-    "serve": "gitbook serve",
-    "docs": "gitbook install && gitbook serve",
-    "html": "gitbook install && gitbook build . _book",
-    "pdf": "gitbook install && gitbook pdf . _book.pdf"
+    "serve": "node _scripts/make && gitbook serve",
+    "docs": "node _scripts/make && gitbook serve",
+    "html": "npm install && node _scripts/make && gitbook build . _book",
+    "pdf": "gitbook pdf . _book.pdf"
   },
   },
   "dependencies": {
   "dependencies": {
-    "gitbook-cli": "^2.3.2"
+    "gitbook-cli": "^2.3.2",
+    "gitbook-plugin-prism": "*",
+    "gitbook-plugin-simple-page-toc": "*",
+    "gitbook-plugin-anchors": "*",
+    "gitbook-plugin-copy-code-button": "*",
+    "gitbook-plugin-anchor-navigation-ex": "*",
+    "gitbook-plugin-cuav-chapters": "*",
+    "gitbook-plugin-local-pagefooter": "*",
+    "gitbook-plugin-edit-link": "*",
+    "gitbook-plugin-github-buttons": "*"
   }
   }
 }
 }

+ 15 - 15
docs/zh_CN/SUMMARY.md

@@ -1,23 +1,23 @@
-<!-- - [Introduction](README.md) -->
-- [快速入门](getting_started.md)
-<!-- ## 参考指南 -->
-- [参考指南](reference.md)
-- [安装与操作](operation/overview.md)
-  - [安装说明](operation/install/overview.md)
-  - [Cent-OS](operation/install/cent-os.md)
-  - [操作指南](operation/operations.md)
-  - [Kuiper 基本配置](operation/configuration_file.md)
-  - [MQTT 源配置](rules/sources/mqtt.md)
-- [命令行界面工具-CLI](cli/overview.md)
+- [概要](./README.md)
+- [安装试用](operation/overview.md)
+  - [下载安装快速入门](getting_started.md) 
+  - [Docker 容器安装教程](quick_start_docker.md) 
+  - [EdgeX Foundry 规则引擎教程](edgex/edgex_rule_engine_tutorial.md)
+- [规则](rules/overview.md) 
+- [命令行工具](cli/overview.md)
   - [流管理](cli/streams.md)
   - [流管理](cli/streams.md)
   - [规则管理](cli/rules.md)
   - [规则管理](cli/rules.md)
+  - [插件管理](cli/plugins.md)
+- [Rest API 接口](restapi/overview.md)
+  - [流管理](restapi/streams.md)
+  - [规则管理](restapi/rules.md)
+  - [插件管理](restapi/plugins.md)
 - [Kuiper SQL 参考](sqls/overview.md)
 - [Kuiper SQL 参考](sqls/overview.md)
   - [流规格](sqls/streams.md)
   - [流规格](sqls/streams.md)
   - [查询语言元素](sqls/query_language_elements.md)
   - [查询语言元素](sqls/query_language_elements.md)
   - [窗口](sqls/windows.md)
   - [窗口](sqls/windows.md)
   - [函数](sqls/built-in_functions.md)
   - [函数](sqls/built-in_functions.md)
-- [规则](rules/overview.md)
-  - [日志动作](rules/sinks/logs.md)
-  - [MQTT 动作](rules/sinks/mqtt.md)
 - [扩展 Kuiper](extension/overview.md)
 - [扩展 Kuiper](extension/overview.md)
-- [插件](plugins/overview.md)
+- [插件](plugins/overview.md)
+  - [插件开发教程](plugins/plugins_tutorial.md)
+

+ 0 - 48
docs/zh_CN/book.json

@@ -1,48 +0,0 @@
-{
-  "plugins": [
-    "prism",
-    "-highlight",
-    "-sharing",
-    "simple-page-toc",
-    "anchors",
-    "copy-code-button",
-    "anchor-navigation-ex",
-    "cuav-chapters",
-    "local-pagefooter"
-  ],
-  "structure": {
-    "readme": "getting_started.md"
-  },
-  "pluginsConfig": {
-    "prism": {
-      "lang": {
-        "shell": "bash"
-      }
-    },
-    "anchor-navigation-ex": {
-      "showLevel": false
-    },
-    "local-pagefooter": {
-      "copyright": "© Copyright 2016-2019, EMQ Technologies Co., Ltd.",
-      "islocal": true,
-      "modify_label": "The document reversion time: ",
-      "modify_format": "YYYY-MM-DD HH:mm:ss"
-    }
-  },
-  "styles": {
-    "website": "styles/website.css",
-    "ebook": "styles/ebook.css",
-    "pdf": "styles/pdf.css",
-    "mobi": "styles/mobi.css",
-    "epub": "styles/epub.css"
-  },
-  "title": "Kuiper",
-  "author": "EMQX Team",
-  "language": "zh",
-  "links": {
-    "sharing": {
-      "facebook": false,
-      "twitter": false
-    }
-  }
-}

+ 94 - 0
docs/zh_CN/cli/plugins.md

@@ -0,0 +1,94 @@
+# Plugins management
+
+The Kuiper plugin command line tools allows you to manage plugins, such as create, show and drop plugins. Notice that, drop a plugin will need to restart kuiper to take effect. To update a plugin, do the following:
+1. Drop the plugin.
+2. Restart Kuiper.
+3. Create the plugin with the new configuration.
+
+## create a plugin
+
+The command is used for creating a plugin.  The plugin's definition is specified with JSON format.
+
+```shell
+create plugin $plugin_type $plugin_name $plugin_json | create plugin $plugin_type $plugin_name -f $plugin_def_file
+```
+
+The plugin can be created with two ways. 
+
+- Specify the plugin definition in command line.
+
+Sample:
+
+```shell
+# bin/cli create plugin source random {"file":"http://127.0.0.1/plugins/sources/random.zip"}
+```
+
+The command create a source plugin named ``random``. 
+
+- Specify the plugin definition in a file. If the plugin is complex, or the plugin is already wrote in text files with well organized formats, you can just specify the plugin definition through ``-f`` option.
+
+Sample:
+
+```shell
+# bin/cli create plugin sink plugin1 -f /tmp/plugin1.txt
+```
+
+Below is the contents of ``plugin1.txt``.
+
+```json
+{
+  "file":"http://127.0.0.1/plugins/sources/random.zip"
+}
+```
+### parameters
+1. plugin_type: the type of the plugin. Available values are `["source", "sink", "functions"]`
+2. plugin_name: a unique name of the plugin. The name must be the same as the camel case version of the plugin with lowercase first letter. For example, if the exported plugin name is `Random`, then the name of this plugin is `random`.
+3. file: the url of the plugin files. It must be a zip file with: a compiled so file and the yaml file(only required for sources). The name of the files must match the name of the plugin. Please check [Extension](../extension/overview.md) for the naming rule.
+
+## show plugins
+
+The command is used for displaying all plugins defined in the server for a plugin type.
+
+```shell
+show plugins function
+```
+
+Sample:
+
+```shell
+# bin/cli show plugins function
+function1
+function2
+```
+
+## describe a plugin
+The command is used to print out the detailed definition of a plugin.
+
+```shell
+describe plugin $plugin_type $plugin_name
+```
+
+Sample: 
+
+```shell
+# bin/cli describe plugin source plugin1
+{
+  "name": "plugin1",
+  "version": "1.0.0"
+}
+```
+
+## drop a plugin
+
+The command is used for drop the plugin.
+
+```shell
+drop plugin $plugin_type $plugin_name -s $stop 
+```
+In which, `-s $stop` is an optional boolean parameter. If it is set to true, the Kuiper server will be stopped for the delete to take effect. The user will need to restart it manually.
+Sample:
+
+```shell
+# bin/cli drop plugin source random
+Plugin random is dropped.
+```

+ 1 - 1
docs/zh_CN/edgex/edgex_rule_engine_tutorial.md

@@ -181,7 +181,7 @@ curl -X POST \
 # bin/cli create rule rule1 -f rule.txt
 # bin/cli create rule rule1 -f rule.txt
 Connecting to 127.0.0.1:20498...
 Connecting to 127.0.0.1:20498...
 Creating a new rule from file rule.txt.
 Creating a new rule from file rule.txt.
-Rule rule1 was created, please use 'cli getstatus rule $rule_name' command to get rule status.
+Rule rule1 was created successfully, please use 'cli getstatus rule rule1' command to get rule status.
 ```
 ```
 
 
 ------
 ------

+ 8 - 0
docs/zh_CN/restapi/overview.md

@@ -0,0 +1,8 @@
+Kuiper provides a set of REST API for streams and rules management in addition to CLI. 
+
+By default, the REST API are running in port 9081. You can change the port in `/etc/kuiper.yaml` for the `restPort` property.
+
+- [Streams](streams.md)
+- [Rules](rules.md)
+- [Plugins](plugins.md)
+

+ 80 - 0
docs/zh_CN/restapi/plugins.md

@@ -0,0 +1,80 @@
+# Plugins management
+
+The Kuiper REST api for plugins allows you to manage plugins, such as create, drop and list plugins. Notice that, drop a plugin will need to restart kuiper to take effect. To update a plugin, do the following:
+1. Drop the plugin.
+2. Restart Kuiper.
+3. Create the plugin with the new configuration.
+
+## create a plugin
+
+The API accepts a JSON content to create a new plugin. Each plugin type has a standalone endpoint. The supported types are `["sources", "sinks", "functions"]`. The plugin is identified by the name. The name must be unique.
+```shell
+POST http://localhost:9081/plugins/sources
+POST http://localhost:9081/plugins/sinks
+POST http://localhost:9081/plugins/functions
+```
+Request Sample
+
+```json
+{
+  "name":"random",
+  "file":"http://127.0.0.1/plugins/sources/random.zip"
+}
+```
+
+### Parameters
+
+1. name: a unique name of the plugin. The name must be the same as the camel case version of the plugin with lowercase first letter. For example, if the exported plugin name is `Random`, then the name of this plugin is `random`.
+2. file: the url of the plugin files. It must be a zip file with: a compiled so file and the yaml file(only required for sources). The name of the files must match the name of the plugin. Please check [Extension](../extension/overview.md) for the naming rule.
+
+
+## show plugins
+
+The API is used for displaying all of plugins defined in the server for a plugin type.
+
+```shell
+GET http://localhost:9081/plugins/sources
+GET http://localhost:9081/plugins/sinks
+GET http://localhost:9081/plugins/functions
+```
+
+Response Sample:
+
+```json
+["plugin1","plugin2"]
+```
+
+## describe a plugin
+
+The API is used to print out the detailed definition of a plugin.
+
+```shell
+GET http://localhost:9081/plugins/sources/{name}
+GET http://localhost:9081/plugins/sinks/{name}
+GET http://localhost:9081/plugins/functions/{name}
+```
+
+Path parameter `name` is the name of the plugin.
+
+Response Sample: 
+
+```json
+{
+  "name": "plugin1",
+  "version": "1.0.0"
+}
+```
+
+## drop a plugin
+
+The API is used for drop the plugin. The kuiper server needs to be restarted to take effect.
+
+```shell
+DELETE http://localhost:8080/plugins/sources/{name}
+DELETE http://localhost:8080/plugins/sinks/{name}
+DELETE http://localhost:8080/plugins/functions/{name}
+```
+The user can pass a query parameter to decide if Kuiper should be stopped after a delete in order to make the deletion take effect. The parameter is `restart` and only when the value is `1` will the Kuiper be stopped. The user has to manually restart it.
+```shell
+DELETE http://localhost:8080/plugins/sources/{name}?restart=1
+```

+ 139 - 0
docs/zh_CN/restapi/rules.md

@@ -0,0 +1,139 @@
+# Rules management
+
+The Kuiper REST api for rules allows you to manage rules, such as create, show, drop, describe, start, stop and restart rules. 
+
+## create a rule
+
+The API accepts a JSON content and create and start a rule.
+```shell
+POST http://localhost:9081/rules
+```
+Request Sample
+
+```json
+{
+  "id": "rule1",
+  "sql": "SELECT * FROM demo",
+  "actions": [{
+    "log":  {}
+  }]
+}
+```
+
+
+## show rules
+
+The API is used for displaying all of rules defined in the server with a brief status.
+
+```shell
+GET http://localhost:9081/rules
+```
+
+Response Sample:
+
+```json
+[
+  {
+    "id": "rule1",
+    "status": "Running"
+  },
+  {
+     "id": "rule2",
+     "status": "Stopped: canceled by error."
+  }
+]
+```
+
+## describe a rule
+
+The API is used for print the detailed definition of rule.
+
+```shell
+GET http://localhost:9081/rules/{id}
+```
+
+Path parameter `id` is the id or name of the rule.
+
+Response Sample: 
+
+```json
+{
+  "sql": "SELECT * from demo",
+  "actions": [
+    {
+      "log": {}
+    },
+    {
+      "mqtt": {
+        "server": "tcp://127.0.0.1:1883",
+        "topic": "demoSink"
+      }
+    }
+  ]
+}
+```
+
+## drop a rule
+
+The API is used for drop the rule.
+
+```shell
+DELETE http://localhost:8080/rules/{id}
+```
+
+
+## start a rule
+
+The API is used to start running the rule.
+
+```shell
+POST http://localhost:8080/rules/{id}/start
+```
+
+
+## stop a rule
+
+The API is used to stop running the rule.
+
+```shell
+POST http://localhost:8080/rules/{id}/stop
+```
+
+## restart a rule
+
+The API is used to restart the rule.
+
+```shell
+POST http://localhost:8080/rules/{id}/restart
+```
+
+## get the status of a rule
+
+The command is used to get the status of the rule. If the rule is running, the metrics will be retrieved realtime. The status can be
+- $metrics
+- stopped: $reason
+
+```shell
+GET http://localhost:8080/rules/{id}/status
+```
+
+Response Sample:
+
+```shell
+{
+    "source_demo_0_records_in_total":5,
+    "source_demo_0_records_out_total":5,
+    "source_demo_0_exceptions_total":0,
+    "source_demo_0_process_latency_ms":0,
+    "source_demo_0_buffer_length":0,
+    "source_demo_0_last_invocation":"2020-01-02T11:28:33.054821",
+    ... 
+    "op_filter_0_records_in_total":5,
+    "op_filter_0_records_out_total":2,
+    "op_filter_0_exceptions_total":0,
+    "op_filter_0_process_latency_ms":0,
+    "op_filter_0_buffer_length":0,
+    "op_filter_0_last_invocation":"2020-01-02T11:28:33.054821",
+    ...
+}
+```

+ 74 - 0
docs/zh_CN/restapi/streams.md

@@ -0,0 +1,74 @@
+# Streams management
+
+The Kuiper REST api for streams allows you to manage the streams, such as create, describe, show and drop stream definitions.
+
+## create a stream
+
+The API is used for creating a stream. For more detailed information of stream definition, please refer to [streams](../sqls/streams.md).
+
+```shell
+POST http://localhost:9081/streams
+```
+Request sample, the request is a json string with `sql` field.
+
+```json
+{"sql":"create stream my_stream (id bigint, name string, score float) WITH ( datasource = \"topic/temperature\", FORMAT = \"json\", KEY = \"id\")"}
+```
+
+This API can run any stream sql statements, not only stream creation.
+
+## show streams
+
+The API is used for displaying all of streams defined in the server.
+
+```shell
+GET http://localhost:9081/streams
+```
+
+Response Sample:
+
+```json
+["mystream"]
+```
+
+## describe a stream
+
+The API is used for print the detailed definition of stream.
+
+```shell
+GET http://localhost:9081/streams/{id}}
+```
+
+Response Sample:
+
+```shell
+{
+  "Name": "demo",
+  "StreamFields": [
+    {
+      "Name": "temperature",
+      "FieldType": {
+        "Type": 2
+      }
+    },
+    {
+      "Name": "ts",
+      "FieldType": {
+        "Type": 1
+      }
+    }
+  ],
+  "Options": {
+    "DATASOURCE": "demo",
+    "FORMAT": "JSON"
+  }
+}
+```
+
+## drop a stream
+
+The API is used for drop the stream definition.
+
+```shell
+DELETE http://localhost:9081/streams/{id}
+```

Datei-Diff unterdrückt, da er zu groß ist
+ 47 - 0
docs/zh_CN/rules/overview.md


Datei-Diff unterdrückt, da er zu groß ist
+ 1 - 47
docs/zh_CN/rules/sinks/rest.md


+ 15 - 0
etc/sources/test.yaml

@@ -0,0 +1,15 @@
+default:
+  interval: 1000
+  ashost: 192.168.1.100
+  sysnr: "02"
+  client: "900"
+  user: SPERF
+  passwd: PASSPASS
+  params:
+    QUERY_TABLE: VBAP
+    ROWCOUNT: 10
+    FIELDS: [
+      FIELDNAME: MANDT,
+      FIELDNAME: VBELN,
+      FIELDNAME: POSNR,
+    ]

+ 480 - 0
fvt_scripts/plugin_end_2_end.jmx

@@ -0,0 +1,480 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<jmeterTestPlan version="1.2" properties="4.0" jmeter="4.0 r1823414">
+  <hashTree>
+    <TestPlan guiclass="TestPlanGui" testclass="TestPlan" testname="Test Plan" enabled="true">
+      <stringProp name="TestPlan.comments"></stringProp>
+      <boolProp name="TestPlan.functional_mode">false</boolProp>
+      <boolProp name="TestPlan.tearDown_on_shutdown">true</boolProp>
+      <boolProp name="TestPlan.serialize_threadgroups">false</boolProp>
+      <elementProp name="TestPlan.user_defined_variables" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
+        <collectionProp name="Arguments.arguments"/>
+      </elementProp>
+      <stringProp name="TestPlan.user_define_classpath"></stringProp>
+    </TestPlan>
+    <hashTree>
+      <Arguments guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
+        <collectionProp name="Arguments.arguments">
+          <elementProp name="srv" elementType="Argument">
+            <stringProp name="Argument.name">srv</stringProp>
+            <stringProp name="Argument.value">127.0.0.1</stringProp>
+            <stringProp name="Argument.metadata">=</stringProp>
+          </elementProp>
+          <elementProp name="rest_port" elementType="Argument">
+            <stringProp name="Argument.name">rest_port</stringProp>
+            <stringProp name="Argument.value">9081</stringProp>
+            <stringProp name="Argument.metadata">=</stringProp>
+          </elementProp>
+        </collectionProp>
+      </Arguments>
+      <hashTree/>
+      <ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup" testname="Rules" enabled="true">
+        <stringProp name="ThreadGroup.on_sample_error">continue</stringProp>
+        <elementProp name="ThreadGroup.main_controller" elementType="LoopController" guiclass="LoopControlPanel" testclass="LoopController" testname="Loop Controller" enabled="true">
+          <boolProp name="LoopController.continue_forever">false</boolProp>
+          <stringProp name="LoopController.loops">1</stringProp>
+        </elementProp>
+        <stringProp name="ThreadGroup.num_threads">1</stringProp>
+        <stringProp name="ThreadGroup.ramp_time">1</stringProp>
+        <boolProp name="ThreadGroup.scheduler">false</boolProp>
+        <stringProp name="ThreadGroup.duration"></stringProp>
+        <stringProp name="ThreadGroup.delay"></stringProp>
+      </ThreadGroup>
+      <hashTree>
+        <ResultCollector guiclass="ViewResultsFullVisualizer" testclass="ResultCollector" testname="View Results Tree" enabled="true">
+          <boolProp name="ResultCollector.error_logging">false</boolProp>
+          <objProp>
+            <name>saveConfig</name>
+            <value class="SampleSaveConfiguration">
+              <time>true</time>
+              <latency>true</latency>
+              <timestamp>true</timestamp>
+              <success>true</success>
+              <label>true</label>
+              <code>true</code>
+              <message>true</message>
+              <threadName>true</threadName>
+              <dataType>true</dataType>
+              <encoding>false</encoding>
+              <assertions>true</assertions>
+              <subresults>true</subresults>
+              <responseData>false</responseData>
+              <samplerData>false</samplerData>
+              <xml>false</xml>
+              <fieldNames>true</fieldNames>
+              <responseHeaders>false</responseHeaders>
+              <requestHeaders>false</requestHeaders>
+              <responseDataOnError>false</responseDataOnError>
+              <saveAssertionResultsFailureMessage>true</saveAssertionResultsFailureMessage>
+              <assertionsResultsToSave>0</assertionsResultsToSave>
+              <bytes>true</bytes>
+              <sentBytes>true</sentBytes>
+              <threadCounts>true</threadCounts>
+              <idleTime>true</idleTime>
+              <connectTime>true</connectTime>
+            </value>
+          </objProp>
+          <stringProp name="filename"></stringProp>
+        </ResultCollector>
+        <hashTree/>
+        <TransactionController guiclass="TransactionControllerGui" testclass="TransactionController" testname="API" enabled="true">
+          <boolProp name="TransactionController.includeTimers">false</boolProp>
+          <boolProp name="TransactionController.parent">false</boolProp>
+        </TransactionController>
+        <hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_AddPlugin" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value">{&#xd;
+  &quot;name&quot;:&quot;zmq&quot;,&#xd;
+  &quot;file&quot;:&quot;http://127.0.0.1:9090/plugins/zmq.zip&quot;&#xd;
+}</stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/plugins/sources</stringProp>
+            <stringProp name="HTTPSampler.method">POST</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
+              <collectionProp name="Asserion.test_strings">
+                <stringProp name="49587">201</stringProp>
+              </collectionProp>
+              <stringProp name="Assertion.custom_message"></stringProp>
+              <stringProp name="Assertion.test_field">Assertion.response_code</stringProp>
+              <boolProp name="Assertion.assume_success">true</boolProp>
+              <intProp name="Assertion.test_type">16</intProp>
+            </ResponseAssertion>
+            <hashTree/>
+            <ConstantTimer guiclass="ConstantTimerGui" testclass="ConstantTimer" testname="Constant Timer" enabled="true">
+              <stringProp name="ConstantTimer.delay">500</stringProp>
+            </ConstantTimer>
+            <hashTree/>
+          </hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_DescPlugin" enabled="true">
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments" guiclass="HTTPArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
+              <collectionProp name="Arguments.arguments"/>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/plugins/sources/zmq</stringProp>
+            <stringProp name="HTTPSampler.method">GET</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
+              <collectionProp name="Asserion.test_strings">
+                <stringProp name="120734">zmq</stringProp>
+              </collectionProp>
+              <stringProp name="Assertion.custom_message"></stringProp>
+              <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
+              <boolProp name="Assertion.assume_success">true</boolProp>
+              <intProp name="Assertion.test_type">16</intProp>
+            </ResponseAssertion>
+            <hashTree/>
+          </hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_CreateStream" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value">{&#xd;
+&quot;sql&quot; : &quot;create stream demo (humidity bigint, temperature bigint) WITH (DATASOURCE=\&quot;events\&quot;, FORMAT=\&quot;JSON\&quot;, TYPE=\&quot;zmq\&quot;)&quot;&#xd;
+}</stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/streams</stringProp>
+            <stringProp name="HTTPSampler.method">POST</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
+              <collectionProp name="Asserion.test_strings">
+                <stringProp name="-1754954177">Stream demo is created.</stringProp>
+              </collectionProp>
+              <stringProp name="Assertion.custom_message"></stringProp>
+              <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
+              <boolProp name="Assertion.assume_success">true</boolProp>
+              <intProp name="Assertion.test_type">16</intProp>
+            </ResponseAssertion>
+            <hashTree/>
+          </hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_CreateRule" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value">{&#xd;
+  &quot;id&quot;: &quot;rule1&quot;,&#xd;
+  &quot;sql&quot;: &quot;SELECT * FROM demo WHERE humidity = 55&quot;,&#xd;
+  &quot;actions&quot;: [&#xd;
+    {&#xd;
+      &quot;rest&quot;: {&#xd;
+        &quot;url&quot;: &quot;http://127.0.0.1:9090/alert&quot;,&#xd;
+        &quot;method&quot;: &quot;post&quot;,&#xd;
+        &quot;dataTemplate&quot;: &quot;{\&quot;content\&quot;:{{json .}}}&quot;,&#xd;
+        &quot;sendSingle&quot;: true&#xd;
+      }&#xd;
+    }&#xd;
+  ]&#xd;
+}</stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/rules</stringProp>
+            <stringProp name="HTTPSampler.method">POST</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
+              <collectionProp name="Asserion.test_strings">
+                <stringProp name="-2022196798">Rule rule1 was created</stringProp>
+              </collectionProp>
+              <stringProp name="Assertion.custom_message"></stringProp>
+              <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
+              <boolProp name="Assertion.assume_success">true</boolProp>
+              <intProp name="Assertion.test_type">16</intProp>
+            </ResponseAssertion>
+            <hashTree/>
+          </hashTree>
+          <SystemSampler guiclass="SystemSamplerGui" testclass="SystemSampler" testname="Send_Data" enabled="true">
+            <boolProp name="SystemSampler.checkReturnCode">false</boolProp>
+            <stringProp name="SystemSampler.expectedReturnCode">0</stringProp>
+            <stringProp name="SystemSampler.command">fvt_scripts/plugins/pub/zmq_pub</stringProp>
+            <elementProp name="SystemSampler.arguments" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
+              <collectionProp name="Arguments.arguments"/>
+            </elementProp>
+            <elementProp name="SystemSampler.environment" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
+              <collectionProp name="Arguments.arguments"/>
+            </elementProp>
+            <stringProp name="SystemSampler.directory">${__property(fvt,,)}</stringProp>
+          </SystemSampler>
+          <hashTree>
+            <ConstantTimer guiclass="ConstantTimerGui" testclass="ConstantTimer" testname="Constant Timer" enabled="true">
+              <stringProp name="ConstantTimer.delay">500</stringProp>
+            </ConstantTimer>
+            <hashTree/>
+          </hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_GetRuleStatus" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value"></stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/rules/rule1/status</stringProp>
+            <stringProp name="HTTPSampler.method">GET</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="JSON Assertion" enabled="true">
+              <stringProp name="JSON_PATH">$.sink_sink_rest_0_records_in_total</stringProp>
+              <stringProp name="EXPECTED_VALUE">2</stringProp>
+              <boolProp name="JSONVALIDATION">true</boolProp>
+              <boolProp name="EXPECT_NULL">false</boolProp>
+              <boolProp name="INVERT">false</boolProp>
+              <boolProp name="ISREGEX">false</boolProp>
+            </JSONPathAssertion>
+            <hashTree/>
+            <ConstantTimer guiclass="ConstantTimerGui" testclass="ConstantTimer" testname="Constant Timer" enabled="true">
+              <stringProp name="ConstantTimer.delay">1000</stringProp>
+            </ConstantTimer>
+            <hashTree/>
+          </hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_DropRule" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value"></stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/rules/rule1</stringProp>
+            <stringProp name="HTTPSampler.method">DELETE</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
+              <collectionProp name="Asserion.test_strings">
+                <stringProp name="717250485">Rule rule1 is dropped.</stringProp>
+              </collectionProp>
+              <stringProp name="Assertion.custom_message"></stringProp>
+              <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
+              <boolProp name="Assertion.assume_success">false</boolProp>
+              <intProp name="Assertion.test_type">16</intProp>
+            </ResponseAssertion>
+            <hashTree/>
+          </hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_Drop_Stream" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value"></stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/streams/demo</stringProp>
+            <stringProp name="HTTPSampler.method">DELETE</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
+              <collectionProp name="Asserion.test_strings">
+                <stringProp name="287881319">Stream demo is dropped.</stringProp>
+              </collectionProp>
+              <stringProp name="Assertion.custom_message"></stringProp>
+              <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
+              <boolProp name="Assertion.assume_success">false</boolProp>
+              <intProp name="Assertion.test_type">16</intProp>
+            </ResponseAssertion>
+            <hashTree/>
+          </hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_DropPlugin" enabled="true">
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments" guiclass="HTTPArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
+              <collectionProp name="Arguments.arguments"/>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/plugins/sources/zmq</stringProp>
+            <stringProp name="HTTPSampler.method">DELETE</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
+              <collectionProp name="Asserion.test_strings">
+                <stringProp name="-1776925942">sources plugin zmq is deleted</stringProp>
+              </collectionProp>
+              <stringProp name="Assertion.custom_message"></stringProp>
+              <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
+              <boolProp name="Assertion.assume_success">true</boolProp>
+              <intProp name="Assertion.test_type">16</intProp>
+            </ResponseAssertion>
+            <hashTree/>
+          </hashTree>
+          <ConstantTimer guiclass="ConstantTimerGui" testclass="ConstantTimer" testname="Constant Timer" enabled="true">
+            <stringProp name="ConstantTimer.delay">500</stringProp>
+          </ConstantTimer>
+          <hashTree/>
+        </hashTree>
+      </hashTree>
+      <ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup" testname="Result" enabled="false">
+        <stringProp name="ThreadGroup.on_sample_error">continue</stringProp>
+        <elementProp name="ThreadGroup.main_controller" elementType="LoopController" guiclass="LoopControlPanel" testclass="LoopController" testname="Loop Controller" enabled="true">
+          <boolProp name="LoopController.continue_forever">false</boolProp>
+          <stringProp name="LoopController.loops">1</stringProp>
+        </elementProp>
+        <stringProp name="ThreadGroup.num_threads">1</stringProp>
+        <stringProp name="ThreadGroup.ramp_time">0</stringProp>
+        <boolProp name="ThreadGroup.scheduler">false</boolProp>
+        <stringProp name="ThreadGroup.duration"></stringProp>
+        <stringProp name="ThreadGroup.delay"></stringProp>
+      </ThreadGroup>
+      <hashTree>
+        <ResultCollector guiclass="ViewResultsFullVisualizer" testclass="ResultCollector" testname="View Results Tree" enabled="true">
+          <boolProp name="ResultCollector.error_logging">false</boolProp>
+          <objProp>
+            <name>saveConfig</name>
+            <value class="SampleSaveConfiguration">
+              <time>true</time>
+              <latency>true</latency>
+              <timestamp>true</timestamp>
+              <success>true</success>
+              <label>true</label>
+              <code>true</code>
+              <message>true</message>
+              <threadName>true</threadName>
+              <dataType>true</dataType>
+              <encoding>false</encoding>
+              <assertions>true</assertions>
+              <subresults>true</subresults>
+              <responseData>false</responseData>
+              <samplerData>false</samplerData>
+              <xml>false</xml>
+              <fieldNames>true</fieldNames>
+              <responseHeaders>false</responseHeaders>
+              <requestHeaders>false</requestHeaders>
+              <responseDataOnError>false</responseDataOnError>
+              <saveAssertionResultsFailureMessage>true</saveAssertionResultsFailureMessage>
+              <assertionsResultsToSave>0</assertionsResultsToSave>
+              <bytes>true</bytes>
+              <sentBytes>true</sentBytes>
+              <threadCounts>true</threadCounts>
+              <idleTime>true</idleTime>
+              <connectTime>true</connectTime>
+            </value>
+          </objProp>
+          <stringProp name="filename"></stringProp>
+        </ResultCollector>
+        <hashTree/>
+        <SystemSampler guiclass="SystemSamplerGui" testclass="SystemSampler" testname="App_Service" enabled="true">
+          <boolProp name="SystemSampler.checkReturnCode">false</boolProp>
+          <stringProp name="SystemSampler.expectedReturnCode">0</stringProp>
+          <stringProp name="SystemSampler.command">./server</stringProp>
+          <elementProp name="SystemSampler.arguments" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
+            <collectionProp name="Arguments.arguments"/>
+          </elementProp>
+          <elementProp name="SystemSampler.environment" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
+            <collectionProp name="Arguments.arguments"/>
+          </elementProp>
+          <stringProp name="SystemSampler.directory">${__property(fvt,,)}/fvt_scripts/plugins/service/</stringProp>
+        </SystemSampler>
+        <hashTree/>
+      </hashTree>
+    </hashTree>
+  </hashTree>
+</jmeterTestPlan>

+ 99 - 0
fvt_scripts/plugins/pub/zmq_pub.go

@@ -0,0 +1,99 @@
+package main
+
+import (
+	"encoding/json"
+	"fmt"
+	zmq "github.com/pebbe/zmq4"
+	"os"
+	"time"
+)
+
+type zmqPub struct {
+	publisher *zmq.Socket
+	srv       string
+	topic     string
+}
+
+func (m *zmqPub) Open() (err error) {
+	m.publisher, err = zmq.NewSocket(zmq.PUB)
+	if err != nil {
+		return fmt.Errorf("zmq sink fails to create socket: %v", err)
+	}
+	err = m.publisher.Bind(m.srv)
+	if err != nil {
+		return fmt.Errorf("zmq sink fails to bind to %s: %v", m.srv, err)
+	}
+	fmt.Println("zmq sink open")
+	return nil
+}
+
+func (m *zmqPub) Send(item interface{}) (err error) {
+	if v, ok := item.([]byte); ok {
+		fmt.Printf("To pub: %s \n", item)
+		if m.topic == "" {
+			_, err = m.publisher.Send(string(v), 0)
+		} else {
+			msgs := []string{
+				m.topic,
+				string(v),
+			}
+			_, err = m.publisher.SendMessage(msgs)
+		}
+	} else {
+		fmt.Printf("zmq sink receive non byte data %v \n", item)
+	}
+	if err != nil {
+		fmt.Printf("send to zmq error %v \n", err)
+	}
+	return
+}
+
+func (m *zmqPub) Close() error {
+	if m.publisher != nil {
+		return m.publisher.Close()
+	}
+	return nil
+}
+
+type data struct {
+	Temperature int `json:"temperature"`
+	Humidity    int `json:"humidity"`
+}
+
+var mockup = [10]data{
+	{Temperature: 10, Humidity: 15},
+	{Temperature: 15, Humidity: 20},
+	{Temperature: 20, Humidity: 25},
+	{Temperature: 25, Humidity: 30},
+	{Temperature: 30, Humidity: 35},
+	{Temperature: 35, Humidity: 40},
+	{Temperature: 40, Humidity: 45},
+	{Temperature: 45, Humidity: 50},
+	{Temperature: 50, Humidity: 55},
+	{Temperature: 55, Humidity: 60},
+}
+
+func main() {
+	zmq := zmqPub{srv: "tcp://127.0.0.1:5563", topic: "events"}
+	if e := zmq.Open(); e != nil {
+		return
+	} else {
+		if len(os.Args) == 2 {
+			v := os.Args[1]
+			if v != "" {
+				zmq.topic = v
+				fmt.Printf("Use the topic %s\n", v)
+			} else {
+				fmt.Printf("Use the default zeromq topic %s\n", "events")
+			}
+		}
+
+		for i := 0; i < 20; i++ {
+			index := i % 10
+			b, _ := json.Marshal(mockup[index])
+			time.Sleep(1000 * time.Millisecond)
+			zmq.Send(b)
+		}
+	}
+}
+

+ 27 - 0
fvt_scripts/plugins/service/server.go

@@ -0,0 +1,27 @@
+package main
+
+import (
+	"bytes"
+	"io/ioutil"
+	"log"
+	"net/http"
+)
+
+func alert(w http.ResponseWriter, req *http.Request) {
+	buf, bodyErr := ioutil.ReadAll(req.Body)
+	if bodyErr != nil {
+		log.Print("bodyErr ", bodyErr.Error())
+		http.Error(w, bodyErr.Error(), http.StatusInternalServerError)
+		return
+	}
+
+	rdr1 := ioutil.NopCloser(bytes.NewBuffer(buf))
+	log.Printf("BODY: %q", rdr1)
+}
+
+func main() {
+	http.Handle("/", http.FileServer(http.Dir("web")))
+	http.HandleFunc("/alert", alert)
+
+	http.ListenAndServe(":9090", nil)
+}

+ 3 - 0
fvt_scripts/plugins/zmq.yaml

@@ -0,0 +1,3 @@
+#Global Zmq configurations
+default:
+  server: tcp://127.0.0.1:5563

+ 35 - 0
fvt_scripts/prepare_plugins.sh

@@ -0,0 +1,35 @@
+#!/bin/bash
+set -e
+
+go build -o fvt_scripts/plugins/pub/zmq_pub fvt_scripts/plugins/pub/zmq_pub.go
+chmod +x fvt_scripts/plugins/pub/zmq_pub
+
+go build -o fvt_scripts/plugins/service/http_server fvt_scripts/plugins/service/server.go
+chmod +x fvt_scripts/plugins/service/http_server
+
+cd fvt_scripts
+
+rm -rf zmq.* Zmq.so
+
+FILE=../plugins/sources/Zmq.so
+if [ -f "$FILE" ]; then
+    echo "$FILE exists, not requried to build plugin."
+else
+    echo "$FILE does not exist, will build the plugin."
+    go build --buildmode=plugin -o ../plugins/sources/Zmq.so ../plugins/sources/zmq.go
+fi
+
+mv ../plugins/sources/Zmq.so .
+cp plugins/zmq.yaml .
+zip zmq.zip Zmq.so zmq.yaml
+rm -rf zmq.yaml Zmq.so
+
+rm -rf plugins/service/web/plugins/
+mkdir -p plugins/service/web/plugins/
+mv zmq.zip plugins/service/web/plugins/
+
+cd plugins/service/
+export BUILD_ID=dontKillMe
+
+echo "starting mock http server..."
+nohup ./http_server > http_server.out 2>&1 &

+ 4 - 1
fvt_scripts/run_jmeter.sh

@@ -83,4 +83,7 @@ if test $with_edgex = true; then
   /opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t fvt_scripts/edgex_mqtt_sink_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/edgex_mqtt_sink_rule.jtl -j jmeter_logs/edgex_mqtt_sink_rule.log
   /opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t fvt_scripts/edgex_mqtt_sink_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/edgex_mqtt_sink_rule.jtl -j jmeter_logs/edgex_mqtt_sink_rule.log
   echo -e "---------------------------------------------\n"
   echo -e "---------------------------------------------\n"
 
 
-fi
+fi
+
+/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t fvt_scripts/plugin_end_2_end.jmx -Dfvt="$fvt_dir" -l jmeter_logs/plugin_end_2_end.jtl -j jmeter_logs/plugin_end_2_end.log
+echo -e "---------------------------------------------\n"

+ 13 - 1
fvt_scripts/setup_env.sh

@@ -36,4 +36,16 @@ else
   done
   done
 fi
 fi
 
 
-fvt_scripts/start_vdmock.sh
+fvt_scripts/start_vdmock.sh
+
+pids=`ps aux | grep http_server | grep "./" | awk '{printf $2 " "}'`
+if [ "$pids" = "" ] ; then
+   echo "No http mockup server was started"
+else
+  for pid in $pids ; do
+    echo "kill http mockup server " $pid
+    kill -9 $pid
+  done
+fi
+
+fvt_scripts/prepare_plugins.sh

+ 35 - 7
plugins/manager.go

@@ -10,6 +10,7 @@ import (
 	"net/http"
 	"net/http"
 	"net/url"
 	"net/url"
 	"os"
 	"os"
+	"os/exec"
 	"path"
 	"path"
 	"path/filepath"
 	"path/filepath"
 	"plugin"
 	"plugin"
@@ -33,6 +34,8 @@ const (
 	FUNCTION
 	FUNCTION
 )
 )
 
 
+const DELETED = "$deleted"
+
 var (
 var (
 	PluginTypes = []string{"sources", "sinks", "functions"}
 	PluginTypes = []string{"sources", "sinks", "functions"}
 	once        sync.Once
 	once        sync.Once
@@ -191,11 +194,14 @@ func (m *Manager) Register(t PluginType, j *Plugin) error {
 		return fmt.Errorf("invalid uri %s", uri)
 		return fmt.Errorf("invalid uri %s", uri)
 	}
 	}
 
 
-	for _, n := range m.registry.List(t) {
-		if n == name {
+	if v, ok := m.registry.Get(t, name); ok {
+		if v == DELETED {
+			return fmt.Errorf("invalid name %s: the plugin is marked as deleted but Kuiper is not restarted for the change to take effect yet", name)
+		} else {
 			return fmt.Errorf("invalid name %s: duplicate", name)
 			return fmt.Errorf("invalid name %s: duplicate", name)
 		}
 		}
 	}
 	}
+
 	zipPath := path.Join(m.pluginDir, name+".zip")
 	zipPath := path.Join(m.pluginDir, name+".zip")
 	var unzipFiles []string
 	var unzipFiles []string
 	//clean up: delete zip file and unzip files in error
 	//clean up: delete zip file and unzip files in error
@@ -206,7 +212,7 @@ func (m *Manager) Register(t PluginType, j *Plugin) error {
 		return fmt.Errorf("fail to download file %s: %s", uri, err)
 		return fmt.Errorf("fail to download file %s: %s", uri, err)
 	}
 	}
 	//unzip and copy to destination
 	//unzip and copy to destination
-	unzipFiles, version, err := m.unzipAndCopy(t, name, zipPath)
+	unzipFiles, version, err := m.install(t, name, zipPath)
 	if err != nil {
 	if err != nil {
 		if t == SOURCE && len(unzipFiles) == 1 { //source that only copy so file
 		if t == SOURCE && len(unzipFiles) == 1 { //source that only copy so file
 			os.Remove(unzipFiles[0])
 			os.Remove(unzipFiles[0])
@@ -249,6 +255,7 @@ func (m *Manager) Delete(t PluginType, name string, stop bool) error {
 	if len(results) > 0 {
 	if len(results) > 0 {
 		return errors.New(strings.Join(results, "\n"))
 		return errors.New(strings.Join(results, "\n"))
 	} else {
 	} else {
+		m.registry.Store(t, name, DELETED)
 		if stop {
 		if stop {
 			go func() {
 			go func() {
 				time.Sleep(1 * time.Second)
 				time.Sleep(1 * time.Second)
@@ -283,8 +290,10 @@ func getSoFileName(m *Manager, t PluginType, name string) (string, error) {
 	return soFile, nil
 	return soFile, nil
 }
 }
 
 
-func (m *Manager) unzipAndCopy(t PluginType, name string, src string) ([]string, string, error) {
+func (m *Manager) install(t PluginType, name string, src string) ([]string, string, error) {
 	var filenames []string
 	var filenames []string
+	var tempPath = path.Join(m.pluginDir, "temp", PluginTypes[t], name)
+	defer os.RemoveAll(tempPath)
 	r, err := zip.OpenReader(src)
 	r, err := zip.OpenReader(src)
 	if err != nil {
 	if err != nil {
 		return filenames, "", err
 		return filenames, "", err
@@ -299,6 +308,7 @@ func (m *Manager) unzipAndCopy(t PluginType, name string, src string) ([]string,
 		yamlPath = path.Join(m.etcDir, PluginTypes[t], yamlFile)
 		yamlPath = path.Join(m.etcDir, PluginTypes[t], yamlFile)
 		expFiles = 2
 		expFiles = 2
 	}
 	}
+	needInstall := false
 	for _, file := range r.File {
 	for _, file := range r.File {
 		fileName := file.Name
 		fileName := file.Name
 		if yamlFile == fileName {
 		if yamlFile == fileName {
@@ -307,8 +317,7 @@ func (m *Manager) unzipAndCopy(t PluginType, name string, src string) ([]string,
 				return filenames, "", err
 				return filenames, "", err
 			}
 			}
 			filenames = append(filenames, yamlPath)
 			filenames = append(filenames, yamlPath)
-		}
-		if soPrefix.Match([]byte(fileName)) {
+		} else if soPrefix.Match([]byte(fileName)) {
 			soPath := path.Join(m.pluginDir, PluginTypes[t], fileName)
 			soPath := path.Join(m.pluginDir, PluginTypes[t], fileName)
 			err = unzipTo(file, soPath)
 			err = unzipTo(file, soPath)
 			if err != nil {
 			if err != nil {
@@ -316,10 +325,27 @@ func (m *Manager) unzipAndCopy(t PluginType, name string, src string) ([]string,
 			}
 			}
 			filenames = append(filenames, soPath)
 			filenames = append(filenames, soPath)
 			_, version = parseName(fileName)
 			_, version = parseName(fileName)
+		} else { //unzip other files
+			err = unzipTo(file, path.Join(tempPath, fileName))
+			if err != nil {
+				return filenames, "", err
+			}
+			if fileName == "install.sh" {
+				needInstall = true
+			}
 		}
 		}
 	}
 	}
 	if len(filenames) != expFiles {
 	if len(filenames) != expFiles {
 		return filenames, version, fmt.Errorf("invalid zip file: so file or conf file is missing")
 		return filenames, version, fmt.Errorf("invalid zip file: so file or conf file is missing")
+	} else if needInstall {
+		//run install script if there is
+		spath := path.Join(tempPath, "install.sh")
+		out, err := exec.Command("/bin/sh", spath).Output()
+		if err != nil {
+			return filenames, "", err
+		} else {
+			common.Log.Infof("install %s plugin %s log: %s", PluginTypes[t], name, out)
+		}
 	}
 	}
 	return filenames, version, nil
 	return filenames, version, nil
 }
 }
@@ -337,7 +363,9 @@ func parseName(n string) (string, string) {
 func unzipTo(f *zip.File, fpath string) error {
 func unzipTo(f *zip.File, fpath string) error {
 	_, err := os.Stat(fpath)
 	_, err := os.Stat(fpath)
 	if err == nil || !os.IsNotExist(err) {
 	if err == nil || !os.IsNotExist(err) {
-		return fmt.Errorf("%s already exist", fpath)
+		if err = os.Remove(fpath); err != nil {
+			return fmt.Errorf("failed to delete file %s", fpath)
+		}
 	}
 	}
 
 
 	if f.FileInfo().IsDir() {
 	if f.FileInfo().IsDir() {

+ 70 - 0
xsql/processors/simple_processor_test.go

@@ -0,0 +1,70 @@
+package processors
+
+import (
+	"reflect"
+	"testing"
+)
+
+func TestRuleActionParse_Apply(t *testing.T) {
+	var tests = []struct {
+		ruleStr string
+		result  []map[string]interface{}
+	}{
+		{
+			ruleStr: `{
+			  "id": "ruleTest",
+			  "sql": "SELECT * from demo",
+			  "actions": [
+				{
+				  	"funcName": "RFC_READ_TABLE",
+					"ashost":   "192.168.1.100",
+					"sysnr":    "02",
+					"client":   "900",
+					"user":     "SPERF",
+					"passwd":   "PASSPASS",
+					"params": {
+						"QUERY_TABLE": "VBAP",
+						"ROWCOUNT":    10,
+						"FIELDS": [
+							{"FIELDNAME": "MANDT"},
+							{"FIELDNAME": "VBELN"},
+							{"FIELDNAME": "POSNR"}
+						]
+					}
+				}
+			  ]
+			}`,
+			result: []map[string]interface{}{
+				{
+					"funcName": "RFC_READ_TABLE",
+					"ashost":   "192.168.1.100",
+					"sysnr":    "02",
+					"client":   "900",
+					"user":     "SPERF",
+					"passwd":   "PASSPASS",
+					"params": map[string]interface{}{
+						"QUERY_TABLE": "VBAP",
+						"ROWCOUNT":    float64(10),
+						"FIELDS": []interface{}{
+							map[string]interface{}{"FIELDNAME": "MANDT"},
+							map[string]interface{}{"FIELDNAME": "VBELN"},
+							map[string]interface{}{"FIELDNAME": "POSNR"},
+						},
+					},
+				},
+			},
+		},
+	}
+
+	p := NewRuleProcessor(DbDir)
+	for i, tt := range tests {
+		r, err := p.getRuleByJson("ruleTest", tt.ruleStr)
+		if err != nil {
+			t.Errorf("get rule error: %s", err)
+		}
+		if !reflect.DeepEqual(tt.result, r.Actions) {
+			t.Errorf("%d \tresult mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, r.Actions)
+		}
+	}
+
+}

+ 1 - 1
xsql/processors/xsql_processor.go

@@ -231,7 +231,7 @@ func (p *RuleProcessor) GetRuleByName(name string) (*api.Rule, error) {
 		return nil, err
 		return nil, err
 	}
 	}
 	defer p.db.Close()
 	defer p.db.Close()
-	s, f := p.db.Get(string(name))
+	s, f := p.db.Get(name)
 	if !f {
 	if !f {
 		return nil, fmt.Errorf("Rule %s is not found.", name)
 		return nil, fmt.Errorf("Rule %s is not found.", name)
 	}
 	}

+ 249 - 5
xsql/processors/xsql_processor_test.go

@@ -800,7 +800,7 @@ func TestSingleSQL(t *testing.T) {
 			t.Error(err)
 			t.Error(err)
 		}
 		}
 		mockSink := test.NewMockSink()
 		mockSink := test.NewMockSink()
-		sink := nodes.NewSinkNodeWithSink("mockSink", mockSink)
+		sink := nodes.NewSinkNodeWithSink("mockSink", mockSink, nil)
 		tp.AddSink(inputs, sink)
 		tp.AddSink(inputs, sink)
 		errCh := tp.Open()
 		errCh := tp.Open()
 		func() {
 		func() {
@@ -843,6 +843,250 @@ func TestSingleSQL(t *testing.T) {
 	}
 	}
 }
 }
 
 
+func TestSingleSQLTemplate(t *testing.T) {
+	var tests = []struct {
+		name string
+		sql  string
+		r    []map[string]interface{}
+		s    string
+		m    map[string]interface{}
+	}{
+		{
+			name: `rule1`,
+			sql:  `SELECT * FROM demo`,
+			r: []map[string]interface{}{
+				{
+					"c":       "red",
+					"wrapper": "w1",
+				},
+				{
+					"c":       "blue",
+					"wrapper": "w1",
+				},
+				{
+					"c":       "blue",
+					"wrapper": "w1",
+				},
+				{
+					"c":       "yellow",
+					"wrapper": "w1",
+				},
+				{
+					"c":       "red",
+					"wrapper": "w1",
+				},
+			},
+			m: map[string]interface{}{
+				"op_preprocessor_demo_0_exceptions_total":   int64(0),
+				"op_preprocessor_demo_0_process_latency_ms": int64(0),
+				"op_preprocessor_demo_0_records_in_total":   int64(5),
+				"op_preprocessor_demo_0_records_out_total":  int64(5),
+
+				"op_project_0_exceptions_total":   int64(0),
+				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_records_in_total":   int64(5),
+				"op_project_0_records_out_total":  int64(5),
+
+				"sink_mockSink_0_exceptions_total":  int64(0),
+				"sink_mockSink_0_records_in_total":  int64(5),
+				"sink_mockSink_0_records_out_total": int64(5),
+
+				"source_demo_0_exceptions_total":  int64(0),
+				"source_demo_0_records_in_total":  int64(5),
+				"source_demo_0_records_out_total": int64(5),
+			},
+			s: "sink_mockSink_0_records_out_total",
+		},
+	}
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	createStreams(t)
+	defer dropStreams(t)
+	//defer close(done)
+	for i, tt := range tests {
+		test.ResetClock(1541152486000)
+		p := NewRuleProcessor(DbDir)
+		parser := xsql.NewParser(strings.NewReader(tt.sql))
+		var (
+			sources []*nodes.SourceNode
+			syncs   []chan int
+		)
+		if stmt, err := xsql.Language.Parse(parser); err != nil {
+			t.Errorf("parse sql %s error: %s", tt.sql, err)
+		} else {
+			if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
+				t.Errorf("sql %s is not a select statement", tt.sql)
+			} else {
+				streams := xsql.GetStreams(selectStmt)
+				for _, stream := range streams {
+					next := make(chan int)
+					syncs = append(syncs, next)
+					source := getMockSource(stream, next, 5)
+					sources = append(sources, source)
+				}
+			}
+		}
+		tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql, Options: map[string]interface{}{
+			"bufferLength": float64(100),
+		}}, sources)
+		if err != nil {
+			t.Error(err)
+		}
+		mockSink := test.NewMockSink()
+		sink := nodes.NewSinkNodeWithSink("mockSink", mockSink, map[string]interface{}{
+			"dataTemplate": `{"wrapper":"w1", "c":"{{.color}}"}`,
+			"sendSingle":   true,
+		})
+		tp.AddSink(inputs, sink)
+		errCh := tp.Open()
+		func() {
+			for i := 0; i < 5; i++ {
+				syncs[i%len(syncs)] <- i
+				select {
+				case err = <-errCh:
+					t.Log(err)
+					tp.Cancel()
+					return
+				default:
+				}
+			}
+			for retry := 100; retry > 0; retry-- {
+				if err := compareMetrics(tp, tt.m, tt.sql); err == nil {
+					break
+				}
+				time.Sleep(time.Duration(retry) * time.Millisecond)
+			}
+		}()
+		results := mockSink.GetResults()
+		var maps []map[string]interface{}
+		for _, v := range results {
+			var mapRes map[string]interface{}
+			err := json.Unmarshal(v, &mapRes)
+			if err != nil {
+				t.Errorf("Failed to parse the input into map")
+				continue
+			}
+			maps = append(maps, mapRes)
+		}
+		if !reflect.DeepEqual(tt.r, maps) {
+			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
+			continue
+		}
+		if err := compareMetrics(tp, tt.m, tt.sql); err != nil {
+			t.Errorf("%d. %q\n\n%v", i, tt.sql, err)
+		}
+		tp.Cancel()
+	}
+}
+
+func TestNoneSingleSQLTemplate(t *testing.T) {
+	var tests = []struct {
+		name string
+		sql  string
+		r    [][]byte
+		s    string
+		m    map[string]interface{}
+	}{
+		{
+			name: `rule1`,
+			sql:  `SELECT * FROM demo`,
+			r: [][]byte{
+				[]byte("<div>results</div><ul><li>red - 3</li></ul>"),
+				[]byte("<div>results</div><ul><li>blue - 6</li></ul>"),
+				[]byte("<div>results</div><ul><li>blue - 2</li></ul>"),
+				[]byte("<div>results</div><ul><li>yellow - 4</li></ul>"),
+				[]byte("<div>results</div><ul><li>red - 1</li></ul>"),
+			},
+			m: map[string]interface{}{
+				"op_preprocessor_demo_0_exceptions_total":   int64(0),
+				"op_preprocessor_demo_0_process_latency_ms": int64(0),
+				"op_preprocessor_demo_0_records_in_total":   int64(5),
+				"op_preprocessor_demo_0_records_out_total":  int64(5),
+
+				"op_project_0_exceptions_total":   int64(0),
+				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_records_in_total":   int64(5),
+				"op_project_0_records_out_total":  int64(5),
+
+				"sink_mockSink_0_exceptions_total":  int64(0),
+				"sink_mockSink_0_records_in_total":  int64(5),
+				"sink_mockSink_0_records_out_total": int64(5),
+
+				"source_demo_0_exceptions_total":  int64(0),
+				"source_demo_0_records_in_total":  int64(5),
+				"source_demo_0_records_out_total": int64(5),
+			},
+			s: "sink_mockSink_0_records_out_total",
+		},
+	}
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	createStreams(t)
+	defer dropStreams(t)
+	//defer close(done)
+	for i, tt := range tests {
+		test.ResetClock(1541152486000)
+		p := NewRuleProcessor(DbDir)
+		parser := xsql.NewParser(strings.NewReader(tt.sql))
+		var (
+			sources []*nodes.SourceNode
+			syncs   []chan int
+		)
+		if stmt, err := xsql.Language.Parse(parser); err != nil {
+			t.Errorf("parse sql %s error: %s", tt.sql, err)
+		} else {
+			if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
+				t.Errorf("sql %s is not a select statement", tt.sql)
+			} else {
+				streams := xsql.GetStreams(selectStmt)
+				for _, stream := range streams {
+					next := make(chan int)
+					syncs = append(syncs, next)
+					source := getMockSource(stream, next, 5)
+					sources = append(sources, source)
+				}
+			}
+		}
+		tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql, Options: map[string]interface{}{
+			"bufferLength": float64(100),
+		}}, sources)
+		if err != nil {
+			t.Error(err)
+		}
+		mockSink := test.NewMockSink()
+		sink := nodes.NewSinkNodeWithSink("mockSink", mockSink, map[string]interface{}{
+			"dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.color}} - {{.size}}</li>{{end}}</ul>`,
+		})
+		tp.AddSink(inputs, sink)
+		errCh := tp.Open()
+		func() {
+			for i := 0; i < 5; i++ {
+				syncs[i%len(syncs)] <- i
+				select {
+				case err = <-errCh:
+					t.Log(err)
+					tp.Cancel()
+					return
+				default:
+				}
+			}
+			for retry := 100; retry > 0; retry-- {
+				if err := compareMetrics(tp, tt.m, tt.sql); err == nil {
+					break
+				}
+				time.Sleep(time.Duration(retry) * time.Millisecond)
+			}
+		}()
+		results := mockSink.GetResults()
+		if !reflect.DeepEqual(tt.r, results) {
+			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, results)
+			continue
+		}
+		if err := compareMetrics(tp, tt.m, tt.sql); err != nil {
+			t.Errorf("%d. %q\n\n%v", i, tt.sql, err)
+		}
+		tp.Cancel()
+	}
+}
+
 func getMockSourceL(name string, done <-chan int, size int) *nodes.SourceNode {
 func getMockSourceL(name string, done <-chan int, size int) *nodes.SourceNode {
 	var data []*xsql.Tuple
 	var data []*xsql.Tuple
 	switch name {
 	switch name {
@@ -1169,7 +1413,7 @@ func TestSingleSQLError(t *testing.T) {
 			t.Error(err)
 			t.Error(err)
 		}
 		}
 		mockSink := test.NewMockSink()
 		mockSink := test.NewMockSink()
-		sink := nodes.NewSinkNodeWithSink("mockSink", mockSink)
+		sink := nodes.NewSinkNodeWithSink("mockSink", mockSink, nil)
 		tp.AddSink(inputs, sink)
 		tp.AddSink(inputs, sink)
 		errCh := tp.Open()
 		errCh := tp.Open()
 		func() {
 		func() {
@@ -1684,7 +1928,7 @@ func TestWindow(t *testing.T) {
 			t.Error(err)
 			t.Error(err)
 		}
 		}
 		mockSink := test.NewMockSink()
 		mockSink := test.NewMockSink()
-		sink := nodes.NewSinkNodeWithSink("mockSink", mockSink)
+		sink := nodes.NewSinkNodeWithSink("mockSink", mockSink, nil)
 		tp.AddSink(inputs, sink)
 		tp.AddSink(inputs, sink)
 		errCh := tp.Open()
 		errCh := tp.Open()
 		func() {
 		func() {
@@ -2015,7 +2259,7 @@ func TestWindowError(t *testing.T) {
 			t.Error(err)
 			t.Error(err)
 		}
 		}
 		mockSink := test.NewMockSink()
 		mockSink := test.NewMockSink()
-		sink := nodes.NewSinkNodeWithSink("mockSink", mockSink)
+		sink := nodes.NewSinkNodeWithSink("mockSink", mockSink, nil)
 		tp.AddSink(inputs, sink)
 		tp.AddSink(inputs, sink)
 		errCh := tp.Open()
 		errCh := tp.Open()
 		func() {
 		func() {
@@ -2879,7 +3123,7 @@ func TestEventWindow(t *testing.T) {
 			t.Error(err)
 			t.Error(err)
 		}
 		}
 		mockSink := test.NewMockSink()
 		mockSink := test.NewMockSink()
-		sink := nodes.NewSinkNodeWithSink("mockSink", mockSink)
+		sink := nodes.NewSinkNodeWithSink("mockSink", mockSink, nil)
 		tp.AddSink(inputs, sink)
 		tp.AddSink(inputs, sink)
 		errCh := tp.Open()
 		errCh := tp.Open()
 		func() {
 		func() {

+ 102 - 25
xstream/nodes/sink_node.go

@@ -1,12 +1,16 @@
 package nodes
 package nodes
 
 
 import (
 import (
+	"bytes"
+	"encoding/json"
 	"fmt"
 	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/common"
+	"github.com/emqx/kuiper/common/templates"
 	"github.com/emqx/kuiper/plugins"
 	"github.com/emqx/kuiper/plugins"
 	"github.com/emqx/kuiper/xstream/api"
 	"github.com/emqx/kuiper/xstream/api"
 	"github.com/emqx/kuiper/xstream/sinks"
 	"github.com/emqx/kuiper/xstream/sinks"
 	"sync"
 	"sync"
+	"text/template"
 	"time"
 	"time"
 )
 )
 
 
@@ -46,12 +50,12 @@ func NewSinkNode(name string, sinkType string, props map[string]interface{}) *Si
 }
 }
 
 
 //Only for mock source, do not use it in production
 //Only for mock source, do not use it in production
-func NewSinkNodeWithSink(name string, sink api.Sink) *SinkNode {
+func NewSinkNodeWithSink(name string, sink api.Sink, props map[string]interface{}) *SinkNode {
 	return &SinkNode{
 	return &SinkNode{
 		input:       make(chan interface{}, 1024),
 		input:       make(chan interface{}, 1024),
 		name:        name,
 		name:        name,
 		sinks:       []api.Sink{sink},
 		sinks:       []api.Sink{sink},
-		options:     nil,
+		options:     props,
 		concurrency: 1,
 		concurrency: 1,
 		ctx:         nil,
 		ctx:         nil,
 		isMock:      true,
 		isMock:      true,
@@ -110,6 +114,30 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 				omitIfEmpty = t
 				omitIfEmpty = t
 			}
 			}
 		}
 		}
+		sendSingle := false
+		if c, ok := m.options["sendSingle"]; ok {
+			if t, ok := c.(bool); !ok {
+				logger.Warnf("invalid type for sendSingle property, should be a bool value 'true/false'.", c)
+			} else {
+				sendSingle = t
+			}
+		}
+		var tp *template.Template = nil
+		if c, ok := m.options["dataTemplate"]; ok {
+			if t, ok := c.(string); !ok {
+				logger.Warnf("invalid type for dateTemplate property, should be a string value.", c)
+			} else {
+				funcMap := template.FuncMap{
+					"json": templates.JsonMarshal,
+				}
+				temp, err := template.New("sink").Funcs(funcMap).Parse(t)
+				if err != nil {
+					logger.Warnf("property dataTemplate %v is invalid: %v", t, err)
+				} else {
+					tp = temp
+				}
+			}
+		}
 
 
 		m.reset()
 		m.reset()
 		logger.Infof("open sink node %d instances", m.concurrency)
 		logger.Infof("open sink node %d instances", m.concurrency)
@@ -149,9 +177,9 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 					case data := <-cache.Out:
 					case data := <-cache.Out:
 						stats.SetBufferLength(int64(cache.Length()))
 						stats.SetBufferLength(int64(cache.Length()))
 						if runAsync {
 						if runAsync {
-							go doCollect(sink, data, stats, retryInterval, omitIfEmpty, cache.Complete, ctx)
+							go doCollect(sink, data, stats, retryInterval, omitIfEmpty, sendSingle, tp, cache.Complete, ctx)
 						} else {
 						} else {
-							doCollect(sink, data, stats, retryInterval, omitIfEmpty,cache.Complete, ctx)
+							doCollect(sink, data, stats, retryInterval, omitIfEmpty, sendSingle, tp, cache.Complete, ctx)
 						}
 						}
 					case <-ctx.Done():
 					case <-ctx.Done():
 						logger.Infof("sink node %s instance %d done", m.name, instance)
 						logger.Infof("sink node %s instance %d done", m.name, instance)
@@ -173,40 +201,89 @@ func (m *SinkNode) reset() {
 	m.statManagers = nil
 	m.statManagers = nil
 }
 }
 
 
-func doCollect(sink api.Sink, item *CacheTuple, stats StatManager, retryInterval int, omitIfEmpty bool, signalCh chan<- int, ctx api.StreamContext) {
+func extractInput(v []byte) ([]map[string]interface{}, error) {
+	var j []map[string]interface{}
+	if err := json.Unmarshal(v, &j); err != nil {
+		return nil, fmt.Errorf("fail to decode the input %s as json: %v", v, err)
+	}
+	return j, nil
+}
+
+func doCollect(sink api.Sink, item *CacheTuple, stats StatManager, retryInterval int, omitIfEmpty bool, sendSingle bool, tp *template.Template, signalCh chan<- int, ctx api.StreamContext) {
 	stats.IncTotalRecordsIn()
 	stats.IncTotalRecordsIn()
 	stats.ProcessTimeStart()
 	stats.ProcessTimeStart()
+	defer stats.ProcessTimeEnd()
 	logger := ctx.GetLogger()
 	logger := ctx.GetLogger()
-	var outdata []byte
+	var outdatas [][]byte
 	switch val := item.data.(type) {
 	switch val := item.data.(type) {
 	case []byte:
 	case []byte:
-		outdata = val
+		if omitIfEmpty && string(val) == "[{}]" {
+			return
+		}
+		var (
+			err error
+			j   []map[string]interface{}
+		)
+		if sendSingle || tp != nil {
+			j, err = extractInput(val)
+			if err != nil {
+				stats.IncTotalExceptions()
+				logger.Warnf("sink node %s instance %d publish %s error: %v", ctx.GetOpId(), ctx.GetInstanceId(), val, err)
+				return
+			}
+			logger.Debugf("receive %d records", len(j))
+		}
+		if !sendSingle {
+			if tp != nil {
+				var output bytes.Buffer
+				err := tp.Execute(&output, j)
+				if err != nil {
+					logger.Warnf("sink node %s instance %d publish %s decode template error: %v", ctx.GetOpId(), ctx.GetInstanceId(), val, err)
+					stats.IncTotalExceptions()
+					return
+				}
+				outdatas = append(outdatas, output.Bytes())
+			} else {
+				outdatas = [][]byte{val}
+			}
+		} else {
+			for _, r := range j {
+				var output bytes.Buffer
+				err := tp.Execute(&output, r)
+				if err != nil {
+					logger.Warnf("sink node %s instance %d publish %s decode template error: %v", ctx.GetOpId(), ctx.GetInstanceId(), val, err)
+					stats.IncTotalExceptions()
+					return
+				}
+				outdatas = append(outdatas, output.Bytes())
+			}
+		}
+
 	case error:
 	case error:
-		outdata = []byte(fmt.Sprintf(`[{"error":"%s"}]`, val.Error()))
+		outdatas = [][]byte{[]byte(fmt.Sprintf(`[{"error":"%s"}]`, val.Error()))}
 	default:
 	default:
-		outdata = []byte(fmt.Sprintf(`[{"error":"result is not a string but found %#v"}]`, val))
+		outdatas = [][]byte{[]byte(fmt.Sprintf(`[{"error":"result is not a string but found %#v"}]`, val))}
 	}
 	}
-	for {
-		if omitIfEmpty && string(outdata) == "[{}]" {
-			break
-		}
-		if err := sink.Collect(ctx, outdata); err != nil {
-			stats.IncTotalExceptions()
-			logger.Warnf("sink node %s instance %d publish %s error: %v", ctx.GetOpId(), ctx.GetInstanceId(), outdata, err)
-			if retryInterval > 0 {
-				time.Sleep(time.Duration(retryInterval) * time.Millisecond)
-				logger.Debugf("try again")
+
+	for _, outdata := range outdatas {
+		for {
+			if err := sink.Collect(ctx, outdata); err != nil {
+				stats.IncTotalExceptions()
+				logger.Warnf("sink node %s instance %d publish %s error: %v", ctx.GetOpId(), ctx.GetInstanceId(), outdata, err)
+				if retryInterval > 0 {
+					time.Sleep(time.Duration(retryInterval) * time.Millisecond)
+					logger.Debugf("try again")
+				} else {
+					break
+				}
 			} else {
 			} else {
+				logger.Debugf("success")
+				stats.IncTotalRecordsOut()
+				signalCh <- item.index
 				break
 				break
 			}
 			}
-		} else {
-			logger.Debugf("success")
-			stats.IncTotalRecordsOut()
-			signalCh <- item.index
-			break
 		}
 		}
 	}
 	}
-	stats.ProcessTimeEnd()
 }
 }
 
 
 func doGetSink(name string, action map[string]interface{}) (api.Sink, error) {
 func doGetSink(name string, action map[string]interface{}) (api.Sink, error) {

+ 76 - 0
xstream/nodes/sink_node_test.go

@@ -0,0 +1,76 @@
+package nodes
+
+import (
+	"fmt"
+	"github.com/emqx/kuiper/common"
+	"github.com/emqx/kuiper/xstream/contexts"
+	"github.com/emqx/kuiper/xstream/test"
+	"reflect"
+	"testing"
+	"time"
+)
+
+func TestSinkTemplate_Apply(t *testing.T) {
+	var tests = []struct {
+		config map[string]interface{}
+		data   []byte
+		result [][]byte
+	}{
+		{
+			config: map[string]interface{}{
+				"sendSingle":   true,
+				"dataTemplate": `{"wrapper":"w1","content":{{json .}},"ab":"{{.ab}}"}`,
+			},
+			data:   []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
+			result: [][]byte{[]byte(`{"wrapper":"w1","content":{"ab":"hello1"},"ab":"hello1"}`), []byte(`{"wrapper":"w1","content":{"ab":"hello2"},"ab":"hello2"}`)},
+		}, {
+			config: map[string]interface{}{
+				"dataTemplate": `{"wrapper":"arr","content":{{json .}},"content0":{{json (index . 0)}},ab0":"{{index . 0 "ab"}}"}`,
+			},
+			data:   []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
+			result: [][]byte{[]byte(`{"wrapper":"arr","content":[{"ab":"hello1"},{"ab":"hello2"}],"content0":{"ab":"hello1"},ab0":"hello1"}`)},
+		}, {
+			config: map[string]interface{}{
+				"dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.ab}}</li>{{end}}</ul>`,
+			},
+			data:   []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
+			result: [][]byte{[]byte(`<div>results</div><ul><li>hello1</li><li>hello2</li></ul>`)},
+		}, {
+			config: map[string]interface{}{
+				"dataTemplate": `{"content":{{json .}}}`,
+			},
+			data:   []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
+			result: [][]byte{[]byte(`{"content":[{"ab":"hello1"},{"ab":"hello2"}]}`)},
+		}, {
+			config: map[string]interface{}{
+				"sendSingle":   true,
+				"dataTemplate": `{"newab":"{{.ab}}"}`,
+			},
+			data:   []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
+			result: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
+		}, {
+			config: map[string]interface{}{
+				"sendSingle":   true,
+				"dataTemplate": `{"newab":"{{.ab}}"}`,
+			},
+			data:   []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
+			result: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
+		},
+	}
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestSinkTemplate_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
+
+	for i, tt := range tests {
+		mockSink := test.NewMockSink()
+		s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
+		s.Open(ctx, make(chan error))
+		s.input <- tt.data
+		time.Sleep(1 * time.Second)
+		s.close(ctx, contextLogger)
+		results := mockSink.GetResults()
+		if !reflect.DeepEqual(tt.result, results) {
+			t.Errorf("%d \tresult mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, results)
+		}
+	}
+}

+ 38 - 0
xstream/nodes/source_node_test.go

@@ -0,0 +1,38 @@
+package nodes
+
+import (
+	"github.com/emqx/kuiper/common"
+	"github.com/emqx/kuiper/xstream/contexts"
+	"reflect"
+	"testing"
+)
+
+func TestGetConf_Apply(t *testing.T) {
+	result := map[string]interface{}{
+		"interval": 1000,
+		"ashost":   "192.168.1.100",
+		"sysnr":    "02",
+		"client":   "900",
+		"user":     "SPERF",
+		"passwd":   "PASSPASS",
+		"params": map[interface{}]interface{}{
+			"QUERY_TABLE": "VBAP",
+			"ROWCOUNT":    10,
+			"FIELDS": []interface{}{
+				map[interface{}]interface{}{"FIELDNAME": "MANDT"},
+				map[interface{}]interface{}{"FIELDNAME": "VBELN"},
+				map[interface{}]interface{}{"FIELDNAME": "POSNR"},
+			},
+		},
+	}
+	n := NewSourceNode("test", map[string]string{
+		"DATASOURCE": "RFC_READ_TABLE",
+		"TYPE":       "test",
+	})
+	contextLogger := common.Log.WithField("rule", "test")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
+	conf := n.getConf(ctx)
+	if !reflect.DeepEqual(result, conf) {
+		t.Errorf("result mismatch:\n\nexp=%s\n\ngot=%s\n\n", result, conf)
+	}
+}

+ 3 - 1
xstream/server/server/rest.go

@@ -160,7 +160,7 @@ func rulesHandler(w http.ResponseWriter, r *http.Request) {
 			handleError(w, fmt.Errorf("Create rule error : %s.", err), http.StatusBadRequest, logger)
 			handleError(w, fmt.Errorf("Create rule error : %s.", err), http.StatusBadRequest, logger)
 			return
 			return
 		} else {
 		} else {
-			result = fmt.Sprintf("Rule %s was created, please use 'cli getstatus rule $rule_name' command to get rule status.", r.Id)
+			result = fmt.Sprintf("Rule %s was created successfully.", r.Id)
 		}
 		}
 		//Start the rule
 		//Start the rule
 		rs, err := createRuleState(r)
 		rs, err := createRuleState(r)
@@ -313,6 +313,8 @@ func pluginHandler(w http.ResponseWriter, r *http.Request, t plugins.PluginType)
 		result := fmt.Sprintf("%s plugin %s is deleted", plugins.PluginTypes[t], name)
 		result := fmt.Sprintf("%s plugin %s is deleted", plugins.PluginTypes[t], name)
 		if r {
 		if r {
 			result = fmt.Sprintf("%s and Kuiper will be stopped", result)
 			result = fmt.Sprintf("%s and Kuiper will be stopped", result)
+		} else {
+			result = fmt.Sprintf("%s and Kuiper must restart for the change to take effect.", result)
 		}
 		}
 		w.Write([]byte(result))
 		w.Write([]byte(result))
 	case http.MethodGet:
 	case http.MethodGet:

+ 2 - 2
xstream/server/server/rpc.go

@@ -80,7 +80,7 @@ func (t *Server) CreateRule(rule *common.RuleDesc, reply *string) error {
 	if err != nil {
 	if err != nil {
 		return fmt.Errorf("Create rule error : %s.", err)
 		return fmt.Errorf("Create rule error : %s.", err)
 	} else {
 	} else {
-		*reply = fmt.Sprintf("Rule %s was created, please use 'cli getstatus rule $rule_name' command to get rule status.", rule.Name)
+		*reply = fmt.Sprintf("Rule %s was created successfully, please use 'bin/cli getstatus rule %s' command to get rule status.", rule.Name, rule.Name)
 	}
 	}
 	//Start the rule
 	//Start the rule
 	rs, err := createRuleState(r)
 	rs, err := createRuleState(r)
@@ -203,7 +203,7 @@ func (t *Server) DropPlugin(arg *common.PluginDesc, reply *string) error {
 		if arg.Stop {
 		if arg.Stop {
 			*reply = fmt.Sprintf("Plugin %s is dropped and Kuiper will be stopped.", p.Name)
 			*reply = fmt.Sprintf("Plugin %s is dropped and Kuiper will be stopped.", p.Name)
 		} else {
 		} else {
-			*reply = fmt.Sprintf("Plugin %s is dropped.", p.Name)
+			*reply = fmt.Sprintf("Plugin %s is dropped and Kuiper must restart for the change to take effect.", p.Name)
 		}
 		}
 
 
 	}
 	}

+ 17 - 119
xstream/sinks/rest_sink.go

@@ -4,28 +4,24 @@ import (
 	"bytes"
 	"bytes"
 	"encoding/json"
 	"encoding/json"
 	"fmt"
 	"fmt"
-	"github.com/emqx/kuiper/common/templates"
 	"github.com/emqx/kuiper/xstream/api"
 	"github.com/emqx/kuiper/xstream/api"
 	"io/ioutil"
 	"io/ioutil"
 	"net"
 	"net"
 	"net/http"
 	"net/http"
 	"net/url"
 	"net/url"
 	"strings"
 	"strings"
-	"text/template"
 	"time"
 	"time"
 )
 )
 
 
 type RestSink struct {
 type RestSink struct {
-	method       string
-	url          string
-	headers      map[string]string
-	bodyType     string
-	timeout      int64
-	sendSingle   bool
-	dataTemplate string
+	method     string
+	url        string
+	headers    map[string]string
+	bodyType   string
+	timeout    int64
+	sendSingle bool
 
 
 	client *http.Client
 	client *http.Client
-	tp     *template.Template
 }
 }
 
 
 var methodsMap = map[string]bool{"GET": true, "HEAD": true, "POST": true, "PUT": true, "DELETE": true, "PATCH": true}
 var methodsMap = map[string]bool{"GET": true, "HEAD": true, "POST": true, "PUT": true, "DELETE": true, "PATCH": true}
@@ -103,34 +99,15 @@ func (ms *RestSink) Configure(ps map[string]interface{}) error {
 		}
 		}
 	}
 	}
 
 
-	temp, ok = ps["dataTemplate"]
-	if ok {
-		ms.dataTemplate, ok = temp.(string)
-		if !ok {
-			return fmt.Errorf("rest sink property dataTemplate %v is not a string", temp)
-		}
-	}
-
-	if ms.dataTemplate != "" {
-		funcMap := template.FuncMap{
-			"json": templates.JsonMarshal,
-		}
-		temp, err := template.New("restSink").Funcs(funcMap).Parse(ms.dataTemplate)
-		if err != nil {
-			return fmt.Errorf("rest sink property dataTemplate %v is invalid: %v", ms.dataTemplate, err)
-		} else {
-			ms.tp = temp
-		}
-	}
 	return nil
 	return nil
 }
 }
 
 
 func (ms *RestSink) Open(ctx api.StreamContext) error {
 func (ms *RestSink) Open(ctx api.StreamContext) error {
 	logger := ctx.GetLogger()
 	logger := ctx.GetLogger()
 	ms.client = &http.Client{Timeout: time.Duration(ms.timeout) * time.Millisecond}
 	ms.client = &http.Client{Timeout: time.Duration(ms.timeout) * time.Millisecond}
-	logger.Infof("open rest sink with configuration: {method: %s, url: %s, bodyType: %s, timeout: %d,header: %v, sendSingle: %v, dataTemplate: %s", ms.method, ms.url, ms.bodyType, ms.timeout, ms.headers, ms.sendSingle, ms.dataTemplate)
+	logger.Infof("open rest sink with configuration: {method: %s, url: %s, bodyType: %s, timeout: %d,header: %v, sendSingle: %v", ms.method, ms.url, ms.bodyType, ms.timeout, ms.headers, ms.sendSingle)
 
 
-	timeout := time.Duration(1 * time.Second)
+	timeout := 1 * time.Second
 	if u, err := url.Parse(ms.url); err != nil {
 	if u, err := url.Parse(ms.url); err != nil {
 		return err
 		return err
 	} else {
 	} else {
@@ -167,33 +144,7 @@ func (ms *RestSink) Collect(ctx api.StreamContext, item interface{}) error {
 		logger.Warnf("rest sink receive non []byte data: %v", item)
 		logger.Warnf("rest sink receive non []byte data: %v", item)
 	}
 	}
 	logger.Debugf("rest sink receive %s", item)
 	logger.Debugf("rest sink receive %s", item)
-	if !ms.sendSingle {
-		return ms.send(v, logger)
-	} else {
-		j, err := extractInput(v)
-		if err != nil {
-			return err
-		}
-		logger.Debugf("receive %d records", len(j))
-		var errs MultiErrors
-		for _, r := range j {
-			if e := ms.send(r, logger); e != nil {
-				errs = errs.AddError(e)
-			}
-		}
-		if len(errs) != 0 {
-			return errs
-		}
-	}
-	return nil
-}
-
-func extractInput(v []byte) ([]map[string]interface{}, error) {
-	var j []map[string]interface{}
-	if err := json.Unmarshal(v, &j); err != nil {
-		return nil, fmt.Errorf("fail to decode the input %s as json: %v", v, err)
-	}
-	return j, nil
+	return ms.send(v, logger)
 }
 }
 
 
 func (ms *RestSink) send(v interface{}, logger api.Logger) error {
 func (ms *RestSink) send(v interface{}, logger api.Logger) error {
@@ -209,35 +160,10 @@ func (ms *RestSink) send(v interface{}, logger api.Logger) error {
 		var body = &(bytes.Buffer{})
 		var body = &(bytes.Buffer{})
 		switch t := v.(type) {
 		switch t := v.(type) {
 		case []byte:
 		case []byte:
-			if ms.tp != nil {
-				j, err := extractInput(t)
-				if err != nil {
-					return err
-				}
-				err = ms.tp.Execute(body, j)
-				if err != nil {
-					return fmt.Errorf("fail to decode content: %v", err)
-				}
-			} else {
-				body = bytes.NewBuffer(t)
-			}
-		case map[string]interface{}:
-			if ms.tp != nil {
-				err = ms.tp.Execute(body, t)
-				if err != nil {
-					return fmt.Errorf("fail to decode content: %v", err)
-				}
-			} else {
-				content, err := json.Marshal(t)
-				if err != nil {
-					return fmt.Errorf("fail to decode content: %v", err)
-				}
-				body = bytes.NewBuffer(content)
-			}
+			body = bytes.NewBuffer(t)
 		default:
 		default:
 			return fmt.Errorf("invalid content: %v", v)
 			return fmt.Errorf("invalid content: %v", v)
 		}
 		}
-
 		req, err = http.NewRequest(ms.method, ms.url, body)
 		req, err = http.NewRequest(ms.method, ms.url, body)
 		if err != nil {
 		if err != nil {
 			return fmt.Errorf("fail to create request: %v", err)
 			return fmt.Errorf("fail to create request: %v", err)
@@ -245,7 +171,7 @@ func (ms *RestSink) send(v interface{}, logger api.Logger) error {
 		req.Header.Set("Content-Type", bodyTypeMap[ms.bodyType])
 		req.Header.Set("Content-Type", bodyTypeMap[ms.bodyType])
 	case "form":
 	case "form":
 		form := url.Values{}
 		form := url.Values{}
-		im, err := convertToMap(v, ms.tp)
+		im, err := convertToMap(v, ms.sendSingle)
 		if err != nil {
 		if err != nil {
 			return err
 			return err
 		}
 		}
@@ -291,46 +217,18 @@ func (ms *RestSink) send(v interface{}, logger api.Logger) error {
 	return nil
 	return nil
 }
 }
 
 
-func convertToMap(v interface{}, tp *template.Template) (map[string]interface{}, error) {
+func convertToMap(v interface{}, sendSingle bool) (map[string]interface{}, error) {
 	switch t := v.(type) {
 	switch t := v.(type) {
 	case []byte:
 	case []byte:
-		if tp != nil {
-			j, err := extractInput(t)
-			if err != nil {
-				return nil, err
-			}
-			var output bytes.Buffer
-			err = tp.Execute(&output, j)
-			if err != nil {
-				return nil, fmt.Errorf("fail to decode content: %v", err)
-			}
-			r := make(map[string]interface{})
-			if err := json.Unmarshal(output.Bytes(), &r); err != nil {
-				return nil, fmt.Errorf("fail to decode content: %v", err)
-			} else {
-				return r, nil
-			}
-		} else {
-			r := make(map[string]interface{})
-			r["result"] = string(t)
-			return r, nil
-		}
-	case map[string]interface{}:
-		if tp != nil {
-			var output bytes.Buffer
-			err := tp.Execute(&output, t)
-			if err != nil {
-				return nil, fmt.Errorf("fail to decode content: %v", err)
-			}
-			r := make(map[string]interface{})
-			if err := json.Unmarshal(output.Bytes(), &r); err != nil {
+		r := make(map[string]interface{})
+		if err := json.Unmarshal(t, &r); err != nil {
+			if sendSingle {
 				return nil, fmt.Errorf("fail to decode content: %v", err)
 				return nil, fmt.Errorf("fail to decode content: %v", err)
 			} else {
 			} else {
-				return r, nil
+				r["result"] = string(t)
 			}
 			}
-		} else {
-			return t, nil
 		}
 		}
+		return r, nil
 	default:
 	default:
 		return nil, fmt.Errorf("invalid content: %v", v)
 		return nil, fmt.Errorf("invalid content: %v", v)
 	}
 	}

+ 31 - 46
xstream/sinks/rest_sink_test.go

@@ -174,16 +174,32 @@ func TestRestSink_Apply(t *testing.T) {
 	defer ts.Close()
 	defer ts.Close()
 	for i, tt := range tests {
 	for i, tt := range tests {
 		requests = nil
 		requests = nil
+		ss, ok := tt.config["sendSingle"]
+		if !ok {
+			ss = false
+		}
 		s := &RestSink{}
 		s := &RestSink{}
 		tt.config["url"] = ts.URL
 		tt.config["url"] = ts.URL
 		s.Configure(tt.config)
 		s.Configure(tt.config)
 		s.Open(ctx)
 		s.Open(ctx)
-		input, err := json.Marshal(tt.data)
-		if err != nil {
-			t.Errorf("Failed to parse the input into []byte]")
-			continue
+		if ss.(bool) {
+			for _, d := range tt.data {
+				input, err := json.Marshal(d)
+				if err != nil {
+					t.Errorf("Failed to parse the input into []byte]")
+					continue
+				}
+				s.Collect(ctx, input)
+			}
+		} else {
+			input, err := json.Marshal(tt.data)
+			if err != nil {
+				t.Errorf("Failed to parse the input into []byte]")
+				continue
+			}
+			s.Collect(ctx, input)
 		}
 		}
-		s.Collect(ctx, input)
+
 		s.Close(ctx)
 		s.Close(ctx)
 		if !reflect.DeepEqual(tt.result, requests) {
 		if !reflect.DeepEqual(tt.result, requests) {
 			t.Errorf("%d \tresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.result, requests)
 			t.Errorf("%d \tresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.result, requests)
@@ -194,7 +210,7 @@ func TestRestSink_Apply(t *testing.T) {
 func TestRestSinkTemplate_Apply(t *testing.T) {
 func TestRestSinkTemplate_Apply(t *testing.T) {
 	var tests = []struct {
 	var tests = []struct {
 		config map[string]interface{}
 		config map[string]interface{}
-		data   []map[string]interface{}
+		data   [][]byte
 		result []request
 		result []request
 	}{
 	}{
 		{
 		{
@@ -204,11 +220,7 @@ func TestRestSinkTemplate_Apply(t *testing.T) {
 				"sendSingle":   true,
 				"sendSingle":   true,
 				"dataTemplate": `{"wrapper":"w1","content":{{json .}},"ab":"{{.ab}}"}`,
 				"dataTemplate": `{"wrapper":"w1","content":{{json .}},"ab":"{{.ab}}"}`,
 			},
 			},
-			data: []map[string]interface{}{{
-				"ab": "hello1",
-			}, {
-				"ab": "hello2",
-			}},
+			data: [][]byte{[]byte(`{"wrapper":"w1","content":{"ab":"hello1"},"ab":"hello1"}`), []byte(`{"wrapper":"w1","content":{"ab":"hello2"},"ab":"hello2"}`)},
 			result: []request{{
 			result: []request{{
 				Method:      "POST",
 				Method:      "POST",
 				Body:        `{"wrapper":"w1","content":{"ab":"hello1"},"ab":"hello1"}`,
 				Body:        `{"wrapper":"w1","content":{"ab":"hello1"},"ab":"hello1"}`,
@@ -224,11 +236,7 @@ func TestRestSinkTemplate_Apply(t *testing.T) {
 				//"url": "http://localhost/test",  //set dynamically to the test server
 				//"url": "http://localhost/test",  //set dynamically to the test server
 				"dataTemplate": `{"wrapper":"arr","content":{{json .}},"content0":{{json (index . 0)}},ab0":"{{index . 0 "ab"}}"}`,
 				"dataTemplate": `{"wrapper":"arr","content":{{json .}},"content0":{{json (index . 0)}},ab0":"{{index . 0 "ab"}}"}`,
 			},
 			},
-			data: []map[string]interface{}{{
-				"ab": "hello1",
-			}, {
-				"ab": "hello2",
-			}},
+			data: [][]byte{[]byte(`{"wrapper":"arr","content":[{"ab":"hello1"},{"ab":"hello2"}],"content0":{"ab":"hello1"},ab0":"hello1"}`)},
 			result: []request{{
 			result: []request{{
 				Method:      "POST",
 				Method:      "POST",
 				Body:        `{"wrapper":"arr","content":[{"ab":"hello1"},{"ab":"hello2"}],"content0":{"ab":"hello1"},ab0":"hello1"}`,
 				Body:        `{"wrapper":"arr","content":[{"ab":"hello1"},{"ab":"hello2"}],"content0":{"ab":"hello1"},ab0":"hello1"}`,
@@ -240,11 +248,7 @@ func TestRestSinkTemplate_Apply(t *testing.T) {
 				//"url": "http://localhost/test",  //set dynamically to the test server
 				//"url": "http://localhost/test",  //set dynamically to the test server
 				"dataTemplate": `{"wrapper":"w1","content":{{json .}},"ab":"{{.ab}}"}`,
 				"dataTemplate": `{"wrapper":"w1","content":{{json .}},"ab":"{{.ab}}"}`,
 			},
 			},
-			data: []map[string]interface{}{{
-				"ab": "hello1",
-			}, {
-				"ab": "hello2",
-			}},
+			data: [][]byte{[]byte(`{"wrapper":"w1","content":{"ab":"hello1"},"ab":"hello1"}`)},
 			result: []request{{
 			result: []request{{
 				Method:      "GET",
 				Method:      "GET",
 				ContentType: "",
 				ContentType: "",
@@ -256,11 +260,7 @@ func TestRestSinkTemplate_Apply(t *testing.T) {
 				"bodyType":     "html",
 				"bodyType":     "html",
 				"dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.ab}}</li>{{end}}</ul>`,
 				"dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.ab}}</li>{{end}}</ul>`,
 			},
 			},
-			data: []map[string]interface{}{{
-				"ab": "hello1",
-			}, {
-				"ab": "hello2",
-			}},
+			data: [][]byte{[]byte(`<div>results</div><ul><li>hello1</li><li>hello2</li></ul>`)},
 			result: []request{{
 			result: []request{{
 				Method:      "PUT",
 				Method:      "PUT",
 				ContentType: "text/html",
 				ContentType: "text/html",
@@ -273,11 +273,7 @@ func TestRestSinkTemplate_Apply(t *testing.T) {
 				"bodyType":     "form",
 				"bodyType":     "form",
 				"dataTemplate": `{"content":{{json .}}}`,
 				"dataTemplate": `{"content":{{json .}}}`,
 			},
 			},
-			data: []map[string]interface{}{{
-				"ab": "hello1",
-			}, {
-				"ab": "hello2",
-			}},
+			data: [][]byte{[]byte(`{"content":[{"ab":"hello1"},{"ab":"hello2"}]}`)},
 			result: []request{{
 			result: []request{{
 				Method:      "POST",
 				Method:      "POST",
 				ContentType: "application/x-www-form-urlencoded;param=value",
 				ContentType: "application/x-www-form-urlencoded;param=value",
@@ -291,11 +287,7 @@ func TestRestSinkTemplate_Apply(t *testing.T) {
 				"sendSingle":   true,
 				"sendSingle":   true,
 				"dataTemplate": `{"newab":"{{.ab}}"}`,
 				"dataTemplate": `{"newab":"{{.ab}}"}`,
 			},
 			},
-			data: []map[string]interface{}{{
-				"ab": "hello1",
-			}, {
-				"ab": "hello2",
-			}},
+			data: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
 			result: []request{{
 			result: []request{{
 				Method:      "POST",
 				Method:      "POST",
 				ContentType: "application/x-www-form-urlencoded;param=value",
 				ContentType: "application/x-www-form-urlencoded;param=value",
@@ -314,11 +306,7 @@ func TestRestSinkTemplate_Apply(t *testing.T) {
 				"timeout":      float64(1000),
 				"timeout":      float64(1000),
 				"dataTemplate": `{"newab":"{{.ab}}"}`,
 				"dataTemplate": `{"newab":"{{.ab}}"}`,
 			},
 			},
-			data: []map[string]interface{}{{
-				"ab": "hello1",
-			}, {
-				"ab": "hello2",
-			}},
+			data: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
 			result: []request{{
 			result: []request{{
 				Method:      "POST",
 				Method:      "POST",
 				Body:        `{"newab":"hello1"}`,
 				Body:        `{"newab":"hello1"}`,
@@ -358,12 +346,9 @@ func TestRestSinkTemplate_Apply(t *testing.T) {
 		tt.config["url"] = ts.URL
 		tt.config["url"] = ts.URL
 		s.Configure(tt.config)
 		s.Configure(tt.config)
 		s.Open(ctx)
 		s.Open(ctx)
-		input, err := json.Marshal(tt.data)
-		if err != nil {
-			t.Errorf("Failed to parse the input into []byte]")
-			continue
+		for _, d := range tt.data {
+			s.Collect(ctx, d)
 		}
 		}
-		s.Collect(ctx, input)
 		s.Close(ctx)
 		s.Close(ctx)
 		if !reflect.DeepEqual(tt.result, requests) {
 		if !reflect.DeepEqual(tt.result, requests) {
 			t.Errorf("%d \tresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.result, requests)
 			t.Errorf("%d \tresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.result, requests)