parser_stream_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package xsql
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/testx"
  18. "github.com/lf-edge/ekuiper/pkg/ast"
  19. "reflect"
  20. "strings"
  21. "testing"
  22. )
  23. func TestParser_ParseCreateStream(t *testing.T) {
  24. var tests = []struct {
  25. s string
  26. stmt *ast.StreamStmt
  27. err string
  28. }{
  29. {
  30. s: `CREATE STREAM demo (
  31. USERID BIGINT,
  32. FIRST_NAME STRING,
  33. LAST_NAME STRING,
  34. NICKNAMES ARRAY(STRING),
  35. data bytea,
  36. Gender BOOLEAN,
  37. ADDRESS STRUCT(STREET_NAME STRING, NUMBER BIGINT),
  38. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID", CONF_KEY="srv1", type="MQTT", TIMESTAMP="USERID", TIMESTAMP_FORMAT="yyyy-MM-dd''T''HH:mm:ssX'");`,
  39. stmt: &ast.StreamStmt{
  40. Name: ast.StreamName("demo"),
  41. StreamFields: []ast.StreamField{
  42. {Name: "USERID", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  43. {Name: "FIRST_NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  44. {Name: "LAST_NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  45. {Name: "NICKNAMES", FieldType: &ast.ArrayType{Type: ast.STRINGS}},
  46. {Name: "data", FieldType: &ast.BasicType{Type: ast.BYTEA}},
  47. {Name: "Gender", FieldType: &ast.BasicType{Type: ast.BOOLEAN}},
  48. {Name: "ADDRESS", FieldType: &ast.RecType{
  49. StreamFields: []ast.StreamField{
  50. {Name: "STREET_NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  51. {Name: "NUMBER", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  52. },
  53. }},
  54. },
  55. Options: &ast.Options{
  56. DATASOURCE: "users",
  57. FORMAT: "JSON",
  58. KEY: "USERID",
  59. CONF_KEY: "srv1",
  60. TYPE: "MQTT",
  61. TIMESTAMP: "USERID",
  62. TIMESTAMP_FORMAT: "yyyy-MM-dd''T''HH:mm:ssX'",
  63. },
  64. },
  65. },
  66. {
  67. s: `CREATE STREAM demo (
  68. USERID BIGINT,
  69. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID", STRICT_VALIDATION="true", SHARED="true");`,
  70. stmt: &ast.StreamStmt{
  71. Name: ast.StreamName("demo"),
  72. StreamFields: []ast.StreamField{
  73. {Name: "USERID", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  74. },
  75. Options: &ast.Options{
  76. DATASOURCE: "users",
  77. FORMAT: "JSON",
  78. KEY: "USERID",
  79. STRICT_VALIDATION: true,
  80. SHARED: true,
  81. },
  82. },
  83. },
  84. {
  85. s: `CREATE STREAM demo (
  86. ADDRESSES ARRAY(STRUCT(STREET_NAME STRING, NUMBER BIGINT)),
  87. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID", STRICT_VALIDATION="FAlse");`,
  88. stmt: &ast.StreamStmt{
  89. Name: ast.StreamName("demo"),
  90. StreamFields: []ast.StreamField{
  91. {Name: "ADDRESSES", FieldType: &ast.ArrayType{
  92. Type: ast.STRUCT,
  93. FieldType: &ast.RecType{
  94. StreamFields: []ast.StreamField{
  95. {Name: "STREET_NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  96. {Name: "NUMBER", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  97. },
  98. },
  99. }},
  100. },
  101. Options: &ast.Options{
  102. DATASOURCE: "users",
  103. FORMAT: "JSON",
  104. KEY: "USERID",
  105. STRICT_VALIDATION: false,
  106. },
  107. },
  108. },
  109. {
  110. s: `CREATE STREAM demo (
  111. ADDRESSES ARRAY(STRUCT(STREET_NAME STRING, NUMBER BIGINT)),
  112. birthday datetime,
  113. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  114. stmt: &ast.StreamStmt{
  115. Name: ast.StreamName("demo"),
  116. StreamFields: []ast.StreamField{
  117. {Name: "ADDRESSES", FieldType: &ast.ArrayType{
  118. Type: ast.STRUCT,
  119. FieldType: &ast.RecType{
  120. StreamFields: []ast.StreamField{
  121. {Name: "STREET_NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  122. {Name: "NUMBER", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  123. },
  124. },
  125. }},
  126. {Name: "birthday", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  127. },
  128. Options: &ast.Options{
  129. DATASOURCE: "users",
  130. FORMAT: "JSON",
  131. KEY: "USERID",
  132. },
  133. },
  134. },
  135. {
  136. s: `CREATE STREAM demo (
  137. NAME string,
  138. ADDRESSES ARRAY(STRUCT(STREET_NAME STRING, NUMBER BIGINT)),
  139. birthday datetime,
  140. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  141. stmt: &ast.StreamStmt{
  142. Name: ast.StreamName("demo"),
  143. StreamFields: []ast.StreamField{
  144. {Name: "NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  145. {Name: "ADDRESSES", FieldType: &ast.ArrayType{
  146. Type: ast.STRUCT,
  147. FieldType: &ast.RecType{
  148. StreamFields: []ast.StreamField{
  149. {Name: "STREET_NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  150. {Name: "NUMBER", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  151. },
  152. },
  153. }},
  154. {Name: "birthday", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  155. },
  156. Options: &ast.Options{
  157. DATASOURCE: "users",
  158. FORMAT: "JSON",
  159. KEY: "USERID",
  160. },
  161. },
  162. },
  163. {
  164. s: `CREATE STREAM demo (
  165. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  166. stmt: &ast.StreamStmt{
  167. Name: ast.StreamName("demo"),
  168. StreamFields: nil,
  169. Options: &ast.Options{
  170. DATASOURCE: "users",
  171. FORMAT: "JSON",
  172. KEY: "USERID",
  173. },
  174. },
  175. },
  176. {
  177. s: `CREATE STREAM demo() WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  178. stmt: &ast.StreamStmt{
  179. Name: ast.StreamName("demo"),
  180. StreamFields: nil,
  181. Options: &ast.Options{
  182. DATASOURCE: "users",
  183. FORMAT: "JSON",
  184. KEY: "USERID",
  185. },
  186. },
  187. },
  188. {
  189. s: `CREATE STREAM demo (NAME string)
  190. WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID", STRICT_VALIDATION="true1");`, //Invalid STRICT_VALIDATION value
  191. stmt: nil,
  192. err: `found "true1", expect TRUE/FALSE value in STRICT_VALIDATION option.`,
  193. },
  194. {
  195. s: `CREATE STREAM demo (NAME string) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  196. stmt: &ast.StreamStmt{
  197. Name: ast.StreamName("demo"),
  198. StreamFields: []ast.StreamField{
  199. {Name: "NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  200. },
  201. Options: &ast.Options{
  202. DATASOURCE: "users",
  203. FORMAT: "JSON",
  204. KEY: "USERID",
  205. },
  206. },
  207. },
  208. {
  209. s: `CREATE STREAM demo (NAME string)) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  210. stmt: &ast.StreamStmt{
  211. Name: ast.StreamName("demo"),
  212. StreamFields: nil,
  213. Options: nil,
  214. },
  215. err: `found ")", expect stream options.`,
  216. },
  217. {
  218. s: `CREATE STREAM demo (NAME string) WITHs (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  219. stmt: &ast.StreamStmt{
  220. Name: ast.StreamName("demo"),
  221. StreamFields: nil,
  222. Options: nil,
  223. },
  224. err: `found "WITHS", expected is with.`,
  225. },
  226. {
  227. s: `CREATE STREAM demo (NAME integer) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  228. stmt: &ast.StreamStmt{
  229. Name: "demo",
  230. StreamFields: nil,
  231. Options: nil,
  232. },
  233. err: `found "integer", expect valid stream field types(BIGINT | FLOAT | STRINGS | DATETIME | BOOLEAN | BYTEA | ARRAY | STRUCT).`,
  234. },
  235. {
  236. s: `CREATE STREAM demo (NAME string) WITH (sources="users", FORMAT="JSON", KEY="USERID");`,
  237. stmt: &ast.StreamStmt{
  238. Name: "demo",
  239. StreamFields: nil,
  240. Options: nil,
  241. },
  242. err: `found "SOURCES", unknown option keys(DATASOURCE|FORMAT|KEY|CONF_KEY|SHARED|STRICT_VALIDATION|TYPE|TIMESTAMP|TIMESTAMP_FORMAT|RETAIN_SIZE|SCHEMAID).`,
  243. },
  244. {
  245. s: `CREATE STREAM demo ((NAME string) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  246. stmt: &ast.StreamStmt{
  247. Name: "demo",
  248. StreamFields: nil,
  249. Options: nil,
  250. },
  251. err: `found "(", expect stream field name.`,
  252. },
  253. {
  254. s: `CREATE STREAM demo (
  255. USERID BIGINT,
  256. ) WITH ();`,
  257. stmt: &ast.StreamStmt{
  258. Name: "demo",
  259. StreamFields: []ast.StreamField{
  260. {Name: "USERID", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  261. },
  262. Options: &ast.Options{},
  263. },
  264. },
  265. {
  266. s: `CREATE STREAM demo (
  267. USERID BIGINT,
  268. ) WITH ());`,
  269. stmt: &ast.StreamStmt{
  270. Name: "",
  271. StreamFields: nil,
  272. Options: nil,
  273. },
  274. err: `found ")", expected semicolon or EOF.`,
  275. },
  276. {
  277. s: `CREATE STREAM demo (
  278. USERID BIGINT,
  279. ) WITH DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  280. stmt: &ast.StreamStmt{
  281. Name: "",
  282. StreamFields: nil,
  283. Options: nil,
  284. },
  285. //TODO The error string should be more accurate
  286. err: `found "DATASOURCE", expect stream options.`,
  287. },
  288. {
  289. s: `CREATE STREAM test(
  290. userID bigint,
  291. username string,
  292. NICKNAMES array(string),
  293. Gender boolean,
  294. ADDRESS struct(
  295. TREET_NAME string, NUMBER bigint
  296. ),
  297. INFO struct(
  298. INFO_NAME string, NUMBER bigint
  299. )
  300. ) WITH (DATASOURCE="test", FORMAT="JSON", CONF_KEY="democonf", TYPE="MQTT");`,
  301. stmt: &ast.StreamStmt{
  302. Name: ast.StreamName("test"),
  303. StreamFields: []ast.StreamField{
  304. {Name: "userID", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  305. {Name: "username", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  306. {Name: "NICKNAMES", FieldType: &ast.ArrayType{Type: ast.STRINGS}},
  307. {Name: "Gender", FieldType: &ast.BasicType{Type: ast.BOOLEAN}},
  308. {Name: "ADDRESS", FieldType: &ast.RecType{
  309. StreamFields: []ast.StreamField{
  310. {Name: "TREET_NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  311. {Name: "NUMBER", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  312. },
  313. }},
  314. {Name: "INFO", FieldType: &ast.RecType{
  315. StreamFields: []ast.StreamField{
  316. {Name: "INFO_NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  317. {Name: "NUMBER", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  318. },
  319. }},
  320. },
  321. Options: &ast.Options{
  322. DATASOURCE: "test",
  323. FORMAT: "JSON",
  324. CONF_KEY: "democonf",
  325. TYPE: "MQTT",
  326. },
  327. },
  328. }, {
  329. s: `CREATE STREAM demo (
  330. USERID BIGINT,
  331. FIRST_NAME STRING,
  332. LAST_NAME STRING,
  333. PICTURE BYTEA,
  334. ) WITH (DATASOURCE="users", FORMAT="JSON");`,
  335. stmt: &ast.StreamStmt{
  336. Name: ast.StreamName("demo"),
  337. StreamFields: []ast.StreamField{
  338. {Name: "USERID", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  339. {Name: "FIRST_NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  340. {Name: "LAST_NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  341. {Name: "PICTURE", FieldType: &ast.BasicType{Type: ast.BYTEA}},
  342. },
  343. Options: &ast.Options{
  344. DATASOURCE: "users",
  345. FORMAT: "JSON",
  346. },
  347. },
  348. }, {
  349. s: `CREATE STREAM demo (
  350. USERID BIGINT,
  351. FIRST_NAME STRING,
  352. LAST_NAME STRING,
  353. PICTURE BYTEA,
  354. ) WITH (DATASOURCE="users", FORMAT="JSON");`,
  355. stmt: &ast.StreamStmt{
  356. Name: ast.StreamName("demo"),
  357. StreamFields: []ast.StreamField{
  358. {Name: "USERID", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  359. {Name: "FIRST_NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  360. {Name: "LAST_NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  361. {Name: "PICTURE", FieldType: &ast.BasicType{Type: ast.BYTEA}},
  362. },
  363. Options: &ast.Options{
  364. DATASOURCE: "users",
  365. FORMAT: "JSON",
  366. },
  367. },
  368. }, {
  369. s: `CREATE STREAM demo (
  370. USERID BIGINT,
  371. FIRST_NAME STRING,
  372. LAST_NAME STRING,
  373. PICTURE BYTEA,
  374. ) WITH (DATASOURCE="users", format="BINARY");`,
  375. stmt: &ast.StreamStmt{
  376. Name: "",
  377. StreamFields: nil,
  378. Options: nil,
  379. },
  380. err: "'binary' format stream can have only one field",
  381. }, {
  382. s: `CREATE STREAM demo (
  383. image BYTEA
  384. ) WITH (DATASOURCE="users", FORMAT="BINARY");`,
  385. stmt: &ast.StreamStmt{
  386. Name: ast.StreamName("demo"),
  387. StreamFields: []ast.StreamField{
  388. {Name: "image", FieldType: &ast.BasicType{Type: ast.BYTEA}},
  389. },
  390. Options: &ast.Options{
  391. DATASOURCE: "users",
  392. FORMAT: "BINARY",
  393. },
  394. },
  395. }, {
  396. s: `CREATE STREAM demo (
  397. ) WITH (DATASOURCE="users", FORMAT="DELIMITED", Delimiter=" ");`,
  398. stmt: &ast.StreamStmt{
  399. Name: ast.StreamName("demo"),
  400. StreamFields: nil,
  401. Options: &ast.Options{
  402. DATASOURCE: "users",
  403. FORMAT: "DELIMITED",
  404. DELIMITER: " ",
  405. },
  406. },
  407. },
  408. }
  409. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  410. for i, tt := range tests {
  411. stmt, err := NewParser(strings.NewReader(tt.s)).ParseCreateStmt()
  412. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  413. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.s, tt.err, err)
  414. } else if tt.err == "" && !reflect.DeepEqual(tt.stmt, stmt) {
  415. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.s, tt.stmt, stmt)
  416. }
  417. }
  418. }