ソースを参照

Merge pull request #218 from emqx/develop

0.3.1 release
jinfahua 5 年 前
コミット
6ba25b376f
40 ファイル変更1643 行追加176 行削除
  1. 5 0
      .github/workflows/build_packages.yaml
  2. 26 48
      README-CN.md
  3. 26 48
      README.md
  4. 30 11
      common/util.go
  5. 2 2
      deploy/chart/kuiper/Chart.yaml
  6. 1 0
      deploy/chart/kuiper/templates/StatefulSet.yaml
  7. 2 0
      deploy/chart/kuiper/values.yaml
  8. 19 0
      deploy/docker/README.md
  9. 73 15
      deploy/docker/docker-entrypoint.sh
  10. 49 0
      docs/en_US/quick_start_docker.md
  11. 1 1
      docs/en_US/restapi/plugins.md
  12. 1 0
      docs/en_US/rules/overview.md
  13. 50 1
      docs/en_US/rules/sinks/edgex.md
  14. 9 0
      docs/en_US/rules/sinks/nop.md
  15. 25 0
      docs/en_US/rules/sources/edgex.md
  16. 229 0
      docs/zh_CN/plugins/plugins_tutorial.md
  17. 49 0
      docs/zh_CN/quick_start_docker.md
  18. 1 1
      docs/zh_CN/rules/overview.md
  19. 50 1
      docs/zh_CN/rules/sinks/edgex.md
  20. 9 0
      docs/zh_CN/rules/sinks/nop.md
  21. 25 0
      docs/zh_CN/rules/sources/edgex.md
  22. 4 0
      etc/kuiper.yaml
  23. 25 4
      etc/sources/edgex.yaml
  24. 124 0
      fvt_scripts/edgex/benchmark/README.md
  25. 111 0
      fvt_scripts/edgex/benchmark/pub.go
  26. BIN
      fvt_scripts/edgex/benchmark/system_usage.png
  27. 49 0
      fvt_scripts/edgex/pub.go
  28. 55 4
      fvt_scripts/edgex/sub/sub.go
  29. 494 0
      fvt_scripts/edgex_mqtt_sink_rule.jmx
  30. 4 0
      fvt_scripts/run_jmeter.sh
  31. 1 1
      go.mod
  32. 11 0
      xsql/ast.go
  33. 4 2
      xsql/processors/xsql_processor.go
  34. 11 12
      xsql/util.go
  35. 8 6
      xstream/extensions/edgex_source.go
  36. 2 0
      xstream/nodes/sink_node.go
  37. 0 1
      xstream/server/server/rest.go
  38. 3 1
      xstream/server/server/server.go
  39. 20 17
      xstream/sinks/edgex_sink.go
  40. 35 0
      xstream/sinks/nop_sink.go

+ 5 - 0
.github/workflows/build_packages.yaml

@@ -132,6 +132,11 @@ jobs:
             aws s3 rm --quiet --recursive s3://packages.emqx.io/kuiper/$version
             aws s3 rm --quiet --recursive s3://packages.emqx.io/kuiper/$version
             aws s3 cp --quiet --recursive ./_packages s3://packages.emqx.io/kuiper/$version
             aws s3 cp --quiet --recursive ./_packages s3://packages.emqx.io/kuiper/$version
             aws cloudfront create-invalidation --distribution-id E3TYD0WSP4S14P --paths "/kuiper/$version/*"
             aws cloudfront create-invalidation --distribution-id E3TYD0WSP4S14P --paths "/kuiper/$version/*"
+        - name: update emqx.io
+          if: github.event_name == 'release'
+          run: |
+            version=$(echo ${{ github.ref }} | sed -r  "s .*/.*/(.*) \1 g")
+            curl -w %{http_code} --insecure -H ${{ secrets.EmqxHeader }} https://admin.emqx.io/api/v1/kuiper_github_release_callback?tag=$version
         - name: update helm packages
         - name: update helm packages
           if: github.event_name == 'release'
           if: github.event_name == 'release'
           run: |
           run: |

ファイルの差分が大きいため隠しています
+ 26 - 48
README-CN.md


ファイルの差分が大きいため隠しています
+ 26 - 48
README.md


+ 30 - 11
common/util.go

@@ -7,10 +7,12 @@ import (
 	"github.com/benbjohnson/clock"
 	"github.com/benbjohnson/clock"
 	"github.com/go-yaml/yaml"
 	"github.com/go-yaml/yaml"
 	"github.com/sirupsen/logrus"
 	"github.com/sirupsen/logrus"
+	"io"
 	"io/ioutil"
 	"io/ioutil"
 	"os"
 	"os"
 	"path"
 	"path"
 	"path/filepath"
 	"path/filepath"
+	"runtime"
 	"sort"
 	"sort"
 	"strings"
 	"strings"
 )
 )
@@ -48,6 +50,8 @@ func LoadConf(confName string) ([]byte, error) {
 
 
 type XStreamConf struct {
 type XStreamConf struct {
 	Debug          bool `yaml:"debug"`
 	Debug          bool `yaml:"debug"`
+	ConsoleLog     bool `yaml:"consoleLog"`
+	FileLog        bool `yaml:"fileLog"`
 	Port           int  `yaml:"port"`
 	Port           int  `yaml:"port"`
 	RestPort       int  `yaml:"restPort"`
 	RestPort       int  `yaml:"restPort"`
 	Prometheus     bool `yaml:"prometheus"`
 	Prometheus     bool `yaml:"prometheus"`
@@ -56,7 +60,12 @@ type XStreamConf struct {
 
 
 func init() {
 func init() {
 	Log = logrus.New()
 	Log = logrus.New()
+	Log.SetReportCaller(true)
 	Log.SetFormatter(&logrus.TextFormatter{
 	Log.SetFormatter(&logrus.TextFormatter{
+		CallerPrettyfier: func(f *runtime.Frame) (string, string) {
+			filename := path.Base(f.File)
+			return fmt.Sprintf("%s()", f.Function), fmt.Sprintf("%s:%d", filename, f.Line)
+		},
 		DisableColors: true,
 		DisableColors: true,
 		FullTimestamp: true,
 		FullTimestamp: true,
 	})
 	})
@@ -91,20 +100,30 @@ func InitConf() {
 		Config = &c
 		Config = &c
 	}
 	}
 
 
-	if !Config.Debug {
-		logDir, err := GetLoc(log_dir)
-		if err != nil {
-			Log.Fatal(err)
-		}
-		file := logDir + logFileName
-		logFile, err := os.OpenFile(file, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
-		if err == nil {
-			Log.Out = logFile
+	if Config.Debug {
+		Log.SetLevel(logrus.DebugLevel)
+	}
+
+	logDir, err := GetLoc(log_dir)
+	if err != nil {
+		Log.Fatal(err)
+	}
+	file := logDir + logFileName
+	logFile, err := os.OpenFile(file, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
+	if err == nil {
+		if Config.ConsoleLog {
+			if Config.FileLog {
+				mw := io.MultiWriter(os.Stdout, logFile)
+				Log.SetOutput(mw)
+			}
 		} else {
 		} else {
-			Log.Infof("Failed to log to file, using default stderr")
+			if Config.FileLog {
+				Log.SetOutput(logFile)
+			}
 		}
 		}
 	} else {
 	} else {
-		Log.SetLevel(logrus.DebugLevel)
+		fmt.Println("Failed to init log file settings...")
+		Log.Infof("Failed to log to file, using default stderr.")
 	}
 	}
 }
 }
 
 

+ 2 - 2
deploy/chart/kuiper/Chart.yaml

@@ -14,8 +14,8 @@ type: application
 
 
 # This is the chart version. This version number should be incremented each time you make changes
 # This is the chart version. This version number should be incremented each time you make changes
 # to the chart and its templates, including the app version.
 # to the chart and its templates, including the app version.
-version: 0.3.0
+version: 0.3.1
 
 
 # This is the version number of the application being deployed. This version number should be
 # This is the version number of the application being deployed. This version number should be
 # incremented each time you make changes to the application.
 # incremented each time you make changes to the application.
-appVersion: 0.3.0
+appVersion: 0.3.1

+ 1 - 0
deploy/chart/kuiper/templates/StatefulSet.yaml

@@ -79,6 +79,7 @@ spec:
         - name: kuiper
         - name: kuiper
           image: "{{ .Values.image.repository }}:{{ .Chart.AppVersion }}"
           image: "{{ .Values.image.repository }}:{{ .Chart.AppVersion }}"
           imagePullPolicy: {{ .Values.image.pullPolicy }}
           imagePullPolicy: {{ .Values.image.pullPolicy }}
+          command: ["./bin/server"]
           ports:
           ports:
           {{ $restPort := index .Values "kuiperConfig" "kuiper.yaml" "basic" "restPort" }}
           {{ $restPort := index .Values "kuiperConfig" "kuiper.yaml" "basic" "restPort" }}
           - containerPort: {{ $restPort | default 9081 }}
           - containerPort: {{ $restPort | default 9081 }}

+ 2 - 0
deploy/chart/kuiper/values.yaml

@@ -50,6 +50,8 @@ kuiperConfig:
     basic:
     basic:
       # true|false, with debug level, it prints more debug info
       # true|false, with debug level, it prints more debug info
       debug: false
       debug: false
+      consoleLog: false
+      fileLog: true
       port: 20498
       port: 20498
       restPort: 9081
       restPort: 9081
       prometheus: false
       prometheus: false

+ 19 - 0
deploy/docker/README.md

@@ -144,6 +144,25 @@ docker run -d --name kuiper -e MQTT_BROKER_ADDRESS=$MQTT_BROKER_ADDRESS emqx/kui
 
 
 ### Configuration
 ### Configuration
 
 
+Use the environment variable to configure `etc/client.yaml`  on the Kuiper container.
+
+| Options                         | Default               | Mapped                      |
+| ------------------------------- | --------------------- | --------------------------- |
+| CLIENT_HOST                     | 127.0.0.1             | client.basic.debug          |
+| CLIENT_PORT                     | 20498                 | client.basic.port           |
+
+Use the environment variable to configure `etc/kuiper.yaml`  on the Kuiper container.
+
+| Options                         | Default               | Mapped                      |
+| ------------------------------- | --------------------- | --------------------------- |
+| KUIPER_DEBUG                    | false                 | kuiper.basic.debug          |
+| KUIPER_CONSOLE_LOG              | false                 | kuiper.basic.consoleLog     |
+| KUIPER_FILE_LOG                 | true                  | kuiper.basic.fileLog        |
+| KUIPER_PORT                     | 20498                 | kuiper.basic.port           |
+| KUIPER_REST_PORT                | 9081                  | kuiper.basic.restPort       |
+| KUIPER_PROMETHEUS               | false                 | kuiper.basic.prometheus     |
+| KUIPER_PROMETHEUS_PORT          | 20499                 | kuiper.basic.prometheusPort |
+
 Use the environment variable to configure `etc/mqtt_sources.yaml`  on the Kuiper container.
 Use the environment variable to configure `etc/mqtt_sources.yaml`  on the Kuiper container.
 
 
 | Options                         | Default               | Mapped                      |
 | Options                         | Default               | Mapped                      |

+ 73 - 15
deploy/docker/docker-entrypoint.sh

@@ -7,47 +7,105 @@ fi
 
 
 KUIPER_HOME=${KUIPER_HOME:-"/kuiper"}
 KUIPER_HOME=${KUIPER_HOME:-"/kuiper"}
 
 
-CONFIG="$KUIPER_HOME/etc/mqtt_source.yaml"
+CLIENT_CONFIG="$KUIPER_HOME/etc/client.yaml"
+
+if [ ! -z "$CLIENT_HOST" ]; then
+    sed -i '/basic:/ ,/host/{/host/d}' $CLIENT_CONFIG
+    sed -i "/basic:/a\  host: $CLIENT_HOST" $CLIENT_CONFIG
+    echo "kuiper.basic.host = $CLIENT_HOST"
+fi
+
+if [ ! -z "$CLIENT_PORT" ]; then
+    sed -i '/basic:/ ,/port/{/port/d}' $CLIENT_CONFIG
+    sed -i "/basic:/a\  port: $CLIENT_PORT" $CLIENT_CONFIG
+    echo "kuiper.basic.port = $CLIENT_PORT"
+fi
+
+KUIPER_CONFIG="$KUIPER_HOME/etc/kuiper.yaml"
+
+if [ ! -z "$KUIPER_DEBUG" ]; then
+    sed -i '/basic:/ ,/debug/{/debug/d}' $KUIPER_CONFIG
+    sed -i "/basic:/a\  debug: $KUIPER_DEBUG" $KUIPER_CONFIG
+    echo "kuiper.basic.debug = $KUIPER_DEBUG"
+fi
+
+if [ ! -z "$KUIPER_CONSOLE_LOG" ]; then
+    sed -i '/basic:/ ,/consoleLog/{/consoleLog/d}' $KUIPER_CONFIG
+    sed -i "/basic:/a\  consoleLog: $KUIPER_CONSOLE_LOG" $KUIPER_CONFIG
+    echo "kuiper.basic.consoleLog = $KUIPER_CONSOLE_LOG"
+fi
+
+if [ ! -z "$KUIPER_FILE_LOG" ]; then
+    sed -i '/basic:/ ,/fileLog/{/fileLog/d}' $KUIPER_CONFIG
+    sed -i "/basic:/a\  fileLog: $KUIPER_FILE_LOG" $KUIPER_CONFIG
+    echo "kuiper.basic.fileLog = $KUIPER_FILE_LOG"
+fi
+
+if [ ! -z "$KUIPER_PORT" ]; then
+    sed -i '/basic:/ ,/port/{/port/d}' $KUIPER_CONFIG
+    sed -i "/basic:/a\  port: $KUIPER_PORT" $KUIPER_CONFIG
+    echo "kuiper.basic.port = $KUIPER_PORT"
+fi
+
+if [ ! -z "$KUIPER_REST_PORT" ]; then
+    sed -i '/basic:/ ,/restPort/{/restPort/d}' $KUIPER_CONFIG
+    sed -i "/basic:/a\  restPort: $KUIPER_REST_PORT" $KUIPER_CONFIG
+    echo "kuiper.basic.restPort = $KUIPER_REST_PORT"
+fi
+
+if [ ! -z "$KUIPER_PROMETHEUS" ]; then
+    sed -i '/basic:/ ,/prometheus/{/prometheus/d}' $KUIPER_CONFIG
+    sed -i "/basic:/a\  prometheus: $KUIPER_PROMETHEUS" $KUIPER_CONFIG
+    echo "kuiper.basic.prometheus = $KUIPER_PROMETHEUS"
+fi
+
+if [ ! -z "$KUIPER_PROMETHEUS_PORT" ]; then
+    sed -i '/basic:/ ,/prometheusPort/{/prometheusPort/d}' $KUIPER_CONFIG
+    sed -i "/basic:/a\  prometheusPort: $KUIPER_PROMETHEUS_PORT" $KUIPER_CONFIG
+    echo "kuiper.basic.prometheusPort = $KUIPER_PROMETHEUS_PORT"
+fi
+
+MQTT_CONFIG="$KUIPER_HOME/etc/mqtt_source.yaml"
 
 
 if [ ! -z "$MQTT_BROKER_ADDRESS" ]; then
 if [ ! -z "$MQTT_BROKER_ADDRESS" ]; then
-    sed -i '/default:/ ,/servers/{/servers/d}' $CONFIG
-    sed -i "/default:/a\  servers: [$MQTT_BROKER_ADDRESS]" $CONFIG
+    sed -i '/default:/ ,/servers/{/servers/d}' $MQTT_CONFIG
+    sed -i "/default:/a\  servers: [$MQTT_BROKER_ADDRESS]" $MQTT_CONFIG
     echo "mqtt.default.servers = $MQTT_BROKER_ADDRESS"
     echo "mqtt.default.servers = $MQTT_BROKER_ADDRESS"
 fi
 fi
 
 
 if [ ! -z "$MQTT_BROKER_SHARED_SUBSCRIPTION" ]; then
 if [ ! -z "$MQTT_BROKER_SHARED_SUBSCRIPTION" ]; then
-    sed -i '/default:/ ,/sharedSubscription/{/sharedSubscription/d}' $CONFIG
-    sed -i "/default:/a\  sharedSubscription: $MQTT_BROKER_SHARED_SUBSCRIPTION" $CONFIG
+    sed -i '/default:/ ,/sharedSubscription/{/sharedSubscription/d}' $MQTT_CONFIG
+    sed -i "/default:/a\  sharedSubscription: $MQTT_BROKER_SHARED_SUBSCRIPTION" $MQTT_CONFIG
     echo "mqtt.default.sharedSubscription = $MQTT_BROKER_SHARED_SUBSCRIPTION"
     echo "mqtt.default.sharedSubscription = $MQTT_BROKER_SHARED_SUBSCRIPTION"
 fi
 fi
 
 
 if [ ! -z "$MQTT_BROKER_QOS" ]; then
 if [ ! -z "$MQTT_BROKER_QOS" ]; then
-    sed -i '/default:/ ,/qos/{/qos/d}' $CONFIG
-    sed -i "/default:/a\  qos: $MQTT_BROKER_QOS" $CONFIG
+    sed -i '/default:/ ,/qos/{/qos/d}' $MQTT_CONFIG
+    sed -i "/default:/a\  qos: $MQTT_BROKER_QOS" $MQTT_CONFIG
     echo "mqtt.default.qos = $MQTT_BROKER_QOS"
     echo "mqtt.default.qos = $MQTT_BROKER_QOS"
 fi
 fi
 
 
 if [ ! -z "$MQTT_BROKER_USERNAME" ]; then
 if [ ! -z "$MQTT_BROKER_USERNAME" ]; then
-    sed -i '/default:/ ,/username/{/username/d}' $CONFIG
-    sed -i "/default:/a\  username: $MQTT_BROKER_USERNAME" $CONFIG
+    sed -i '/default:/ ,/username/{/username/d}' $MQTT_CONFIG
+    sed -i "/default:/a\  username: $MQTT_BROKER_USERNAME" $MQTT_CONFIG
     echo "mqtt.default.username = $MQTT_BROKER_USERNAME"
     echo "mqtt.default.username = $MQTT_BROKER_USERNAME"
 fi
 fi
 
 
 if [ ! -z "$MQTT_BROKER_PASSWORD" ]; then
 if [ ! -z "$MQTT_BROKER_PASSWORD" ]; then
-    sed -i '/default:/ ,/password/{/password/d}' $CONFIG
-    sed -i "/default:/a\  password: $MQTT_BROKER_PASSWORD" $CONFIG
+    sed -i '/default:/ ,/password/{/password/d}' $MQTT_CONFIG
+    sed -i "/default:/a\  password: $MQTT_BROKER_PASSWORD" $MQTT_CONFIG
     echo "mqtt.default.password = $MQTT_BROKER_PASSWORD"
     echo "mqtt.default.password = $MQTT_BROKER_PASSWORD"
 fi
 fi
 
 
 if [ ! -z "$MQTT_BROKER_CER_PATH" ]; then
 if [ ! -z "$MQTT_BROKER_CER_PATH" ]; then
-    sed -i '/default:/ ,/certificationPath/{/certificationPath/d}' $CONFIG
-    sed -i "/default:/a\  certificationPath: $MQTT_BROKER_CER_PATH" $CONFIG
+    sed -i '/default:/ ,/certificationPath/{/certificationPath/d}' $MQTT_CONFIG
+    sed -i "/default:/a\  certificationPath: $MQTT_BROKER_CER_PATH" $MQTT_CONFIG
     echo "mqtt.default.certificationPath = $MQTT_BROKER_CER_PATH"
     echo "mqtt.default.certificationPath = $MQTT_BROKER_CER_PATH"
 fi
 fi
 
 
 if [ ! -z "$MQTT_BROKER_KEY_PATH" ]; then
 if [ ! -z "$MQTT_BROKER_KEY_PATH" ]; then
-    sed -i '/default:/ ,/privateKeyPath/{/privateKeyPath/d}' $CONFIG
-    sed -i "/default:/a\  privateKeyPath: $MQTT_BROKER_KEY_PATH" $CONFIG
+    sed -i '/default:/ ,/privateKeyPath/{/privateKeyPath/d}' $MQTT_CONFIG
+    sed -i "/default:/a\  privateKeyPath: $MQTT_BROKER_KEY_PATH" $MQTT_CONFIG
     echo "mqtt.default.privateKeyPath = $MQTT_BROKER_KEY_PATH"
     echo "mqtt.default.privateKeyPath = $MQTT_BROKER_KEY_PATH"
 fi
 fi
 
 

ファイルの差分が大きいため隠しています
+ 49 - 0
docs/en_US/quick_start_docker.md


+ 1 - 1
docs/en_US/restapi/plugins.md

@@ -7,7 +7,7 @@ The Kuiper REST api for plugins allows you to manage plugins, such as create, dr
 
 
 ## create a plugin
 ## create a plugin
 
 
-The API accepts a JSON content to create a new plugin. Each plugin type has a standalone endpoint. The supported types are `["sources", "sinks", "functions"`. The plugin is identified by the name. The name must be unique.
+The API accepts a JSON content to create a new plugin. Each plugin type has a standalone endpoint. The supported types are `["sources", "sinks", "functions"]`. The plugin is identified by the name. The name must be unique.
 ```shell
 ```shell
 POST http://localhost:9081/plugins/sources
 POST http://localhost:9081/plugins/sources
 POST http://localhost:9081/plugins/sinks
 POST http://localhost:9081/plugins/sinks

+ 1 - 0
docs/en_US/rules/overview.md

@@ -53,6 +53,7 @@ Currently, 3 kinds of sinks/actions are supported:
 - [mqtt](sinks/mqtt.md): Send the result to an MQTT broker. 
 - [mqtt](sinks/mqtt.md): Send the result to an MQTT broker. 
 - [edgex](sinks/edgex.md): Send the result to EdgeX message bus.
 - [edgex](sinks/edgex.md): Send the result to EdgeX message bus.
 - [rest](sinks/rest.md): Send the result to a Rest HTTP server.
 - [rest](sinks/rest.md): Send the result to a Rest HTTP server.
+- [nop](sinks/nop.md): Send the result to a nop operation.
 
 
 Each action can define its own properties. There are 3 common properties:
 Each action can define its own properties. There are 3 common properties:
 
 

+ 50 - 1
docs/en_US/rules/sinks/edgex.md

@@ -11,6 +11,24 @@ The action is used for publish output message into EdgeX message bus.
 | contentType   | true     | The content type of message to be published. If not specified, then use the default value ``application/json``. |
 | contentType   | true     | The content type of message to be published. If not specified, then use the default value ``application/json``. |
 | metadata      | true     | The property is a field name that allows user to specify a field name of SQL  select clause,  the field name should use ``meta(*) AS xxx``  to select all of EdgeX metadata from message. |
 | metadata      | true     | The property is a field name that allows user to specify a field name of SQL  select clause,  the field name should use ``meta(*) AS xxx``  to select all of EdgeX metadata from message. |
 | deviceName    | true     | Allows user to specify the device name in the event structure that are sent from Kuiper. |
 | deviceName    | true     | Allows user to specify the device name in the event structure that are sent from Kuiper. |
+| type          | true     | The message bus type, two types of message buses are supported, ``zero`` or ``mqtt``, and ``zero`` is the default value. |
+| optional      | true     | If ``mqtt`` message bus type is specified, then some optional values can be specified. Please refer to below for supported optional supported configurations. |
+
+Please notice that all of values in optional are **<u>string type</u>**, so values for these configurations should be string - such as ``KeepAlive: "5000"``. Below optional configurations are supported, please check MQTT specification for the detailed information.
+
+- optional
+  - ClientId
+  - Username
+  - Password
+  - Qos
+  - KeepAlive
+  - Retained
+  - ConnectionPayload
+  - CertFile
+  - KeyFile
+  - CertPEMBlock
+  - KeyPEMBlock
+  - SkipCertVerify
 
 
 ## Examples
 ## Examples
 
 
@@ -120,4 +138,35 @@ Please notice that,
 - For the reading that can NOT be found in original message,  the metadata will not be set.  Such as metadata of ``t1`` in the sample will fill with default value that generated by Kuiper. 
 - For the reading that can NOT be found in original message,  the metadata will not be set.  Such as metadata of ``t1`` in the sample will fill with default value that generated by Kuiper. 
 - If your SQL has aggregated function, then it does not make sense to keep these metadata, but Kuiper will still fill with metadata from a particular message in the time window. For example, with following SQL, 
 - If your SQL has aggregated function, then it does not make sense to keep these metadata, but Kuiper will still fill with metadata from a particular message in the time window. For example, with following SQL, 
 ```SELECT avg(temperature) AS temperature, meta(*) AS edgex_meta FROM ... GROUP BY TUMBLINGWINDOW(ss, 10)```. 
 ```SELECT avg(temperature) AS temperature, meta(*) AS edgex_meta FROM ... GROUP BY TUMBLINGWINDOW(ss, 10)```. 
-In this case, there are possibly several messages in the window, the metadata value for ``temperature`` will be filled with value from 1st message that received from bus.
+In this case, there are possibly several messages in the window, the metadata value for ``temperature`` will be filled with value from 1st message that received from bus.
+
+## Send result to MQTT message bus
+
+Below is a rule that send analysis result to MQTT message bus, please notice how to specify ``ClientId`` in ``optional`` configuration.
+
+```json
+{
+  "id": "rule1",
+  "sql": "SELECT meta(*) AS edgex_meta, temperature, humidity, humidity*2 as h1 FROM demo WHERE temperature = 20",
+  "actions": [
+    {
+      "edgex": {
+        "protocol": "tcp",
+        "host": "127.0.0.1",
+        "port": 1883,
+        "topic": "result",
+        "type": "mqtt",
+        "metadata": "edgex_meta",
+        "contentType": "application/json",
+        "optional": {
+        	"ClientId": "edgex_message_bus_001"
+        }
+      }
+    },
+    {
+      "log":{}
+    }
+  ]
+}
+```
+

+ 9 - 0
docs/en_US/rules/sinks/nop.md

@@ -0,0 +1,9 @@
+# Nop action
+
+The action is an Nop sink, the result sent to this sink will be ignored. If specify the ``log`` property to ``true``, then the result will be saved into log file, the log file is at  `` $kuiper_install/log/stream.log`` by default.
+
+| Property name      | Optional | Description                                                  |
+| ------------------ | -------- | ------------------------------------------------------------ |
+| log             | true | true/false - print the sink result to log or not. By default is ``false``, that will not print the result to log file. |
+
+

+ 25 - 0
docs/en_US/rules/sources/edgex.md

@@ -83,6 +83,31 @@ The topic name of EdgeX message bus,  default value is ``events``.
 
 
 The base service address for getting value descriptors, the value of ``serviceServer`` will be concatenated to ``/api/v1/valuedescriptor`` to get all of value descriptors of EdgeX server.
 The base service address for getting value descriptors, the value of ``serviceServer`` will be concatenated to ``/api/v1/valuedescriptor`` to get all of value descriptors of EdgeX server.
 
 
+## type
+
+The EdgeX message bus type, currently two types of message buses are supported. If specified other values, then will use the default ``zero`` value.
+
+- ``zero``: Use ZeroMQ as EdgeX message bus. 
+- ``mqtt``: Use the MQTT broker as EdgeX message bus.
+
+## optional
+
+If MQTT message bus is used, some other optional configurations can be specified. Please notice that all of values in optional are **<u>string type</u>**, so values for these configurations should be string - such as ``KeepAlive: "5000"``. Below optional configurations are supported, please check MQTT specification for the detailed information.
+
+- ClientId
+
+- Username
+- Password
+- Qos
+- KeepAlive
+- Retained
+- ConnectionPayload
+- CertFile
+- KeyFile
+- CertPEMBlock
+- KeyPEMBlock
+- SkipCertVerify
+
 ## Override the default settings
 ## Override the default settings
 
 
 In some cases, maybe you want to consume message from multiple topics from message bus.  Kuiper supports to specify another configuration, and use the ``CONF_KEY`` to specify the newly created key when you create a stream.
 In some cases, maybe you want to consume message from multiple topics from message bus.  Kuiper supports to specify another configuration, and use the ``CONF_KEY`` to specify the newly created key when you create a stream.

ファイルの差分が大きいため隠しています
+ 229 - 0
docs/zh_CN/plugins/plugins_tutorial.md


ファイルの差分が大きいため隠しています
+ 49 - 0
docs/zh_CN/quick_start_docker.md


+ 1 - 1
docs/zh_CN/rules/overview.md

@@ -45,7 +45,7 @@
 
 
 ### 动作
 ### 动作
 
 
-当前,支持两种操作: [log](sinks/logs.md) 、[mqtt](sinks/mqtt.md) 和 [rest](sinks/rest.md)。 每个动作可以定义自己的属性。当前有三个公共属性:
+当前,支持以下操作: [log](sinks/logs.md) 、[mqtt](sinks/mqtt.md) 、[edgex](sinks/edgex.md)、[rest](sinks/rest.md) 和 [nop](sinks/nop.md)。 每个动作可以定义自己的属性。当前有以下的公共属性:
 
 
 | 属性名 | 类型和默认值 | 描述                                                  |
 | 属性名 | 类型和默认值 | 描述                                                  |
 | ------------- | -------- | ------------------------------------------------------------ |
 | ------------- | -------- | ------------------------------------------------------------ |

+ 50 - 1
docs/zh_CN/rules/sinks/edgex.md

@@ -11,6 +11,24 @@
 | contentType | true     | 发布消息的内容类型,如未指定,使用缺省值 ``application/json``. |
 | contentType | true     | 发布消息的内容类型,如未指定,使用缺省值 ``application/json``. |
 | metadata    | true     | 该属性为一个字段名称,该字段是 SQL SELECT 子句的一个字段名称,这个字段应该类似于 ``meta(*) AS xxx`` ,用于选出消息中所有的 EdgeX 元数据. |
 | metadata    | true     | 该属性为一个字段名称,该字段是 SQL SELECT 子句的一个字段名称,这个字段应该类似于 ``meta(*) AS xxx`` ,用于选出消息中所有的 EdgeX 元数据. |
 | deviceName  | true     | 允许用户指定设备名称,该名称将作为从 Kuiper 中发送出来的 Event 结构体的设备名称. |
 | deviceName  | true     | 允许用户指定设备名称,该名称将作为从 Kuiper 中发送出来的 Event 结构体的设备名称. |
+| type          | true     | 消息总线类型,目前支持两种类型的消息总线, ``zero`` 或者 ``mqtt``,其中 ``zero`` 为缺省类型。 |
+| optional      | true     | 如果指定了 ``mqtt`` 消息总线,那么还可以指定一下可选的值。请参考以下可选的支持的配置类型。 |
+
+请注意,所有在可选的配置项里指定的值都必须为**<u>字符类型</u>**,因此这里出现的所有的配置应该是字符类型的 - 例如 ``KeepAlive: "5000"``。以下为支持的可选的配置列表,您可以参考 MQTT 协议规范来获取更详尽的信息。
+
+- optional
+  - ClientId
+  - Username
+  - Password
+  - Qos
+  - KeepAlive
+  - Retained
+  - ConnectionPayload
+  - CertFile
+  - KeyFile
+  - CertPEMBlock
+  - KeyPEMBlock
+  - SkipCertVerify
 
 
 ## 例子
 ## 例子
 
 
@@ -124,4 +142,35 @@
 - 对于在原有消息中无法找到的 reading,元数据将不会被设置。如例子中的``t1`` 的元数据被设置为 Kuiper 产生的缺省值。
 - 对于在原有消息中无法找到的 reading,元数据将不会被设置。如例子中的``t1`` 的元数据被设置为 Kuiper 产生的缺省值。
 - 如果你的 SQL 包含了聚合函数,那保留原有的元数据就没有意义,但是 Kuiper 还是会使用时间窗口中的某一条记录的元数据。例如,在下面的 SQL 里,
 - 如果你的 SQL 包含了聚合函数,那保留原有的元数据就没有意义,但是 Kuiper 还是会使用时间窗口中的某一条记录的元数据。例如,在下面的 SQL 里,
 ```SELECT avg(temperature) AS temperature, meta(*) AS edgex_meta FROM ... GROUP BY TUMBLINGWINDOW(ss, 10)```. 
 ```SELECT avg(temperature) AS temperature, meta(*) AS edgex_meta FROM ... GROUP BY TUMBLINGWINDOW(ss, 10)```. 
-这种情况下,在时间窗口中可能有几条数据,Kuiper 会使用窗口中的第一条数据的元数据来填充 ``temperature`` 的元数据。
+这种情况下,在时间窗口中可能有几条数据,Kuiper 会使用窗口中的第一条数据的元数据来填充 ``temperature`` 的元数据。
+
+## 结果发布到 MQTT 消息总线
+
+以下是将分析结果发送到 MQTT 消息总线的规则,请注意在``optional`` 中是如何指定 ``ClientId`` 的。
+
+```json
+{
+  "id": "rule1",
+  "sql": "SELECT meta(*) AS edgex_meta, temperature, humidity, humidity*2 as h1 FROM demo WHERE temperature = 20",
+  "actions": [
+    {
+      "edgex": {
+        "protocol": "tcp",
+        "host": "127.0.0.1",
+        "port": 1883,
+        "topic": "result",
+        "type": "mqtt",
+        "metadata": "edgex_meta",
+        "contentType": "application/json",
+        "optional": {
+        	"ClientId": "edgex_message_bus_001"
+        }
+      }
+    },
+    {
+      "log":{}
+    }
+  ]
+}
+```
+

+ 9 - 0
docs/zh_CN/rules/sinks/nop.md

@@ -0,0 +1,9 @@
+# Nop action
+
+该 action 是一个空操作目标,所有发送到此的结果将被忽略。如果指定 ``log`` 属性为 ``true``,那么结果将会保存到日志文件,日志文件缺省保存在  `` $kuiper_install/log/stream.log``。
+
+| Property name      | Optional | Description                                                  |
+| ------------------ | -------- | ------------------------------------------------------------ |
+| log             | true | true/false - 是否将结果打印到日志。缺省为 ``false``,这种情况下将不会打印到日志文件。 |
+
+

+ 25 - 0
docs/zh_CN/rules/sources/edgex.md

@@ -83,6 +83,31 @@ EdgeX 消息总线上监听的主题名称,缺省为 ``events``.
 
 
 访问 value descriptors 的基础服务地址,配置项 ``serviceServer`` 的值与 ``/api/v1/valuedescriptor`` 拼接后,用于获取 EdgeX 服务器上定义的所有 value descriptors。
 访问 value descriptors 的基础服务地址,配置项 ``serviceServer`` 的值与 ``/api/v1/valuedescriptor`` 拼接后,用于获取 EdgeX 服务器上定义的所有 value descriptors。
 
 
+## type
+
+EdgeX 消息总线类型,目前支持两种消息总线。如果指定了错误的消息总线类型,那么会使用缺省 ``zero`` 类型。
+
+- ``zero``:使用 ZeroMQ 类型的消息总线 
+- ``mqtt``:使用 MQTT 服务器作为消息总线
+
+## optional
+
+如果使用了 MQTT 消息总线,还可以指定别的一些可选配置项。请注意,所有在可选的配置项里指定的值都必须为**<u>字符类型</u>**,因此这里出现的所有的配置应该是字符类型的 - 例如 ``KeepAlive: "5000"``。以下为支持的可选的配置列表,您可以参考 MQTT 协议规范来获取更详尽的信息。
+
+- ClientId
+
+- Username
+- Password
+- Qos
+- KeepAlive
+- Retained
+- ConnectionPayload
+- CertFile
+- KeyFile
+- CertPEMBlock
+- KeyPEMBlock
+- SkipCertVerify
+
 ## 重载缺省设置
 ## 重载缺省设置
 
 
 在某些情况下,你可能想消费来自于多个主题的数据。Kuiper 支持指定别的配置,并且在创建流定义的时候使用 ``CONF_KEY`` 来指定新的配置。
 在某些情况下,你可能想消费来自于多个主题的数据。Kuiper 支持指定别的配置,并且在创建流定义的时候使用 ``CONF_KEY`` 来指定新的配置。

+ 4 - 0
etc/kuiper.yaml

@@ -1,6 +1,10 @@
 basic:
 basic:
   # true|false, with debug level, it prints more debug info
   # true|false, with debug level, it prints more debug info
   debug: false
   debug: false
+  # true|false, if it's set to true, then the log will be print to console
+  consoleLog: false
+  # true|false, if it's set to true, then the log will be print to log file
+  fileLog: true
   port: 20498
   port: 20498
   restPort: 9081
   restPort: 9081
   prometheus: false
   prometheus: false

+ 25 - 4
etc/sources/edgex.yaml

@@ -5,13 +5,34 @@ default:
   port: 5563
   port: 5563
   topic: events
   topic: events
   serviceServer: http://localhost:48080
   serviceServer: http://localhost:48080
+#  Below is optional configurations settings for mqtt
+#  type: mqtt
 #  optional:
 #  optional:
-#    ClientId: client1
-#    Username: user1
-#    Password: password
+#    ClientId: "client1"
+#    Username: "user1"
+#    Password: "password"
+#    Qos: "1"
+#    KeepAlive: "5000"
+#    Retained: "true/false"
+#    ConnectionPayload: ""
+#    CertFile: ""
+#    KeyFile: ""
+#    CertPEMBlock: ""
+#    KeyPEMBlock: ""
+#    SkipCertVerify: "true/false"
+
 #Override the global configurations
 #Override the global configurations
 application_conf: #Conf_key
 application_conf: #Conf_key
   protocol: tcp
   protocol: tcp
   server: localhost
   server: localhost
   port: 5571
   port: 5571
-  topic: application
+  topic: application
+
+mqtt_conf: #Conf_key
+  protocol: tcp
+  server: 127.0.0.1
+  port: 1883
+  topic: events
+  type: mqtt
+  optional:
+    ClientId: "client1"

+ 124 - 0
fvt_scripts/edgex/benchmark/README.md

@@ -0,0 +1,124 @@
+
+
+- Build the ``pub.go``, this is used for simulating the data.
+    ```shell
+    # go build -o fvt_scripts/edgex/benchmark/pub fvt_scripts/edgex/benchmark/pub.go
+    ```
+
+- A mockup EdgeX value descriptor service should be compiled and run before test.
+
+    ```shell
+    # go build -o fvt_scripts/edgex/valuedesc/vdmocker fvt_scripts/edgex/valuedesc/vd_server.go
+    
+    # fvt_scripts/edgex/valuedesc/vdmocker > vdmocker.out 2>&1 &
+    ```
+
+- Update edgex configuration. Update ``Server`` configuration to the address where you run ``pub`` in the 1st step, update ``serviceServer`` to the address where you run ``vdmocker`` of last step.  
+  ```yaml
+  default:
+    protocol: tcp
+    server: 172.31.1.144
+    port: 5563
+    topic: events
+    serviceServer: http://localhost:48080 
+  ```
+
+- The rule is listed as following, save the rule as ``rule.txt``.
+
+   ```json
+    {
+      "sql": "SELECT * from demo where temperature>50",
+      "actions": [
+        {
+          "nop": {
+            "log": false
+          }
+        }
+      ]
+    }
+   ```
+  
+- Create stream ``bin/cli create stream demo'() WITH (FORMAT="JSON", TYPE="edgex")'``
+
+- Create rule ``bin/cli create rule rule1 -f rule.txt``. To add another rule, just change the rule name of command, then deploy another rule, e,g, ``bin/cli create rule rule2 -f rule.txt``
+
+- Run ``pub`` application ``./pub 1000000``, below is an example.
+  ```shell script
+  ubuntu@ip-172-31-1-144:~$ ./pub 1000000
+  elapsed 174.924363s
+  ```
+  
+  Check the status of rule,
+  ```shell script
+  ubuntu@ip-172-31-5-85:/tmp/kuiper-master/_build/kuiper--linux-x86_64$ bin/cli getstatus rule rule1
+    Connecting to 127.0.0.1:20498...
+    {
+      "source_demo_0_records_in_total": 1000000,
+      "source_demo_0_records_out_total": 1000000,
+      "source_demo_0_exceptions_total": 0,
+      "source_demo_0_process_latency_ms": 0,
+      "source_demo_0_buffer_length": 0,
+      "source_demo_0_last_invocation": "2020-04-10T04:26:15.51329",
+      "op_preprocessor_demo_0_records_in_total": 1000000,
+      "op_preprocessor_demo_0_records_out_total": 1000000,
+      "op_preprocessor_demo_0_exceptions_total": 0,
+      "op_preprocessor_demo_0_process_latency_ms": 0,
+      "op_preprocessor_demo_0_buffer_length": 0,
+      "op_preprocessor_demo_0_last_invocation": "2020-04-10T04:26:15.513371",
+      "op_filter_0_records_in_total": 1000000,
+      "op_filter_0_records_out_total": 100000,
+      "op_filter_0_exceptions_total": 0,
+      "op_filter_0_process_latency_ms": 0,
+      "op_filter_0_buffer_length": 0,
+      "op_filter_0_last_invocation": "2020-04-10T04:26:15.513391",
+      "op_project_0_records_in_total": 100000,
+      "op_project_0_records_out_total": 100000,
+      "op_project_0_exceptions_total": 0,
+      "op_project_0_process_latency_ms": 0,
+      "op_project_0_buffer_length": 0,
+      "op_project_0_last_invocation": "2020-04-10T04:26:15.513468",
+      "sink_sink_nop_0_records_in_total": 100000,
+      "sink_sink_nop_0_records_out_total": 100000,
+      "sink_sink_nop_0_exceptions_total": 0,
+      "sink_sink_nop_0_process_latency_ms": 0,
+      "sink_sink_nop_0_buffer_length": 1,
+      "sink_sink_nop_0_last_invocation": "2020-04-10T04:26:15.513501"
+    }
+  ubuntu@ip-172-31-5-85:/tmp/kuiper-master/_build/kuiper--linux-x86_64$ bin/cli getstatus rule rule2
+    Connecting to 127.0.0.1:20498...
+    {
+      "source_demo_0_records_in_total": 1000000,
+      "source_demo_0_records_out_total": 1000000,
+      "source_demo_0_exceptions_total": 0,
+      "source_demo_0_process_latency_ms": 0,
+      "source_demo_0_buffer_length": 0,
+      "source_demo_0_last_invocation": "2020-04-10T04:26:15.514621",
+      "op_preprocessor_demo_0_records_in_total": 1000000,
+      "op_preprocessor_demo_0_records_out_total": 1000000,
+      "op_preprocessor_demo_0_exceptions_total": 0,
+      "op_preprocessor_demo_0_process_latency_ms": 0,
+      "op_preprocessor_demo_0_buffer_length": 0,
+      "op_preprocessor_demo_0_last_invocation": "2020-04-10T04:26:15.514631",
+      "op_filter_0_records_in_total": 1000000,
+      "op_filter_0_records_out_total": 100000,
+      "op_filter_0_exceptions_total": 0,
+      "op_filter_0_process_latency_ms": 0,
+      "op_filter_0_buffer_length": 0,
+      "op_filter_0_last_invocation": "2020-04-10T04:26:15.514635",
+      "op_project_0_records_in_total": 100000,
+      "op_project_0_records_out_total": 100000,
+      "op_project_0_exceptions_total": 0,
+      "op_project_0_process_latency_ms": 0,
+      "op_project_0_buffer_length": 0,
+      "op_project_0_last_invocation": "2020-04-10T04:26:15.514639",
+      "sink_sink_nop_0_records_in_total": 100000,
+      "sink_sink_nop_0_records_out_total": 100000,
+      "sink_sink_nop_0_exceptions_total": 0,
+      "sink_sink_nop_0_process_latency_ms": 0,
+      "sink_sink_nop_0_buffer_length": 1,
+      "sink_sink_nop_0_last_invocation": "2020-04-10T04:26:15.514652"
+    }
+  ```
+Below is the system usage screenshot,
+
+  ![](system_usage.png)

+ 111 - 0
fvt_scripts/edgex/benchmark/pub.go

@@ -0,0 +1,111 @@
+// +build benchmark
+
+//Not necessary to build the file, until for the edgex benchmark test
+package main
+
+import (
+	"context"
+	"fmt"
+	"github.com/edgexfoundry/go-mod-core-contracts/clients/coredata"
+	"github.com/edgexfoundry/go-mod-core-contracts/clients/urlclient/local"
+	"github.com/edgexfoundry/go-mod-core-contracts/models"
+	"github.com/edgexfoundry/go-mod-messaging/messaging"
+	"github.com/edgexfoundry/go-mod-messaging/pkg/types"
+	"log"
+	"os"
+	"strconv"
+	"sync"
+	"time"
+)
+
+var msgConfig1 = types.MessageBusConfig{
+	PublishHost: types.HostInfo{
+		Host:     "172.31.1.144",
+		Port:     5563,
+		Protocol: "tcp",
+	},
+	Type:messaging.ZeroMQ,
+}
+
+type data struct {
+	temperature int
+	humidity    int
+}
+
+var mockup = []data{
+	{temperature: 10, humidity: 15},
+	{temperature: 15, humidity: 20},
+	{temperature: 20, humidity: 25},
+	{temperature: 25, humidity: 30},
+	{temperature: 30, humidity: 35},
+	{temperature: 35, humidity: 40},
+	{temperature: 40, humidity: 45},
+	{temperature: 45, humidity: 50},
+	{temperature: 50, humidity: 55},
+	{temperature: 55, humidity: 60},
+}
+
+func pubEventClientZeroMq(count int, wg *sync.WaitGroup) {
+	defer wg.Done()
+	if msgClient, err := messaging.NewMessageClient(msgConfig1); err != nil {
+		log.Fatal(err)
+	} else {
+		if ec := msgClient.Connect(); ec != nil {
+			log.Fatal(ec)
+		} else {
+			client := coredata.NewEventClient(local.New("test"))
+			index := 0;
+			for i := 0; i < count; i++ {
+				if i % 10 == 0 {
+					index = 0
+				}
+
+				var testEvent = models.Event{Device: "demo"}
+				var r1 = models.Reading{Device: "Temperature device", Name: "Temperature", Value: fmt.Sprintf("%d", mockup[index].temperature)}
+				var r2 = models.Reading{Device: "Humidity device", Name: "Humidity", Value: fmt.Sprintf("%d",  mockup[index].humidity)}
+				index++
+
+				testEvent.Readings = append(testEvent.Readings, r1, r2)
+
+				data, err := client.MarshalEvent(testEvent)
+				if err != nil {
+					fmt.Errorf("unexpected error MarshalEvent %v", err)
+				}
+
+				env := types.NewMessageEnvelope([]byte(data), context.Background())
+				env.ContentType = "application/json"
+
+				if e := msgClient.Publish(env, "events"); e != nil {
+					log.Fatal(e)
+				} else {
+					//fmt.Printf("%d - %s\n", index, string(data))
+				}
+				time.Sleep(100 * time.Nanosecond)
+			}
+		}
+	}
+}
+
+func main() {
+	start := time.Now()
+	count := 1000
+	if len(os.Args) == 2 {
+		v := os.Args[1]
+		if c, err := strconv.Atoi(v); err != nil {
+			fmt.Errorf("%s\n", err)
+		} else {
+			count = c
+		}
+	}
+
+	var wg sync.WaitGroup
+	for i := 0; i < 1; i++ {
+		wg.Add(1)
+		go pubEventClientZeroMq(count ,&wg)
+	}
+	wg.Wait()
+	t := time.Now()
+	elapsed := t.Sub(start)
+
+	fmt.Printf("elapsed %2fs\n", elapsed.Seconds())
+}

BIN
fvt_scripts/edgex/benchmark/system_usage.png


+ 49 - 0
fvt_scripts/edgex/pub.go

@@ -113,6 +113,50 @@ func pubToAnother() {
 	}
 	}
 }
 }
 
 
+func pubToMQTT(host string) {
+	var msgConfig2 = types.MessageBusConfig{
+		PublishHost: types.HostInfo{
+			Host:     host,
+			Port:     1883,
+			Protocol: "tcp",
+		},
+		Optional:map[string]string{
+			"ClientId": "0001_client_id",
+		},
+		Type:messaging.MQTT,
+	}
+	if msgClient, err := messaging.NewMessageClient(msgConfig2); err != nil {
+		log.Fatal(err)
+	} else {
+		if ec := msgClient.Connect(); ec != nil {
+			log.Fatal(ec)
+		}
+		client := coredata.NewEventClient(local.New("test1"))
+		var testEvent = models.Event{Device: "demo1", Created: 123, Modified: 123, Origin: 123}
+		var r1 = models.Reading{Pushed: 123, Created: 123, Origin: 123, Modified: 123, Device: "test device name", Name: "Temperature", Value: "20"}
+		var r2 = models.Reading{Pushed: 123, Created: 123, Origin: 123, Modified: 123, Device: "test device name", Name: "Humidity", Value: "30"}
+
+		testEvent.Readings = append(testEvent.Readings, r1, r2)
+
+		data, err := client.MarshalEvent(testEvent)
+		if err != nil {
+			fmt.Errorf("unexpected error MarshalEvent %v", err)
+		} else {
+			fmt.Println(string(data))
+		}
+
+		env := types.NewMessageEnvelope([]byte(data), context.Background())
+		env.ContentType = "application/json"
+
+		if e := msgClient.Publish(env, "events"); e != nil {
+			log.Fatal(e)
+		} else {
+			fmt.Printf("pubToAnother successful: %s\n", data)
+		}
+		time.Sleep(1500 * time.Millisecond)
+	}
+}
+
 func pubMetaSource() {
 func pubMetaSource() {
 	if msgClient, err := messaging.NewMessageClient(msgConfig1); err != nil {
 	if msgClient, err := messaging.NewMessageClient(msgConfig1); err != nil {
 		log.Fatal(err)
 		log.Fatal(err)
@@ -161,6 +205,11 @@ func main() {
 		} else if v == "meta" {
 		} else if v == "meta" {
 			pubMetaSource()
 			pubMetaSource()
 		}
 		}
+	} else if len(os.Args) == 3 {
+		if v := os.Args[1]; v == "mqtt" {
+			//The 2nd parameter is MQTT broker server address
+			pubToMQTT(os.Args[2])
+		}
 	}
 	}
 }
 }
 
 

+ 55 - 4
fvt_scripts/edgex/sub/sub.go

@@ -5,9 +5,10 @@ import (
 	"github.com/edgexfoundry/go-mod-messaging/messaging"
 	"github.com/edgexfoundry/go-mod-messaging/messaging"
 	"github.com/edgexfoundry/go-mod-messaging/pkg/types"
 	"github.com/edgexfoundry/go-mod-messaging/pkg/types"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/common"
+	"os"
 )
 )
 
 
-func main() {
+func subEventsFromZMQ() {
 	var msgConfig1 = types.MessageBusConfig{
 	var msgConfig1 = types.MessageBusConfig{
 		SubscribeHost: types.HostInfo{
 		SubscribeHost: types.HostInfo{
 			Host:     "localhost",
 			Host:     "localhost",
@@ -23,9 +24,6 @@ func main() {
 		if ec := msgClient.Connect(); ec != nil {
 		if ec := msgClient.Connect(); ec != nil {
 			common.Log.Fatal(ec)
 			common.Log.Fatal(ec)
 		} else {
 		} else {
-			if err := msgClient.Connect(); err != nil {
-				common.Log.Fatal(err)
-			}
 			//log.Infof("The connection to edgex messagebus is established successfully.")
 			//log.Infof("The connection to edgex messagebus is established successfully.")
 			messages := make(chan types.MessageEnvelope)
 			messages := make(chan types.MessageEnvelope)
 			topics := []types.TopicChannel{{Topic: "", Messages: messages}}
 			topics := []types.TopicChannel{{Topic: "", Messages: messages}}
@@ -52,3 +50,56 @@ func main() {
 		}
 		}
 	}
 	}
 }
 }
+
+func subEventsFromMQTT(host string) {
+	var msgConfig1 = types.MessageBusConfig{
+		SubscribeHost: types.HostInfo{
+			Host:     host,
+			Port:     1883,
+			Protocol: "tcp",
+		},
+		Type: messaging.MQTT,
+	}
+
+	if msgClient, err := messaging.NewMessageClient(msgConfig1); err != nil {
+		common.Log.Fatal(err)
+	} else {
+		if ec := msgClient.Connect(); ec != nil {
+			common.Log.Fatal(ec)
+		} else {
+			//log.Infof("The connection to edgex messagebus is established successfully.")
+			messages := make(chan types.MessageEnvelope)
+			topics := []types.TopicChannel{{Topic: "result", Messages: messages}}
+			err := make(chan error)
+			if e := msgClient.Subscribe(topics, err); e != nil {
+				//log.Errorf("Failed to subscribe to edgex messagebus topic %s.\n", e)
+				common.Log.Fatal(e)
+			} else {
+				var count int = 0
+				for {
+					select {
+					case e1 := <-err:
+						common.Log.Errorf("%s\n", e1)
+						return
+					case env := <-messages:
+						count++
+						fmt.Printf("%s\n", env.Payload)
+						if count == 1 {
+							return
+						}
+					}
+				}
+			}
+		}
+	}
+}
+
+func main() {
+	if len(os.Args) == 1 {
+		subEventsFromZMQ()
+	} else if len(os.Args) == 3 {
+		if v := os.Args[1]; v == "mqtt" {
+			subEventsFromMQTT(os.Args[2])
+		}
+	}
+}

+ 494 - 0
fvt_scripts/edgex_mqtt_sink_rule.jmx

@@ -0,0 +1,494 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<jmeterTestPlan version="1.2" properties="4.0" jmeter="4.0 r1823414">
+  <hashTree>
+    <TestPlan guiclass="TestPlanGui" testclass="TestPlan" testname="Test Plan" enabled="true">
+      <stringProp name="TestPlan.comments"></stringProp>
+      <boolProp name="TestPlan.functional_mode">false</boolProp>
+      <boolProp name="TestPlan.tearDown_on_shutdown">true</boolProp>
+      <boolProp name="TestPlan.serialize_threadgroups">false</boolProp>
+      <elementProp name="TestPlan.user_defined_variables" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
+        <collectionProp name="Arguments.arguments"/>
+      </elementProp>
+      <stringProp name="TestPlan.user_define_classpath"></stringProp>
+    </TestPlan>
+    <hashTree>
+      <Arguments guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
+        <collectionProp name="Arguments.arguments">
+          <elementProp name="srv" elementType="Argument">
+            <stringProp name="Argument.name">srv</stringProp>
+            <stringProp name="Argument.value">127.0.0.1</stringProp>
+            <stringProp name="Argument.metadata">=</stringProp>
+          </elementProp>
+          <elementProp name="rest_port" elementType="Argument">
+            <stringProp name="Argument.name">rest_port</stringProp>
+            <stringProp name="Argument.value">9081</stringProp>
+            <stringProp name="Argument.metadata">=</stringProp>
+          </elementProp>
+          <elementProp name="fvt" elementType="Argument">
+            <stringProp name="Argument.name">fvt</stringProp>
+            <stringProp name="Argument.value">${__property(fvt,,)}</stringProp>
+            <stringProp name="Argument.metadata">=</stringProp>
+          </elementProp>
+          <elementProp name="mqtt_srv" elementType="Argument">
+            <stringProp name="Argument.name">mqtt_srv</stringProp>
+            <stringProp name="Argument.value">127.0.0.1</stringProp>
+            <stringProp name="Argument.metadata">=</stringProp>
+          </elementProp>
+        </collectionProp>
+      </Arguments>
+      <hashTree/>
+      <ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup" testname="Rules" enabled="true">
+        <stringProp name="ThreadGroup.on_sample_error">continue</stringProp>
+        <elementProp name="ThreadGroup.main_controller" elementType="LoopController" guiclass="LoopControlPanel" testclass="LoopController" testname="Loop Controller" enabled="true">
+          <boolProp name="LoopController.continue_forever">false</boolProp>
+          <stringProp name="LoopController.loops">1</stringProp>
+        </elementProp>
+        <stringProp name="ThreadGroup.num_threads">1</stringProp>
+        <stringProp name="ThreadGroup.ramp_time">1</stringProp>
+        <boolProp name="ThreadGroup.scheduler">false</boolProp>
+        <stringProp name="ThreadGroup.duration"></stringProp>
+        <stringProp name="ThreadGroup.delay"></stringProp>
+      </ThreadGroup>
+      <hashTree>
+        <ResultCollector guiclass="ViewResultsFullVisualizer" testclass="ResultCollector" testname="View Results Tree" enabled="true">
+          <boolProp name="ResultCollector.error_logging">false</boolProp>
+          <objProp>
+            <name>saveConfig</name>
+            <value class="SampleSaveConfiguration">
+              <time>true</time>
+              <latency>true</latency>
+              <timestamp>true</timestamp>
+              <success>true</success>
+              <label>true</label>
+              <code>true</code>
+              <message>true</message>
+              <threadName>true</threadName>
+              <dataType>true</dataType>
+              <encoding>false</encoding>
+              <assertions>true</assertions>
+              <subresults>true</subresults>
+              <responseData>false</responseData>
+              <samplerData>false</samplerData>
+              <xml>false</xml>
+              <fieldNames>true</fieldNames>
+              <responseHeaders>false</responseHeaders>
+              <requestHeaders>false</requestHeaders>
+              <responseDataOnError>false</responseDataOnError>
+              <saveAssertionResultsFailureMessage>true</saveAssertionResultsFailureMessage>
+              <assertionsResultsToSave>0</assertionsResultsToSave>
+              <bytes>true</bytes>
+              <sentBytes>true</sentBytes>
+              <threadCounts>true</threadCounts>
+              <idleTime>true</idleTime>
+              <connectTime>true</connectTime>
+            </value>
+          </objProp>
+          <stringProp name="filename"></stringProp>
+        </ResultCollector>
+        <hashTree/>
+        <TransactionController guiclass="TransactionControllerGui" testclass="TransactionController" testname="Rule" enabled="true">
+          <boolProp name="TransactionController.includeTimers">false</boolProp>
+          <boolProp name="TransactionController.parent">false</boolProp>
+        </TransactionController>
+        <hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_CreateStream" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value">{&#xd;
+&quot;sql&quot; : &quot;create stream demo () WITH (FORMAT=\&quot;JSON\&quot;, TYPE=\&quot;edgex\&quot; Conf_key=\&quot;mqtt_conf\&quot;)&quot;&#xd;
+}</stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/streams</stringProp>
+            <stringProp name="HTTPSampler.method">POST</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
+              <collectionProp name="Asserion.test_strings">
+                <stringProp name="-1754954177">Stream demo is created.</stringProp>
+              </collectionProp>
+              <stringProp name="Assertion.custom_message"></stringProp>
+              <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
+              <boolProp name="Assertion.assume_success">true</boolProp>
+              <intProp name="Assertion.test_type">16</intProp>
+            </ResponseAssertion>
+            <hashTree/>
+          </hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_CreateRule" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value">{&#xd;
+  &quot;id&quot;: &quot;rule1&quot;,&#xd;
+  &quot;sql&quot;: &quot;SELECT meta(*) AS edgex_meta, temperature, humidity, humidity*2 as h1 FROM demo WHERE temperature = 20&quot;,&#xd;
+  &quot;actions&quot;: [&#xd;
+    {&#xd;
+      &quot;edgex&quot;: {&#xd;
+        &quot;protocol&quot;: &quot;tcp&quot;,&#xd;
+        &quot;host&quot;: &quot;${mqtt_srv}&quot;,&#xd;
+        &quot;port&quot;: 1883,&#xd;
+        &quot;topic&quot;: &quot;result&quot;,&#xd;
+        &quot;type&quot;: &quot;mqtt&quot;,&#xd;
+        &quot;metadata&quot;: &quot;edgex_meta&quot;,&#xd;
+        &quot;contentType&quot;: &quot;application/json&quot;,&#xd;
+        &quot;optional&quot;: {&#xd;
+        	&quot;ClientId&quot;: &quot;edgex_message_bus_001&quot;&#xd;
+        }&#xd;
+      }&#xd;
+    },&#xd;
+    {&#xd;
+      &quot;log&quot;:{}&#xd;
+    }&#xd;
+  ]&#xd;
+}</stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/rules</stringProp>
+            <stringProp name="HTTPSampler.method">POST</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
+              <collectionProp name="Asserion.test_strings">
+                <stringProp name="-2022196798">Rule rule1 was created</stringProp>
+              </collectionProp>
+              <stringProp name="Assertion.custom_message"></stringProp>
+              <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
+              <boolProp name="Assertion.assume_success">true</boolProp>
+              <intProp name="Assertion.test_type">16</intProp>
+            </ResponseAssertion>
+            <hashTree/>
+          </hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_GetRuleStatus" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value"></stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/rules/rule1/status</stringProp>
+            <stringProp name="HTTPSampler.method">GET</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="JSON Assertion" enabled="true">
+              <stringProp name="JSON_PATH">$.source_demo_0_records_in_total</stringProp>
+              <stringProp name="EXPECTED_VALUE">0</stringProp>
+              <boolProp name="JSONVALIDATION">true</boolProp>
+              <boolProp name="EXPECT_NULL">false</boolProp>
+              <boolProp name="INVERT">false</boolProp>
+              <boolProp name="ISREGEX">false</boolProp>
+            </JSONPathAssertion>
+            <hashTree/>
+          </hashTree>
+          <SystemSampler guiclass="SystemSamplerGui" testclass="SystemSampler" testname="OS Process Sampler" enabled="true">
+            <boolProp name="SystemSampler.checkReturnCode">false</boolProp>
+            <stringProp name="SystemSampler.expectedReturnCode">0</stringProp>
+            <stringProp name="SystemSampler.command">fvt_scripts/edgex/pub</stringProp>
+            <elementProp name="SystemSampler.arguments" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="Argument">
+                  <stringProp name="Argument.name"></stringProp>
+                  <stringProp name="Argument.value">mqtt</stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+                <elementProp name="" elementType="Argument">
+                  <stringProp name="Argument.name"></stringProp>
+                  <stringProp name="Argument.value">${mqtt_srv}</stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <elementProp name="SystemSampler.environment" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
+              <collectionProp name="Arguments.arguments"/>
+            </elementProp>
+            <stringProp name="SystemSampler.directory">${__property(fvt,,)}</stringProp>
+          </SystemSampler>
+          <hashTree/>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_GetRuleStatus" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value"></stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/rules/rule1/status</stringProp>
+            <stringProp name="HTTPSampler.method">GET</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="JSON Assertion" enabled="true">
+              <stringProp name="JSON_PATH">$.source_demo_0_records_in_total</stringProp>
+              <stringProp name="EXPECTED_VALUE">1</stringProp>
+              <boolProp name="JSONVALIDATION">true</boolProp>
+              <boolProp name="EXPECT_NULL">false</boolProp>
+              <boolProp name="INVERT">false</boolProp>
+              <boolProp name="ISREGEX">false</boolProp>
+            </JSONPathAssertion>
+            <hashTree/>
+            <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="JSON Assertion" enabled="false">
+              <stringProp name="JSON_PATH">$.sink_sink_mqtt_0_records_in_total</stringProp>
+              <stringProp name="EXPECTED_VALUE">6</stringProp>
+              <boolProp name="JSONVALIDATION">true</boolProp>
+              <boolProp name="EXPECT_NULL">false</boolProp>
+              <boolProp name="INVERT">false</boolProp>
+              <boolProp name="ISREGEX">false</boolProp>
+            </JSONPathAssertion>
+            <hashTree/>
+            <ConstantTimer guiclass="ConstantTimerGui" testclass="ConstantTimer" testname="Constant Timer" enabled="true">
+              <stringProp name="ConstantTimer.delay">5000</stringProp>
+            </ConstantTimer>
+            <hashTree/>
+          </hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_DropRule" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value"></stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/rules/rule1</stringProp>
+            <stringProp name="HTTPSampler.method">DELETE</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
+              <collectionProp name="Asserion.test_strings">
+                <stringProp name="717250485">Rule rule1 is dropped.</stringProp>
+              </collectionProp>
+              <stringProp name="Assertion.custom_message"></stringProp>
+              <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
+              <boolProp name="Assertion.assume_success">false</boolProp>
+              <intProp name="Assertion.test_type">16</intProp>
+            </ResponseAssertion>
+            <hashTree/>
+          </hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_Drop_Stream" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value"></stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/streams/demo</stringProp>
+            <stringProp name="HTTPSampler.method">DELETE</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
+              <collectionProp name="Asserion.test_strings">
+                <stringProp name="287881319">Stream demo is dropped.</stringProp>
+              </collectionProp>
+              <stringProp name="Assertion.custom_message"></stringProp>
+              <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
+              <boolProp name="Assertion.assume_success">false</boolProp>
+              <intProp name="Assertion.test_type">16</intProp>
+            </ResponseAssertion>
+            <hashTree/>
+          </hashTree>
+          <ConstantTimer guiclass="ConstantTimerGui" testclass="ConstantTimer" testname="Constant Timer" enabled="true">
+            <stringProp name="ConstantTimer.delay">500</stringProp>
+          </ConstantTimer>
+          <hashTree/>
+        </hashTree>
+      </hashTree>
+      <ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup" testname="Result" enabled="true">
+        <stringProp name="ThreadGroup.on_sample_error">continue</stringProp>
+        <elementProp name="ThreadGroup.main_controller" elementType="LoopController" guiclass="LoopControlPanel" testclass="LoopController" testname="Loop Controller" enabled="true">
+          <boolProp name="LoopController.continue_forever">false</boolProp>
+          <stringProp name="LoopController.loops">1</stringProp>
+        </elementProp>
+        <stringProp name="ThreadGroup.num_threads">1</stringProp>
+        <stringProp name="ThreadGroup.ramp_time">1</stringProp>
+        <boolProp name="ThreadGroup.scheduler">false</boolProp>
+        <stringProp name="ThreadGroup.duration"></stringProp>
+        <stringProp name="ThreadGroup.delay"></stringProp>
+      </ThreadGroup>
+      <hashTree>
+        <ResultCollector guiclass="ViewResultsFullVisualizer" testclass="ResultCollector" testname="View Results Tree" enabled="true">
+          <boolProp name="ResultCollector.error_logging">false</boolProp>
+          <objProp>
+            <name>saveConfig</name>
+            <value class="SampleSaveConfiguration">
+              <time>true</time>
+              <latency>true</latency>
+              <timestamp>true</timestamp>
+              <success>true</success>
+              <label>true</label>
+              <code>true</code>
+              <message>true</message>
+              <threadName>true</threadName>
+              <dataType>true</dataType>
+              <encoding>false</encoding>
+              <assertions>true</assertions>
+              <subresults>true</subresults>
+              <responseData>false</responseData>
+              <samplerData>false</samplerData>
+              <xml>false</xml>
+              <fieldNames>true</fieldNames>
+              <responseHeaders>false</responseHeaders>
+              <requestHeaders>false</requestHeaders>
+              <responseDataOnError>false</responseDataOnError>
+              <saveAssertionResultsFailureMessage>true</saveAssertionResultsFailureMessage>
+              <assertionsResultsToSave>0</assertionsResultsToSave>
+              <bytes>true</bytes>
+              <sentBytes>true</sentBytes>
+              <threadCounts>true</threadCounts>
+              <idleTime>true</idleTime>
+              <connectTime>true</connectTime>
+            </value>
+          </objProp>
+          <stringProp name="filename"></stringProp>
+        </ResultCollector>
+        <hashTree/>
+        <SystemSampler guiclass="SystemSamplerGui" testclass="SystemSampler" testname="OS Process Sampler" enabled="true">
+          <boolProp name="SystemSampler.checkReturnCode">true</boolProp>
+          <stringProp name="SystemSampler.expectedReturnCode">0</stringProp>
+          <stringProp name="SystemSampler.command">fvt_scripts/edgex/sub/sub</stringProp>
+          <elementProp name="SystemSampler.arguments" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
+            <collectionProp name="Arguments.arguments">
+              <elementProp name="" elementType="Argument">
+                <stringProp name="Argument.name"></stringProp>
+                <stringProp name="Argument.value">mqtt</stringProp>
+                <stringProp name="Argument.metadata">=</stringProp>
+              </elementProp>
+              <elementProp name="" elementType="Argument">
+                <stringProp name="Argument.name"></stringProp>
+                <stringProp name="Argument.value">${mqtt_srv}</stringProp>
+                <stringProp name="Argument.metadata">=</stringProp>
+              </elementProp>
+            </collectionProp>
+          </elementProp>
+          <elementProp name="SystemSampler.environment" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
+            <collectionProp name="Arguments.arguments"/>
+          </elementProp>
+          <stringProp name="SystemSampler.directory">${__property(fvt,,)}</stringProp>
+        </SystemSampler>
+        <hashTree>
+          <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="device Assertion" enabled="true">
+            <stringProp name="JSON_PATH">$.device</stringProp>
+            <stringProp name="EXPECTED_VALUE">demo1</stringProp>
+            <boolProp name="JSONVALIDATION">true</boolProp>
+            <boolProp name="EXPECT_NULL">false</boolProp>
+            <boolProp name="INVERT">false</boolProp>
+            <boolProp name="ISREGEX">false</boolProp>
+          </JSONPathAssertion>
+          <hashTree/>
+          <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="reading_0 Assertion" enabled="true">
+            <stringProp name="JSON_PATH">$.readings[0].value</stringProp>
+            <stringProp name="EXPECTED_VALUE">30</stringProp>
+            <boolProp name="JSONVALIDATION">false</boolProp>
+            <boolProp name="EXPECT_NULL">false</boolProp>
+            <boolProp name="INVERT">false</boolProp>
+            <boolProp name="ISREGEX">false</boolProp>
+          </JSONPathAssertion>
+          <hashTree/>
+          <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="reading_1 Assertion" enabled="true">
+            <stringProp name="JSON_PATH">$.readings[1].value</stringProp>
+            <stringProp name="EXPECTED_VALUE">20</stringProp>
+            <boolProp name="JSONVALIDATION">false</boolProp>
+            <boolProp name="EXPECT_NULL">false</boolProp>
+            <boolProp name="INVERT">false</boolProp>
+            <boolProp name="ISREGEX">false</boolProp>
+          </JSONPathAssertion>
+          <hashTree/>
+          <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="reading_2 Assertion" enabled="true">
+            <stringProp name="JSON_PATH">$.readings[2].value</stringProp>
+            <stringProp name="EXPECTED_VALUE">20</stringProp>
+            <boolProp name="JSONVALIDATION">false</boolProp>
+            <boolProp name="EXPECT_NULL">false</boolProp>
+            <boolProp name="INVERT">false</boolProp>
+            <boolProp name="ISREGEX">false</boolProp>
+          </JSONPathAssertion>
+          <hashTree/>
+        </hashTree>
+      </hashTree>
+    </hashTree>
+  </hashTree>
+</jmeterTestPlan>

+ 4 - 0
fvt_scripts/run_jmeter.sh

@@ -79,4 +79,8 @@ if test $with_edgex = true; then
   
   
   /opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t fvt_scripts/select_edgex_meta_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/select_edgex_meta_rule.jtl -j jmeter_logs/select_edgex_meta_rule.log
   /opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t fvt_scripts/select_edgex_meta_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/select_edgex_meta_rule.jtl -j jmeter_logs/select_edgex_meta_rule.log
   echo -e "---------------------------------------------\n"
   echo -e "---------------------------------------------\n"
+
+  /opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t fvt_scripts/edgex_mqtt_sink_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/edgex_mqtt_sink_rule.jtl -j jmeter_logs/edgex_mqtt_sink_rule.log
+  echo -e "---------------------------------------------\n"
+
 fi
 fi

+ 1 - 1
go.mod

@@ -5,7 +5,7 @@ require (
 	github.com/buger/jsonparser v0.0.0-20191004114745-ee4c978eae7e
 	github.com/buger/jsonparser v0.0.0-20191004114745-ee4c978eae7e
 	github.com/eclipse/paho.mqtt.golang v1.2.0
 	github.com/eclipse/paho.mqtt.golang v1.2.0
 	github.com/edgexfoundry/go-mod-core-contracts v0.1.53
 	github.com/edgexfoundry/go-mod-core-contracts v0.1.53
-	github.com/edgexfoundry/go-mod-messaging v0.1.16
+	github.com/edgexfoundry/go-mod-messaging v0.1.18
 	github.com/go-yaml/yaml v2.1.0+incompatible
 	github.com/go-yaml/yaml v2.1.0+incompatible
 	github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3
 	github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3
 	github.com/google/uuid v1.1.1
 	github.com/google/uuid v1.1.1

+ 11 - 0
xsql/ast.go

@@ -1,6 +1,7 @@
 package xsql
 package xsql
 
 
 import (
 import (
+	"encoding/json"
 	"fmt"
 	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/plugins"
 	"github.com/emqx/kuiper/plugins"
@@ -310,6 +311,16 @@ type StreamField struct {
 	FieldType
 	FieldType
 }
 }
 
 
+func (u *StreamField) MarshalJSON() ([]byte, error) {
+	return json.Marshal(&struct {
+		FieldType string
+		Name      string
+	}{
+		FieldType: PrintFieldType(u.FieldType),
+		Name:      u.Name,
+	})
+}
+
 type StreamFields []StreamField
 type StreamFields []StreamField
 
 
 func (sf StreamFields) node() {}
 func (sf StreamFields) node() {}

+ 4 - 2
xsql/processors/xsql_processor.go

@@ -71,7 +71,9 @@ func (p *StreamProcessor) execCreateStream(stmt *xsql.StreamStmt, statement stri
 	if err != nil {
 	if err != nil {
 		return "", fmt.Errorf("Create stream fails: %v.", err)
 		return "", fmt.Errorf("Create stream fails: %v.", err)
 	} else {
 	} else {
-		return fmt.Sprintf("Stream %s is created.", stmt.Name), nil
+		info := fmt.Sprintf("Stream %s is created.", stmt.Name)
+		log.Printf("%s", info)
+		return info, nil
 	}
 	}
 }
 }
 
 
@@ -110,7 +112,7 @@ func (p *StreamProcessor) execDescribeStream(stmt *xsql.DescribeStreamStatement)
 	buff.WriteString("Fields\n--------------------------------------------------------------------------------\n")
 	buff.WriteString("Fields\n--------------------------------------------------------------------------------\n")
 	for _, f := range streamStmt.StreamFields {
 	for _, f := range streamStmt.StreamFields {
 		buff.WriteString(f.Name + "\t")
 		buff.WriteString(f.Name + "\t")
-		xsql.PrintFieldType(f.FieldType, &buff)
+		buff.WriteString(xsql.PrintFieldType(f.FieldType))
 		buff.WriteString("\n")
 		buff.WriteString("\n")
 	}
 	}
 	buff.WriteString("\n")
 	buff.WriteString("\n")

+ 11 - 12
xsql/util.go

@@ -1,36 +1,35 @@
 package xsql
 package xsql
 
 
 import (
 import (
-	"bytes"
 	"strings"
 	"strings"
 )
 )
 
 
-func PrintFieldType(ft FieldType, buff *bytes.Buffer) {
+func PrintFieldType(ft FieldType) (result string) {
 	switch t := ft.(type) {
 	switch t := ft.(type) {
 	case *BasicType:
 	case *BasicType:
-		buff.WriteString(t.Type.String())
+		result = t.Type.String()
 	case *ArrayType:
 	case *ArrayType:
-		buff.WriteString("array(")
+		result = "array("
 		if t.FieldType != nil {
 		if t.FieldType != nil {
-			PrintFieldType(t.FieldType, buff)
+			result += PrintFieldType(t.FieldType)
 		} else {
 		} else {
-			buff.WriteString(t.Type.String())
+			result += t.Type.String()
 		}
 		}
-		buff.WriteString(")")
+		result += ")"
 	case *RecType:
 	case *RecType:
-		buff.WriteString("struct(")
+		result = "struct("
 		isFirst := true
 		isFirst := true
 		for _, f := range t.StreamFields {
 		for _, f := range t.StreamFields {
 			if isFirst {
 			if isFirst {
 				isFirst = false
 				isFirst = false
 			} else {
 			} else {
-				buff.WriteString(", ")
+				result += ", "
 			}
 			}
-			buff.WriteString(f.Name + " ")
-			PrintFieldType(f.FieldType, buff)
+			result = result + f.Name + " " + PrintFieldType(f.FieldType)
 		}
 		}
-		buff.WriteString(")")
+		result += ")"
 	}
 	}
+	return
 }
 }
 
 
 func GetStreams(stmt *SelectStatement) (result []string) {
 func GetStreams(stmt *SelectStatement) (result []string) {

+ 8 - 6
xstream/extensions/edgex_source.go

@@ -46,10 +46,9 @@ func (es *EdgexSource) Configure(device string, props map[string]interface{}) er
 	var mbusType = messaging.ZeroMQ
 	var mbusType = messaging.ZeroMQ
 	if t, ok := props["type"]; ok {
 	if t, ok := props["type"]; ok {
 		mbusType = t.(string)
 		mbusType = t.(string)
-	}
-
-	if messaging.ZeroMQ != strings.ToLower(mbusType) {
-		mbusType = messaging.MQTT
+		if mbusType != messaging.ZeroMQ && mbusType != messaging.MQTT {
+			return fmt.Errorf("Specified wrong message type value %s, will use zeromq messagebus.\n", mbusType)
+		}
 	}
 	}
 
 
 	if serviceServer, ok := props["serviceServer"]; ok {
 	if serviceServer, ok := props["serviceServer"]; ok {
@@ -61,7 +60,7 @@ func (es *EdgexSource) Configure(device string, props map[string]interface{}) er
 		return fmt.Errorf("The service server cannot be empty.")
 		return fmt.Errorf("The service server cannot be empty.")
 	}
 	}
 
 
-	mbconf := types.MessageBusConfig{SubscribeHost: types.HostInfo{Protocol: protocol, Host: server, Port: port}, Type: messaging.ZeroMQ}
+	mbconf := types.MessageBusConfig{SubscribeHost: types.HostInfo{Protocol: protocol, Host: server, Port: port}, Type: mbusType}
 	common.Log.Infof("Use configuration for edgex messagebus %v\n", mbconf)
 	common.Log.Infof("Use configuration for edgex messagebus %v\n", mbconf)
 
 
 	var optional = make(map[string]string)
 	var optional = make(map[string]string)
@@ -88,7 +87,10 @@ func (es *EdgexSource) Configure(device string, props map[string]interface{}) er
 func (es *EdgexSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
 func (es *EdgexSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
 	log := ctx.GetLogger()
 	log := ctx.GetLogger()
 	if err := es.client.Connect(); err != nil {
 	if err := es.client.Connect(); err != nil {
-		errCh <- fmt.Errorf("Failed to connect to edgex message bus: " + err.Error())
+		info := fmt.Errorf("Failed to connect to edgex message bus: " + err.Error())
+		log.Errorf(info.Error())
+		errCh <- info
+		return
 	}
 	}
 	log.Infof("The connection to edgex messagebus is established successfully.")
 	log.Infof("The connection to edgex messagebus is established successfully.")
 	messages := make(chan types.MessageEnvelope)
 	messages := make(chan types.MessageEnvelope)

+ 2 - 0
xstream/nodes/sink_node.go

@@ -220,6 +220,8 @@ func doGetSink(name string, action map[string]interface{}) (api.Sink, error) {
 		s = &sinks.MQTTSink{}
 		s = &sinks.MQTTSink{}
 	case "rest":
 	case "rest":
 		s = &sinks.RestSink{}
 		s = &sinks.RestSink{}
+	case "nop":
+		s = &sinks.NopSink{}
 	default:
 	default:
 		nf, err := plugins.GetPlugin(name, plugins.SINK)
 		nf, err := plugins.GetPlugin(name, plugins.SINK)
 		if err != nil {
 		if err != nil {

+ 0 - 1
xstream/server/server/rest.go

@@ -131,7 +131,6 @@ func streamHandler(w http.ResponseWriter, r *http.Request) {
 			handleError(w, fmt.Errorf("describe stream error: %s", err), http.StatusBadRequest, logger)
 			handleError(w, fmt.Errorf("describe stream error: %s", err), http.StatusBadRequest, logger)
 			return
 			return
 		}
 		}
-		//TODO format data type
 		jsonResponse(content, w, logger)
 		jsonResponse(content, w, logger)
 	case http.MethodDelete:
 	case http.MethodDelete:
 		content, err := streamProcessor.DropStream(name)
 		content, err := streamProcessor.DropStream(name)

+ 3 - 1
xstream/server/server/server.go

@@ -67,7 +67,9 @@ func StartUp(Version string) {
 	// Listen to TPC connections on port 1234
 	// Listen to TPC connections on port 1234
 	listener, e := net.Listen("tcp", fmt.Sprintf(":%d", common.Config.Port))
 	listener, e := net.Listen("tcp", fmt.Sprintf(":%d", common.Config.Port))
 	if e != nil {
 	if e != nil {
-		logger.Fatal("Listen error: ", e)
+		m := fmt.Sprintf("Listen error: %s", e)
+		fmt.Printf(m)
+		logger.Fatal(m)
 	}
 	}
 
 
 	if common.Config.Prometheus {
 	if common.Config.Prometheus {

+ 20 - 17
xstream/sinks/edgex_sink.go

@@ -26,16 +26,10 @@ type EdgexMsgBusSink struct {
 	deviceName string
 	deviceName string
 	metadata   string
 	metadata   string
 
 
-	optional *OptionalConf
+	optional map[string]string
 	client   messaging.MessageClient
 	client   messaging.MessageClient
 }
 }
 
 
-type OptionalConf struct {
-	clientid string
-	username string
-	password string
-}
-
 func (ems *EdgexMsgBusSink) Configure(ps map[string]interface{}) error {
 func (ems *EdgexMsgBusSink) Configure(ps map[string]interface{}) error {
 	ems.host = "*"
 	ems.host = "*"
 	ems.protocol = "tcp"
 	ems.protocol = "tcp"
@@ -81,6 +75,14 @@ func (ems *EdgexMsgBusSink) Configure(ps map[string]interface{}) error {
 		common.Log.Infof("Not find contentType conf, will use default value 'application/json'.")
 		common.Log.Infof("Not find contentType conf, will use default value 'application/json'.")
 	}
 	}
 
 
+	if ptype, ok := ps["type"]; ok {
+		ems.ptype = ptype.(string)
+		if ems.ptype != messaging.ZeroMQ && ems.ptype != messaging.MQTT {
+			common.Log.Infof("Specified wrong message type value %s, will use zeromq messagebus.\n", ems.ptype)
+			ems.ptype = messaging.ZeroMQ
+		}
+	}
+
 	if dname, ok := ps["deviceName"]; ok {
 	if dname, ok := ps["deviceName"]; ok {
 		ems.deviceName = dname.(string)
 		ems.deviceName = dname.(string)
 	}
 	}
@@ -91,17 +93,17 @@ func (ems *EdgexMsgBusSink) Configure(ps map[string]interface{}) error {
 
 
 	if optIntf, ok := ps["optional"]; ok {
 	if optIntf, ok := ps["optional"]; ok {
 		if opt, ok1 := optIntf.(map[string]interface{}); ok1 {
 		if opt, ok1 := optIntf.(map[string]interface{}); ok1 {
-			optional := &OptionalConf{}
-			ems.optional = optional
-			if cid, ok2 := opt["clientid"]; ok2 {
-				optional.clientid = cid.(string)
-			}
-			if uname, ok2 := opt["username"]; ok2 {
-				optional.username = uname.(string)
-			}
-			if password, ok2 := opt["password"]; ok2 {
-				optional.password = password.(string)
+			optional := make(map[string]string)
+			for k, v := range opt {
+				if sv, ok2 := v.(string); ok2 {
+					optional[k] = sv
+				} else {
+					info := fmt.Sprintf("Only string value is allowed for optional value, the value for key %s is not a string.", k)
+					common.Log.Infof(info)
+					return fmt.Errorf(info)
+				}
 			}
 			}
+			ems.optional = optional
 		}
 		}
 	}
 	}
 	return nil
 	return nil
@@ -116,6 +118,7 @@ func (ems *EdgexMsgBusSink) Open(ctx api.StreamContext) error {
 			Protocol: ems.protocol,
 			Protocol: ems.protocol,
 		},
 		},
 		Type: ems.ptype,
 		Type: ems.ptype,
+		Optional: ems.optional,
 	}
 	}
 	log.Infof("Using configuration for EdgeX message bus sink: %+v", conf)
 	log.Infof("Using configuration for EdgeX message bus sink: %+v", conf)
 	if msgClient, err := messaging.NewMessageClient(conf); err != nil {
 	if msgClient, err := messaging.NewMessageClient(conf); err != nil {

+ 35 - 0
xstream/sinks/nop_sink.go

@@ -0,0 +1,35 @@
+package sinks
+
+import (
+	"github.com/emqx/kuiper/xstream/api"
+)
+
+type NopSink struct {
+	log  bool
+}
+
+func (ns *NopSink) Configure(ps map[string]interface{}) error {
+	var log = false
+	l, ok := ps["log"]
+	if ok {
+		log = l.(bool)
+	}
+	ns.log = log
+	return nil
+}
+
+func (ns *NopSink) Open(ctx api.StreamContext) error {
+	return nil
+}
+
+func (ns *NopSink) Collect(ctx api.StreamContext, item interface{}) error {
+	logger := ctx.GetLogger()
+	if ns.log {
+		logger.Infof("%s", item)
+	}
+	return nil
+}
+
+func (ns *NopSink) Close(ctx api.StreamContext) error {
+	return nil
+}