streams.md 4.3 KB

流规格

数据类型

参考 Azure IoT, 将布尔类型强制转换为 int。

# 数据类型 说明
1 bigint
2 float
3 string
4 datetime 需要指定日期格式? 例如 "yyyy-MM-dd"
5 boolean
6 array 数组类型可以是简单数据或结构类型中的任何类型(#1-#5和#7)。
7 struct 复杂类型

语言定义

创建流

CREATE STREAM   
    stream_name   
    ( column_name <data_type> [ ,...n ] )
    WITH ( property_name = expression [, ...] );

支持的属性名称。

属性名 可选 说明
DATASOURCE 如果是 MQTT 数据源,则列出主题名称。
FORMAT json 或 Avro.
目前,我们仅支持 JSON 类型 ?
KEY 将来将用于 GROUP BY 语句 ??
TYPE 如果支持越来越多的源,将来是否需要? 默认情况下,它将是 MQTT 类型。
StrictValidation 根据流模式控制消息字段的验证行为。
CONF_KEY 如果需要配置其他配置项,请在此处指定配置键。
Kuiper当前建议使用yaml文件格式。

StrictValidation介绍

StrictValidation 的值可以为 true 或 false。
1)True:如果消息不符合流定义,则删除消息。
2)False:保留消息,但用默认的空值填充缺少的字段。

bigint: 0
float: 0.0
string: ""
datetime: ??
boolean: false
array: zero length array
struct: null value

示例 1,

CREATE STREAM demo (
        USERID BIGINT,
        FIRST_NAME STRING,
        LAST_NAME STRING,
        NICKNAMES ARRAY(STRING),
        Gender BOOLEAN,
        ADDRESS STRUCT(STREET_NAME STRING, NUMBER BIGINT),
    ) WITH (DATASOURCE="topics:test/, demo/test", FORMAT="AVRO", KEY="USERID", CONF_KEY="democonf");

示例 2,

CREATE STREAM my_stream   
    (id int, name string, score float)
    WITH ( datasource = "topic/temperature", FORMAT = "json", KEY = "id");

MQTT 源的配置指定以 yaml 格式,并且配置文件位置在$kuiper/etc/mqtt_source.yaml 处。 以下是文件格式。

#全局MQTT配置
QoS: 1
Share-subscription: true
Servers:
  - 
     tcp://127.0.0.1:1883
#TODO: 其他全局配置


#重载全局配置
demo: #Conf_key
  QoS: 0
  Servers:
    - 
      tls://10.211.55.6:1883


删除流

删除流。

DROP STREAM my_stream

描述流

打印流定义。

DESC STREAM my_stream

Fields
-----------
id     int
name   string
score  float

SOURCE: topic/temperature
Format: json
Key: id

解释流

打印流的详细运行时信息。

EXPLAIN STREAM my_stream

显示流

打印系统中所有已定义的流。

SHOW STREAMS

my_stream, iot_stream

一个简单的 CLI

一个简单的命令行工具在 stream/cli/main.go 中实现。

运行 SQL 来管理流

运行 cli stream 命令,在显示 kuiper> 提示后,输入与流相关的 sql 语句,例如create,drop,description,explain和show stream语句以执行操作。

cli stream
kuiper > CREATE STREAM sname (count bigint) WITH (source="users", FORMAT="AVRO", KEY="USERID")
kuiper > DESCRIBE STREAM sname
...

查询

cli query
kuiper > select USERID from demo;
...

实现

如何保存流定义?

请参阅下面的内容,需要使用存储器来保存流定义。

stream_storage