# MQTT源 eKuiper 为 MQTT 源流提供了内置支持,流可以订阅来自 MQTT 代理的消息并输入eKuiper 处理管道。 MQTT 源的配置文件位于 `$ekuiper/etc/mqtt_source.yaml`。 以下是文件格式。 ```yaml #全局MQTT配置 default: qos: 1 servers: [tcp://127.0.0.1:1883] #username: user1 #password: password #certificationPath: /var/kuiper/xyz-certificate.pem #privateKeyPath: /var/kuiper/xyz-private.pem.key kubeedgeVersion: "1.0" kubeedgeModelFile: "mqtt_model.json" #重载全局配置 demo: #Conf_key qos: 0 servers: [tcp://10.211.55.6:1883] ``` ## 全局 MQTT 配置 用户可以在此处指定全局 MQTT 设置。`default` 部分中指定的配置项将用作所有MQTT 连接的默认设置。 ### qos 默认订阅QoS级别。 ### servers MQTT 消息代理的服务器列表。 当前,只能指定一个服务器。 ### username MQTT 连接用户名。 ### password MQTT 连接密码。 ### protocolVersion MQTT 协议版本。3.1 (也被称为 MQTT 3) 或者 3.1.1 (也被称为 MQTT 4)。 如果未指定,缺省值为 3.1。 ### clientid MQTT 连接的客户端 ID。 如果未指定,将使用一个 uuid。 ### certificationPath 证书路径。可以为绝对路径,也可以为相对路径。如果指定的是相对路径,那么父目录为执行 `kuiperd` 命令的路径。比如,如果你在 `/var/kuiper` 中运行 `bin/kuiperd` ,那么父目录为 `/var/kuiper`; 如果运行从`/var/kuiper/bin`中运行`./kuiperd`,那么父目录为 `/var/kuiper/bin`。 比如 `d3807d9fa5-certificate.pem`。 ### privateKeyPath 私钥路径。可以为绝对路径,也可以为相对路径。更详细的信息,请参考 `certificationPath`,比如 `d3807d9fa5-private.pem.key`。 ### rootCaPath 根证书路径。可以为绝对路径,也可以为相对路径。 ### insecureSkipVerify 如果 InsecureSkipVerify 设置为 true, TLS接受服务器提供的任何证书以及该证书中的任何主机名。 在这种模式下,TLS容易受到中间人攻击。默认值为false。配置项只能用于TLS连接 ### connectionSelector 重用 MQTT 源连接。连接配置信息位于 ``connections/connection.yaml``. ```yaml mqtt: localConnection: #connection key servers: [tcp://127.0.0.1:1883] username: ekuiper password: password #certificationPath: /var/kuiper/xyz-certificate.pem #privateKeyPath: /var/kuiper/xyz-private.pem.ke #insecureSkipVerify: false #protocolVersion: 3 clientid: ekuiper cloudConnection: #connection key servers: ["tcp://broker.emqx.io:1883"] username: user1 password: password #certificationPath: /var/kuiper/xyz-certificate.pem #privateKeyPath: /var/kuiper/xyz-private.pem.ke #insecureSkipVerify: false #protocolVersion: 3 ``` 对于 MQTT 连接,这里有两个配置组。用户应该使用 ``mqtt.localConnection`` 或者 ``mqtt.cloudConnection`` 来作为参数。举例如下: ```yaml #Global MQTT configurations default: qos: 1 servers: [tcp://127.0.0.1:1883] #username: user1 #password: password #certificationPath: /var/kuiper/xyz-certificate.pem #privateKeyPath: /var/kuiper/xyz-private.pem.key connectionSelector: mqtt.localConnection ``` *注意*: 相应配置组一旦指定 connectionSelector 参数,所有关于连接的参数都会被忽略. 上面例子中,`` servers: [tcp://127.0.0.1:1883]`` 会被忽略。 ### bufferLength 指定最大缓存消息数目。该参数主要用于防止内存溢出。实际内存用量会根据当前缓存消息数目动态变化。增大该参数不会增加初始内存分配量,因此设置较大的数值是安全的。该参数默认值为102400;如果每条消息为100字节,则默认情况下,缓存最大占用内存量为102400 * 100B ~= 10MB. ### kubeedgeVersion kubeedge 版本号,不同的版本号对应的文件内容不同。 ### kubeedgeModelFile kubeedge 模版文件名,文件指定放在 etc/sources 文件夹中,样例格式如下: ```json { "deviceModels": [{ "name": "device1", "properties": [{ "name": "temperature", "dataType": "int" }, { "name": "temperature-enable", "dataType": "string" }] }] } ``` #### deviceModels.name 设备名称,与订阅主题中的字段匹配,位于第三和第四个“/”之间的内容。例如:$ke/events/device/device1/data/update #### properties.name 字段名称 #### properties.dataType 期望的字段类型 ## 重载默认设置 如果您有一个特定连接需要重载默认设置,则可以创建一个自定义模块。 在上一个示例中,我们创建一个名为 `demo` 的特定设置。 然后,您可以在创建流定义时使用选项 `CONF_KEY` 指定配置(有关更多信息,请参见 [stream specs](../../sqls/streams.md) )。 **示例** ``` demo ( ... ) WITH (DATASOURCE="test/", FORMAT="JSON", KEY="USERID", CONF_KEY="demo"); ``` 这些特定设置使用的配置键与 `default` 设置中的配置键相同,在特定设置中指定的任何值都将覆盖 `default` 部分中的值。