# MQTT 数据源 stream source scan table source MQTT(Message Queuing Telemetry Transport)是一种轻量级的通信协议,用于在物联网设备之间进行可靠的消息传递。eKuiper 内置 MQTT 连接器,方便订阅来自 MQTT 代理的消息并输入 eKuiper 处理管道,实现对指定 MQTT 主题的实时数据处理。 在 eKuiper 中,MQTT 连接器可以作为源连接器(从 MQTT 代理获取数据)或 [Sink 连接器](../../sinks/builtin/mqtt.md)(将数据发布到 MQTT 代理),本节重点介绍 MQTT 源连接器。 ## 配置 eKuiper 连接器可以通过[环境变量](../../../configuration/configuration.md#environment-variable-syntax)、[REST API](../../../api/restapi/configKey.md) 或配置文件进行配置,本节将介绍配置文件的使用方法。 MQTT 源连接器的配置文件位于:`$ekuiper/etc/mqtt_source.yaml`,其中: - default:对应全局连接配置。 - 自定义部分:适用于需要自定义连接参数的场景,该部分的配置将覆盖全局连接配置。 - 连接器重用:eKuiper 还支持通过 [`connectionSelector`](../../connector.md#connection-selector) 配置项在不同的配置中复用某个连接配置。 以下示例包括一个全局配置和自定义配置 `demo_conf`: ```yaml #全局 MQTT 配置 default: qos: 1 server: "tcp://127.0.0.1:1883" #username: user1 #password: password #certificationPath: /var/kuiper/xyz-certificate.pem #privateKeyPath: /var/kuiper/xyz-private.pem.key #rootCaPath: /var/kuiper/xyz-rootca.pem #insecureSkipVerify: true #connectionSelector: mqtt.mqtt_conf1 # 使用指定的压缩方法解压缩。现在支持`gzip`、`zstd` # decompression: "" #覆盖全局配置 demo_conf: #Conf_key qos: 0 server: "tcp://10.211.55.6:1883" ``` ## 全局配置 用户可在 `default` 部分指定全局设置。 ### 连接相关配置 - `qos`:默认订阅 QoS 级别。 - `server`:MQTT 服务器。 - `username`:MQTT 连接用户名。 - `password`:MQTT 连接密码。 - `protocolVersion`:MQTT 协议版本。可选值:3.1 (MQTT 3) 或 3.1.1 (也被称为 MQTT 4)。如未指定,则将使用缺省值:3.1。 - `clientid`:MQTT 连接的客户端 ID。如未指定,将使用 uuid。 ### 安全和认证配置 - `certificationPath`: 证书路径,示例值:`d3807d9fa5-certificate.pem`。可以是绝对路径,也可以是相对路径。如指定相对路径,那么父目录为执行 `kuiperd` 命令的路径,例如: - 如果在 `/var/kuiper` 中运行 `bin/kuiperd` ,那么父目录为 `/var/kuiper`。 - 如果运行从`/var/kuiper/bin`中运行`./kuiperd`,那么父目录为 `/var/kuiper/bin`。 - `privateKeyPath`:私钥路径,示例值:`d3807d9fa5-private.pem.key`。可以是绝对路径,也可以是相对路径,具体可参考 `certificationPath`。 - `rootCaPath`:根证书路径。可以是绝对路径,也可以是相对路径。 - `insecureSkipVerify`:是否跳过证书验证。如设置为 `true`,TLS 接受服务器提供的任何证书以及该证书中的任何主机名。注意:此时,TLS 容易受到中间人攻击。默认值:`false`。 ### 连接重用 - `connectionSelector`: 重用 MQTT 数据源连接,如下方配置示例中的 `mqtt.localConnection`。注意:连接配置文件位于 `connections/connection.yaml`。有关连接重用的详细解释,见[连接器的重用](../../connector.md#连接器的重用)。 ```yaml #全局 MQTT 连接 default: qos: 1 server: "tcp://127.0.0.1:1883" #username: user1 #password: password #certificationPath: /var/kuiper/xyz-certificate.pem #privateKeyPath: /var/kuiper/xyz-private.pem.key connectionSelector: mqtt.localConnection ``` ::: tip 指定 `connectionSelector` 参数后,所有关于连接的参数都会被忽略,例如上例中的 `server:"tcp://127.0.0.1:1883"`。 ::: ### **负载相关配置** - `decompression`:使用指定的压缩方法解压缩,支持 `gzip`、`zstd`。 - `bufferLength`:指定最大缓存消息数目。该参数主要用于防止内存溢出。实际内存用量会根据当前缓存消息数目动态变化。增大该参数不会增加初始内存分配量,因此建议设为较大的数值。默认值为102400;如果每条消息为100字节,则默认情况下,缓存最大占用内存量为102400 * 100B ~= 10MB. ### **KubeEdge 集成** - `kubeedgeVersion`:KubeEdge 版本号,不同的版本号对应的文件内容不同。 - `kubeedgeModelFile`:KubeEdge 模版文件名,文件路径为:`etc/sources`,样例格式如下: ```yaml { "deviceModels": [{ "name": "device1", "properties": [{ "name": "temperature", "dataType": "int" }, { "name": "temperature-enable", "dataType": "string" }] }] } ``` 其中, - `deviceModels.name`:设备名称,与订阅主题中的字段匹配,为第三和第四个“/”之间的内容。例如:$ke/events/device/device1/data/update,设备名称为 `device1`。 - `properties.name`:字段名称。 - `properties.dataType`:预期字段类型。 ## 自定义配置 对于需要自定义某些连接参数的场景,eKuiper 支持用户创建自定义模块来实现全局配置的重载。 **配置示例** ```yaml #覆盖全局配置 demo_conf: #Conf_key qos: 0 server: "tcp://10.211.55.6:1883" ``` 定义 `demo_conf` 配置组后,如希望在创建流时使用此配置,可通过 `CONF_KEY` 选项并指定配置名称,此时,在自定义配置中定义的参数将覆盖 `default` 配置中的相应参数。详细步骤,可参考 [流语句](../../../sqls/streams.md)。 **示例** ```sql demo ( ... ) WITH (DATASOURCE="test/", FORMAT="JSON", KEY="USERID", CONF_KEY="demo_conf"); ``` ## 创建流类型源 完成连接器的配置后,后续可通过创建流将其与 eKuiper 规则集成。MQTT 源连接器可以作为[流式](../../streams/overview.md)或[扫描表数据源](../../tables/scan.md)使用,本节将以流类型源为例进行说明。 您可通过 REST API 或 CLI 工具在 eKuiper 中创建 MQTT 数据源。 ### 通过 REST API 创建 REST API 为 eKuiper 提供了一种可编程的交互方式,适用于自动化或需要将 eKuiper 集成到其他系统中的场景。 **示例** ```sql {"sql":"create stream my_stream (id bigint, name string, score float) WITH ( datasource = \"topic/temperature\", FORMAT = \"json\", KEY = \"id\")"} ``` 详细操作步骤及命令解释,可参考 [通过 REST API 进行流管理](../../../api/restapi/streams.md)。 ### 通过 CLI 创建 用户也可以通过命令行界面(CLI)直接访问 eKuiper。 1. 进入 eKuiper `bin` 目录: ```bash cd path_to_eKuiper_directory/bin ``` 2. 使用 `create` 命令创建规则,指定 MQTT 数据源,如: ```bash bin/kuiper create stream my_stream '(id bigint, name string, score float) WITH ( datasource = "topic/temperature", FORMAT = "json", KEY = "id")' ``` 详细操作步骤及命令解释,可参考 [通过 CLI 进行流管理](../../../api/cli/streams.md)。 ## 迁移指南 从 eKuiper 1.5.0 开始,eKuiper 将 MQTT 源地址配置从 `servers` 更改为 `server`,即用户只能配置一个 MQTT 源地址而不是一个地址数组。对需要进行版本升级的用户: - 如希望通过配置文件配置,请确保 `etc/mqtt_source.yaml` 文件内的 `server` 已正确配置。 - 如希望通过环境变量配置,例如针对 `tcp://broker.emqx.io:1883` 地址,配置命令应从 `MQTT_SOURCE__DEFAULT__SERVERS=[tcp://broker.emqx.io:1883]` 改为 `MQTT_SOURCE__DEFAULT__SERVER="tcp://broker.emqx.io:1883"`。