Bladeren bron

feat(io): Support source/sink share connection

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>
Jianxiang Ran 3 jaren geleden
bovenliggende
commit
69151c92c4

+ 1 - 0
docs/en_US/rules/sinks/edgex.md

@@ -12,6 +12,7 @@ The action is used for publishing output message into EdgeX message bus.
 | protocol      | true     | The protocol. If it's not specified, then use default value ``redis``. |
 | host          | true     | The host of message bus. If not specified, then use default value ``localhost``. |
 | port          | true     | The port of message bus. If not specified, then use default value ``6379``. |
+| connectionSelector | true     | reuse the connection to EdgeX message bus. [more info](../sources/edgex.md#connectionselector)
 | topic         | true     | The topic to be published. The topic is static across all messages. To use dynamic topic, leave this empty and specify the topicPrefix property. Only one of the topic and topicPrefix properties can be specified. If both are not specified, then use default topic value ``application``. |
 | topicPrefix         | true     | The prefix of a dynamic topic to be published. The topic will become a concatenation of `$topicPrefix/$profileName/$deviceName/$sourceName`. |
 | contentType   | true     | The content type of message to be published. If not specified, then use the default value ``application/json``. |

+ 1 - 0
docs/en_US/rules/sinks/mqtt.md

@@ -15,6 +15,7 @@ The action is used for publish output message into an MQTT server.
 | privateKeyPath     | true     | The private key path. It can be either absolute path, or relative path, which is similar to use of certificationPath. |
 | insecureSkipVerify | true     | If InsecureSkipVerify is `true`, TLS accepts any certificate presented by the server and any host name in that certificate.  In this mode, TLS is susceptible to man-in-the-middle attacks. The default value is `false`. The configuration item can only be used with TLS connections. |
 | retained           | true     | If retained is `true`,The broker stores the last retained message and the corresponding QoS for that topic.The default value is `false`.
+| connectionSelector | true     | reuse the connection to mqtt broker. [more info](../sources/mqtt.md#connectionselector)
 
 Below is sample configuration for connecting to Azure IoT Hub by using SAS authentication.
 ```json

+ 43 - 0
docs/en_US/rules/sources/edgex.md

@@ -87,6 +87,49 @@ The server address of  EdgeX message bus, default value is ``localhost``.
 
 The port of EdgeX message bus, default value is ``5573``.
 
+### connectionSelector
+
+specify the stream to reuse the connection to EdgeX message bus. The connection profile located in ``connection.yaml``.
+```yaml
+mqtt:
+  mqtt_conf1: #connection key
+    servers: [tcp://127.0.0.1:1883]
+    username: ekuiper
+    password: password
+    #certificationPath: /var/kuiper/xyz-certificate.pem
+    #privateKeyPath: /var/kuiper/xyz-private.pem.key
+    #insecureSkipVerify: false
+    #protocolVersion: 3
+    clientid: ekuiper
+  mqtt_conf2: #connection key
+    servers: ["tcp://127.0.0.1:1883"]
+
+edgex:
+  edgex_conf1: #connection key
+    protocol: redis
+    server: 127.0.0.1
+    port: 6379
+    type: redis
+```
+There is one configuration group for EdgeX message bus in the example, user need use ``edgex.edgex_conf1`` as the selector.
+For example
+```yaml
+#Global Edgex configurations
+default:
+  protocol: tcp
+  server: localhost
+  port: 5573
+  connectionSelector: edgex.edgex_conf1
+  topic: events
+  messageType: event
+  #  optional:
+  #    ClientId: client1
+  #    Username: user1
+  #    Password: password
+```
+*Note*: once specify the connectionSelector in specific configuration group , all connection related parameters will be ignored , in this case ``protocol: tcp | server: localhost | port: 5573``
+
+
 ### topic
 
 The topic name of EdgeX message bus, default value is ``events``. Users can subscribe to the topics of message bus

+ 40 - 0
docs/en_US/rules/sources/mqtt.md

@@ -11,6 +11,7 @@ default:
   #password: password
   #certificationPath: /var/kuiper/xyz-certificate.pem
   #privateKeyPath: /var/kuiper/xyz-private.pem.key
+  #connectionSelector: mqtt.mqtt_conf1
 
 
 #Override the global configurations
@@ -48,6 +49,45 @@ The location of certification path. It can be an absolute path, or a relative pa
 
 The location of private key path. It can be an absolute path, or a relative path.  For more detailed information, please refer to ``certificationPath``. Such as ``d3807d9fa5-private.pem.key``.
 
+### connectionSelector
+
+specify the stream to reuse the connection to mqtt broker. The connection profile located in ``connection.yaml``.
+```yaml
+mqtt:
+  mqtt_conf1: #connection key
+    servers: [tcp://127.0.0.1:1883]
+    username: ekuiper
+    password: password
+    #certificationPath: /var/kuiper/xyz-certificate.pem
+    #privateKeyPath: /var/kuiper/xyz-private.pem.ke
+    #insecureSkipVerify: false
+    #protocolVersion: 3
+    clientid: ekuiper
+  mqtt_conf2: #connection key
+    servers: ["tcp://127.0.0.1:1883"]
+
+edgex:
+  edgex_conf1: #connection key
+    protocol: redis
+    server: 127.0.0.1
+    port: 6379
+    type: redis
+```
+There are two configuration groups for mqtt in the example, user need use ``mqtt.mqtt_conf1`` or ``mqtt.mqtt_conf2`` as the selector.
+For example
+```yaml
+#Global MQTT configurations
+default:
+  qos: 1
+  servers: [tcp://127.0.0.1:1883]
+  #username: user1
+  #password: password
+  #certificationPath: /var/kuiper/xyz-certificate.pem
+  #privateKeyPath: /var/kuiper/xyz-private.pem.key
+  connectionSelector: mqtt.mqtt_conf1
+```
+*Note*: once specify the connectionSelector in specific configuration group , all connection related parameters will be ignored , in this case ``servers: [tcp://127.0.0.1:1883]``
+
 ### bufferLength
 
 specify the maximum number of messages to be buffered in the memory. This is used to avoid the extra large memory usage that would cause out of memory error. Notice that the memory usage will be varied to the actual buffer. Increase the length here won't increase the initial memory allocation so it is safe to set a large buffer length. The default value is 102400, that is if each payload size is about 100 bytes, the maximum buffer size will be about 102400 * 100B ~= 10MB.

+ 1 - 0
docs/zh_CN/rules/sinks/edgex.md

@@ -12,6 +12,7 @@
 | protocol    | 是     | 协议,如未指定,使用缺省值 `tcp` 。  |
 | host        | 是    | 消息总线主机地址,使用缺省值 `*` 。                    |
 | port        | 是    | 消息总线端口号。 如未指定,使用缺省值 `5563` 。              |
+| connectionSelector | 是     | 复用到 EdgeX 消息总线的连接,详细信息,[请参考](../sources/edgex.md#connectionselector)
 | topic       | 是    | 发布的主题名称。该主题为固定值。若不同的消息需要动态指定主题,则将该属性置空,并设置 topicPrefix 属性。这两个属性只能设置一个。若两者都未设置,则使用缺省主题 `application` 。          |
 | topicPrefix | 是     | 发布的主题的前缀。发送的主题将采用动态拼接,格式为`$topicPrefix/$profileName/$deviceName/$sourceName` 。|
 | contentType | 是    | 发布消息的内容类型,如未指定,使用缺省值 `application/json` 。|

+ 3 - 3
docs/zh_CN/rules/sinks/mqtt.md

@@ -13,9 +13,9 @@
 | password          | 是    | 连接密码                             |
 | certificationPath | 是    | 证书路径。可以为绝对路径,也可以为相对路径。如果指定的是相对路径,那么父目录为执行 `kuiperd` 命令的路径。比如,如果你在 `/var/kuiper` 中运行 `bin/kuiperd` ,那么父目录为 `/var/kuiper`; 如果运行从 `/var/kuiper/bin` 中运行`./kuiperd`,那么父目录为 `/var/kuiper/bin`。 |
 | privateKeyPath    | 是    | 私钥路径。可以为绝对路径,也可以为相对路径,相对路径的用法与 `certificationPath` 类似。 |
-| insecureSkipVerify | true     | 如果 InsecureSkipVerify 设置为 `true`, TLS接受服务器提供的任何证书以及该证书中的任何主机名。 在这种模式下,TLS容易受到中间人攻击。默认值为`false`。配置项只能用于TLS连接。|
-| retained           | true     | 如果 retained 设置为 `true`,Broker会存储每个Topic的最后一条保留消息及其Qos。默认值是 `false`   
-
+| insecureSkipVerify |      | 如果 InsecureSkipVerify 设置为 `true`, TLS接受服务器提供的任何证书以及该证书中的任何主机名。 在这种模式下,TLS容易受到中间人攻击。默认值为`false`。配置项只能用于TLS连接。|
+| retained           |      | 如果 retained 设置为 `true`,Broker会存储每个Topic的最后一条保留消息及其Qos。默认值是 `false`   
+| connectionSelector | 是     | 复用到 MQTT Broker 的连接,详细信息,[请参考](../sources/mqtt.md#connectionselector)
 以下为使用 SAS 连接到 Azure IoT Hub 的样例。
 ```json
     {

+ 38 - 0
docs/zh_CN/rules/sources/edgex.md

@@ -87,6 +87,44 @@ EdgeX 消息总线的地址,缺省为 `localhost`
 
 EdgeX 消息总线的端口,缺省为 `5573`
 
+## connectionSelector
+
+复用 EdgeX 源连接。连接配置信息位于 ``connection.yaml``.
+```yaml
+mqtt:
+  mqtt_conf1: #connection key
+    servers: [tcp://127.0.0.1:1883]
+    username: ekuiper
+    password: password
+    #certificationPath: /var/kuiper/xyz-certificate.pem
+    #privateKeyPath: /var/kuiper/xyz-private.pem.key
+    #insecureSkipVerify: false
+    #protocolVersion: 3
+    clientid: ekuiper
+  mqtt_conf2: #connection key
+    servers: ["tcp://127.0.0.1:1883"]
+
+edgex:
+  edgex_conf1: #connection key
+    protocol: redis
+    server: 127.0.0.1
+    port: 6379
+    type: redis
+```
+对于 EdgeX 连接,这里有一个配置组。用户应该使用 ``edgex.edgex_conf1`` 来作为参数。举例如下:
+```yaml
+#Global Edgex configurations
+default:
+  protocol: tcp
+  server: localhost
+  port: 5573
+  connectionSelector: edgex.edgex_conf1
+  topic: events
+  messageType: event
+```
+*注意*: 相应配置组一旦指定 connectionSelector 参数,所有关于连接的参数都会被忽略. 上面例子中,`` protocol: tcp | server: localhost | port: 5573`` 会被忽略。
+
+
 ## topic
 
 EdgeX 消息总线上监听的主题名称,缺省为 `events`。用户可以直接连接到 EdgeX 消息总线上的主题也可以连接到 application service 暴露的主题。需要注意的是,两种主题的消息数据类型不同,需要设置正确的

+ 38 - 0
docs/zh_CN/rules/sources/mqtt.md

@@ -49,6 +49,44 @@ MQTT 连接密码。如果指定了 `certificationPath` 或者 `privateKeyPath`
 
 私钥路径。可以为绝对路径,也可以为相对路径。更详细的信息,请参考 `certificationPath`,比如 `d3807d9fa5-private.pem.key`。
 
+### connectionSelector
+
+复用 MQTT 源连接。连接配置信息位于 ``connection.yaml``.
+```yaml
+mqtt:
+  mqtt_conf1: #connection key
+    servers: [tcp://127.0.0.1:1883]
+    username: ekuiper
+    password: password
+    #certificationPath: /var/kuiper/xyz-certificate.pem
+    #privateKeyPath: /var/kuiper/xyz-private.pem.ke
+    #insecureSkipVerify: false
+    #protocolVersion: 3
+    clientid: ekuiper
+  mqtt_conf2: #connection key
+    servers: ["tcp://127.0.0.1:1883"]
+
+edgex:
+  edgex_conf1: #connection key
+    protocol: redis
+    server: 127.0.0.1
+    port: 6379
+    type: redis
+```
+对于 MQTT 连接,这里有两个配置组。用户应该使用 ``mqtt.mqtt_conf1`` 或者 ``mqtt.mqtt_conf2`` 来作为参数。举例如下:
+```yaml
+#Global MQTT configurations
+default:
+  qos: 1
+  servers: [tcp://127.0.0.1:1883]
+  #username: user1
+  #password: password
+  #certificationPath: /var/kuiper/xyz-certificate.pem
+  #privateKeyPath: /var/kuiper/xyz-private.pem.key
+  connectionSelector: mqtt.mqtt_conf
+```
+*注意*: 相应配置组一旦指定 connectionSelector 参数,所有关于连接的参数都会被忽略. 上面例子中,`` servers: [tcp://127.0.0.1:1883]`` 会被忽略。
+
 ### bufferLength
 
 指定最大缓存消息数目。该参数主要用于防止内存溢出。实际内存用量会根据当前缓存消息数目动态变化。增大该参数不会增加初始内存分配量,因此设置较大的数值是安全的。该参数默认值为102400;如果每条消息为100字节,则默认情况下,缓存最大占用内存量为102400 * 100B ~= 10MB. 

+ 34 - 0
etc/connection.yaml

@@ -0,0 +1,34 @@
+mqtt:
+  mqtt_conf1: #connection key
+    servers: [tcp://127.0.0.1:1883]
+    username: ekuiper
+    password: password
+    #certificationPath: /var/kuiper/xyz-certificate.pem
+    #privateKeyPath: /var/kuiper/xyz-private.pem.ke
+    #insecureSkipVerify: false
+    #protocolVersion: 3
+    clientid: ekuiper
+  mqtt_conf2: #connection key
+    servers: ["tcp://127.0.0.1:1883"]
+
+edgex:
+  edgex_conf1: #connection key
+    protocol: redis
+    server: 127.0.0.1
+    port: 6379
+    type: redis
+    #  Below is optional configurations settings for mqtt
+    #  type: mqtt
+    #  optional:
+    #    ClientId: client1
+    #    Username: user1
+    #    Password: password
+    #    Qos: 1
+    #    KeepAlive: 5000
+    #    Retained: true/false
+    #    ConnectionPayload:
+    #    CertFile:
+    #    KeyFile:
+    #    CertPEMBlock:
+    #    KeyPEMBlock:
+    #    SkipCertVerify: true/false

+ 1 - 0
etc/mqtt_source.yaml

@@ -6,6 +6,7 @@ default:
   #password: password
   #certificationPath: /var/kuiper/xyz-certificate.pem
   #privateKeyPath: /var/kuiper/xyz-private.pem.key
+  #connectionSelector: mqtt.mqtt_conf1
   #kubeedgeVersion: 
   #kubeedgeModelFile: ""
 

+ 1 - 0
go.mod

@@ -32,6 +32,7 @@ require (
 	github.com/msgpack-rpc/msgpack-rpc-go v0.0.0-20131026060856-c76397e1782b
 	github.com/msgpack/msgpack-go v0.0.0-20130625150338-8224460e6fa3 // indirect
 	github.com/pebbe/zmq4 v1.2.7
+	github.com/pkg/errors v0.8.1 // indirect
 	github.com/prometheus/client_golang v1.2.1
 	github.com/sirupsen/logrus v1.4.2
 	github.com/smartystreets/goconvey v1.6.4 // indirect

+ 108 - 0
internal/topo/connection/client_edgex.go

@@ -0,0 +1,108 @@
+// Copyright 2021 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build edgex
+
+package connection
+
+import (
+	"fmt"
+	"github.com/edgexfoundry/go-mod-messaging/v2/messaging"
+	"github.com/edgexfoundry/go-mod-messaging/v2/pkg/types"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/pkg/cast"
+)
+
+func init() {
+	registerClientFactory("edgex", func(s *ConSelector) Client {
+		return &EdgexClient{selector: s}
+	})
+}
+
+type EdgexClient struct {
+	selector *ConSelector
+	mbconf   types.MessageBusConfig
+	client   messaging.MessageClient
+}
+
+type EdgexConf struct {
+	Protocol string            `json:"protocol"`
+	Server   string            `json:"server"`
+	Port     int               `json:"port"`
+	Type     string            `json:"type"`
+	Optional map[string]string `json:"optional"`
+}
+
+func (es *EdgexClient) CfgValidate(props map[string]interface{}) error {
+
+	c := &EdgexConf{}
+	err := cast.MapToStructStrict(props, c)
+	if err != nil {
+		return fmt.Errorf("read properties %v fail for connection selector %s with error: %v", props, es.selector.ConnSelectorCfg, err)
+	}
+
+	if c.Server == "" {
+		return fmt.Errorf("missing server property for connection selector %s", es.selector.ConnSelectorCfg)
+	}
+
+	if c.Port == 0 {
+		return fmt.Errorf("missing port property for connection selector %s", es.selector.ConnSelectorCfg)
+	}
+
+	if c.Type != messaging.ZeroMQ && c.Type != messaging.MQTT && c.Type != messaging.Redis {
+		return fmt.Errorf("specified wrong type value %s for connection selector %s", c.Type, es.selector.ConnSelectorCfg)
+	}
+
+	mbconf := types.MessageBusConfig{
+		SubscribeHost: types.HostInfo{
+			Protocol: c.Protocol,
+			Host:     c.Server,
+			Port:     c.Port,
+		},
+		PublishHost: types.HostInfo{
+			Host:     c.Server,
+			Port:     c.Port,
+			Protocol: c.Protocol,
+		},
+		Type: c.Type}
+	mbconf.Optional = c.Optional
+	es.mbconf = mbconf
+
+	return nil
+}
+
+func (es *EdgexClient) GetClient() (interface{}, error) {
+
+	client, err := messaging.NewMessageClient(es.mbconf)
+	if err != nil {
+		return nil, err
+	}
+
+	if err := client.Connect(); err != nil {
+		conf.Log.Errorf("The connection to edgex messagebus failed for connection selector : %s.", es.selector.ConnSelectorCfg)
+		return nil, fmt.Errorf("Failed to connect to edgex message bus: " + err.Error())
+	}
+	conf.Log.Infof("The connection to edgex messagebus is established successfully for connection selector : %s.", es.selector.ConnSelectorCfg)
+
+	es.client = client
+	return client, nil
+}
+
+func (es *EdgexClient) CloseClient() error {
+	conf.Log.Infof("Closing the connection to edgex messagebus for connection selector : %s.", es.selector.ConnSelectorCfg)
+	if e := es.client.Disconnect(); e != nil {
+		return e
+	}
+	return nil
+}

+ 129 - 0
internal/topo/connection/client_edgex_test.go

@@ -0,0 +1,129 @@
+// Copyright 2021 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build edgex
+
+package connection
+
+import (
+	"github.com/edgexfoundry/go-mod-messaging/v2/pkg/types"
+	"testing"
+)
+
+func TestEdgex_CfgValidate(t *testing.T) {
+	type fields struct {
+		mbconf types.MessageBusConfig
+	}
+	type args struct {
+		props map[string]interface{}
+	}
+	tests := []struct {
+		name    string
+		fields  fields
+		args    args
+		wantErr bool
+	}{
+		{
+			name:   "config pass",
+			fields: fields{},
+			args: args{props: map[string]interface{}{
+				"protocol": "tcp",
+				"server":   "127.0.0.1",
+				"port":     int64(1883),
+				"type":     "mqtt",
+				"optional": map[string]string{
+					"ClientId": "client1",
+					"Username": "user1",
+				},
+			}},
+			wantErr: false,
+		},
+		{
+			name:   "config not case sensitive",
+			fields: fields{},
+			args: args{props: map[string]interface{}{
+				"Protocol": "tcp",
+				"server":   "127.0.0.1",
+				"Port":     1883,
+				"type":     "mqtt",
+				"optional": map[string]string{
+					"ClientId": "client1",
+					"Username": "user1",
+				},
+			}},
+
+			wantErr: false,
+		},
+		{
+			name:   "have unwanted config items topic",
+			fields: fields{},
+			args: args{props: map[string]interface{}{
+				"protocol": "tcp",
+				"server":   "127.0.0.1",
+				"port":     1883,
+				"type":     "mqtt",
+				"optional": map[string]string{
+					"ClientId": "client1",
+					"Username": "user1",
+				},
+				"topic": "demo",
+			}},
+
+			wantErr: true,
+		},
+		{
+			name:   "config type not in zero/mqtt/redis ",
+			fields: fields{},
+			args: args{props: map[string]interface{}{
+				"protocol": "tcp",
+				"server":   "127.0.0.1",
+				"port":     1883,
+				"type":     "kafka",
+				"optional": map[string]string{
+					"ClientId": "client1",
+					"Username": "user1",
+				},
+			}},
+
+			wantErr: true,
+		},
+		{
+			name:   "do not have enough config items ",
+			fields: fields{},
+			args: args{props: map[string]interface{}{
+				"protocol": "tcp",
+				"type":     "mqtt",
+				"optional": map[string]string{
+					"ClientId": "client1",
+					"Username": "user1",
+				},
+			}},
+
+			wantErr: true,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			es := &EdgexClient{
+				selector: &ConSelector{
+					ConnSelectorCfg: "testSelector",
+				},
+				mbconf: tt.fields.mbconf,
+			}
+			if err := es.CfgValidate(tt.args.props); (err != nil) != tt.wantErr {
+				t.Errorf("CfgValidate() error = %v, wantErr %v", err, tt.wantErr)
+			}
+		})
+	}
+}

+ 131 - 0
internal/topo/connection/client_mqtt.go

@@ -0,0 +1,131 @@
+package connection
+
+import (
+	"crypto/tls"
+	"fmt"
+	MQTT "github.com/eclipse/paho.mqtt.golang"
+	"github.com/google/uuid"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/pkg/cast"
+	"strings"
+)
+
+func init() {
+	registerClientFactory("mqtt", func(s *ConSelector) Client {
+		return &MQTTClient{selector: s}
+	})
+}
+
+type MQTTConnectionConfig struct {
+	Servers            []string `json:"servers"`
+	PVersion           string   `json:"protocolVersion"`
+	ClientId           string   `json:"clientid"`
+	Uname              string   `json:"username"`
+	Password           string   `json:"password"`
+	Certification      string   `json:"certificationPath"`
+	PrivateKPath       string   `json:"privateKeyPath"`
+	InsecureSkipVerify bool     `json:"insecureSkipVerify"`
+}
+
+type MQTTClient struct {
+	srv      string
+	clientid string
+	pVersion uint
+	uName    string
+	password string
+	certPath string
+	pkeyPath string
+	Insecure bool
+
+	selector *ConSelector
+	conn     MQTT.Client
+}
+
+func (ms *MQTTClient) CfgValidate(props map[string]interface{}) error {
+
+	cfg := MQTTConnectionConfig{}
+
+	err := cast.MapToStructStrict(props, &cfg)
+	if err != nil {
+		return fmt.Errorf("failed to get config for %s, the error is %s", ms.selector.ConnSelectorCfg, err)
+	}
+
+	if srvs := cfg.Servers; srvs != nil && len(srvs) > 0 {
+		ms.srv = srvs[0]
+	} else {
+		return fmt.Errorf("missing server property for %s", ms.selector.ConnSelectorCfg)
+	}
+
+	if cfg.ClientId == "" {
+		if newUUID, err := uuid.NewUUID(); err != nil {
+			return fmt.Errorf("failed to get uuid for %s, the error is %s", ms.selector.ConnSelectorCfg, err)
+		} else {
+			ms.clientid = newUUID.String()
+		}
+	} else {
+		ms.clientid = cfg.ClientId
+	}
+
+	ms.pVersion = 3
+	if cfg.PVersion == "3.1.1" {
+		ms.pVersion = 4
+	}
+
+	if cfg.Certification != "" || cfg.PrivateKPath != "" {
+		ms.certPath, err = conf.ProcessPath(cfg.Certification)
+		if err != nil {
+			return fmt.Errorf("failed to get certPath for %s, the error is %s", ms.selector.ConnSelectorCfg, err)
+		}
+
+		ms.pkeyPath, err = conf.ProcessPath(cfg.PrivateKPath)
+		if err != nil {
+			return fmt.Errorf("failed to get keyPath for %s, the error is %s", ms.selector.ConnSelectorCfg, err)
+		}
+	}
+
+	ms.uName = cfg.Uname
+	ms.password = strings.Trim(cfg.Password, " ")
+	ms.Insecure = cfg.InsecureSkipVerify
+
+	return nil
+}
+
+func (ms *MQTTClient) GetClient() (interface{}, error) {
+
+	opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetProtocolVersion(ms.pVersion).SetCleanSession(false)
+
+	if ms.certPath != "" && ms.pkeyPath != "" {
+		if cer, err := tls.LoadX509KeyPair(ms.certPath, ms.pkeyPath); err != nil {
+			return nil, fmt.Errorf("error when load cert/key for %s, the error is: %s", ms.selector.ConnSelectorCfg, err)
+		} else {
+			opts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}, InsecureSkipVerify: ms.Insecure})
+		}
+	} else {
+		if ms.uName != "" {
+			opts = opts.SetUsername(ms.uName)
+		}
+		if ms.password != "" {
+			opts = opts.SetPassword(ms.password)
+		}
+	}
+	opts = opts.SetClientID(ms.clientid)
+	opts = opts.SetAutoReconnect(true)
+
+	c := MQTT.NewClient(opts)
+	if token := c.Connect(); token.Wait() && token.Error() != nil {
+		conf.Log.Errorf("The connection to mqtt broker failed for connection selector: %s ", ms.selector.ConnSelectorCfg)
+		return nil, fmt.Errorf("found error when connecting for connection selector %s: %s", ms.selector.ConnSelectorCfg, token.Error())
+	}
+	conf.Log.Infof("The connection to mqtt broker is established successfully for connection selector: %s.", ms.selector.ConnSelectorCfg)
+
+	ms.conn = c
+	return c, nil
+}
+
+func (ms *MQTTClient) CloseClient() error {
+	conf.Log.Infof("Closing the connection to mqtt broker for connection selector: %s", ms.selector.ConnSelectorCfg)
+	if ms.conn != nil && ms.conn.IsConnected() {
+		ms.conn.Disconnect(5000)
+	}
+	return nil
+}

+ 132 - 0
internal/topo/connection/client_mqtt_test.go

@@ -0,0 +1,132 @@
+package connection
+
+import (
+	"reflect"
+	"testing"
+)
+
+func TestMQTTClient_CfgValidate(t *testing.T) {
+
+	type args struct {
+		props map[string]interface{}
+	}
+	tests := []struct {
+		name    string
+		args    args
+		wantErr bool
+	}{
+		{
+			name: "config pass",
+			args: args{
+				props: map[string]interface{}{
+					"servers": []string{"tcp:127.0.0.1"},
+				},
+			},
+			wantErr: false,
+		},
+		{
+			name: "config are not case sensitive",
+			args: args{
+				props: map[string]interface{}{
+					"SERVERS": []string{"tcp:127.0.0.1"},
+				},
+			},
+			wantErr: false,
+		},
+		{
+			name: "config server addr key error",
+			args: args{
+				props: map[string]interface{}{
+					"server": []string{"tcp:127.0.0.1"},
+				},
+			},
+			wantErr: true,
+		},
+		{
+			name: "config have unwanted topic fields",
+			args: args{
+				props: map[string]interface{}{
+					"servers": []string{"tcp:127.0.0.1"},
+					"topic":   "demo",
+				},
+			},
+			wantErr: true,
+		},
+		{
+			name: "config no server addr",
+			args: args{
+				props: map[string]interface{}{
+					"username": "user1",
+				},
+			},
+			wantErr: true,
+		},
+		{
+			name: "config no server addr",
+			args: args{
+				props: map[string]interface{}{
+					"servers": []string{},
+				},
+			},
+			wantErr: true,
+		},
+		{
+			name: "config miss cert key file",
+			args: args{
+				props: map[string]interface{}{
+					"servers":           []string{"tcp:127.0.0.1"},
+					"certificationPath": "./not_exist.crt",
+					"privateKeyPath":    "./not_exist.key",
+				},
+			},
+			wantErr: true,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			ms := &MQTTClient{
+				selector: &ConSelector{
+					ConnSelectorCfg: "testSelector",
+				},
+			}
+			err := ms.CfgValidate(tt.args.props)
+			if (err != nil) != tt.wantErr {
+				t.Errorf("CfgValidate() error = %v, wantErr %v", err, tt.wantErr)
+			}
+		})
+	}
+}
+
+func TestMQTTClient_CfgResult(t *testing.T) {
+	props := map[string]interface{}{
+		"servers":  []string{"tcp:127.0.0.1:1883"},
+		"USERNAME": "demo",
+		"Password": "password",
+		"clientID": "clientid",
+	}
+
+	ms := &MQTTClient{
+		selector: &ConSelector{
+			ConnSelectorCfg: "testSelector",
+		},
+	}
+
+	_ = ms.CfgValidate(props)
+
+	if !reflect.DeepEqual("tcp:127.0.0.1:1883", ms.srv) {
+		t.Errorf("result mismatch:\n\n got=%#v\n\n", ms.srv)
+	}
+	if !reflect.DeepEqual("demo", ms.uName) {
+		t.Errorf("result mismatch:\n\n got=%#v\n\n", ms.uName)
+	}
+	if !reflect.DeepEqual("password", ms.password) {
+		t.Errorf("result mismatch:\n\n got=%#v\n\n", ms.password)
+	}
+	if !reflect.DeepEqual("clientid", ms.clientid) {
+		t.Errorf("result mismatch:\n\n got=%#v\n\n", ms.clientid)
+	}
+	if !reflect.DeepEqual(uint(3), ms.pVersion) {
+		t.Errorf("result mismatch:\n\n got=%#v\n\n", ms.pVersion)
+	}
+
+}

+ 60 - 0
internal/topo/connection/client_wrapper.go

@@ -0,0 +1,60 @@
+package connection
+
+type ClientFactoryFunc func(super *ConSelector) Client
+
+type Client interface {
+	CfgValidate(map[string]interface{}) error
+	GetClient() (interface{}, error)
+	CloseClient() error
+}
+
+type clientWrapper struct {
+	cli    Client
+	conn   interface{}
+	refCnt uint32
+}
+
+func NewClientWrapper(client Client, selector *ConSelector) (*clientWrapper, error) {
+	props, err := selector.ReadCfgFromYaml()
+	if err != nil {
+		return nil, err
+	}
+	err = client.CfgValidate(props)
+	if err != nil {
+		return nil, err
+	}
+	var con interface{}
+
+	con, err = client.GetClient()
+	if err != nil {
+		return nil, err
+	}
+
+	cliWpr := &clientWrapper{
+		cli:    client,
+		conn:   con,
+		refCnt: 1,
+	}
+
+	return cliWpr, nil
+}
+
+func (c *clientWrapper) addRef() {
+	c.refCnt = c.refCnt + 1
+}
+
+func (c *clientWrapper) subRef() {
+	c.refCnt = c.refCnt - 1
+}
+
+func (c *clientWrapper) IsRefEmpty() bool {
+	return c.refCnt == 0
+}
+
+func (c *clientWrapper) clean() {
+	_ = c.cli.CloseClient()
+}
+
+func (c *clientWrapper) getInstance() interface{} {
+	return c.conn
+}

+ 60 - 0
internal/topo/connection/connect_selector.go

@@ -0,0 +1,60 @@
+package connection
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"strings"
+)
+
+var SUPPORTE_CONTYPE = []string{"mqtt", "edgex"}
+
+type ConSelector struct {
+	ConnSelectorCfg string
+
+	Type          string // mqtt edgex
+	CfgKey        string // config key
+	SupportedType []string
+}
+
+func (c *ConSelector) Init() error {
+
+	c.SupportedType = SUPPORTE_CONTYPE
+
+	conTypeSel := strings.SplitN(c.ConnSelectorCfg, ".", 2)
+	if len(conTypeSel) != 2 {
+		return fmt.Errorf("not a valid connection selector : %s", c.ConnSelectorCfg)
+	}
+	c.Type = conTypeSel[0]
+	c.CfgKey = conTypeSel[1]
+	return nil
+}
+
+func (c *ConSelector) ReadCfgFromYaml() (props map[string]interface{}, err error) {
+
+	var (
+		found = false
+	)
+
+	cfg := make(map[string]interface{})
+	err = conf.LoadConfigByName("connection.yaml", &cfg)
+	if err != nil {
+		return nil, err
+	}
+
+	if cons, ok := cfg[c.Type]; ok {
+		if connItems, ok1 := cons.(map[string]interface{}); ok1 {
+			if conItem, ok := connItems[c.CfgKey]; ok {
+				if item, ok1 := conItem.(map[string]interface{}); ok1 {
+					props = item
+					found = true
+				}
+			}
+		}
+	}
+
+	if !found {
+		return nil, fmt.Errorf("not found connection Type and Selector:  %s.%s", c.Type, c.CfgKey)
+	}
+
+	return
+}

+ 167 - 0
internal/topo/connection/connect_selector_test.go

@@ -0,0 +1,167 @@
+package connection
+
+import (
+	"os"
+	"reflect"
+	"testing"
+)
+
+func Test_getConnectionConf(t *testing.T) {
+	type args struct {
+		connectionType     string
+		connectionSelector string
+	}
+	tests := []struct {
+		name    string
+		args    args
+		want    map[string]interface{}
+		wantErr bool
+	}{
+		{
+			name: "mqtt:mqtt_conf1",
+			args: args{
+				connectionType:     "mqtt",
+				connectionSelector: "mqtt_conf1",
+			},
+			want: map[string]interface{}{
+				"servers":  []interface{}{"tcp://127.0.0.1:1883"},
+				"username": "ekuiper",
+				"password": "password",
+				"clientid": "ekuiper",
+			},
+			wantErr: false,
+		},
+		{
+			name: "mqtt:mqtt_conf2",
+			args: args{
+				connectionType:     "mqtt",
+				connectionSelector: "mqtt_conf2",
+			},
+			want: map[string]interface{}{
+				"servers": []interface{}{"tcp://127.0.0.1:1883"},
+			},
+			wantErr: false,
+		},
+		{
+			name: "mqtt:mqtt_conf3 not exist",
+			args: args{
+				connectionType:     "mqtt",
+				connectionSelector: "mqtt_conf3",
+			},
+			wantErr: true,
+		},
+		{
+			name: "mqtts:mqtt_conf3 not exist",
+			args: args{
+				connectionType:     "mqtts",
+				connectionSelector: "mqtt_conf3",
+			},
+			wantErr: true,
+		},
+		{
+			name: "edgex:edgex_conf1",
+			args: args{
+				connectionType:     "edgex",
+				connectionSelector: "edgex_conf1",
+			},
+			want: map[string]interface{}{
+				"protocol": "redis",
+				"server":   "127.0.0.1",
+				"port":     6379,
+				"type":     "redis",
+			},
+			wantErr: false,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			c := ConSelector{
+				Type:   tt.args.connectionType,
+				CfgKey: tt.args.connectionSelector,
+			}
+
+			got, err := c.ReadCfgFromYaml()
+			if (err != nil) != tt.wantErr {
+				t.Errorf("getConnectionConf() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+			if !reflect.DeepEqual(got, tt.want) {
+				t.Errorf("getConnectionConf() got = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func Test_getConnectionConfWithEnv(t *testing.T) {
+	mqttServerKey := "CONNECTION__MQTT__MQTT_CONF1__SERVERS"
+	mqttServerValue := "[tcp://broker.emqx.io:1883]"
+
+	edgexPortKey := "CONNECTION__EDGEX__EDGEX_CONF1__PORT"
+	edgexPortValue := "6666"
+
+	err := os.Setenv(mqttServerKey, mqttServerValue)
+	if err != nil {
+		t.Error(err)
+	}
+	err = os.Setenv(edgexPortKey, edgexPortValue)
+	if err != nil {
+		t.Error(err)
+	}
+
+	type args struct {
+		connectionType     string
+		connectionSelector string
+	}
+	tests := []struct {
+		name    string
+		args    args
+		want    map[string]interface{}
+		wantErr bool
+	}{
+		{
+			name: "mqtt:mqtt_conf1",
+			args: args{
+				connectionType:     "mqtt",
+				connectionSelector: "mqtt_conf1",
+			},
+			want: map[string]interface{}{
+				"servers":  []interface{}{"tcp://broker.emqx.io:1883"},
+				"username": "ekuiper",
+				"password": "password",
+				"clientid": "ekuiper",
+			},
+			wantErr: false,
+		},
+		{
+			name: "edgex:edgex_conf1",
+			args: args{
+				connectionType:     "edgex",
+				connectionSelector: "edgex_conf1",
+			},
+			want: map[string]interface{}{
+				"protocol": "redis",
+				"server":   "127.0.0.1",
+				"port":     int64(6666),
+				"type":     "redis",
+			},
+			wantErr: false,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			c := ConSelector{
+				Type:   tt.args.connectionType,
+				CfgKey: tt.args.connectionSelector,
+			}
+
+			got, err := c.ReadCfgFromYaml()
+			if (err != nil) != tt.wantErr {
+				t.Errorf("getConnectionConf() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+			if !reflect.DeepEqual(got, tt.want) {
+				t.Errorf("getConnectionConf() got = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}

+ 81 - 0
internal/topo/connection/manager.go

@@ -0,0 +1,81 @@
+package connection
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"sync"
+)
+
+var m = clientManager{
+	clientFactory: make(map[string]ClientFactoryFunc),
+	lock:          sync.Mutex{},
+	clientMap:     make(map[string]*clientWrapper),
+}
+
+type clientManager struct {
+	lock          sync.Mutex
+	clientFactory map[string]ClientFactoryFunc
+	clientMap     map[string]*clientWrapper
+}
+
+func registerClientFactory(clientType string, creatorFunc ClientFactoryFunc) {
+	m.lock.Lock()
+	m.clientFactory[clientType] = creatorFunc
+	m.lock.Unlock()
+}
+
+func GetConnection(connectSelector string) (interface{}, error) {
+
+	m.lock.Lock()
+	defer m.lock.Unlock()
+
+	var cliWpr *clientWrapper
+	var found bool
+
+	cliWpr, found = m.clientMap[connectSelector]
+	if found {
+		cliWpr.addRef()
+	} else {
+		selectCfg := &ConSelector{
+			ConnSelectorCfg: connectSelector,
+		}
+		err := selectCfg.Init()
+		if err != nil {
+			conf.Log.Errorf("connection selector: %s have error %s.", connectSelector, err)
+			return nil, err
+		}
+
+		clientCreator, ok := m.clientFactory[selectCfg.Type]
+		if !ok {
+			conf.Log.Errorf("can not find clientCreator for connection selector : %s only support %s", connectSelector, selectCfg.SupportedType)
+			return nil, fmt.Errorf("can not find clientCreator for connection selector : %s. only support %s", connectSelector, selectCfg.SupportedType)
+		}
+
+		client := clientCreator(selectCfg)
+
+		cliWpr, err = NewClientWrapper(client, selectCfg)
+		if err != nil {
+			conf.Log.Errorf("can not create client for connection selector : %s have error %s", connectSelector, err)
+			return nil, err
+		}
+
+		m.clientMap[connectSelector] = cliWpr
+	}
+
+	conf.Log.Infof("connection selector: %s GetConnection count %d.", connectSelector, cliWpr.refCnt)
+
+	return cliWpr.getInstance(), nil
+}
+
+func ReleaseConnection(connectSelector string) {
+	m.lock.Lock()
+	defer m.lock.Unlock()
+	if v, ok := m.clientMap[connectSelector]; ok {
+		v.subRef()
+		conf.Log.Infof("connection selector: %s ReleaseConnection count %d.", connectSelector, v.refCnt)
+		if v.IsRefEmpty() {
+			v.clean()
+			delete(m.clientMap, connectSelector)
+		}
+	}
+}

+ 91 - 0
internal/topo/connection/manager_test.go

@@ -0,0 +1,91 @@
+package connection
+
+import (
+	"reflect"
+	"testing"
+)
+
+const TestClient = "MockClient"
+
+var GetCalledNumber int
+var CloseCalledNumber int
+
+type MockClient struct {
+	selector *ConSelector
+
+	mockCon string
+}
+
+func (m *MockClient) CfgValidate(m2 map[string]interface{}) error {
+	return nil
+}
+
+func (m *MockClient) GetClient() (interface{}, error) {
+	GetCalledNumber = 1
+	m.mockCon = "MockClient"
+	return m.mockCon, nil
+}
+
+func (m *MockClient) CloseClient() error {
+	CloseCalledNumber = 1
+	m.mockCon = ""
+	return nil
+}
+
+func TestManager(t *testing.T) {
+	registerClientFactory("mqtt", func(super *ConSelector) Client {
+		return &MockClient{selector: super}
+	})
+
+	conSelector := "mqtt.mqtt_conf1"
+
+	connection, err := GetConnection(conSelector)
+	if err != nil {
+		t.Errorf("GetConnection Error")
+	}
+	value := connection.(string)
+	if !reflect.DeepEqual(value, TestClient) {
+		t.Errorf("Error")
+	}
+
+	connection, err = GetConnection(conSelector)
+	if err != nil {
+		t.Errorf("GetConnection Error")
+	}
+
+	wrapper := m.clientMap[conSelector]
+	if !reflect.DeepEqual(wrapper.refCnt, uint32(2)) {
+		t.Errorf("Error, ectual=%v, want=%v", wrapper.refCnt, 2)
+	}
+
+	connection, err = GetConnection(conSelector)
+	if err != nil {
+		t.Errorf("GetConnection Error")
+	}
+	if !reflect.DeepEqual(wrapper.refCnt, uint32(3)) {
+		t.Errorf("Error")
+	}
+
+	ReleaseConnection(conSelector)
+	if !reflect.DeepEqual(wrapper.refCnt, uint32(2)) {
+		t.Errorf("Error")
+	}
+
+	ReleaseConnection(conSelector)
+	if !reflect.DeepEqual(wrapper.refCnt, uint32(1)) {
+		t.Errorf("Error")
+	}
+
+	ReleaseConnection(conSelector)
+	if !reflect.DeepEqual(wrapper.refCnt, uint32(0)) {
+		t.Errorf("Error")
+	}
+
+	if _, ok := m.clientMap[conSelector]; ok {
+		t.Errorf("Error")
+	}
+
+	if !reflect.DeepEqual(GetCalledNumber, CloseCalledNumber) || !reflect.DeepEqual(GetCalledNumber, 1) {
+		t.Errorf("Error")
+	}
+}

+ 9 - 0
internal/topo/context/default.go

@@ -18,6 +18,7 @@ import (
 	"context"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/topo/connection"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/sirupsen/logrus"
@@ -198,3 +199,11 @@ func (c *DefaultContext) SaveState(checkpointId int64) error {
 	c.snapshot = nil
 	return nil
 }
+
+func (c *DefaultContext) GetConnection(connectSelector string) (interface{}, error) {
+	return connection.GetConnection(connectSelector)
+}
+
+func (c *DefaultContext) ReleaseConnection(connectSelector string) {
+	connection.ReleaseConnection(connectSelector)
+}

+ 9 - 0
internal/topo/context/func_context.go

@@ -16,6 +16,7 @@ package context
 
 import (
 	"fmt"
+	"github.com/lf-edge/ekuiper/internal/topo/connection"
 	"github.com/lf-edge/ekuiper/pkg/api"
 )
 
@@ -55,6 +56,14 @@ func (c *DefaultFuncContext) GetFuncId() int {
 	return c.funcId
 }
 
+func (c *DefaultFuncContext) GetConnection(connectSelector string) (interface{}, error) {
+	return connection.GetConnection(connectSelector)
+}
+
+func (c *DefaultFuncContext) ReleaseConnection(connectSelector string) {
+	connection.ReleaseConnection(connectSelector)
+}
+
 func (c *DefaultFuncContext) convertKey(key string) string {
 	return fmt.Sprintf("$$func%d_%s", c.funcId, key)
 }

+ 1 - 1
internal/topo/node/source_node_test.go

@@ -81,7 +81,7 @@ func TestGetConfAndConvert_Apply(t *testing.T) {
 		Interval: 100,
 		Seed:     1,
 		Pattern: map[string]interface{}{
-			"count": float64(50),
+			"count": 50,
 		},
 		Deduplicate: 50,
 	}

+ 93 - 38
internal/topo/sink/edgex_sink.go

@@ -40,19 +40,20 @@ const (
 )
 
 type EdgexConf struct {
-	Protocol    string            `json:"protocol"`
-	Host        string            `json:"host"`
-	Port        int               `json:"port"`
-	Topic       string            `json:"topic"`
-	TopicPrefix string            `json:"topicPrefix"`
-	Type        string            `json:"type"`
-	MessageType messageType       `json:"messageType"`
-	ContentType string            `json:"contentType"`
-	DeviceName  string            `json:"deviceName"`
-	ProfileName string            `json:"profileName"`
-	SourceName  string            `json:"sourceName"`
-	Metadata    string            `json:"metadata"`
-	Optional    map[string]string `json:"optional"`
+	Protocol           string            `json:"protocol"`
+	Host               string            `json:"host"`
+	Port               int               `json:"port"`
+	Topic              string            `json:"topic"`
+	TopicPrefix        string            `json:"topicPrefix"`
+	Type               string            `json:"type"`
+	MessageType        messageType       `json:"messageType"`
+	ContentType        string            `json:"contentType"`
+	DeviceName         string            `json:"deviceName"`
+	ProfileName        string            `json:"profileName"`
+	SourceName         string            `json:"sourceName"`
+	Metadata           string            `json:"metadata"`
+	Optional           map[string]string `json:"optional"`
+	ConnectionSelector string            `json:"connectionSelector"`
 }
 
 type EdgexMsgBusSink struct {
@@ -64,36 +65,68 @@ type EdgexMsgBusSink struct {
 }
 
 func (ems *EdgexMsgBusSink) Configure(ps map[string]interface{}) error {
+
+	var (
+		defaultProtocol = "redis"
+		defaultServer   = "localhost"
+		defaultType     = messaging.Redis
+		defaultPort     = 6379
+	)
+
 	c := &EdgexConf{
-		Protocol:    "redis",
-		Host:        "localhost",
-		Port:        6379,
-		Type:        messaging.Redis,
 		MessageType: MessageTypeEvent,
 		ContentType: "application/json",
 		DeviceName:  "ekuiper",
 		ProfileName: "ekuiperProfile",
-		// SourceName is set in open as the rule id
 	}
+
 	err := cast.MapToStruct(ps, c)
 	if err != nil {
 		return fmt.Errorf("read properties %v fail with error: %v", ps, err)
 	}
 
-	if c.Port < 0 {
-		return fmt.Errorf("specified wrong port value, expect positive integer but got %d", c.Port)
-	}
+	if c.ConnectionSelector != "" {
+		conf.Log.Infof("use connection selector %s for edgex sink", c.ConnectionSelector)
+		if c.Protocol != "" || c.Host != "" || c.Type != "" || c.Port != 0 {
+			return fmt.Errorf("connectionSelector can not coexist with other connection configs, properties %v", ps)
+		}
 
-	if c.Type != messaging.ZeroMQ && c.Type != messaging.MQTT && c.Type != messaging.Redis {
-		return fmt.Errorf("specified wrong type value %s", c.Type)
-	}
+		if c.MessageType != MessageTypeEvent && c.MessageType != MessageTypeRequest {
+			return fmt.Errorf("specified wrong messageType value %s", c.MessageType)
+		}
 
-	if c.MessageType != MessageTypeEvent && c.MessageType != MessageTypeRequest {
-		return fmt.Errorf("specified wrong messageType value %s", c.MessageType)
-	}
+		if c.MessageType == MessageTypeEvent && c.ContentType != "application/json" {
+			return fmt.Errorf("specified wrong contentType value %s: only 'application/json' is supported if messageType is event", c.ContentType)
+		}
+	} else {
 
-	if c.MessageType == MessageTypeEvent && c.ContentType != "application/json" {
-		return fmt.Errorf("specified wrong contentType value %s: only 'application/json' is supported if messageType is event", c.ContentType)
+		if c.Host == "" {
+			c.Host = defaultServer
+		}
+		if c.Protocol == "" {
+			c.Protocol = defaultProtocol
+		}
+		if c.Type == "" {
+			c.Type = defaultType
+		}
+		if c.Port == 0 {
+			c.Port = defaultPort
+		}
+		if c.Port < 0 {
+			return fmt.Errorf("specified wrong port value, expect positive integer but got %d", c.Port)
+		}
+
+		if c.Type != messaging.ZeroMQ && c.Type != messaging.MQTT && c.Type != messaging.Redis {
+			return fmt.Errorf("specified wrong type value %s", c.Type)
+		}
+
+		if c.MessageType != MessageTypeEvent && c.MessageType != MessageTypeRequest {
+			return fmt.Errorf("specified wrong messageType value %s", c.MessageType)
+		}
+
+		if c.MessageType == MessageTypeEvent && c.ContentType != "application/json" {
+			return fmt.Errorf("specified wrong contentType value %s: only 'application/json' is supported if messageType is event", c.ContentType)
+		}
 	}
 
 	if c.Topic != "" && c.TopicPrefix != "" {
@@ -114,18 +147,37 @@ func (ems *EdgexMsgBusSink) Configure(ps map[string]interface{}) error {
 	return nil
 }
 
-func (ems *EdgexMsgBusSink) Open(ctx api.StreamContext) error {
+func (ems *EdgexMsgBusSink) getClient(ctx api.StreamContext) error {
 	log := ctx.GetLogger()
-	log.Infof("Using configuration for EdgeX message bus sink: %+v", ems.c)
-	if msgClient, err := messaging.NewMessageClient(*ems.conf); err != nil {
-		return err
+	if ems.c.ConnectionSelector != "" {
+		con, err := ctx.GetConnection(ems.c.ConnectionSelector)
+		if err != nil {
+			log.Errorf("The edgex client for connection selector %s get fail with error: %s", ems.c.ConnectionSelector, err)
+			return err
+		}
+		ems.client = con.(messaging.MessageClient)
+		log.Infof("The edge client for connection selector %s get successfully", ems.c.ConnectionSelector)
 	} else {
-		if ec := msgClient.Connect(); ec != nil {
-			return ec
-		} else {
-			ems.client = msgClient
+		c, err := messaging.NewMessageClient(*ems.conf)
+		if err != nil {
+			return err
+		}
+		ems.client = c
+		if err := ems.client.Connect(); err != nil {
+			return fmt.Errorf("Failed to connect to edgex message bus: " + err.Error())
 		}
+		log.Infof("The connection to edgex messagebus is established successfully.")
 	}
+	return nil
+}
+
+func (ems *EdgexMsgBusSink) Open(ctx api.StreamContext) error {
+	log := ctx.GetLogger()
+	log.Infof("Using configuration for EdgeX message bus sink: %+v", ems.c)
+	if err := ems.getClient(ctx); err != nil {
+		return fmt.Errorf("Failed to get edgex message client: " + err.Error())
+	}
+
 	if ems.c.SourceName == "" {
 		ems.c.SourceName = ctx.GetRuleId()
 	}
@@ -431,11 +483,14 @@ func (ems *EdgexMsgBusSink) Collect(ctx api.StreamContext, item interface{}) err
 func (ems *EdgexMsgBusSink) Close(ctx api.StreamContext) error {
 	logger := ctx.GetLogger()
 	logger.Infof("Closing edgex sink")
-	if ems.client != nil {
+	if ems.client != nil && ems.c.ConnectionSelector == "" {
 		if e := ems.client.Disconnect(); e != nil {
 			return e
 		}
 	}
+	if ems.c.ConnectionSelector != "" {
+		ctx.ReleaseConnection(ems.c.ConnectionSelector)
+	}
 	return nil
 }
 

+ 1 - 1
internal/topo/sink/edgex_sink_test.go

@@ -171,7 +171,7 @@ func TestConfigure(t *testing.T) {
 				"messageType": "requests",
 				"contentType": "application/json",
 			},
-			error: "read properties map[contentType:application/json host:edgex-redis messageType:requests port:6379 protocol:redis topicPrefix:edgex/events/device type:20] fail with error: json: cannot unmarshal number into Go struct field EdgexConf.type of type string",
+			error: "read properties map[contentType:application/json host:edgex-redis messageType:requests port:6379 protocol:redis topicPrefix:edgex/events/device type:20] fail with error: 1 error(s) decoding:\n\n* 'type' expected type 'string', got unconvertible type 'int', value: '20'",
 		}, { // 6
 			conf: map[string]interface{}{
 				"type":        "redis",

+ 152 - 107
internal/topo/sink/mqtt_sink.go

@@ -35,6 +35,7 @@ type MQTTSink struct {
 	password string
 	certPath string
 	pkeyPath string
+	conSel   string
 
 	insecureSkipVerify bool
 	retained           bool
@@ -42,83 +43,121 @@ type MQTTSink struct {
 	conn MQTT.Client
 }
 
-func (ms *MQTTSink) Configure(ps map[string]interface{}) error {
-	srv, ok := ps["server"]
-	if !ok {
-		return fmt.Errorf("mqtt sink is missing property server")
-	}
-	tpc, ok := ps["topic"]
-	if !ok {
-		return fmt.Errorf("mqtt sink is missing property topic")
+func (ms *MQTTSink) hasKeys(str []string, ps map[string]interface{}) bool {
+	for _, v := range str {
+		if _, ok := ps[v]; ok {
+			return true
+		}
 	}
-	clientid, ok := ps["clientId"]
-	if !ok {
-		if uuid, err := uuid.NewUUID(); err != nil {
-			return fmt.Errorf("mqtt sink fails to get uuid, the error is %s", err)
-		} else {
+	return false
+}
 
-			clientid = uuid.String()
+func (ms *MQTTSink) Configure(ps map[string]interface{}) error {
+	conSelector := ""
+	if pk, ok := ps["connectionSelector"]; ok {
+		if v, ok := pk.(string); ok {
+			conSelector = v
 		}
-	}
-	var pVersion uint = 3
-	pVersionStr, ok := ps["protocolVersion"]
-	if ok {
-		v, _ := pVersionStr.(string)
-		if v == "3.1" {
-			pVersion = 3
-		} else if v == "3.1.1" {
-			pVersion = 4
+		keys := []string{"server", "clientId", "protocolVersion", "username", "password", "certificationPath", "privateKeyPath", "insecureSkipVerify"}
+		if ms.hasKeys(keys, ps) {
+			return fmt.Errorf("already have connection selector: %s, remove connection related config", conSelector)
+		}
+		ms.conSel = conSelector
+	} else {
+
+		srv := ""
+		if pk, ok := ps["server"]; ok {
+			if v, ok := pk.(string); ok {
+				srv = v
+			}
 		} else {
-			return fmt.Errorf("unknown protocol version %s, the value could be only 3.1 or 3.1.1 (also refers to MQTT version 4)", pVersionStr)
+			return fmt.Errorf("mqtt sink is missing property server")
 		}
-	}
 
-	var qos byte = 0
-	if qosRec, ok := ps["qos"]; ok {
-		if v, err := cast.ToInt(qosRec, cast.STRICT); err == nil {
-			qos = byte(v)
+		clientid, ok := ps["clientId"]
+		if !ok {
+			if uuid, err := uuid.NewUUID(); err != nil {
+				return fmt.Errorf("mqtt sink fails to get uuid, the error is %s", err)
+			} else {
+
+				clientid = uuid.String()
+			}
 		}
-		if qos != 0 && qos != 1 && qos != 2 {
-			return fmt.Errorf("not valid qos value %v, the value could be only int 0 or 1 or 2", qos)
+		var pVersion uint = 3
+		pVersionStr, ok := ps["protocolVersion"]
+		if ok {
+			v, _ := pVersionStr.(string)
+			if v == "3.1" {
+				pVersion = 3
+			} else if v == "3.1.1" {
+				pVersion = 4
+			} else {
+				return fmt.Errorf("unknown protocol version %s, the value could be only 3.1 or 3.1.1 (also refers to MQTT version 4)", pVersionStr)
+			}
 		}
-	}
 
-	uName := ""
-	un, ok := ps["username"]
-	if ok {
-		v, _ := un.(string)
-		if strings.Trim(v, " ") != "" {
-			uName = v
+		uName := ""
+		un, ok := ps["username"]
+		if ok {
+			v, _ := un.(string)
+			if strings.Trim(v, " ") != "" {
+				uName = v
+			}
 		}
-	}
 
-	password := ""
-	pwd, ok := ps["password"]
-	if ok {
-		v, _ := pwd.(string)
-		if strings.Trim(v, " ") != "" {
-			password = v
+		password := ""
+		pwd, ok := ps["password"]
+		if ok {
+			v, _ := pwd.(string)
+			if strings.Trim(v, " ") != "" {
+				password = v
+			}
 		}
-	}
 
-	certPath := ""
-	if cp, ok := ps["certificationPath"]; ok {
-		if v, ok := cp.(string); ok {
-			certPath = v
+		certPath := ""
+		if cp, ok := ps["certificationPath"]; ok {
+			if v, ok := cp.(string); ok {
+				certPath = v
+			}
 		}
-	}
 
-	pKeyPath := ""
-	if pk, ok := ps["privateKeyPath"]; ok {
-		if v, ok := pk.(string); ok {
-			pKeyPath = v
+		pKeyPath := ""
+		if pk, ok := ps["privateKeyPath"]; ok {
+			if v, ok := pk.(string); ok {
+				pKeyPath = v
+			}
 		}
+
+		insecureSkipVerify := false
+		if pk, ok := ps["insecureSkipVerify"]; ok {
+			if v, ok := pk.(bool); ok {
+				insecureSkipVerify = v
+			}
+		}
+
+		ms.srv = srv
+		ms.clientid = clientid.(string)
+		ms.pVersion = pVersion
+
+		ms.uName = uName
+		ms.password = password
+		ms.certPath = certPath
+		ms.pkeyPath = pKeyPath
+		ms.insecureSkipVerify = insecureSkipVerify
 	}
 
-	insecureSkipVerify := false
-	if pk, ok := ps["insecureSkipVerify"]; ok {
-		if v, ok := pk.(bool); ok {
-			insecureSkipVerify = v
+	tpc, ok := ps["topic"]
+	if !ok {
+		return fmt.Errorf("mqtt sink is missing property topic")
+	}
+
+	var qos byte = 0
+	if qosRec, ok := ps["qos"]; ok {
+		if v, err := cast.ToInt(qosRec, cast.STRICT); err == nil {
+			qos = byte(v)
+		}
+		if qos != 0 && qos != 1 && qos != 2 {
+			return fmt.Errorf("not valid qos value %v, the value could be only int 0 or 1 or 2", qos)
 		}
 	}
 
@@ -129,73 +168,76 @@ func (ms *MQTTSink) Configure(ps map[string]interface{}) error {
 		}
 	}
 
-	ms.srv = srv.(string)
-	ms.tpc = tpc.(string)
-	ms.clientid = clientid.(string)
-	ms.pVersion = pVersion
 	ms.qos = qos
-	ms.uName = uName
-	ms.password = password
-	ms.certPath = certPath
-	ms.pkeyPath = pKeyPath
-	ms.insecureSkipVerify = insecureSkipVerify
+	ms.tpc = tpc.(string)
 	ms.retained = retained
 
 	return nil
 }
 
 func (ms *MQTTSink) Open(ctx api.StreamContext) error {
+	var client MQTT.Client
 	log := ctx.GetLogger()
-	log.Infof("Opening mqtt sink for rule %s.", ctx.GetRuleId())
-	opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetClientID(ms.clientid)
-
-	if ms.certPath != "" || ms.pkeyPath != "" {
-		log.Infof("Connect MQTT broker with certification and keys.")
-		if cp, err := conf.ProcessPath(ms.certPath); err == nil {
-			if kp, err1 := conf.ProcessPath(ms.pkeyPath); err1 == nil {
-				if cer, err2 := tls.LoadX509KeyPair(cp, kp); err2 != nil {
-					return err2
+	if ms.conSel != "" {
+		con, err := ctx.GetConnection(ms.conSel)
+		if err != nil {
+			log.Errorf("The mqtt client for connection selector %s get fail with error: %s", ms.conSel, err)
+			return err
+		}
+		client = con.(MQTT.Client)
+		log.Infof("The mqtt client for connection selector %s get successfully", ms.conSel)
+	} else {
+		log.Infof("Opening mqtt sink for rule %s.", ctx.GetRuleId())
+		opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetClientID(ms.clientid)
+
+		if ms.certPath != "" || ms.pkeyPath != "" {
+			log.Infof("Connect MQTT broker with certification and keys.")
+			if cp, err := conf.ProcessPath(ms.certPath); err == nil {
+				if kp, err1 := conf.ProcessPath(ms.pkeyPath); err1 == nil {
+					if cer, err2 := tls.LoadX509KeyPair(cp, kp); err2 != nil {
+						return err2
+					} else {
+						opts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}, InsecureSkipVerify: ms.insecureSkipVerify})
+					}
 				} else {
-					opts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}, InsecureSkipVerify: ms.insecureSkipVerify})
+					return err1
 				}
 			} else {
-				return err1
+				return err
 			}
 		} else {
-			return err
-		}
-	} else {
-		log.Infof("Connect MQTT broker with username and password.")
-		if ms.uName != "" {
-			opts = opts.SetUsername(ms.uName)
-		}
+			log.Infof("Connect MQTT broker with username and password.")
+			if ms.uName != "" {
+				opts = opts.SetUsername(ms.uName)
+			}
 
-		if ms.password != "" {
-			opts = opts.SetPassword(ms.password)
+			if ms.password != "" {
+				opts = opts.SetPassword(ms.password)
+			}
 		}
-	}
 
-	opts.SetAutoReconnect(true)
-	var reconn = false
-	opts.SetConnectionLostHandler(func(client MQTT.Client, e error) {
-		log.Errorf("The connection %s is disconnected due to error %s, will try to re-connect later.", ms.srv+": "+ms.clientid, e)
-		ms.conn = client
-		reconn = true
-	})
+		opts.SetAutoReconnect(true)
+		var reconn = false
+		opts.SetConnectionLostHandler(func(client MQTT.Client, e error) {
+			log.Errorf("The connection %s is disconnected due to error %s, will try to re-connect later.", ms.srv+": "+ms.clientid, e)
+			ms.conn = client
+			reconn = true
+		})
 
-	opts.SetOnConnectHandler(func(client MQTT.Client) {
-		if reconn {
-			log.Infof("The connection is %s re-established successfully.", ms.srv+": "+ms.clientid)
-		}
-	})
+		opts.SetOnConnectHandler(func(client MQTT.Client) {
+			if reconn {
+				log.Infof("The connection is %s re-established successfully.", ms.srv+": "+ms.clientid)
+			}
+		})
 
-	c := MQTT.NewClient(opts)
-	if token := c.Connect(); token.Wait() && token.Error() != nil {
-		return fmt.Errorf("Found error: %s", token.Error())
+		client = MQTT.NewClient(opts)
+		if token := client.Connect(); token.Wait() && token.Error() != nil {
+			return fmt.Errorf("Found error: %s", token.Error())
+		}
+		log.Infof("The connection to server %s:%d was established successfully", ms.srv, ms.clientid)
 	}
 
-	log.Infof("The connection to server %s was established successfully", ms.srv)
-	ms.conn = c
+	ms.conn = client
 	return nil
 }
 
@@ -212,8 +254,11 @@ func (ms *MQTTSink) Collect(ctx api.StreamContext, item interface{}) error {
 func (ms *MQTTSink) Close(ctx api.StreamContext) error {
 	logger := ctx.GetLogger()
 	logger.Infof("Closing mqtt sink")
-	if ms.conn != nil && ms.conn.IsConnected() {
+	if ms.conn != nil && ms.conn.IsConnected() && ms.conSel == "" {
 		ms.conn.Disconnect(5000)
 	}
+	if ms.conSel != "" {
+		ctx.ReleaseConnection(ms.conSel)
+	}
 	return nil
 }

+ 53 - 20
internal/topo/source/edgex_source.go

@@ -33,6 +33,9 @@ import (
 )
 
 type EdgexSource struct {
+	mbconf types.MessageBusConfig
+	conSel string
+
 	client      messaging.MessageClient
 	subscribed  bool
 	topic       string
@@ -40,14 +43,15 @@ type EdgexSource struct {
 }
 
 type EdgexConf struct {
-	Format      string            `json:"format"`
-	Protocol    string            `json:"protocol"`
-	Server      string            `json:"server"`
-	Port        int               `json:"port"`
-	Topic       string            `json:"topic"`
-	Type        string            `json:"type"`
-	MessageType messageType       `json:"messageType"`
-	Optional    map[string]string `json:"optional"`
+	Format             string            `json:"format"`
+	Protocol           string            `json:"protocol"`
+	Server             string            `json:"server"`
+	Port               int               `json:"port"`
+	Topic              string            `json:"topic"`
+	Type               string            `json:"type"`
+	MessageType        messageType       `json:"messageType"`
+	Optional           map[string]string `json:"optional"`
+	ConnectionSelector string            `json:"connectionSelector"`
 }
 
 type messageType string
@@ -84,15 +88,17 @@ func (es *EdgexSource) Configure(_ string, props map[string]interface{}) error {
 
 	mbconf := types.MessageBusConfig{SubscribeHost: types.HostInfo{Protocol: c.Protocol, Host: c.Server, Port: c.Port}, Type: c.Type}
 	mbconf.Optional = c.Optional
-	printConf(mbconf)
-	if client, err := messaging.NewMessageClient(mbconf); err != nil {
-		return err
+	if c.ConnectionSelector == "" {
+		printConf(mbconf)
 	} else {
-		es.client = client
-		es.messageType = c.MessageType
-		es.topic = c.Topic
-		return nil
+		conf.Log.Infof("use connection selector %s for edgex source", c.ConnectionSelector)
 	}
+	es.conSel = c.ConnectionSelector
+	es.mbconf = mbconf
+	es.messageType = c.MessageType
+	es.topic = c.Topic
+
+	return nil
 }
 
 // Modify the copied conf to print no password.
@@ -109,15 +115,39 @@ func printConf(mbconf types.MessageBusConfig) {
 	conf.Log.Infof("Use configuration for edgex messagebus %v", mbconf)
 }
 
+func (es *EdgexSource) getClient(ctx api.StreamContext) error {
+	log := ctx.GetLogger()
+	if es.conSel != "" {
+		con, err := ctx.GetConnection(es.conSel)
+		if err != nil {
+			log.Errorf("The edgex client for connection selector %s get fail with error: %s", es.conSel, err)
+			return err
+		}
+		es.client = con.(messaging.MessageClient)
+		log.Infof("The edge client for connection selector %s get successfully", es.conSel)
+	} else {
+		c, err := messaging.NewMessageClient(es.mbconf)
+		if err != nil {
+			return err
+		}
+		es.client = c
+
+		if err := es.client.Connect(); err != nil {
+			return fmt.Errorf("Failed to connect to edgex message bus: " + err.Error())
+		}
+		log.Infof("The connection to edgex messagebus is established successfully.")
+	}
+	return nil
+}
+
 func (es *EdgexSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
 	log := ctx.GetLogger()
-	if err := es.client.Connect(); err != nil {
-		info := fmt.Errorf("Failed to connect to edgex message bus: " + err.Error())
+	if err := es.getClient(ctx); err != nil {
+		info := fmt.Errorf("Failed to get edgex message client: " + err.Error())
 		log.Errorf(info.Error())
 		errCh <- info
 		return
 	}
-	log.Infof("The connection to edgex messagebus is established successfully.")
 	messages := make(chan types.MessageEnvelope)
 	topics := []types.TopicChannel{{Topic: es.topic, Messages: messages}}
 	err := make(chan error)
@@ -315,11 +345,14 @@ func convertFloatArray(v string, bitSize int) (interface{}, error) {
 	}
 }
 
-func (es *EdgexSource) Close(_ api.StreamContext) error {
-	if es.subscribed {
+func (es *EdgexSource) Close(ctx api.StreamContext) error {
+	if es.subscribed && es.conSel == "" {
 		if e := es.client.Disconnect(); e != nil {
 			return e
 		}
 	}
+	if es.conSel != "" {
+		ctx.ReleaseConnection(es.conSel)
+	}
 	return nil
 }

+ 104 - 80
internal/topo/source/mqtt_source.go

@@ -30,6 +30,7 @@ import (
 
 type MQTTSource struct {
 	srv      string
+	qos      int
 	format   string
 	tpc      string
 	clientid string
@@ -38,6 +39,8 @@ type MQTTSource struct {
 	password string
 	certPath string
 	pkeyPath string
+	conSel   string
+	InSecure bool
 
 	model  modelVersion
 	schema map[string]interface{}
@@ -45,17 +48,19 @@ type MQTTSource struct {
 }
 
 type MQTTConfig struct {
-	Format            string   `json:"format"`
-	Qos               int      `json:"qos"`
-	Servers           []string `json:"servers"`
-	Clientid          string   `json:"clientid"`
-	PVersion          string   `json:"protocolVersion"`
-	Uname             string   `json:"username"`
-	Password          string   `json:"password"`
-	Certification     string   `json:"certificationPath"`
-	PrivateKPath      string   `json:"privateKeyPath"`
-	KubeedgeModelFile string   `json:"kubeedgeModelFile"`
-	KubeedgeVersion   string   `json:"kubeedgeVersion"`
+	Format             string   `json:"format"`
+	Qos                int      `json:"qos"`
+	Servers            []string `json:"servers"`
+	Clientid           string   `json:"clientid"`
+	PVersion           string   `json:"protocolVersion"`
+	Uname              string   `json:"username"`
+	Password           string   `json:"password"`
+	Certification      string   `json:"certificationPath"`
+	PrivateKPath       string   `json:"privateKeyPath"`
+	InsecureSkipVerify bool     `json:"insecureSkipVerify"`
+	KubeedgeModelFile  string   `json:"kubeedgeModelFile"`
+	KubeedgeVersion    string   `json:"kubeedgeVersion"`
+	ConnectionSelector string   `json:"connectionSelector"`
 }
 
 func (ms *MQTTSource) WithSchema(_ string) *MQTTSource {
@@ -69,14 +74,17 @@ func (ms *MQTTSource) Configure(topic string, props map[string]interface{}) erro
 		return fmt.Errorf("read properties %v fail with error: %v", props, err)
 	}
 	ms.tpc = topic
-	if srvs := cfg.Servers; srvs != nil && len(srvs) > 0 {
-		ms.srv = srvs[0]
-	} else {
-		return fmt.Errorf("missing server property")
+	if cfg.ConnectionSelector == "" {
+		if srvs := cfg.Servers; srvs != nil && len(srvs) > 0 {
+			ms.srv = srvs[0]
+		} else {
+			return fmt.Errorf("missing server property")
+		}
 	}
-
+	ms.conSel = cfg.ConnectionSelector
 	ms.format = cfg.Format
 	ms.clientid = cfg.Clientid
+	ms.qos = cfg.Qos
 
 	ms.pVersion = 3
 	if cfg.PVersion == "3.1.1" {
@@ -100,89 +108,102 @@ func (ms *MQTTSource) Configure(topic string, props map[string]interface{}) erro
 }
 
 func (ms *MQTTSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
+	var client MQTT.Client
 	log := ctx.GetLogger()
 
-	opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetProtocolVersion(ms.pVersion)
-	if ms.clientid == "" {
-		if newUUID, err := uuid.NewUUID(); err != nil {
-			errCh <- fmt.Errorf("failed to get uuid, the error is %s", err)
+	if ms.conSel != "" {
+		con, err := ctx.GetConnection(ms.conSel)
+		if err != nil {
+			log.Errorf("The mqtt client for connection selector %s get fail with error: %s", ms.conSel, err)
+			errCh <- err
 			return
-		} else {
-			ms.clientid = newUUID.String()
-			opts.SetClientID(newUUID.String())
 		}
+		client = con.(MQTT.Client)
+		log.Infof("The mqtt client for connection selector %s get successfully", ms.conSel)
 	} else {
-		opts.SetClientID(ms.clientid)
-	}
+		opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetProtocolVersion(ms.pVersion)
+		if ms.clientid == "" {
+			if newUUID, err := uuid.NewUUID(); err != nil {
+				errCh <- fmt.Errorf("failed to get uuid, the error is %s", err)
+				return
+			} else {
+				ms.clientid = newUUID.String()
+				opts = opts.SetClientID(newUUID.String())
+			}
+		} else {
+			opts = opts.SetClientID(ms.clientid)
+		}
 
-	if ms.certPath != "" || ms.pkeyPath != "" {
-		log.Infof("Connect MQTT broker with certification and keys.")
-		if cp, err := conf.ProcessPath(ms.certPath); err == nil {
-			log.Infof("The certification file is %s.", cp)
-			if kp, err1 := conf.ProcessPath(ms.pkeyPath); err1 == nil {
-				log.Infof("The private key file is %s.", kp)
-				if cer, err2 := tls.LoadX509KeyPair(cp, kp); err2 != nil {
-					errCh <- err2
-					return
+		if ms.certPath != "" || ms.pkeyPath != "" {
+			log.Infof("Connect MQTT broker with certification and keys.")
+			if cp, err := conf.ProcessPath(ms.certPath); err == nil {
+				log.Infof("The certification file is %s.", cp)
+				if kp, err1 := conf.ProcessPath(ms.pkeyPath); err1 == nil {
+					log.Infof("The private key file is %s.", kp)
+					if cer, err2 := tls.LoadX509KeyPair(cp, kp); err2 != nil {
+						errCh <- err2
+						return
+					} else {
+						opts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}, InsecureSkipVerify: ms.InSecure})
+					}
 				} else {
-					opts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}})
+					errCh <- err1
+					return
 				}
 			} else {
-				errCh <- err1
+				errCh <- err
 				return
 			}
 		} else {
-			errCh <- err
-			return
-		}
-	} else {
-		log.Infof("Connect MQTT broker with username and password.")
-		if ms.uName != "" {
-			opts = opts.SetUsername(ms.uName)
-		} else {
-			log.Infof("The username is empty.")
+			log.Infof("Connect MQTT broker with username and password.")
+			if ms.uName != "" {
+				opts = opts.SetUsername(ms.uName)
+			} else {
+				log.Infof("The username is empty.")
+			}
+
+			if ms.password != "" {
+				opts = opts.SetPassword(ms.password)
+			} else {
+				log.Infof("The password is empty.")
+			}
 		}
+		opts.SetAutoReconnect(true)
+		var reconn = false
+		opts.SetConnectionLostHandler(func(client MQTT.Client, e error) {
+			log.Errorf("The connection %s is disconnected due to error %s, will try to re-connect later.", ms.srv+": "+ms.clientid, e)
+			reconn = true
+			subscribe(ms, client, ctx, consumer)
+		})
+
+		opts.SetOnConnectHandler(func(client MQTT.Client) {
+			if reconn {
+				log.Infof("The connection is %s re-established successfully.", ms.srv+": "+ms.clientid)
+				subscribe(ms, client, ctx, consumer)
+			}
+		})
 
-		if ms.password != "" {
-			opts = opts.SetPassword(ms.password)
-		} else {
-			log.Infof("The password is empty.")
+		client = MQTT.NewClient(opts)
+
+		if token := client.Connect(); token.Wait() && token.Error() != nil {
+			errCh <- fmt.Errorf("found error when connecting to %s: %s", ms.srv, token.Error())
+			return
 		}
+		log.Infof("The connection to server %s:%s was established successfully", ms.srv, ms.clientid)
 	}
-	opts.SetAutoReconnect(true)
-	var reconn = false
-	opts.SetConnectionLostHandler(func(client MQTT.Client, e error) {
-		log.Errorf("The connection %s is disconnected due to error %s, will try to re-connect later.", ms.srv+": "+ms.clientid, e)
-		reconn = true
-		subscribe(ms.tpc, client, ctx, consumer, ms.model, ms.format)
-	})
-
-	opts.SetOnConnectHandler(func(client MQTT.Client) {
-		if reconn {
-			log.Infof("The connection is %s re-established successfully.", ms.srv+": "+ms.clientid)
-			subscribe(ms.tpc, client, ctx, consumer, ms.model, ms.format)
-		}
-	})
 
-	c := MQTT.NewClient(opts)
-	if token := c.Connect(); token.Wait() && token.Error() != nil {
-		errCh <- fmt.Errorf("found error when connecting to %s: %s", ms.srv, token.Error())
-		return
-	}
-	log.Infof("The connection to server %s was established successfully", ms.srv)
-	ms.conn = c
-	subscribe(ms.tpc, c, ctx, consumer, ms.model, ms.format)
-	log.Infof("Successfully subscribe to topic %s", ms.srv+": "+ms.clientid)
+	ms.conn = client
+	subscribe(ms, client, ctx, consumer)
 }
 
-func subscribe(topic string, client MQTT.Client, ctx api.StreamContext, consumer chan<- api.SourceTuple, model modelVersion, format string) {
+func subscribe(ms *MQTTSource, client MQTT.Client, ctx api.StreamContext, consumer chan<- api.SourceTuple) {
 	log := ctx.GetLogger()
 	h := func(client MQTT.Client, msg MQTT.Message) {
 		log.Debugf("instance %d received %s", ctx.GetInstanceId(), msg.Payload())
-		result, e := message.Decode(msg.Payload(), format)
+		result, e := message.Decode(msg.Payload(), ms.format)
 		//The unmarshal type can only be bool, float64, string, []interface{}, map[string]interface{}, nil
 		if e != nil {
-			log.Errorf("Invalid data format, cannot decode %s to %s format with error %s", string(msg.Payload()), format, e)
+			log.Errorf("Invalid data format, cannot decode %s to %s format with error %s", string(msg.Payload()), ms.format, e)
 			return
 		}
 
@@ -190,8 +211,8 @@ func subscribe(topic string, client MQTT.Client, ctx api.StreamContext, consumer
 		meta["topic"] = msg.Topic()
 		meta["messageid"] = strconv.Itoa(int(msg.MessageID()))
 
-		if nil != model {
-			sliErr := model.checkType(result, msg.Topic())
+		if nil != ms.model {
+			sliErr := ms.model.checkType(result, msg.Topic())
 			for _, v := range sliErr {
 				log.Errorf(v)
 			}
@@ -205,17 +226,20 @@ func subscribe(topic string, client MQTT.Client, ctx api.StreamContext, consumer
 		}
 	}
 
-	if token := client.Subscribe(topic, 0, h); token.Wait() && token.Error() != nil {
+	if token := client.Subscribe(ms.tpc, byte(ms.qos), h); token.Wait() && token.Error() != nil {
 		log.Errorf("Found error: %s", token.Error())
 	} else {
-		log.Infof("Successfully subscribe to topic %s", topic)
+		log.Infof("Successfully subscribe to topic %s", ms.tpc)
 	}
 }
 
 func (ms *MQTTSource) Close(ctx api.StreamContext) error {
 	ctx.GetLogger().Infof("Mqtt Source instance %d Done", ctx.GetInstanceId())
-	if ms.conn != nil && ms.conn.IsConnected() {
+	if ms.conn != nil && ms.conn.IsConnected() && ms.conSel == "" {
 		ms.conn.Disconnect(5000)
 	}
+	if ms.conSel != "" {
+		ctx.ReleaseConnection(ms.conSel)
+	}
 	return nil
 }

+ 2 - 0
pkg/api/stream.go

@@ -149,6 +149,8 @@ type StreamContext interface {
 	PutState(key string, value interface{}) error
 	GetState(key string) (interface{}, error)
 	DeleteState(key string) error
+	GetConnection(connectSelector string) (interface{}, error)
+	ReleaseConnection(connectSelector string)
 }
 
 type Operator interface {

+ 27 - 5
pkg/cast/cast.go

@@ -15,8 +15,8 @@
 package cast
 
 import (
-	"encoding/json"
 	"fmt"
+	"github.com/mitchellh/mapstructure"
 	"html/template"
 	"reflect"
 	"strconv"
@@ -952,19 +952,41 @@ func ToBytesSlice(input interface{}, sn Strictness) ([][]byte, error) {
 	return result, nil
 }
 
+//MapToStruct
 /*
 *   Convert a map into a struct. The output parameter must be a pointer to a struct
 *   The struct can have the json meta data
  */
 func MapToStruct(input, output interface{}) error {
-	// convert map to json
-	jsonString, err := json.Marshal(input)
+	config := &mapstructure.DecoderConfig{
+		TagName: "json",
+		Result:  output,
+	}
+	decoder, err := mapstructure.NewDecoder(config)
+	if err != nil {
+		return err
+	}
+
+	return decoder.Decode(input)
+}
+
+// MapToStructStrict
+/*
+*   Convert a map into a struct. The output parameter must be a pointer to a struct
+*   If the input have key/value pair output do not defined, will report error
+ */
+func MapToStructStrict(input, output interface{}) error {
+	config := &mapstructure.DecoderConfig{
+		ErrorUnused: true,
+		TagName:     "json",
+		Result:      output,
+	}
+	decoder, err := mapstructure.NewDecoder(config)
 	if err != nil {
 		return err
 	}
 
-	// convert json to struct
-	return json.Unmarshal(jsonString, output)
+	return decoder.Decode(input)
 }
 
 func ConvertMap(s map[interface{}]interface{}) map[string]interface{} {

+ 306 - 0
pkg/cast/cast_test.go

@@ -90,3 +90,309 @@ func errstring(err error) string {
 	}
 	return ""
 }
+
+func TestMapToStructStrict(t *testing.T) {
+	type args struct {
+		input  interface{}
+		output interface{}
+		expect interface{}
+	}
+
+	type Result struct {
+		Foo string `json:"foo"`
+		Bar string `json:"bar"`
+	}
+
+	tests := []struct {
+		name    string
+		args    args
+		wantErr bool
+	}{
+		{
+			name: "normal parse",
+			args: args{
+				input: map[string]interface{}{
+					"foo": "foo",
+					"bar": "bar",
+				},
+				output: &Result{},
+				expect: &Result{
+					Foo: "foo",
+					Bar: "bar",
+				},
+			},
+			wantErr: false,
+		},
+		{
+			name: "input have more than keys",
+			args: args{
+				input: map[string]interface{}{
+					"foo":    "foo",
+					"bar":    "bar",
+					"foobar": "foobar",
+				},
+				output: &Result{},
+			},
+			wantErr: true,
+		},
+		{
+			name: "input have less keys",
+			args: args{
+				input: map[string]interface{}{
+					"foo": "foo",
+				},
+				output: &Result{},
+				expect: &Result{
+					Foo: "foo",
+				},
+			},
+			wantErr: false,
+		},
+		{
+			name: "input have unused keys",
+			args: args{
+				input: map[string]interface{}{
+					"foo":    "foo",
+					"foobar": "foobar",
+				},
+				output: &Result{},
+			},
+			wantErr: true,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			err := MapToStructStrict(tt.args.input, tt.args.output)
+			if (err != nil) != tt.wantErr {
+				t.Errorf("MapToStructure() error = %v, wantErr %v", err, tt.wantErr)
+			}
+			if tt.wantErr == false && !reflect.DeepEqual(tt.args.output, tt.args.expect) {
+				t.Errorf(" got = %v, want %v", tt.args.output, tt.args.expect)
+			}
+		})
+	}
+}
+
+func TestMapToStruct(t *testing.T) {
+	type args struct {
+		input  interface{}
+		output interface{}
+		expect interface{}
+	}
+
+	type Result struct {
+		Foo string `json:"foo"`
+		Bar string `json:"bar"`
+	}
+
+	tests := []struct {
+		name    string
+		args    args
+		wantErr bool
+	}{
+		{
+			name: "normal parse",
+			args: args{
+				input: map[string]interface{}{
+					"foo": "foo",
+					"bar": "bar",
+				},
+				output: &Result{},
+				expect: &Result{
+					Foo: "foo",
+					Bar: "bar",
+				},
+			},
+			wantErr: false,
+		},
+		{
+			name: "input have more than keys",
+			args: args{
+				input: map[string]interface{}{
+					"foo":    "foo",
+					"bar":    "bar",
+					"foobar": "foobar",
+				},
+				output: &Result{},
+				expect: &Result{
+					Foo: "foo",
+					Bar: "bar",
+				},
+			},
+			wantErr: false,
+		},
+		{
+			name: "input have less keys",
+			args: args{
+				input: map[string]interface{}{
+					"foo": "foo",
+				},
+				output: &Result{},
+				expect: &Result{
+					Foo: "foo",
+				},
+			},
+			wantErr: false,
+		},
+		{
+			name: "input have unused keys",
+			args: args{
+				input: map[string]interface{}{
+					"foo":    "foo",
+					"foobar": "foobar",
+				},
+				output: &Result{},
+				expect: &Result{
+					Foo: "foo",
+				},
+			},
+			wantErr: false,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if err := MapToStruct(tt.args.input, tt.args.output); (err != nil) != tt.wantErr {
+				t.Errorf("MapToStructure() error = %v, wantErr %v", err, tt.wantErr)
+			}
+		})
+	}
+}
+
+func TestMapToStructNotCaseSensitive(t *testing.T) {
+	type args struct {
+		input  interface{}
+		output interface{}
+		expect interface{}
+	}
+
+	type Result struct {
+		Foo string `json:"foo"`
+		Bar string
+	}
+
+	tests := []struct {
+		name    string
+		args    args
+		wantErr bool
+	}{
+		{
+			name: "normal parse",
+			args: args{
+				input: map[string]interface{}{
+					"foo": "foo",
+					"bar": "bar",
+				},
+				output: &Result{},
+				expect: &Result{
+					Foo: "foo",
+					Bar: "bar",
+				},
+			},
+			wantErr: false,
+		},
+		{
+			name: "not case sensitive",
+			args: args{
+				input: map[string]interface{}{
+					"FOO": "foo",
+					"BAR": "bar",
+				},
+				output: &Result{},
+				expect: &Result{
+					Foo: "foo",
+					Bar: "bar",
+				},
+			},
+			wantErr: false,
+		},
+		{
+			name: "keys must match",
+			args: args{
+				input: map[string]interface{}{
+					"foo":  "foo",
+					"BARS": "bars",
+				},
+				output: &Result{},
+				expect: &Result{
+					Foo: "foo",
+				},
+			},
+			wantErr: false,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if err := MapToStruct(tt.args.input, tt.args.output); (err != nil) != tt.wantErr {
+				t.Errorf("MapToStructure() error = %v, wantErr %v", err, tt.wantErr)
+			}
+		})
+	}
+}
+
+func TestMapToStructTag(t *testing.T) {
+	type args struct {
+		input  interface{}
+		output interface{}
+		expect interface{}
+	}
+
+	type Result struct {
+		Foo string `json:"fo"`
+		Bar string
+	}
+
+	tests := []struct {
+		name    string
+		args    args
+		wantErr bool
+	}{
+		{
+			name: "normal parse",
+			args: args{
+				input: map[string]interface{}{
+					"fo":  "foo",
+					"bar": "bar",
+				},
+				output: &Result{},
+				expect: &Result{
+					Foo: "foo",
+					Bar: "bar",
+				},
+			},
+			wantErr: false,
+		},
+		{
+			name: "key tag not match",
+			args: args{
+				input: map[string]interface{}{
+					"FOO": "foo",
+					"BAR": "bar",
+				},
+				output: &Result{},
+				expect: &Result{
+					Bar: "bar",
+				},
+			},
+			wantErr: false,
+		},
+		{
+			name: "key tag not match",
+			args: args{
+				input: map[string]interface{}{
+					"foo":  "foo",
+					"BARS": "bars",
+				},
+				output: &Result{},
+				expect: &Result{},
+			},
+			wantErr: false,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if err := MapToStruct(tt.args.input, tt.args.output); (err != nil) != tt.wantErr {
+				t.Errorf("MapToStructure() error = %v, wantErr %v", err, tt.wantErr)
+			}
+		})
+	}
+}