|
@@ -1,12 +1,26 @@
|
|
-# MQTT源
|
|
|
|
|
|
+# MQTT 数据源
|
|
|
|
|
|
<span style="background:green;color:white;">stream source</span>
|
|
<span style="background:green;color:white;">stream source</span>
|
|
<span style="background:green;color:white">scan table source</span>
|
|
<span style="background:green;color:white">scan table source</span>
|
|
|
|
|
|
-eKuiper 为 MQTT 源流提供了内置支持,流可以订阅来自 MQTT 代理的消息并输入 eKuiper 处理管道。 MQTT 源的配置文件位于 `$ekuiper/etc/mqtt_source.yaml`。 以下是文件格式。
|
|
|
|
|
|
+MQTT(Message Queuing Telemetry Transport)是一种轻量级的通信协议,用于在物联网设备之间进行可靠的消息传递。eKuiper 内置 MQTT 连接器,方便订阅来自 MQTT 代理的消息并输入 eKuiper 处理管道,实现对指定 MQTT 主题的实时数据处理。
|
|
|
|
+
|
|
|
|
+在 eKuiper 中,MQTT 连接器可以作为源连接器(从 MQTT 代理获取数据)或 [Sink 连接器](../../sinks/builtin/mqtt.md)(将数据发布到 MQTT 代理),本节重点介绍 MQTT 源连接器。
|
|
|
|
+
|
|
|
|
+## 配置
|
|
|
|
+
|
|
|
|
+eKuiper 连接器可以通过[环境变量](../../../configuration/configuration.md#environment-variable-syntax)、[REST API](../../../api/restapi/configKey.md) 或配置文件进行配置,本节将介绍配置文件的使用方法。
|
|
|
|
+
|
|
|
|
+MQTT 源连接器的配置文件位于:`$ekuiper/etc/mqtt_source.yaml`,其中:
|
|
|
|
+
|
|
|
|
+- default:对应全局连接配置。
|
|
|
|
+- 自定义部分:适用于需要自定义连接参数的场景,该部分的配置将覆盖全局连接配置。
|
|
|
|
+- 连接器重用:eKuiper 还支持通过 [`connectionSelector`](../../connector.md#connection-selector) 配置项在不同的配置中复用某个连接配置。
|
|
|
|
+
|
|
|
|
+以下示例包括一个全局配置和自定义配置 `demo_conf`:
|
|
|
|
|
|
```yaml
|
|
```yaml
|
|
-#全局MQTT配置
|
|
|
|
|
|
+#全局 MQTT 配置
|
|
default:
|
|
default:
|
|
qos: 1
|
|
qos: 1
|
|
server: "tcp://127.0.0.1:1883"
|
|
server: "tcp://127.0.0.1:1883"
|
|
@@ -14,199 +28,157 @@ default:
|
|
#password: password
|
|
#password: password
|
|
#certificationPath: /var/kuiper/xyz-certificate.pem
|
|
#certificationPath: /var/kuiper/xyz-certificate.pem
|
|
#privateKeyPath: /var/kuiper/xyz-private.pem.key
|
|
#privateKeyPath: /var/kuiper/xyz-private.pem.key
|
|
- kubeedgeVersion: "1.0"
|
|
|
|
- kubeedgeModelFile: "mqtt_model.json"
|
|
|
|
- # 使用指定的压缩方法解压缩。现在支持`gzip`、`zstd` 。
|
|
|
|
|
|
+ #rootCaPath: /var/kuiper/xyz-rootca.pem
|
|
|
|
+ #insecureSkipVerify: true
|
|
|
|
+ #connectionSelector: mqtt.mqtt_conf1
|
|
|
|
+ # 使用指定的压缩方法解压缩。现在支持`gzip`、`zstd`
|
|
# decompression: ""
|
|
# decompression: ""
|
|
|
|
|
|
|
|
|
|
-#重载全局配置
|
|
|
|
-demo: #Conf_key
|
|
|
|
|
|
+#覆盖全局配置
|
|
|
|
+demo_conf: #Conf_key
|
|
qos: 0
|
|
qos: 0
|
|
server: "tcp://10.211.55.6:1883"
|
|
server: "tcp://10.211.55.6:1883"
|
|
```
|
|
```
|
|
|
|
|
|
-## 全局 MQTT 配置
|
|
|
|
|
|
+## 全局配置
|
|
|
|
|
|
-用户可以在此处指定全局 MQTT 设置。`default` 部分中指定的配置项将用作所有MQTT 连接的默认设置。
|
|
|
|
|
|
+用户可在 `default` 部分指定全局设置。
|
|
|
|
|
|
-### qos
|
|
|
|
|
|
+### 连接相关配置
|
|
|
|
|
|
-默认订阅QoS级别。
|
|
|
|
|
|
+- `qos`:默认订阅 QoS 级别。
|
|
|
|
+- `server`:MQTT 服务器。
|
|
|
|
+- `username`:MQTT 连接用户名。
|
|
|
|
+- `password`:MQTT 连接密码。
|
|
|
|
+- `protocolVersion`:MQTT 协议版本。可选值:3.1 (MQTT 3) 或 3.1.1 (也被称为 MQTT 4)。如未指定,则将使用缺省值:3.1。
|
|
|
|
+- `clientid`:MQTT 连接的客户端 ID。如未指定,将使用 uuid。
|
|
|
|
|
|
-### server
|
|
|
|
|
|
+### 安全和认证配置
|
|
|
|
|
|
-MQTT 消息代理的服务器。
|
|
|
|
|
|
+- `certificationPath`: 证书路径,示例值:`d3807d9fa5-certificate.pem`。可以是绝对路径,也可以是相对路径。如指定相对路径,那么父目录为执行 `kuiperd` 命令的路径,例如:
|
|
|
|
+ - 如果在 `/var/kuiper` 中运行 `bin/kuiperd` ,那么父目录为 `/var/kuiper`。
|
|
|
|
+ - 如果运行从`/var/kuiper/bin`中运行`./kuiperd`,那么父目录为 `/var/kuiper/bin`。
|
|
|
|
+- `privateKeyPath`:私钥路径,示例值:`d3807d9fa5-private.pem.key`。可以是绝对路径,也可以是相对路径,具体可参考 `certificationPath`。
|
|
|
|
+- `rootCaPath`:根证书路径。可以是绝对路径,也可以是相对路径。
|
|
|
|
+- `insecureSkipVerify`:是否跳过证书验证。如设置为 `true`,TLS 接受服务器提供的任何证书以及该证书中的任何主机名。注意:此时,TLS 容易受到中间人攻击。默认值:`false`。
|
|
|
|
|
|
-### username
|
|
|
|
|
|
+### 连接重用
|
|
|
|
|
|
-MQTT 连接用户名。
|
|
|
|
|
|
+- `connectionSelector`: 重用 MQTT 数据源连接,如下方配置示例中的 `mqtt.localConnection`。注意:连接配置文件位于 `connections/connection.yaml`。有关连接重用的详细解释,见[连接器的重用](../../connector.md#连接器的重用)。
|
|
|
|
|
|
-### password
|
|
|
|
-
|
|
|
|
-MQTT 连接密码。
|
|
|
|
-
|
|
|
|
-### decompression
|
|
|
|
-
|
|
|
|
-使用指定的压缩方法解压缩。现在支持 `gzip`、`zstd`。
|
|
|
|
-
|
|
|
|
-### protocolVersion
|
|
|
|
-
|
|
|
|
-MQTT 协议版本。3.1 (也被称为 MQTT 3) 或者 3.1.1 (也被称为 MQTT 4)。 如果未指定,缺省值为 3.1。
|
|
|
|
|
|
+ ```yaml
|
|
|
|
+ #全局 MQTT 连接
|
|
|
|
+ default:
|
|
|
|
+ qos: 1
|
|
|
|
+ server: "tcp://127.0.0.1:1883"
|
|
|
|
+ #username: user1
|
|
|
|
+ #password: password
|
|
|
|
+ #certificationPath: /var/kuiper/xyz-certificate.pem
|
|
|
|
+ #privateKeyPath: /var/kuiper/xyz-private.pem.key
|
|
|
|
+ connectionSelector: mqtt.localConnection
|
|
|
|
+ ```
|
|
|
|
|
|
-### clientid
|
|
|
|
|
|
+ ::: tip
|
|
|
|
|
|
-MQTT 连接的客户端 ID。 如果未指定,将使用一个 uuid。
|
|
|
|
|
|
+ 指定 `connectionSelector` 参数后,所有关于连接的参数都会被忽略,例如上例中的 `server:"tcp://127.0.0.1:1883"`。
|
|
|
|
|
|
-### certificationPath
|
|
|
|
|
|
+ :::
|
|
|
|
|
|
-证书路径。可以为绝对路径,也可以为相对路径。如果指定的是相对路径,那么父目录为执行 `kuiperd` 命令的路径。比如,如果你在 `/var/kuiper` 中运行 `bin/kuiperd` ,那么父目录为 `/var/kuiper`; 如果运行从`/var/kuiper/bin`中运行`./kuiperd`,那么父目录为 `/var/kuiper/bin`。 比如 `d3807d9fa5-certificate.pem`。
|
|
|
|
|
|
+### **负载相关配置**
|
|
|
|
|
|
-### privateKeyPath
|
|
|
|
|
|
+- `decompression`:使用指定的压缩方法解压缩,支持 `gzip`、`zstd`。
|
|
|
|
+- `bufferLength`:指定最大缓存消息数目。该参数主要用于防止内存溢出。实际内存用量会根据当前缓存消息数目动态变化。增大该参数不会增加初始内存分配量,因此建议设为较大的数值。默认值为102400;如果每条消息为100字节,则默认情况下,缓存最大占用内存量为102400 * 100B ~= 10MB.
|
|
|
|
|
|
-私钥路径。可以为绝对路径,也可以为相对路径。更详细的信息,请参考 `certificationPath`,比如 `d3807d9fa5-private.pem.key`。
|
|
|
|
|
|
+### **KubeEdge 集成**
|
|
|
|
|
|
-### rootCaPath
|
|
|
|
|
|
+- `kubeedgeVersion`:KubeEdge 版本号,不同的版本号对应的文件内容不同。
|
|
|
|
|
|
-根证书路径。可以为绝对路径,也可以为相对路径。
|
|
|
|
|
|
+- `kubeedgeModelFile`:KubeEdge 模版文件名,文件路径为:`etc/sources`,样例格式如下:
|
|
|
|
|
|
-### insecureSkipVerify
|
|
|
|
|
|
+ ```yaml
|
|
|
|
+ {
|
|
|
|
+ "deviceModels": [{
|
|
|
|
+ "name": "device1",
|
|
|
|
+ "properties": [{
|
|
|
|
+ "name": "temperature",
|
|
|
|
+ "dataType": "int"
|
|
|
|
+ }, {
|
|
|
|
+ "name": "temperature-enable",
|
|
|
|
+ "dataType": "string"
|
|
|
|
+ }]
|
|
|
|
+ }]
|
|
|
|
+ }
|
|
|
|
+ ```
|
|
|
|
|
|
-如果 InsecureSkipVerify 设置为 true,TLS 接受服务器提供的任何证书以及该证书中的任何主机名。 在这种模式下,TLS容易受到中间人攻击。默认值为false。配置项只能用于 TLS 连接
|
|
|
|
|
|
+ 其中,
|
|
|
|
|
|
-### connectionSelector
|
|
|
|
|
|
+ - `deviceModels.name`:设备名称,与订阅主题中的字段匹配,为第三和第四个“/”之间的内容。例如:$ke/events/device/device1/data/update,设备名称为 `device1`。
|
|
|
|
+ - `properties.name`:字段名称。
|
|
|
|
+ - `properties.dataType`:预期字段类型。
|
|
|
|
|
|
-重用 MQTT 源连接。连接配置信息位于 `connections/connection.yaml`.
|
|
|
|
|
|
+## 自定义配置
|
|
|
|
|
|
-```yaml
|
|
|
|
-mqtt:
|
|
|
|
- localConnection: #connection key
|
|
|
|
- server: "tcp://127.0.0.1:1883"
|
|
|
|
- username: ekuiper
|
|
|
|
- password: password
|
|
|
|
- #certificationPath: /var/kuiper/xyz-certificate.pem
|
|
|
|
- #privateKeyPath: /var/kuiper/xyz-private.pem.ke
|
|
|
|
- #insecureSkipVerify: false
|
|
|
|
- #protocolVersion: 3
|
|
|
|
- clientid: ekuiper
|
|
|
|
- cloudConnection: #connection key
|
|
|
|
- server: "tcp://broker.emqx.io:1883"
|
|
|
|
- username: user1
|
|
|
|
- password: password
|
|
|
|
- #certificationPath: /var/kuiper/xyz-certificate.pem
|
|
|
|
- #privateKeyPath: /var/kuiper/xyz-private.pem.ke
|
|
|
|
- #insecureSkipVerify: false
|
|
|
|
- #protocolVersion: 3
|
|
|
|
-```
|
|
|
|
|
|
+对于需要自定义某些连接参数的场景,eKuiper 支持用户创建自定义模块来实现全局配置的重载。
|
|
|
|
|
|
-对于 MQTT 连接,这里有两个配置组。用户应该使用 `mqtt.localConnection` 或者 `mqtt.cloudConnection` 来作为参数。举例如下:
|
|
|
|
|
|
+**配置示例**
|
|
|
|
|
|
```yaml
|
|
```yaml
|
|
-#Global MQTT configurations
|
|
|
|
-default:
|
|
|
|
- qos: 1
|
|
|
|
- server: "tcp://127.0.0.1:1883"
|
|
|
|
- #username: user1
|
|
|
|
- #password: password
|
|
|
|
- #certificationPath: /var/kuiper/xyz-certificate.pem
|
|
|
|
- #privateKeyPath: /var/kuiper/xyz-private.pem.key
|
|
|
|
- connectionSelector: mqtt.localConnection
|
|
|
|
|
|
+#覆盖全局配置
|
|
|
|
+demo_conf: #Conf_key
|
|
|
|
+ qos: 0
|
|
|
|
+ server: "tcp://10.211.55.6:1883"
|
|
```
|
|
```
|
|
|
|
|
|
-*注意*: 相应配置组一旦指定 connectionSelector 参数,所有关于连接的参数都会被忽略. 上面例子中,``server: "tcp://127.0.0.1:1883"`` 会被忽略。
|
|
|
|
-
|
|
|
|
-### bufferLength
|
|
|
|
-
|
|
|
|
-指定最大缓存消息数目。该参数主要用于防止内存溢出。实际内存用量会根据当前缓存消息数目动态变化。增大该参数不会增加初始内存分配量,因此设置较大的数值是安全的。该参数默认值为102400;如果每条消息为100字节,则默认情况下,缓存最大占用内存量为102400 * 100B ~= 10MB.
|
|
|
|
-
|
|
|
|
-### kubeedgeVersion
|
|
|
|
-
|
|
|
|
-kubeedge 版本号,不同的版本号对应的文件内容不同。
|
|
|
|
-
|
|
|
|
-### kubeedgeModelFile
|
|
|
|
|
|
+定义 `demo_conf` 配置组后,如希望在创建流时使用此配置,可通过 `CONF_KEY` 选项并指定配置名称,此时,在自定义配置中定义的参数将覆盖 `default` 配置中的相应参数。详细步骤,可参考 [流语句](../../../sqls/streams.md)。
|
|
|
|
|
|
-kubeedge 模版文件名,文件指定放在 etc/sources 文件夹中,样例格式如下:
|
|
|
|
|
|
+**示例**
|
|
|
|
|
|
-```json
|
|
|
|
-{
|
|
|
|
- "deviceModels": [{
|
|
|
|
- "name": "device1",
|
|
|
|
- "properties": [{
|
|
|
|
- "name": "temperature",
|
|
|
|
- "dataType": "int"
|
|
|
|
- }, {
|
|
|
|
- "name": "temperature-enable",
|
|
|
|
- "dataType": "string"
|
|
|
|
- }]
|
|
|
|
- }]
|
|
|
|
-}
|
|
|
|
|
|
+```sql
|
|
|
|
+demo (
|
|
|
|
+ ...
|
|
|
|
+ ) WITH (DATASOURCE="test/", FORMAT="JSON", KEY="USERID", CONF_KEY="demo_conf");
|
|
```
|
|
```
|
|
|
|
|
|
-#### deviceModels.name
|
|
|
|
-
|
|
|
|
-设备名称,与订阅主题中的字段匹配,位于第三和第四个“/”之间的内容。例如:$ke/events/device/device1/data/update
|
|
|
|
-
|
|
|
|
-#### properties.name
|
|
|
|
-
|
|
|
|
-字段名称
|
|
|
|
|
|
+## 创建流类型源
|
|
|
|
|
|
-#### properties.dataType
|
|
|
|
|
|
+完成连接器的配置后,后续可通过创建流将其与 eKuiper 规则集成。MQTT 源连接器可以作为[流式](../../streams/overview.md)或[扫描表数据源](../../tables/scan.md)使用,本节将以流类型源为例进行说明。
|
|
|
|
|
|
-期望的字段类型
|
|
|
|
|
|
+您可通过 REST API 或 CLI 工具在 eKuiper 中创建 MQTT 数据源。
|
|
|
|
|
|
-## 重载默认设置
|
|
|
|
|
|
+### 通过 REST API 创建
|
|
|
|
|
|
-如果您有一个特定连接需要重载默认设置,则可以创建一个自定义模块。 在上一个示例中,我们创建一个名为 `demo` 的特定设置。 然后,您可以在创建流定义时使用选项 `CONF_KEY` 指定配置(有关更多信息,请参见 [stream specs](../../../sqls/streams.md) )。
|
|
|
|
|
|
+REST API 为 eKuiper 提供了一种可编程的交互方式,适用于自动化或需要将 eKuiper 集成到其他系统中的场景。
|
|
|
|
|
|
**示例**
|
|
**示例**
|
|
|
|
|
|
-```text
|
|
|
|
-demo (
|
|
|
|
- ...
|
|
|
|
- ) WITH (DATASOURCE="test/", FORMAT="JSON", KEY="USERID", CONF_KEY="demo");
|
|
|
|
|
|
+```sql
|
|
|
|
+{"sql":"create stream my_stream (id bigint, name string, score float) WITH ( datasource = \"topic/temperature\", FORMAT = \"json\", KEY = \"id\")"}
|
|
```
|
|
```
|
|
|
|
|
|
-这些特定设置使用的配置键与 `default` 设置中的配置键相同,在特定设置中指定的任何值都将覆盖 `default` 部分中的值。
|
|
|
|
-
|
|
|
|
-## 在多个配置块中引用同一个 connectionSelector
|
|
|
|
-
|
|
|
|
-如下所示,用户创建了两个配置项
|
|
|
|
|
|
+详细操作步骤及命令解释,可参考 [通过 REST API 进行流管理](../../../api/restapi/streams.md)。
|
|
|
|
|
|
-```yaml
|
|
|
|
-#Override the global configurations
|
|
|
|
-demo_conf: #Conf_key
|
|
|
|
- qos: 0
|
|
|
|
- connectionSelector: mqtt.localConnection
|
|
|
|
- servers: [tcp://10.211.55.6:1883, tcp://127.0.0.1]
|
|
|
|
|
|
+### 通过 CLI 创建
|
|
|
|
|
|
-#Override the global configurations
|
|
|
|
-demo2_conf: #Conf_key
|
|
|
|
- qos: 0
|
|
|
|
- connentionSelector: mqtt.localConnection
|
|
|
|
- servers: [tcp://10.211.55.6:1883, tcp://127.0.0.1]
|
|
|
|
-```
|
|
|
|
|
|
+用户也可以通过命令行界面(CLI)直接访问 eKuiper。
|
|
|
|
|
|
-基于以上配置,创建了两个数据流
|
|
|
|
|
|
+1. 进入 eKuiper `bin` 目录:
|
|
|
|
|
|
-```text
|
|
|
|
-demo (
|
|
|
|
- ...
|
|
|
|
- ) WITH (DATASOURCE="test/", FORMAT="JSON", CONF_KEY="demo_conf");
|
|
|
|
|
|
+ ```bash
|
|
|
|
+ cd path_to_eKuiper_directory/bin
|
|
|
|
+ ```
|
|
|
|
|
|
-demo2 (
|
|
|
|
- ...
|
|
|
|
- ) WITH (DATASOURCE="test2/", FORMAT="JSON", CONF_KEY="demo2_conf");
|
|
|
|
|
|
+2. 使用 `create` 命令创建规则,指定 MQTT 数据源,如:
|
|
|
|
|
|
-```
|
|
|
|
|
|
+ ```bash
|
|
|
|
+ bin/kuiper create stream my_stream '(id bigint, name string, score float) WITH ( datasource = "topic/temperature", FORMAT = "json", KEY = "id")'
|
|
|
|
+ ```
|
|
|
|
|
|
-当相应的规则分别引用以上数据流时,规则之间的源部分将共享连接。
|
|
|
|
-在这里 `DATASOURCE` 对应 mqtt 订阅的 topic,配置项中的 `qos` 将用作订阅时的 `Qos`.
|
|
|
|
-在这个例子中,`demo` 以 Qos 0 订阅 topic `test/`,`demo2` 以 Qos 0 订阅 topic `test2/`
|
|
|
|
-值得注意的是如果两个规则订阅的 `topic` 完全一样而 `Qos` 不同,那么只会订阅一次并以首先启动的规则订阅为准。
|
|
|
|
|
|
+详细操作步骤及命令解释,可参考 [通过 CLI 进行流管理](../../../api/cli/streams.md)。
|
|
|
|
|
|
## 迁移指南
|
|
## 迁移指南
|
|
|
|
|
|
-从 1.5.0 开始,eKuiper 将 mqtt 源地址配置从 `servers` 更改为 `server`,用户只能配置一个 mqtt 源地址而不是一个地址数组。
|
|
|
|
-使用之前版本并把 mqtt broker 作为数据源的用户,想要迁移到 1.5.0 或更高版本,需要确保 ``etc/mqtt_source.yaml`` 文件 ``server`` 的配置是正确的。
|
|
|
|
-使用环境变量配置 mqtt 源地址的用户需要成功更改配置,假设其地址为 ``tcp://broker.emqx.io:1883``。他们需要将环境变量 从
|
|
|
|
-``MQTT_SOURCE__DEFAULT__SERVERS=[tcp://broker.emqx.io:1883]`` 改为 ``MQTT_SOURCE__DEFAULT__SERVER="tcp://broker.emqx.io:1883"``
|
|
|
|
|
|
+从 eKuiper 1.5.0 开始,eKuiper 将 MQTT 源地址配置从 `servers` 更改为 `server`,即用户只能配置一个 MQTT 源地址而不是一个地址数组。对需要进行版本升级的用户:
|
|
|
|
+
|
|
|
|
+- 如希望通过配置文件配置,请确保 `etc/mqtt_source.yaml` 文件内的 `server` 已正确配置。
|
|
|
|
+- 如希望通过环境变量配置,例如针对 `tcp://broker.emqx.io:1883` 地址,配置命令应从 `MQTT_SOURCE__DEFAULT__SERVERS=[tcp://broker.emqx.io:1883]` 改为 `MQTT_SOURCE__DEFAULT__SERVER="tcp://broker.emqx.io:1883"`。
|