# EdgeX 数据源 stream source scan table source eKuiper 内置支持 EdgeX 数据源,支持订阅来自于 [EdgeX 消息总线](https://github.com/edgexfoundry/go-mod-messaging)的数据,并将数据放入 eKuiper 数据处理流水线中。用户可直接通过 EdgeX 数据源消费 EdgeX 中的事件,[无需任何手动模式定义](#拓展阅读-edgex-中的流定义)。 在 eKuiper 中,EdgeX 连接器可以作为源连接器(从 EdgeX 获取数据)或 [Sink 连接器](../../sinks/builtin/mqtt.md)(将数据发布到 EdgeX),本节重点介绍 EdgeX 源连接器。 ## Configurations eKuiper 连接器可以通过[环境变量](../../../configuration/configuration.md#environment-variable-syntax)、[REST API](../../../api/restapi/configKey.md) 或配置文件进行配置,本节将介绍配置文件的使用方法。 EdgeX 源连接器的配置文件位于: `$ekuiper/etc/sources/edgex.yaml`,其中: - default:对应全局连接配置。 - 自定义部分:适用于需要自定义连接参数的场景,该部分的配置将覆盖全局连接配置。 - 连接器重用:eKuiper 还支持通过 [`connectionSelector`](../../connector.md#connection-selector) 配置项在不同的配置中复用某个连接配置。 以下示例包括一个全局配置和自定义配置 `demo1`: ```yaml #全局 Edgex 配置 default: protocol: tcp server: localhost port: 5573 topic: rules-events messageType: event # optional: # ClientId: client1 # Username: user1 # Password: password #覆盖全局配置 demo1: #Conf_key protocol: tcp server: 10.211.55.6 port: 5571 topic: rules-events ``` ## 全局配置 用户可在 `default` 部分指定全局设置。 ### 连接相关配置 - `protocol`:连接到 EdgeX 消息总线的协议,缺省为 `tcp` - `server`:EdgeX 消息总线的地址,缺省为 `localhost` - `port`:EdgeX 消息总线的端口,缺省为 `5573` ### 连接重用 - `connectionSelector`:重用 EdgeX 数据源连接,如下方配置示例中的 `edgex.redisMsgBus`。注意:连接配置文件位于 `connections/connection.yaml`。有关连接重用的详细解释,见[连接器的重用](../../connector.md#连接器的重用)。 ```yaml #全局 Edgex 配置 default: protocol: tcp server: localhost port: 5573 connectionSelector: edgex.redisMsgBus topic: rules-events messageType: event # optional: # ClientId: client1 # Username: user1 # Password: password ``` ::: tip 指定 `connectionSelector` 参数后,所有关于连接的参数都会被忽略,包括 `protocol`、`server` 和 `port` 配置。本例中,`protocol: tcp | server: localhost | port: 5573`的值都将被忽略。 ::: ### 主题和消息配置 - `topic`:EdgeX 消息总线上监听的主题名称,缺省为 `rules-events`。用户可以直接连接到 EdgeX 消息总线上的主题也可以连接到 application service 暴露的主题。需要注意的是,两种主题的消息数据类型不同,需要设置正确的 messageType 类型。 - `type`:EdgeX 消息总线类型,目前支持三种消息总线。如果指定的消息总线类型不支持,将使用缺省 `zero` 类型。 - `zero`:使用 ZeroMQ 类型的消息总线。 - `mqtt`:使用 MQTT 服务器作为消息总线,如选择 MQTT 总线类型,eKuiper 支持更多的 MQTT 配置项,具体请查看 [其他配置(MQTT 相关配置)](#其他配置-mqtt-相关配置)。 - `redis`:使用 Redis 服务器作为消息总线。使用 EdgeX docker compose 启动时,type 参数会默认设置为该类型。 EdgeX Levski 引入了两种信息消息总线类型,eKuiper 从 1.7.1 开始支持这两种新的类型,分别为 - `nats-jetstream` - `nats-core` - `messageType`:EdgeX 消息模型类型。该参数支持两种类型: - `event`:如果连接到 EdgeX application service 的主题、则消息为 "event" 类型;消息将会解码为 `dtos.Event` 类型。该选项为默认值。 - `request`:如果直接连接到消息总线的主题,接收 device service 或者 core data 发出的数据,则消息类型为 "request"。消息将会解码为 `requests.AddEventRequest` 类型。 ### 其他配置(MQTT 相关配置) 如使用 MQTT 消息总线,eKuiper 还支持其他一些可选配置项。请注意,所有可选配置都应为**字符类型**,`KeepAlive: "5000"` ,有关各配置项的详细解释,可参考 MQTT 协议。 - `ClientId` - `Username` - `Password` - `Qos` - `KeepAlive` - `Retained` - `ConnectionPayload` - `CertFile` - `KeyFile` - `CertPEMBlock` - `KeyPEMBlock` - `SkipCertVerify` ## 自定义配置 对于需要自定义某些连接参数的场景,eKuiper 支持用户创建自定义模块来实现全局配置的重载。 **配置示例** ```yaml #覆盖全局配置 demo1: #Conf_key protocol: tcp server: 10.211.55.6 port: 5571 topic: rules-events ``` 定义 `demo1` 配置组后,如希望在创建流时使用此配置,可通过 `CONF_KEY` 选项并指定配置名称,此时,在自定义配置中定义的参数将覆盖 `default` 配置中的相应参数。详细步骤,可参考 [流语句](../../../sqls/streams.md)。 **示例** ```sql create stream demo1() WITH (FORMAT="JSON"、type="edgex"、CONF_KEY="demo1"); ``` ## 创建流类型源 完成连接器的配置后,后续可通过创建流将其与 eKuiper 规则集成。EdgeX 源连接器可以作为[流类型](../../streams/overview.md)或[扫描表类型数据源](../../tables/scan.md)使用,本节将以流类型源为例进行说明。 ### 通过 REST API 创建 REST API 为 eKuiper 提供了一种可编程的交互方式,适用于自动化或需要将 eKuiper 集成到其他系统中的场景。 **示例** ```sql create stream demo1() WITH (FORMAT="JSON"、type="edgex"、CONF_KEY="demo1"); ``` 详细操作步骤及命令解释,可参考 [通过 REST API 进行流管理](../../../api/restapi/streams.md)。 ### 通过 CLI 创建 用户也可以通过命令行界面(CLI)直接访问 eKuiper。 1. 进入 eKuiper `bin` 目录: ```bash cd path_to_eKuiper_directory/bin ``` 2. 使用 `create` 命令创建规则,指定 EdgeX 连接器为数据源,例如: ```bash bin/kuiper CREATE STREAM demo'() with(format="json"、datasource="demo" type="edgex")' ``` 详细操作步骤及命令解释,可参考 [通过 CLI 进行流管理](../../../api/cli/streams.md)。 ### 拓展阅读:EdgeX 中的流定义 EdgeX 在 [reading objects](https://docs.edgexfoundry.org/2.0/microservices/core/data/Ch-CoreData/#events-and-readings) 已经定义了数据类型,因此在 eKuiper 中建议采用 [schema-less 方式](../../streams/overview.md#schema)的 EdgeX 流式定义,如下所示: ```shell # cd $eKuiper_base # bin/kuiper CREATE STREAM demo'() with(format="json"、datasource="demo" type="edgex")' ``` ### 自动数据类型转换 eKuiper 在处理 EdgeX 事件时,会根据 EdgeX `ValueType` 字段自动管理数据类型转换。 **数据转换原则:** - 如果在 reading 的值类型中可以找到支持的数据类型,执行数据类型转换;guas - 如果在 reading 的值类型中找不到支持的数据类型,将保留原值; - 如果类型转换失败,该值将被**丢弃**,并在日志上打印一条告警消息; #### Boolean 如果 `reading` 中 `ValueType` 的值为 `Bool` ,那么 eKuiper 会试着将其转换为 `boolean` 类型: - 转换为 `true`: "1"、"t"、"T"、"true"、"TRUE"、"True" - 转换为 `false`:"0"、"f"、"F"、"false"、"FALSE"、"False" #### Bigint 如果 `reading` 中 `ValueType` 的值为 `INT8`、`INT16`、`INT32`、`INT64`、`UINT8`、`UINT16`、`UINT32`、`UINT64` 那么 eKuiper 会试着将其转换为 `Bigint` 类型。 #### Float 如果 `reading` 中 `ValueType` 的值为 `FLOAT32`、`FLOAT64` ,那么 eKuiper 会试着将其转换为 `Float` 类型。 #### String 如果 `reading` 中 `ValueType` 的值为 `String`,那么 eKuiper 会试着将其转换为 `String` 类型。 #### Boolean 数组 EdgeX 中的 `Bool` 数组类型会被转换为 `boolean` 数组。 #### Bigint 数组 EdgeX 中所有的 `INT8`,`INT16`,`INT32`,`INT64`,`UINT8`,`UINT16`,`UINT32`,`UINT64` 数组类型会被转换为 `Bigint` 数组。 #### Float 数组 EdgeX 中所有的 `FLOAT32`,`FLOAT64` 数组类型会被转换为 `Float` 数组。