Explorar o código

fix(mqtt): remove mqtt server array (#1143)

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>
superxan %!s(int64=3) %!d(string=hai) anos
pai
achega
c0afe82f64

+ 1 - 1
.github/workflows/run_fvt_tests.yaml

@@ -278,7 +278,7 @@ jobs:
 
         sed -i -r "s/^appVersion: .*$/appVersion: \"${version}\"/g" deploy/chart/kuiper/Chart.yaml
         sed -i -r 's/  pullPolicy: .*$/  pullPolicy: Never/g' deploy/chart/kuiper/values.yaml
-        sed -i -r "s/      servers: \[.*\]$/      servers: \[tcp:\/\/${emqx_address}:1883\]/g" deploy/chart/kuiper/values.yaml
+        sed -i -r "s/      server: .*$/      server: tcp:\/\/${emqx_address}:1883/g" deploy/chart/kuiper/values.yaml
         
         helm install kuiper deploy/chart/kuiper --debug --dry-run
         helm install kuiper deploy/chart/kuiper

+ 1 - 1
deploy/chart/kuiper/README.md

@@ -178,7 +178,7 @@ eKuiper can be deployed at k3s/k8s cluster through Helm chart. Below takes k3s a
       default:
         qos: 1
         sharedSubscription: true
-        servers: [tcp://127.0.0.1:1883]
+        server: tcp://127.0.0.1:1883
         concurrency: 1
         #username: user1
         #password: password

+ 1 - 1
deploy/chart/kuiper/README_zh.md

@@ -178,7 +178,7 @@ eKuiper 可以通过 Helm chart 部署在 k3s / k8s 集群上。下面以 k3s 
       default:
         qos: 1
         sharedSubscription: true
-        servers: [tcp://127.0.0.1:1883]
+        server: tcp://127.0.0.1:1883
         concurrency: 1
         #username: user1
         #password: password

+ 4 - 4
deploy/chart/kuiper/values.yaml

@@ -153,7 +153,7 @@ kuiperConfig:
   "connections/connection.yaml":
     mqtt:
       localConnection: #connection key
-        servers: [ tcp://127.0.0.1:1883 ]
+        server: "tcp://127.0.0.1:1883"
         username: ekuiper
         password: password
         #certificationPath: /var/kuiper/xyz-certificate.pem
@@ -163,7 +163,7 @@ kuiperConfig:
         #protocolVersion: 3
         clientid: ekuiper
       cloudConnection: #connection key
-        servers: [ "tcp://broker.emqx.io:1883" ]
+        server: "tcp://broker.emqx.io:1883"
         username: user1
         password: password
         #certificationPath: /var/kuiper/xyz-certificate.pem
@@ -204,7 +204,7 @@ kuiperConfig:
     #Global MQTT configurations
     default:
       qos: 1
-      servers: [ tcp://127.0.0.1:1883 ]
+      server: "tcp://127.0.0.1:1883"
       #username: user1
       #password: password
       #certificationPath: /var/kuiper/xyz-certificate.pem
@@ -217,7 +217,7 @@ kuiperConfig:
 
     demo_conf: #Conf_key
       qos: 0
-      servers: [ tcp://10.211.55.6:1883, tcp://127.0.0.1 ]
+      server: "tcp://10.211.55.6:1883"
   "sources/edgex.yaml":
     #Global Edgex configurations
     default:

A diferenza do arquivo foi suprimida porque é demasiado grande
+ 10 - 2
deploy/docker/README.md


+ 1 - 1
deploy/docker/test/mqtt_source.yaml

@@ -1,4 +1,4 @@
 #Global MQTT configurations
 default:
-  servers: [tcp://127.0.0.1:1883]
+  server: "tcp://127.0.0.1:1883"
   test: [0,1]

+ 2 - 2
docs/en_US/getting_started.md

@@ -104,13 +104,13 @@ We create a stream named `demo` which consumes MQTT `demo` topic as specified in
 ```sh
 $ bin/kuiper create stream demo '(temperature float, humidity bigint) WITH (FORMAT="JSON", DATASOURCE="demo")'
 ```
-The MQTT source will connect to MQTT broker at `tcp://localhost:1883`. If your MQTT broker is in another location, specify it in the `etc/mqtt_source.yaml`.  You can change the servers configuration as in below.
+The MQTT source will connect to MQTT broker at `tcp://localhost:1883`. If your MQTT broker is in another location, specify it in the `etc/mqtt_source.yaml`.  You can change the server configuration as in below.
 
 ```yaml
 default:
   qos: 1
   sharedsubscription: true
-  servers: [tcp://127.0.0.1:1883]
+  server: "tcp://127.0.0.1:1883"
 ```
 
 You can use command `kuiper show streams` to see if the `demo` stream was created or not.

+ 3 - 4
docs/en_US/operation/manager-ui/overview.md

@@ -33,7 +33,7 @@ From eKuiper version 0.9.1, whenever a new version of eKuiper is released, the c
 - Run the eKuiper container (for convenience, we will use the public MQTT server provided by [EMQ](https://www.emqx.io), and the address can be set by the `-e` option when running the container). If you want to access the eKuiper instance through the host, you can expose port 9081 by adding the `-p 9081:9081` parameter when starting the container.
 
   ```shell
-  # docker run -d --name kuiper -e MQTT_SOURCE__DEFAULT__SERVERS=[tcp://broker.emqx.io:1883] lfedge/ekuiper:1.4-slim
+  # docker run -d --name kuiper -e MQTT_SOURCE__DEFAULT__SERVER="tcp://broker.emqx.io:1883" lfedge/ekuiper:1.4-slim
   ```
   
   When the container is running, the MQTT server address can be set through the `-e` option, and the data is written to the MQTT source configuration file, which can be viewed by the following command:
@@ -43,14 +43,13 @@ From eKuiper version 0.9.1, whenever a new version of eKuiper is released, the c
   # cat etc/mqtt_source.yaml
   ```
   
-  Some output of this file is shown below, and the value of `servers` is set to `tcp://broker.emqx.io:1883`.
+  Some output of this file is shown below, and the value of `server` is set to `tcp://broker.emqx.io:1883`.
   
   ```yaml
   default:
     concurrency: 1
     qos: 1
-    servers:
-    - tcp://broker.emqx.io:1883
+    server: "tcp://broker.emqx.io:1883"
     sharedSubscription: true
   ....
   ```

+ 1 - 1
docs/en_US/quick_start_docker.md

@@ -5,7 +5,7 @@
 2. Set eKuiper source to an MQTT server. This sample uses server locating at `tcp://broker.emqx.io:1883`. `broker.emqx.io` is a public MQTT test server hosted by [EMQ](https://www.emqx.io).
 
    ```shell
-   docker run -p 9081:9081 -d --name kuiper -e MQTT_SOURCE__DEFAULT__SERVERS=[tcp://broker.emqx.io:1883] lfedge/ekuiper:$tag
+   docker run -p 9081:9081 -d --name kuiper -e MQTT_SOURCE__DEFAULT__SERVER="tcp://broker.emqx.io:1883" lfedge/ekuiper:$tag
    ```
 
 3. Create a stream - the stream is your stream data schema, similar to table definition in database. Let's say the temperature & humidity data are sent to `broker.emqx.io`, and those data will be processed in your **LOCAL RUN** eKuiper docker instance.  Below steps will create a stream named `demo`, and data are sent to `devices/device_001/messages` topic, while `device_001` could be other devices, such as `device_002`, all of those data will be subscribed and handled by `demo` stream.

+ 15 - 8
docs/en_US/rules/sources/mqtt.md

@@ -6,7 +6,7 @@ eKuiper provides built-in support for MQTT source stream, which can subscribe th
 #Global MQTT configurations
 default:
   qos: 1
-  servers: [tcp://127.0.0.1:1883]
+  server: "tcp://127.0.0.1:1883"
   #username: user1
   #password: password
   #certificationPath: /var/kuiper/xyz-certificate.pem
@@ -19,7 +19,7 @@ default:
 #Override the global configurations
 demo_conf: #Conf_key
   qos: 0
-  servers: [tcp://10.211.55.6:1883, tcp://127.0.0.1]
+  server: "tcp://10.211.55.6:1883"
 
 ```
 
@@ -31,9 +31,9 @@ Use can specify the global MQTT settings here. The configuration items specified
 
 The default subscription QoS level.
 
-### servers
+### server
 
-The server list for MQTT message broker. Currently, only `ONE` server can be specified.
+The server for MQTT message broker. 
 
 ### username
 
@@ -74,7 +74,7 @@ specify the stream to reuse the connection to mqtt broker. The connection profil
 ```yaml
 mqtt:
   localConnection: #connection key
-    servers: [tcp://127.0.0.1:1883]
+    server: "tcp://127.0.0.1:1883"
     username: ekuiper
     password: password
     #certificationPath: /var/kuiper/xyz-certificate.pem
@@ -83,7 +83,7 @@ mqtt:
     #protocolVersion: 3
     clientid: ekuiper
   cloudConnection: #connection key
-    servers: ["tcp://broker.emqx.io:1883"]
+    server: "tcp://broker.emqx.io:1883"
     username: user1
     password: password
     #certificationPath: /var/kuiper/xyz-certificate.pem
@@ -98,14 +98,14 @@ For example
 #Global MQTT configurations
 default:
   qos: 1
-  servers: [tcp://127.0.0.1:1883]
+  server: "tcp://127.0.0.1:1883"
   #username: user1
   #password: password
   #certificationPath: /var/kuiper/xyz-certificate.pem
   #privateKeyPath: /var/kuiper/xyz-private.pem.key
   connectionSelector: mqtt.localConnection
 ```
-*Note*: once specify the connectionSelector in specific configuration group , all connection related parameters will be ignored , in this case `servers: [tcp://127.0.0.1:1883]`
+*Note*: once specify the connectionSelector in specific configuration group , all connection related parameters will be ignored , in this case ``servers: [tcp://127.0.0.1:1883]``
 
 ### bufferLength
 
@@ -193,3 +193,10 @@ When create rules using the defined streams, the rules will share the same conne
 The `DATASOURCE` here will be used as mqtt subscription topics, and subscription  `Qos` defined in config section.
 So stream `demo` will subscribe to topic `test/` with Qos 0 and stream `demo2` will subscribe to topic `test2/` with Qos 0 in this example.
 But if  `DATASOURCE` is same and `qos` not, will only subscribe one time when the first rule starts.       
+
+## Migration Guide
+
+Since 1.5.0, eKuiper changes the mqtt source broker configuration from `servers` to `server` and users can only configure a mqtt broker address instead of address array.
+Users who are using mqtt broker as stream source in previous release and want to migrate to 1.5.0 release or later, need make sure the ``etc/mqtt_source.yaml`` file ``server`` 's configuration is right.
+Users who are using environment variable to configure the mqtt source address need change their ENV successfully, for example, their broker address is ``tcp://broker.emqx.io:1883``. They need change the ENV from
+``MQTT_SOURCE__DEFAULT__SERVERS=[tcp://broker.emqx.io:1883]`` to ``MQTT_SOURCE__DEFAULT__SERVER="tcp://broker.emqx.io:1883"``

+ 1 - 1
docs/en_US/sqls/streams.md

@@ -48,7 +48,7 @@ my_stream
 WITH ( datasource = "topic/temperature", FORMAT = "json", KEY = "id");
 ```
 
-The stream will subscribe to MQTT topic `topic/temperature`, the server connection uses `servers` key of `default` section in configuration file `$ekuiper/etc/mqtt_source.yaml`. 
+The stream will subscribe to MQTT topic ``topic/temperature``, the server connection uses ``server`` key of ``default`` section in configuration file ``$ekuiper/etc/mqtt_source.yaml``. 
 
 - See [MQTT source](../rules/sources/mqtt.md) for more info.
 

+ 1 - 1
docs/zh_CN/getting_started.md

@@ -106,7 +106,7 @@ MQTT 源将通过`tcp://localhost:1883`连接到 MQTT 消息服务器,如果
 default:
   qos: 1
   sharedsubscription: true
-  servers: [tcp://127.0.0.1:1883]
+  server: "tcp://127.0.0.1:1883"
 ```
 
 您可以使用`kuiper show streams` 命令来查看是否创建了 `demo` 流。

+ 3 - 4
docs/zh_CN/operation/manager-ui/overview.md

@@ -33,7 +33,7 @@
 - 运行 eKuiper 容器(为了方便,我们将使用由 [EMQ](https://www.emqx.cn) 提供的公有 MQTT 服务器,在运行容器时可通过 `-e` 选项设置地址)。如果您想通过主机访问 eKuiper 实例,可以通过在启动容器的时候加入 `-p 9081:9081` 参数来暴露 9081 端口。
 
   ```shell
-  # docker run -d --name kuiper -e MQTT_SOURCE__DEFAULT__SERVERS=[tcp://broker.emqx.io:1883] lfedge/ekuiper:1.4-slim
+  # docker run -d --name kuiper -e MQTT_SOURCE__DEFAULT__SERVER="tcp://broker.emqx.io:1883" lfedge/ekuiper:1.4-slim
   ```
   
   在运行容器时通过 `-e` 选项设置了 MQTT 服务器地址,数据写到了 MQTT 源配置文件中,通过以下命令可以查看:
@@ -43,14 +43,13 @@
   # cat etc/mqtt_source.yaml
   ```
   
-  该文件的部分输出如下所示,`servers` 的值被设置为 `tcp://broker.emqx.io:1883`。
+  该文件的部分输出如下所示,`server` 的值被设置为 `tcp://broker.emqx.io:1883`。
   
   ```yaml
   default:
     concurrency: 1
     qos: 1
-    servers:
-    - tcp://broker.emqx.io:1883
+    server: tcp://broker.emqx.io:1883
     sharedSubscription: true
   ....
   ```

+ 1 - 1
docs/zh_CN/quick_start_docker.md

@@ -5,7 +5,7 @@
 2. 设置 eKuiper 源为一个 MQTT 服务器。本例使用位于 `tcp://broker.emqx.io:1883` 的 MQTT 服务器, `broker.emqx.io` 是一个由 [EMQ](https://www.emqx.cn) 提供的公有 MQTT 服务器。
 
    ```shell
-   docker run -p 9081:9081 -d --name kuiper -e MQTT_SOURCE__DEFAULT__SERVERS=[tcp://broker.emqx.io:1883] lfedge/ekuiper:$tag
+   docker run -p 9081:9081 -d --name kuiper -e MQTT_SOURCE__DEFAULT__SERVER="tcp://broker.emqx.io:1883" lfedge/ekuiper:$tag
    ```
 
 3. 创建流(stream)- 流式数据的结构定义,类似于数据库中的表格类型定义。比如说要发送温度与湿度的数据到 `broker.emqx.io`,这些数据将会被在**本地运行的** eKuiper docker 实例中处理。以下的步骤将创建一个名字为 `demo` 的流,并且数据将会被发送至 `devices/device_001/messages` 主题,这里的 `device_001` 可以是别的设备,比如 `device_002`,所有的这些数据会被 `demo` 流订阅并处理。

+ 16 - 9
docs/zh_CN/rules/sources/mqtt.md

@@ -6,7 +6,7 @@ eKuiper 为 MQTT 源流提供了内置支持,流可以订阅来自 MQTT 代理
 #全局MQTT配置
 default:
   qos: 1
-  servers: [tcp://127.0.0.1:1883]
+  server: "tcp://127.0.0.1:1883"
   #username: user1
   #password: password
   #certificationPath: /var/kuiper/xyz-certificate.pem
@@ -18,7 +18,7 @@ default:
 #重载全局配置
 demo: #Conf_key
   qos: 0
-  servers: [tcp://10.211.55.6:1883]
+  server: "tcp://10.211.55.6:1883"
 ```
 
 ## 全局 MQTT 配置
@@ -29,9 +29,9 @@ demo: #Conf_key
 
 默认订阅QoS级别。
 
-### servers
+### server
 
-MQTT 消息代理的服务器列表。 当前,只能指定一个服务器
+MQTT 消息代理的服务器。
 
 ### username
 
@@ -71,7 +71,7 @@ MQTT 连接的客户端 ID。 如果未指定,将使用一个 uuid。
 ```yaml
 mqtt:
   localConnection: #connection key
-    servers: [tcp://127.0.0.1:1883]
+    server: "tcp://127.0.0.1:1883"
     username: ekuiper
     password: password
     #certificationPath: /var/kuiper/xyz-certificate.pem
@@ -80,7 +80,7 @@ mqtt:
     #protocolVersion: 3
     clientid: ekuiper
   cloudConnection: #connection key
-    servers: ["tcp://broker.emqx.io:1883"]
+    server: "tcp://broker.emqx.io:1883"
     username: user1
     password: password
     #certificationPath: /var/kuiper/xyz-certificate.pem
@@ -93,14 +93,14 @@ mqtt:
 #Global MQTT configurations
 default:
   qos: 1
-  servers: [tcp://127.0.0.1:1883]
+  server: "tcp://127.0.0.1:1883"
   #username: user1
   #password: password
   #certificationPath: /var/kuiper/xyz-certificate.pem
   #privateKeyPath: /var/kuiper/xyz-private.pem.key
   connectionSelector: mqtt.localConnection
 ```
-*注意*: 相应配置组一旦指定 connectionSelector 参数,所有关于连接的参数都会被忽略. 上面例子中,` servers: [tcp://127.0.0.1:1883]` 会被忽略。
+*注意*: 相应配置组一旦指定 connectionSelector 参数,所有关于连接的参数都会被忽略. 上面例子中,`` server: "tcp://127.0.0.1:1883"`` 会被忽略。
 
 ### bufferLength
 
@@ -187,4 +187,11 @@ demo2 (
 当相应的规则分别引用以上数据流时,规则之间的源部分将共享连接。
 在这里 `DATASOURCE` 对应 mqtt 订阅的 topic, 配置项中的 `qos` 将用作订阅时的 `Qos`.
 在这个例子中,`demo` 以 Qos 0 订阅 topic `test/`,`demo2` 以 Qos 0 订阅 topic `test2/`
-值得注意的是如果两个规则订阅的 `topic` 完全一样而 `Qos` 不同,那么只会订阅一次并以首先启动的规则订阅为准。
+值得注意的是如果两个规则订阅的 `topic` 完全一样而 `Qos` 不同,那么只会订阅一次并以首先启动的规则订阅为准。
+
+## 迁移指南
+
+从 1.5.0 开始,eKuiper 将 mqtt 源地址配置从 `servers` 更改为 `server`,用户只能配置一个 mqtt 源地址而不是一个地址数组。
+使用之前版本并把 mqtt broker 作为数据源的用户,想要迁移到 1.5.0 或更高版本,需要确保 ``etc/mqtt_source.yaml`` 文件 ``server`` 的配置是正确的。
+使用环境变量配置 mqtt 源地址的用户需要成功更改配置,假设其地址为 ``tcp://broker.emqx.io:1883``。他们需要将环境变量 从
+``MQTT_SOURCE__DEFAULT__SERVERS=[tcp://broker.emqx.io:1883]`` 改为 ``MQTT_SOURCE__DEFAULT__SERVER="tcp://broker.emqx.io:1883"``

+ 1 - 1
docs/zh_CN/sqls/streams.md

@@ -50,7 +50,7 @@ my_stream
 WITH ( datasource = "topic/temperature", FORMAT = "json", KEY = "id");
 ```
 
-该流将订阅 MQTT 主题`topic/temperature`,服务器连接使用配置文件`$ekuiper/etc/mqtt_source.yaml` 中默认部分的 servers 键。
+该流将订阅 MQTT 主题`topic/temperature`,服务器连接使用配置文件`$ekuiper/etc/mqtt_source.yaml` 中默认部分的 server 键。
 
 - 有关更多信息,请参见 [MQTT source](../rules/sources/mqtt.md) 
 

+ 2 - 2
etc/connections/connection.yaml

@@ -1,6 +1,6 @@
 mqtt:
   localConnection: #connection key
-    servers: [tcp://127.0.0.1:1883]
+    server: "tcp://127.0.0.1:1883"
     username: ekuiper
     password: password
     #certificationPath: /var/kuiper/xyz-certificate.pem
@@ -9,7 +9,7 @@ mqtt:
     #insecureSkipVerify: false
     #protocolVersion: 3
   cloudConnection: #connection key
-    servers: ["tcp://broker.emqx.io:1883"]
+    server: "tcp://broker.emqx.io:1883"
     username: user1
     password: password
     #certificationPath: /var/kuiper/xyz-certificate.pem

+ 8 - 8
etc/mqtt_source.json

@@ -34,19 +34,19 @@
 				"zh_CN": "复用连接信息"
 			}
 		}, {
-			"name": "servers",
-			"default": ["tcp://127.0.0.1:1883"],
+			"name": "server",
+			"default": "tcp://127.0.0.1:1883",
 			"optional": true,
 			"connection_related": true,
-			"control": "list",
-			"type": "list_string",
+			"control": "text",
+			"type": "string",
 			"hint": {
-				"en_US": "The server list for MQTT message broker. Currently, only ONE server can be specified.",
-				"zh_CN": "MQTT 消息代理的服务器列表。 当前,只能指定一个服务器。"
+				"en_US": "The server for MQTT message broker.",
+				"zh_CN": "MQTT 消息代理的服务器。"
 			},
 			"label": {
-				"en_US": "Server list",
-				"zh_CN": "服务器列表"
+				"en_US": "Server Address",
+				"zh_CN": "服务器地址"
 			}
 		}, {
 			"name": "username",

+ 2 - 2
etc/mqtt_source.yaml

@@ -1,7 +1,7 @@
 #Global MQTT configurations
 default:
   qos: 1
-  servers: [tcp://127.0.0.1:1883]
+  server: "tcp://127.0.0.1:1883"
   #username: user1
   #password: password
   #certificationPath: /var/kuiper/xyz-certificate.pem
@@ -14,4 +14,4 @@ default:
 
 demo_conf: #Conf_key
   qos: 0
-  servers: [tcp://10.211.55.6:1883, tcp://127.0.0.1]
+  server: "tcp://10.211.55.6:1883"

+ 1 - 1
internal/topo/connection/clients/mqtt/client_mqtt_test.go

@@ -51,7 +51,7 @@ func TestMQTTClient_CfgValidate(t *testing.T) {
 			name: "config server addr key error",
 			args: args{
 				props: map[string]interface{}{
-					"servers": []string{"tcp:127.0.0.1"},
+					"server": "tcp:127.0.0.1",
 				},
 			},
 			wantErr: false,

+ 10 - 13
internal/topo/connection/clients/mqtt/mqtt.go

@@ -27,16 +27,15 @@ import (
 )
 
 type MQTTConnectionConfig struct {
-	Servers            []string `json:"servers"`
-	Server             string   `json:"server"`
-	PVersion           string   `json:"protocolVersion"`
-	ClientId           string   `json:"clientid"`
-	Uname              string   `json:"username"`
-	Password           string   `json:"password"`
-	Certification      string   `json:"certificationPath"`
-	PrivateKPath       string   `json:"privateKeyPath"`
-	RootCaPath         string   `json:"rootCaPath"`
-	InsecureSkipVerify bool     `json:"insecureSkipVerify"`
+	Server             string `json:"server"`
+	PVersion           string `json:"protocolVersion"`
+	ClientId           string `json:"clientid"`
+	Uname              string `json:"username"`
+	Password           string `json:"password"`
+	Certification      string `json:"certificationPath"`
+	PrivateKPath       string `json:"privateKeyPath"`
+	RootCaPath         string `json:"rootCaPath"`
+	InsecureSkipVerify bool   `json:"insecureSkipVerify"`
 }
 
 type MQTTClient struct {
@@ -59,9 +58,7 @@ func (ms *MQTTClient) CfgValidate(props map[string]interface{}) error {
 		return fmt.Errorf("failed to get config, the error is %s", err)
 	}
 
-	if srv := cfg.Servers; len(srv) != 0 {
-		ms.srv = srv[0]
-	} else if cfg.Server != "" {
+	if cfg.Server != "" {
 		ms.srv = cfg.Server
 	} else {
 		return fmt.Errorf("missing server property")