package xsql import ( "fmt" "reflect" "strings" "testing" ) func TestParser_ParseCreateStream(t *testing.T) { var tests = []struct { s string stmt *StreamStmt err string }{ { s: `CREATE STREAM demo ( USERID BIGINT, FIRST_NAME STRING, LAST_NAME STRING, NICKNAMES ARRAY(STRING), Gender BOOLEAN, ADDRESS STRUCT(STREET_NAME STRING, NUMBER BIGINT), ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID", CONF_KEY="srv1", type="MQTT", TIMESTAMP="USERID", TIMESTAMP_FORMAT="yyyy-MM-dd''T''HH:mm:ssX'");`, stmt: &StreamStmt{ Name: StreamName("demo"), StreamFields: []StreamField{ {Name: "USERID", FieldType: &BasicType{Type: BIGINT}}, {Name: "FIRST_NAME", FieldType: &BasicType{Type: STRINGS}}, {Name: "LAST_NAME", FieldType: &BasicType{Type: STRINGS}}, {Name: "NICKNAMES", FieldType: &ArrayType{Type: STRINGS}}, {Name: "Gender", FieldType: &BasicType{Type: BOOLEAN}}, {Name: "ADDRESS", FieldType: &RecType{ StreamFields: []StreamField{ {Name: "STREET_NAME", FieldType: &BasicType{Type: STRINGS}}, {Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}}, }, }}, }, Options: map[string]string{ "DATASOURCE" : "users", "FORMAT" : "AVRO", "KEY" : "USERID", "CONF_KEY" : "srv1", "TYPE" : "MQTT", "TIMESTAMP" : "USERID", "TIMESTAMP_FORMAT" : "yyyy-MM-dd''T''HH:mm:ssX'", }, }, }, { s: `CREATE STREAM demo ( USERID BIGINT, ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID", STRICT_VALIDATION="true");`, stmt: &StreamStmt{ Name: StreamName("demo"), StreamFields: []StreamField{ {Name: "USERID", FieldType: &BasicType{Type: BIGINT}}, }, Options: map[string]string{ "DATASOURCE" : "users", "FORMAT" : "JSON", "KEY" : "USERID", "STRICT_VALIDATION" : "true", }, }, }, { s: `CREATE STREAM demo ( ADDRESSES ARRAY(STRUCT(STREET_NAME STRING, NUMBER BIGINT)), ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID", STRICT_VALIDATION="FAlse");`, stmt: &StreamStmt{ Name: StreamName("demo"), StreamFields: []StreamField{ {Name: "ADDRESSES", FieldType: &ArrayType{ Type: STRUCT, FieldType: &RecType{ StreamFields: []StreamField{ {Name: "STREET_NAME", FieldType: &BasicType{Type: STRINGS}}, {Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}}, }, }, }}, }, Options: map[string]string{ "DATASOURCE" : "users", "FORMAT" : "AVRO", "KEY" : "USERID", "STRICT_VALIDATION": "FAlse", }, }, }, { s: `CREATE STREAM demo ( ADDRESSES ARRAY(STRUCT(STREET_NAME STRING, NUMBER BIGINT)), birthday datetime, ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID");`, stmt: &StreamStmt{ Name: StreamName("demo"), StreamFields: []StreamField{ {Name: "ADDRESSES", FieldType: &ArrayType{ Type: STRUCT, FieldType: &RecType{ StreamFields: []StreamField{ {Name: "STREET_NAME", FieldType: &BasicType{Type: STRINGS}}, {Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}}, }, }, }}, {Name: "birthday", FieldType: &BasicType{Type: DATETIME}}, }, Options: map[string]string{ "DATASOURCE" : "users", "FORMAT" : "AVRO", "KEY" : "USERID", }, }, }, { s: `CREATE STREAM demo ( NAME string, ADDRESSES ARRAY(STRUCT(STREET_NAME STRING, NUMBER BIGINT)), birthday datetime, ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID");`, stmt: &StreamStmt{ Name: StreamName("demo"), StreamFields: []StreamField{ {Name: "NAME", FieldType: &BasicType{Type: STRINGS}}, {Name: "ADDRESSES", FieldType: &ArrayType{ Type: STRUCT, FieldType: &RecType{ StreamFields: []StreamField{ {Name: "STREET_NAME", FieldType: &BasicType{Type: STRINGS}}, {Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}}, }, }, }}, {Name: "birthday", FieldType: &BasicType{Type: DATETIME}}, }, Options: map[string]string{ "DATASOURCE" : "users", "FORMAT" : "AVRO", "KEY" : "USERID", }, }, }, { s: `CREATE STREAM demo ( ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`, stmt: nil, err: `found ")", expect stream field name.`, }, { s: `CREATE STREAM demo (NAME string) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID", STRICT_VALIDATION="true1");`, //Invalid STRICT_VALIDATION value stmt: nil, err: `found "true1", expect TRUE/FALSE value in STRICT_VALIDATION option.`, }, { s: `CREATE STREAM demo (NAME string) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`, stmt: &StreamStmt{ Name: StreamName("demo"), StreamFields: []StreamField{ {Name: "NAME", FieldType: &BasicType{Type: STRINGS}}, }, Options: map[string]string{ "DATASOURCE" : "users", "FORMAT" : "JSON", "KEY" : "USERID", }, }, }, { s: `CREATE STREAM demo (NAME string)) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`, stmt: &StreamStmt{ Name: StreamName("demo"), StreamFields: nil, Options: nil, }, err: `found ")", expect stream options.`, }, { s: `CREATE STREAM demo (NAME string) WITHs (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`, stmt: &StreamStmt{ Name: StreamName("demo"), StreamFields: nil, Options: nil, }, err: `found "WITHs", expected is with.`, }, { s: `CREATE STREAM demo (NAME integer) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`, stmt: &StreamStmt{ Name: "demo", StreamFields: nil, Options: nil, }, err: `found "integer", expect valid stream field types(BIGINT | FLOAT | STRINGS | DATETIME | BOOLEAN | ARRAY | STRUCT).`, }, { s: `CREATE STREAM demo (NAME string) WITH (sources="users", FORMAT="JSON", KEY="USERID");`, stmt: &StreamStmt{ Name: "demo", StreamFields: nil, Options: nil, }, err: `found "sources", unknown option keys(DATASOURCE|FORMAT|KEY|CONF_KEY|STRICT_VALIDATION|TYPE).`, }, { s: `CREATE STREAM demo ((NAME string) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`, stmt: &StreamStmt{ Name: "demo", StreamFields: nil, Options: nil, }, err: `found "(", expect stream field name.`, }, { s: `CREATE STREAM demo ( USERID BIGINT, ) WITH ();`, stmt: &StreamStmt{ Name: "demo", StreamFields: []StreamField{ {Name: "USERID", FieldType: &BasicType{Type: BIGINT}}, }, Options: map[string]string{}, }, }, { s: `CREATE STREAM demo ( USERID BIGINT, ) WITH ());`, stmt: &StreamStmt{ Name: "", StreamFields: nil, Options: nil, }, err: `found ")", expected semicolon or EOF.`, }, { s: `CREATE STREAM demo ( USERID BIGINT, ) WITH DATASOURCE="users", FORMAT="JSON", KEY="USERID");`, stmt: &StreamStmt{ Name: "", StreamFields: nil, Options: nil, }, //TODO The error string should be more accurate err: `found "DATASOURCE", expect stream options.`, }, } fmt.Printf("The test bucket size is %d.\n\n", len(tests)) for i, tt := range tests { stmt, err := NewParser(strings.NewReader(tt.s)).ParseCreateStreamStmt() if !reflect.DeepEqual(tt.err, errstring(err)) { t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.s, tt.err, err) } else if tt.err == "" && !reflect.DeepEqual(tt.stmt, stmt) { t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.s, tt.stmt, stmt) } } } // errstring returns the string representation of an error. //func errstring(err error) string { // if err != nil { // return err.Error() // } // return "" //}