参考 Azure IoT, 将布尔类型强制转换为 int。
# | 数据类型 | 说明 |
---|---|---|
1 | bigint | |
2 | float | |
3 | string | |
4 | datetime | 需要指定日期格式? 例如 "yyyy-MM-dd" |
5 | boolean | |
6 | array | 数组类型可以是简单数据或结构类型中的任何类型(#1-#5和#7)。 |
7 | struct | 复杂类型 |
CREATE STREAM
stream_name
( column_name <data_type> [ ,...n ] )
WITH ( property_name = expression [, ...] );
支持的属性名称。
属性名 | 可选 | 说明 |
---|---|---|
DATASOURCE | 否 | 如果是 MQTT 数据源,则列出主题名称。 |
FORMAT | 否 | json 或 Avro. 目前,我们仅支持 JSON 类型 ? |
KEY | 是 | 将来将用于 GROUP BY 语句 ?? |
TYPE | 否 | 如果支持越来越多的源,将来是否需要? 默认情况下,它将是 MQTT 类型。 |
StrictValidation | 否 | 根据流模式控制消息字段的验证行为。 |
CONF_KEY | 否 | 如果需要配置其他配置项,请在此处指定配置键。 Kuiper当前建议使用yaml文件格式。 |
StrictValidation介绍
StrictValidation 的值可以为 true 或 false。
1)True:如果消息不符合流定义,则删除消息。
2)False:保留消息,但用默认的空值填充缺少的字段。
bigint: 0
float: 0.0
string: ""
datetime: ??
boolean: false
array: zero length array
struct: null value
示例 1,
CREATE STREAM demo (
USERID BIGINT,
FIRST_NAME STRING,
LAST_NAME STRING,
NICKNAMES ARRAY(STRING),
Gender BOOLEAN,
ADDRESS STRUCT(STREET_NAME STRING, NUMBER BIGINT),
) WITH (DATASOURCE="topics:test/, demo/test", FORMAT="AVRO", KEY="USERID", CONF_KEY="democonf");
示例 2,
CREATE STREAM my_stream
(id int, name string, score float)
WITH ( datasource = "topic/temperature", FORMAT = "json", KEY = "id");
MQTT 源的配置指定以 yaml 格式,并且配置文件位置在$kuiper/etc/mqtt_source.yaml
处。 以下是文件格式。
#全局MQTT配置
QoS: 1
Share-subscription: true
Servers:
-
tcp://127.0.0.1:1883
#TODO: 其他全局配置
#重载全局配置
demo: #Conf_key
QoS: 0
Servers:
-
tls://10.211.55.6:1883
删除流。
DROP STREAM my_stream
打印流定义。
DESC STREAM my_stream
Fields
-----------
id int
name string
score float
SOURCE: topic/temperature
Format: json
Key: id
打印流的详细运行时信息。
EXPLAIN STREAM my_stream
打印系统中所有已定义的流。
SHOW STREAMS
my_stream, iot_stream
一个简单的命令行工具在 stream/cli/main.go
中实现。
运行 kuiper stream
命令,在显示 kuiper>
提示后,输入与流相关的 sql 语句,例如create,drop,description,explain和show stream语句以执行操作。
cli stream
kuiper > CREATE STREAM sname (count bigint) WITH (source="users", FORMAT="AVRO", KEY="USERID")
kuiper > DESCRIBE STREAM sname
...
cli query
kuiper > select USERID from demo;
...
请参阅下面的内容,需要使用存储器来保存流定义。