Kuiper 提供了内置的 EdgeX 源支持,它可以被用来订阅来自于EdgeX 消息总线的数据,并且将数据放入 Kuiper 数据处理流水线中。
EdgeX 在 value descriptors 已经定义了数据类型,因此在 Kuiper 中建议采用 schema-less 方式的 EdgeX 流式定义,如下所示。
# cd $kuiper_base
# bin/cli CREATE STREAM demo'() with(format="json", datasource="demo" type="edgex")'
EdgeX 源会试图取得某个字段的类型,
在 EdgeX value descriptors 中定义的数据类型,将被转换为 Kuiper 中相应支持的数据类型。
如果 ValueDescriptor
中 Type
的值为 Bool
,那么 Kuiper 会试着将其转换为 boolean
类型,以下的值将被转化为 true
。
以下值将被转换为 false
。
如果 ValueDescriptor
中 Type
的值为 INT8
, INT16
, INT32
, INT64
, UINT8
, UINT16
, UINT32
, UINT64
那么 Kuiper 会试着将其转换为 Bigint
类型。
如果 ValueDescriptor
中 Type
的值为 FLOAT32
, FLOAT64
,那么 Kuiper 会试着将其转换为 Float
类型。
如果 ValueDescriptor
中 Type
的值为 String
,那么 Kuiper 会试着将其转换为 String
类型。
EdgeX 中的 Bool
数组类型会被转换为 boolean
数组。
EdgeX 中所有的 INT8
, INT16
, INT32
, INT64
, UINT8
, UINT16
, UINT32
, UINT64
数组类型会被转换为 Bigint
数组。
EdgeX 中所有的 FLOAT32
, FLOAT64
数组类型会被转换为 Float
数组。
EdgeX 源配置文件为 $kuiper/etc/sources/edgex.yaml
,以下配置文件内容。
#Global Edgex configurations
default:
protocol: tcp
server: localhost
port: 5573
topic: events
serviceServer: http://localhost:48080
# optional:
# ClientId: client1
# Username: user1
# Password: password
用户可以在此指定全局的 EdgeX 配置。在 default
部分中指定的配置将作为所有 EdgeX 源的缺省配置。
连接到 EdgeX 消息总线的协议,缺省为 tcp
EdgeX 消息总线的地址,缺省为 localhost
EdgeX 消息总线的端口,缺省为 5573
EdgeX 消息总线上监听的主题名称,缺省为 events
访问 value descriptors 的基础服务地址,配置项 serviceServer
的值与 /api/v1/valuedescriptor
拼接后,用于获取 EdgeX 服务器上定义的所有 value descriptors。
EdgeX 消息总线类型,目前支持两种消息总线。如果指定了错误的消息总线类型,那么会使用缺省 zero
类型。
zero
:使用 ZeroMQ 类型的消息总线mqtt
:使用 MQTT 服务器作为消息总线如果使用了 MQTT 消息总线,还可以指定别的一些可选配置项。请注意,所有在可选的配置项里指定的值都必须为字符类型,因此这里出现的所有的配置应该是字符类型的 - 例如 KeepAlive: "5000"
。以下为支持的可选的配置列表,您可以参考 MQTT 协议规范来获取更详尽的信息。
ClientId
Username
Password
Qos
KeepAlive
Retained
ConnectionPayload
CertFile
KeyFile
CertPEMBlock
KeyPEMBlock
SkipCertVerify
在某些情况下,你可能想消费来自于多个主题的数据。Kuiper 支持指定别的配置,并且在创建流定义的时候使用 CONF_KEY
来指定新的配置。
#覆盖全局配置
demo1: #Conf_key
protocol: tcp
server: 10.211.55.6
port: 5571
topic: events
如果你有个特定的源需要覆盖缺省的设置,你可以定义一个自定义的配置段。在上面的例子中,我们创建了一个新的配置 demo1
,然后你在创建流定义的时候可以使用选项 CONF_KEY
来使用新的配置 (参考 流定义规范 获取更多详细信息)。
例子
create stream demo1() WITH (FORMAT="JSON", type="edgex", CONF_KEY="demo1");
在自定义的配置中,能够使用的配置项与 default
部分的是一样的,任何在自定义段中设置的值将覆盖 default
部分里的配置。