parser_stream_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package xsql
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/testx"
  18. "github.com/lf-edge/ekuiper/pkg/ast"
  19. "reflect"
  20. "strings"
  21. "testing"
  22. )
  23. func TestParser_ParseCreateStream(t *testing.T) {
  24. var tests = []struct {
  25. s string
  26. stmt *ast.StreamStmt
  27. err string
  28. }{
  29. {
  30. s: `CREATE STREAM demo (
  31. USERID BIGINT,
  32. FIRST_NAME STRING,
  33. LAST_NAME STRING,
  34. NICKNAMES ARRAY(STRING),
  35. data bytea,
  36. Gender BOOLEAN,
  37. ADDRESS STRUCT(STREET_NAME STRING, NUMBER BIGINT),
  38. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID", CONF_KEY="srv1", type="MQTT", TIMESTAMP="USERID", TIMESTAMP_FORMAT="yyyy-MM-dd''T''HH:mm:ssX'");`,
  39. stmt: &ast.StreamStmt{
  40. Name: ast.StreamName("demo"),
  41. StreamFields: []ast.StreamField{
  42. {Name: "USERID", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  43. {Name: "FIRST_NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  44. {Name: "LAST_NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  45. {Name: "NICKNAMES", FieldType: &ast.ArrayType{Type: ast.STRINGS}},
  46. {Name: "data", FieldType: &ast.BasicType{Type: ast.BYTEA}},
  47. {Name: "Gender", FieldType: &ast.BasicType{Type: ast.BOOLEAN}},
  48. {Name: "ADDRESS", FieldType: &ast.RecType{
  49. StreamFields: []ast.StreamField{
  50. {Name: "STREET_NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  51. {Name: "NUMBER", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  52. },
  53. }},
  54. },
  55. Options: &ast.Options{
  56. DATASOURCE: "users",
  57. FORMAT: "JSON",
  58. KEY: "USERID",
  59. CONF_KEY: "srv1",
  60. TYPE: "MQTT",
  61. TIMESTAMP: "USERID",
  62. TIMESTAMP_FORMAT: "yyyy-MM-dd''T''HH:mm:ssX'",
  63. STRICT_VALIDATION: true,
  64. },
  65. },
  66. },
  67. {
  68. s: `CREATE STREAM demo (
  69. USERID BIGINT,
  70. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID", STRICT_VALIDATION="true", SHARED="true");`,
  71. stmt: &ast.StreamStmt{
  72. Name: ast.StreamName("demo"),
  73. StreamFields: []ast.StreamField{
  74. {Name: "USERID", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  75. },
  76. Options: &ast.Options{
  77. DATASOURCE: "users",
  78. FORMAT: "JSON",
  79. KEY: "USERID",
  80. STRICT_VALIDATION: true,
  81. SHARED: true,
  82. },
  83. },
  84. },
  85. {
  86. s: `CREATE STREAM demo (
  87. ADDRESSES ARRAY(STRUCT(STREET_NAME STRING, NUMBER BIGINT)),
  88. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID", STRICT_VALIDATION="FAlse");`,
  89. stmt: &ast.StreamStmt{
  90. Name: ast.StreamName("demo"),
  91. StreamFields: []ast.StreamField{
  92. {Name: "ADDRESSES", FieldType: &ast.ArrayType{
  93. Type: ast.STRUCT,
  94. FieldType: &ast.RecType{
  95. StreamFields: []ast.StreamField{
  96. {Name: "STREET_NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  97. {Name: "NUMBER", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  98. },
  99. },
  100. }},
  101. },
  102. Options: &ast.Options{
  103. DATASOURCE: "users",
  104. FORMAT: "JSON",
  105. KEY: "USERID",
  106. STRICT_VALIDATION: false,
  107. },
  108. },
  109. },
  110. {
  111. s: `CREATE STREAM demo (
  112. ADDRESSES ARRAY(STRUCT(STREET_NAME STRING, NUMBER BIGINT)),
  113. birthday datetime,
  114. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  115. stmt: &ast.StreamStmt{
  116. Name: ast.StreamName("demo"),
  117. StreamFields: []ast.StreamField{
  118. {Name: "ADDRESSES", FieldType: &ast.ArrayType{
  119. Type: ast.STRUCT,
  120. FieldType: &ast.RecType{
  121. StreamFields: []ast.StreamField{
  122. {Name: "STREET_NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  123. {Name: "NUMBER", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  124. },
  125. },
  126. }},
  127. {Name: "birthday", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  128. },
  129. Options: &ast.Options{
  130. DATASOURCE: "users",
  131. FORMAT: "JSON",
  132. KEY: "USERID",
  133. STRICT_VALIDATION: true,
  134. },
  135. },
  136. },
  137. {
  138. s: `CREATE STREAM demo (
  139. NAME string,
  140. ADDRESSES ARRAY(STRUCT(STREET_NAME STRING, NUMBER BIGINT)),
  141. birthday datetime,
  142. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  143. stmt: &ast.StreamStmt{
  144. Name: ast.StreamName("demo"),
  145. StreamFields: []ast.StreamField{
  146. {Name: "NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  147. {Name: "ADDRESSES", FieldType: &ast.ArrayType{
  148. Type: ast.STRUCT,
  149. FieldType: &ast.RecType{
  150. StreamFields: []ast.StreamField{
  151. {Name: "STREET_NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  152. {Name: "NUMBER", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  153. },
  154. },
  155. }},
  156. {Name: "birthday", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  157. },
  158. Options: &ast.Options{
  159. DATASOURCE: "users",
  160. FORMAT: "JSON",
  161. KEY: "USERID",
  162. STRICT_VALIDATION: true,
  163. },
  164. },
  165. },
  166. {
  167. s: `CREATE STREAM demo (
  168. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  169. stmt: &ast.StreamStmt{
  170. Name: ast.StreamName("demo"),
  171. StreamFields: nil,
  172. Options: &ast.Options{
  173. DATASOURCE: "users",
  174. FORMAT: "JSON",
  175. KEY: "USERID",
  176. STRICT_VALIDATION: true,
  177. },
  178. },
  179. },
  180. {
  181. s: `CREATE STREAM demo() WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  182. stmt: &ast.StreamStmt{
  183. Name: ast.StreamName("demo"),
  184. StreamFields: nil,
  185. Options: &ast.Options{
  186. DATASOURCE: "users",
  187. FORMAT: "JSON",
  188. KEY: "USERID",
  189. STRICT_VALIDATION: true,
  190. },
  191. },
  192. },
  193. {
  194. s: `CREATE STREAM demo (NAME string)
  195. WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID", STRICT_VALIDATION="true1");`, //Invalid STRICT_VALIDATION value
  196. stmt: nil,
  197. err: `found "true1", expect TRUE/FALSE value in STRICT_VALIDATION option.`,
  198. },
  199. {
  200. s: `CREATE STREAM demo (NAME string) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  201. stmt: &ast.StreamStmt{
  202. Name: ast.StreamName("demo"),
  203. StreamFields: []ast.StreamField{
  204. {Name: "NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  205. },
  206. Options: &ast.Options{
  207. DATASOURCE: "users",
  208. FORMAT: "JSON",
  209. KEY: "USERID",
  210. STRICT_VALIDATION: true,
  211. },
  212. },
  213. },
  214. {
  215. s: `CREATE STREAM demo (NAME string)) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  216. stmt: &ast.StreamStmt{
  217. Name: ast.StreamName("demo"),
  218. StreamFields: nil,
  219. Options: nil,
  220. },
  221. err: `found ")", expect stream options.`,
  222. },
  223. {
  224. s: `CREATE STREAM demo (NAME string) WITHs (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  225. stmt: &ast.StreamStmt{
  226. Name: ast.StreamName("demo"),
  227. StreamFields: nil,
  228. Options: nil,
  229. },
  230. err: `found "WITHs", expected is with.`,
  231. },
  232. {
  233. s: `CREATE STREAM demo (NAME integer) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  234. stmt: &ast.StreamStmt{
  235. Name: "demo",
  236. StreamFields: nil,
  237. Options: nil,
  238. },
  239. err: `found "integer", expect valid stream field types(BIGINT | FLOAT | STRINGS | DATETIME | BOOLEAN | BYTEA | ARRAY | STRUCT).`,
  240. },
  241. {
  242. s: `CREATE STREAM demo (NAME string) WITH (sources="users", FORMAT="JSON", KEY="USERID");`,
  243. stmt: &ast.StreamStmt{
  244. Name: "demo",
  245. StreamFields: nil,
  246. Options: nil,
  247. },
  248. err: `found "sources", unknown option keys(DATASOURCE|FORMAT|KEY|CONF_KEY|SHARED|STRICT_VALIDATION|TYPE|TIMESTAMP|TIMESTAMP_FORMAT|RETAIN_SIZE|SCHEMAID).`,
  249. },
  250. {
  251. s: `CREATE STREAM demo ((NAME string) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  252. stmt: &ast.StreamStmt{
  253. Name: "demo",
  254. StreamFields: nil,
  255. Options: nil,
  256. },
  257. err: `found "(", expect stream field name.`,
  258. },
  259. {
  260. s: `CREATE STREAM demo (
  261. USERID BIGINT,
  262. ) WITH ();`,
  263. stmt: &ast.StreamStmt{
  264. Name: "demo",
  265. StreamFields: []ast.StreamField{
  266. {Name: "USERID", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  267. },
  268. Options: &ast.Options{STRICT_VALIDATION: true},
  269. },
  270. },
  271. {
  272. s: `CREATE STREAM demo (
  273. USERID BIGINT,
  274. ) WITH ());`,
  275. stmt: &ast.StreamStmt{
  276. Name: "",
  277. StreamFields: nil,
  278. Options: nil,
  279. },
  280. err: `found ")", expected semicolon or EOF.`,
  281. },
  282. {
  283. s: `CREATE STREAM demo (
  284. USERID BIGINT,
  285. ) WITH DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  286. stmt: &ast.StreamStmt{
  287. Name: "",
  288. StreamFields: nil,
  289. Options: nil,
  290. },
  291. //TODO The error string should be more accurate
  292. err: `found "DATASOURCE", expect stream options.`,
  293. },
  294. {
  295. s: `CREATE STREAM test(
  296. userID bigint,
  297. username string,
  298. NICKNAMES array(string),
  299. Gender boolean,
  300. ADDRESS struct(
  301. TREET_NAME string, NUMBER bigint
  302. ),
  303. INFO struct(
  304. INFO_NAME string, NUMBER bigint
  305. )
  306. ) WITH (DATASOURCE="test", FORMAT="JSON", CONF_KEY="democonf", TYPE="MQTT");`,
  307. stmt: &ast.StreamStmt{
  308. Name: ast.StreamName("test"),
  309. StreamFields: []ast.StreamField{
  310. {Name: "userID", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  311. {Name: "username", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  312. {Name: "NICKNAMES", FieldType: &ast.ArrayType{Type: ast.STRINGS}},
  313. {Name: "Gender", FieldType: &ast.BasicType{Type: ast.BOOLEAN}},
  314. {Name: "ADDRESS", FieldType: &ast.RecType{
  315. StreamFields: []ast.StreamField{
  316. {Name: "TREET_NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  317. {Name: "NUMBER", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  318. },
  319. }},
  320. {Name: "INFO", FieldType: &ast.RecType{
  321. StreamFields: []ast.StreamField{
  322. {Name: "INFO_NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  323. {Name: "NUMBER", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  324. },
  325. }},
  326. },
  327. Options: &ast.Options{
  328. DATASOURCE: "test",
  329. FORMAT: "JSON",
  330. CONF_KEY: "democonf",
  331. TYPE: "MQTT",
  332. STRICT_VALIDATION: true,
  333. },
  334. },
  335. }, {
  336. s: `CREATE STREAM demo (
  337. USERID BIGINT,
  338. FIRST_NAME STRING,
  339. LAST_NAME STRING,
  340. PICTURE BYTEA,
  341. ) WITH (DATASOURCE="users", FORMAT="JSON");`,
  342. stmt: &ast.StreamStmt{
  343. Name: ast.StreamName("demo"),
  344. StreamFields: []ast.StreamField{
  345. {Name: "USERID", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  346. {Name: "FIRST_NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  347. {Name: "LAST_NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  348. {Name: "PICTURE", FieldType: &ast.BasicType{Type: ast.BYTEA}},
  349. },
  350. Options: &ast.Options{
  351. DATASOURCE: "users",
  352. FORMAT: "JSON",
  353. STRICT_VALIDATION: true,
  354. },
  355. },
  356. }, {
  357. s: `CREATE STREAM demo (
  358. USERID BIGINT,
  359. FIRST_NAME STRING,
  360. LAST_NAME STRING,
  361. PICTURE BYTEA,
  362. ) WITH (DATASOURCE="users", FORMAT="JSON");`,
  363. stmt: &ast.StreamStmt{
  364. Name: ast.StreamName("demo"),
  365. StreamFields: []ast.StreamField{
  366. {Name: "USERID", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  367. {Name: "FIRST_NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  368. {Name: "LAST_NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  369. {Name: "PICTURE", FieldType: &ast.BasicType{Type: ast.BYTEA}},
  370. },
  371. Options: &ast.Options{
  372. DATASOURCE: "users",
  373. FORMAT: "JSON",
  374. STRICT_VALIDATION: true,
  375. },
  376. },
  377. }, {
  378. s: `CREATE STREAM demo (
  379. USERID BIGINT,
  380. FIRST_NAME STRING,
  381. LAST_NAME STRING,
  382. PICTURE BYTEA,
  383. ) WITH (DATASOURCE="users", format="BINARY");`,
  384. stmt: &ast.StreamStmt{
  385. Name: "",
  386. StreamFields: nil,
  387. Options: nil,
  388. },
  389. err: "'binary' format stream can have only one field",
  390. }, {
  391. s: `CREATE STREAM demo (
  392. image BYTEA
  393. ) WITH (DATASOURCE="users", FORMAT="BINARY");`,
  394. stmt: &ast.StreamStmt{
  395. Name: ast.StreamName("demo"),
  396. StreamFields: []ast.StreamField{
  397. {Name: "image", FieldType: &ast.BasicType{Type: ast.BYTEA}},
  398. },
  399. Options: &ast.Options{
  400. DATASOURCE: "users",
  401. FORMAT: "BINARY",
  402. STRICT_VALIDATION: true,
  403. },
  404. },
  405. },
  406. }
  407. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  408. for i, tt := range tests {
  409. stmt, err := NewParser(strings.NewReader(tt.s)).ParseCreateStmt()
  410. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  411. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.s, tt.err, err)
  412. } else if tt.err == "" && !reflect.DeepEqual(tt.stmt, stmt) {
  413. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.s, tt.stmt, stmt)
  414. }
  415. }
  416. }