## 流规格 ### 数据类型 参考 [Azure IoT](https://docs.microsoft.com/en-us/stream-analytics-query/data-types-azure-stream-analytics), 将布尔类型强制转换为 int。 | # | 数据类型 | 说明 | | ---- | -------- | ------------------------------------------------------------ | | 1 | bigint | | | 2 | float | | | 3 | string | | | 4 | datetime | 需要指定日期格式? 例如 "yyyy-MM-dd" | | 5 | boolean | | | 6 | array | 数组类型可以是简单数据或结构类型中的任何类型(#1-#5和#7)。 | | 7 | struct | 复杂类型 | ### 语言定义 #### 创建流 ```sql CREATE STREAM stream_name ( column_name [ ,...n ] ) WITH ( property_name = expression [, ...] ); ``` **支持的属性名称。** | 属性名 | 可选 | 说明 | | ------------- | -------- | ------------------------------------------------------------ | | DATASOURCE | 否 | 如果是 MQTT 数据源,则列出主题名称。 | | FORMAT | 否 | json 或 Avro.
目前,我们仅支持 JSON 类型 ? | | KEY | 是 | 将来将用于 GROUP BY 语句 ?? | | TYPE | 否 | 如果支持越来越多的源,将来是否需要? 默认情况下,它将是 MQTT 类型。 | | StrictValidation | 否 | 根据流模式控制消息字段的验证行为。 | | CONF_KEY | 否 | 如果需要配置其他配置项,请在此处指定配置键。
Kuiper当前建议使用yaml文件格式。 | **StrictValidation介绍** ``` StrictValidation 的值可以为 true 或 false。 1)True:如果消息不符合流定义,则删除消息。 2)False:保留消息,但用默认的空值填充缺少的字段。 bigint: 0 float: 0.0 string: "" datetime: ?? boolean: false array: zero length array struct: null value ``` 示例 1, ```sql CREATE STREAM demo ( USERID BIGINT, FIRST_NAME STRING, LAST_NAME STRING, NICKNAMES ARRAY(STRING), Gender BOOLEAN, ADDRESS STRUCT(STREET_NAME STRING, NUMBER BIGINT), ) WITH (DATASOURCE="topics:test/, demo/test", FORMAT="AVRO", KEY="USERID", CONF_KEY="democonf"); ``` 示例 2, ```sql CREATE STREAM my_stream (id int, name string, score float) WITH ( datasource = "topic/temperature", FORMAT = "json", KEY = "id"); ``` MQTT 源的配置指定以 yaml 格式,并且配置文件位置在`$kuiper/etc/mqtt_source.yaml` 处。 以下是文件格式。 ```yaml #全局MQTT配置 QoS: 1 Share-subscription: true Servers: - tcp://127.0.0.1:1883 #TODO: 其他全局配置 #重载全局配置 demo: #Conf_key QoS: 0 Servers: - tls://10.211.55.6:1883 ``` #### 删除流 删除流。 ``` DROP STREAM my_stream ``` #### 描述流 打印流定义。 ``` DESC STREAM my_stream Fields ----------- id int name string score float SOURCE: topic/temperature Format: json Key: id ``` #### 解释流 打印流的详细运行时信息。 ``` EXPLAIN STREAM my_stream ``` #### 显示流 打印系统中所有已定义的流。 ``` SHOW STREAMS my_stream, iot_stream ``` ### 一个简单的 CLI 一个简单的命令行工具在 `stream/cli/main.go` 中实现。 #### 运行 SQL 来管理流 运行 `kuiper stream` 命令,在显示 `kuiper>` 提示后,输入与流相关的 sql 语句,例如create,drop,description,explain和show stream语句以执行操作。 ```bash cli stream kuiper > CREATE STREAM sname (count bigint) WITH (source="users", FORMAT="AVRO", KEY="USERID") kuiper > DESCRIBE STREAM sname ... ``` #### 查询 ```bash cli query kuiper > select USERID from demo; ... ``` ### 实现 ##### 如何保存流定义? 请参阅下面的内容,需要使用存储器来保存流定义。 ![stream_storage](./resources/stream_storage.png)