Refer to Azure IoT, boolean type is cast to int.
# | Data type | Description |
---|---|---|
1 | bigint | |
2 | float | |
3 | string | |
4 | datetime | Need to specify the date format?? Such as "yyyy-MM-dd" |
5 | boolean | |
6 | array | The array type, can be any types from simple data or struct type (#1 - #5, and #7). |
7 | struct | The complex type. |
CREATE STREAM
stream_name
( column_name <data_type> [ ,...n ] )
WITH ( property_name = expression [, ...] );
The supported property names.
Property name | Optional | Description |
---|---|---|
DATASOURCE | false | The topic names list if it's a MQTT data source. |
FORMAT | false | json or Avro. Currently, we just support the JSON type ? |
KEY | true | It will be used in future for GROUP BY statements ?? |
TYPE | false | The type of source to be used. The value must be camel case. For example, if the user creates a customized source MySource with MySource.so inside the plugins/sources folder, set TYPE=mySource to use that extended source. By default, it would be MQTT type. |
StrictValidation | false | To control validation behavior of message field against stream schema. |
CONF_KEY | false | If additional configuration items are requied to be configured, then specify the config key here. eKuiper currently propose yaml file format. |
Introduction for StrictValidation
The value of StrictValidation can be true or false.
1) True: Drop the message if the message is not satisfy with the stream definition.
2) False: Keep the message, but fill the missing field with default empty value.
bigint: 0
float: 0.0
string: ""
datetime: ??
boolean: false
array: zero length array
struct: null value
Example 1,
CREATE STREAM demo (
USERID BIGINT,
FIRST_NAME STRING,
LAST_NAME STRING,
NICKNAMES ARRAY(STRING),
Gender BOOLEAN,
ADDRESS STRUCT(STREET_NAME STRING, NUMBER BIGINT),
) WITH (DATASOURCE="topics:test/, demo/test", FORMAT="AVRO", KEY="USERID", CONF_KEY="democonf");
Example 2,
CREATE STREAM my_stream
(id int, name string, score float)
WITH ( datasource = "topic/temperature", FORMAT = "json", KEY = "id");
The configuration of MQTT source is specified with yaml format, and the configuration file location is at $ekuiper/etc/mqtt_source.yaml
. Below is the file format.
#Global MQTT configurations
QoS: 1
Share-subscription: true
Servers:
-
tcp://127.0.0.1:1883
#TODO: Other global configurations
#Override the global configurations
demo: #Conf_key
QoS: 0
Servers:
-
tls://10.211.55.6:1883
DROP the stream.
DROP STREAM my_stream
Print the stream definition.
DESC STREAM my_stream
Fields
-----------
id int
name string
score float
SOURCE: topic/temperature
Format: json
Key: id
Print the detailed runtime infomation of the stream.
EXPLAIN STREAM my_stream
Print all defined streams in system.
SHOW STREAMS
my_stream, iot_stream
A simple command line tool is implemented in stream/cli/main.go
.
Run kuiper stream
command, after kuiper >
prompt shown, enter stream related sql statements such as create, drop, describe, explain and show stream statements to execute.
cli stream
kuiper > CREATE STREAM sname '(count bigint) WITH (source="users", FORMAT="AVRO", KEY="USERID")'
kuiper > DESCRIBE STREAM sname
...
cli query
kuiper > select USERID from demo;
...
Refer to below, a storage is required for saving the stream definitions.