parser_stream_test.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414
  1. package xsql
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/internal/testx"
  5. "github.com/emqx/kuiper/pkg/ast"
  6. "reflect"
  7. "strings"
  8. "testing"
  9. )
  10. func TestParser_ParseCreateStream(t *testing.T) {
  11. var tests = []struct {
  12. s string
  13. stmt *ast.StreamStmt
  14. err string
  15. }{
  16. {
  17. s: `CREATE STREAM demo (
  18. USERID BIGINT,
  19. FIRST_NAME STRING,
  20. LAST_NAME STRING,
  21. NICKNAMES ARRAY(STRING),
  22. data bytea,
  23. Gender BOOLEAN,
  24. ADDRESS STRUCT(STREET_NAME STRING, NUMBER BIGINT),
  25. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID", CONF_KEY="srv1", type="MQTT", TIMESTAMP="USERID", TIMESTAMP_FORMAT="yyyy-MM-dd''T''HH:mm:ssX'");`,
  26. stmt: &ast.StreamStmt{
  27. Name: ast.StreamName("demo"),
  28. StreamFields: []ast.StreamField{
  29. {Name: "USERID", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  30. {Name: "FIRST_NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  31. {Name: "LAST_NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  32. {Name: "NICKNAMES", FieldType: &ast.ArrayType{Type: ast.STRINGS}},
  33. {Name: "data", FieldType: &ast.BasicType{Type: ast.BYTEA}},
  34. {Name: "Gender", FieldType: &ast.BasicType{Type: ast.BOOLEAN}},
  35. {Name: "ADDRESS", FieldType: &ast.RecType{
  36. StreamFields: []ast.StreamField{
  37. {Name: "STREET_NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  38. {Name: "NUMBER", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  39. },
  40. }},
  41. },
  42. Options: &ast.Options{
  43. DATASOURCE: "users",
  44. FORMAT: "JSON",
  45. KEY: "USERID",
  46. CONF_KEY: "srv1",
  47. TYPE: "MQTT",
  48. TIMESTAMP: "USERID",
  49. TIMESTAMP_FORMAT: "yyyy-MM-dd''T''HH:mm:ssX'",
  50. },
  51. },
  52. },
  53. {
  54. s: `CREATE STREAM demo (
  55. USERID BIGINT,
  56. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID", STRICT_VALIDATION="true", SHARED="true");`,
  57. stmt: &ast.StreamStmt{
  58. Name: ast.StreamName("demo"),
  59. StreamFields: []ast.StreamField{
  60. {Name: "USERID", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  61. },
  62. Options: &ast.Options{
  63. DATASOURCE: "users",
  64. FORMAT: "JSON",
  65. KEY: "USERID",
  66. STRICT_VALIDATION: true,
  67. SHARED: true,
  68. },
  69. },
  70. },
  71. {
  72. s: `CREATE STREAM demo (
  73. ADDRESSES ARRAY(STRUCT(STREET_NAME STRING, NUMBER BIGINT)),
  74. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID", STRICT_VALIDATION="FAlse");`,
  75. stmt: &ast.StreamStmt{
  76. Name: ast.StreamName("demo"),
  77. StreamFields: []ast.StreamField{
  78. {Name: "ADDRESSES", FieldType: &ast.ArrayType{
  79. Type: ast.STRUCT,
  80. FieldType: &ast.RecType{
  81. StreamFields: []ast.StreamField{
  82. {Name: "STREET_NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  83. {Name: "NUMBER", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  84. },
  85. },
  86. }},
  87. },
  88. Options: &ast.Options{
  89. DATASOURCE: "users",
  90. FORMAT: "JSON",
  91. KEY: "USERID",
  92. STRICT_VALIDATION: false,
  93. },
  94. },
  95. },
  96. {
  97. s: `CREATE STREAM demo (
  98. ADDRESSES ARRAY(STRUCT(STREET_NAME STRING, NUMBER BIGINT)),
  99. birthday datetime,
  100. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  101. stmt: &ast.StreamStmt{
  102. Name: ast.StreamName("demo"),
  103. StreamFields: []ast.StreamField{
  104. {Name: "ADDRESSES", FieldType: &ast.ArrayType{
  105. Type: ast.STRUCT,
  106. FieldType: &ast.RecType{
  107. StreamFields: []ast.StreamField{
  108. {Name: "STREET_NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  109. {Name: "NUMBER", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  110. },
  111. },
  112. }},
  113. {Name: "birthday", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  114. },
  115. Options: &ast.Options{
  116. DATASOURCE: "users",
  117. FORMAT: "JSON",
  118. KEY: "USERID",
  119. },
  120. },
  121. },
  122. {
  123. s: `CREATE STREAM demo (
  124. NAME string,
  125. ADDRESSES ARRAY(STRUCT(STREET_NAME STRING, NUMBER BIGINT)),
  126. birthday datetime,
  127. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  128. stmt: &ast.StreamStmt{
  129. Name: ast.StreamName("demo"),
  130. StreamFields: []ast.StreamField{
  131. {Name: "NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  132. {Name: "ADDRESSES", FieldType: &ast.ArrayType{
  133. Type: ast.STRUCT,
  134. FieldType: &ast.RecType{
  135. StreamFields: []ast.StreamField{
  136. {Name: "STREET_NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  137. {Name: "NUMBER", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  138. },
  139. },
  140. }},
  141. {Name: "birthday", FieldType: &ast.BasicType{Type: ast.DATETIME}},
  142. },
  143. Options: &ast.Options{
  144. DATASOURCE: "users",
  145. FORMAT: "JSON",
  146. KEY: "USERID",
  147. },
  148. },
  149. },
  150. {
  151. s: `CREATE STREAM demo (
  152. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  153. stmt: &ast.StreamStmt{
  154. Name: ast.StreamName("demo"),
  155. StreamFields: nil,
  156. Options: &ast.Options{
  157. DATASOURCE: "users",
  158. FORMAT: "JSON",
  159. KEY: "USERID",
  160. },
  161. },
  162. },
  163. {
  164. s: `CREATE STREAM demo() WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  165. stmt: &ast.StreamStmt{
  166. Name: ast.StreamName("demo"),
  167. StreamFields: nil,
  168. Options: &ast.Options{
  169. DATASOURCE: "users",
  170. FORMAT: "JSON",
  171. KEY: "USERID",
  172. },
  173. },
  174. },
  175. {
  176. s: `CREATE STREAM demo (NAME string)
  177. WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID", STRICT_VALIDATION="true1");`, //Invalid STRICT_VALIDATION value
  178. stmt: nil,
  179. err: `found "true1", expect TRUE/FALSE value in STRICT_VALIDATION option.`,
  180. },
  181. {
  182. s: `CREATE STREAM demo (NAME string) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  183. stmt: &ast.StreamStmt{
  184. Name: ast.StreamName("demo"),
  185. StreamFields: []ast.StreamField{
  186. {Name: "NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  187. },
  188. Options: &ast.Options{
  189. DATASOURCE: "users",
  190. FORMAT: "JSON",
  191. KEY: "USERID",
  192. },
  193. },
  194. },
  195. {
  196. s: `CREATE STREAM demo (NAME string)) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  197. stmt: &ast.StreamStmt{
  198. Name: ast.StreamName("demo"),
  199. StreamFields: nil,
  200. Options: nil,
  201. },
  202. err: `found ")", expect stream options.`,
  203. },
  204. {
  205. s: `CREATE STREAM demo (NAME string) WITHs (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  206. stmt: &ast.StreamStmt{
  207. Name: ast.StreamName("demo"),
  208. StreamFields: nil,
  209. Options: nil,
  210. },
  211. err: `found "WITHs", expected is with.`,
  212. },
  213. {
  214. s: `CREATE STREAM demo (NAME integer) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  215. stmt: &ast.StreamStmt{
  216. Name: "demo",
  217. StreamFields: nil,
  218. Options: nil,
  219. },
  220. err: `found "integer", expect valid stream field types(BIGINT | FLOAT | STRINGS | DATETIME | BOOLEAN | BYTEA | ARRAY | STRUCT).`,
  221. },
  222. {
  223. s: `CREATE STREAM demo (NAME string) WITH (sources="users", FORMAT="JSON", KEY="USERID");`,
  224. stmt: &ast.StreamStmt{
  225. Name: "demo",
  226. StreamFields: nil,
  227. Options: nil,
  228. },
  229. err: `found "sources", unknown option keys(DATASOURCE|FORMAT|KEY|CONF_KEY|SHARED|STRICT_VALIDATION|TYPE|TIMESTAMP|TIMESTAMP_FORMAT|RETAIN_SIZE).`,
  230. },
  231. {
  232. s: `CREATE STREAM demo ((NAME string) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  233. stmt: &ast.StreamStmt{
  234. Name: "demo",
  235. StreamFields: nil,
  236. Options: nil,
  237. },
  238. err: `found "(", expect stream field name.`,
  239. },
  240. {
  241. s: `CREATE STREAM demo (
  242. USERID BIGINT,
  243. ) WITH ();`,
  244. stmt: &ast.StreamStmt{
  245. Name: "demo",
  246. StreamFields: []ast.StreamField{
  247. {Name: "USERID", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  248. },
  249. Options: &ast.Options{},
  250. },
  251. },
  252. {
  253. s: `CREATE STREAM demo (
  254. USERID BIGINT,
  255. ) WITH ());`,
  256. stmt: &ast.StreamStmt{
  257. Name: "",
  258. StreamFields: nil,
  259. Options: nil,
  260. },
  261. err: `found ")", expected semicolon or EOF.`,
  262. },
  263. {
  264. s: `CREATE STREAM demo (
  265. USERID BIGINT,
  266. ) WITH DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  267. stmt: &ast.StreamStmt{
  268. Name: "",
  269. StreamFields: nil,
  270. Options: nil,
  271. },
  272. //TODO The error string should be more accurate
  273. err: `found "DATASOURCE", expect stream options.`,
  274. },
  275. {
  276. s: `CREATE STREAM test(
  277. userID bigint,
  278. username string,
  279. NICKNAMES array(string),
  280. Gender boolean,
  281. ADDRESS struct(
  282. TREET_NAME string, NUMBER bigint
  283. ),
  284. INFO struct(
  285. INFO_NAME string, NUMBER bigint
  286. )
  287. ) WITH (DATASOURCE="test", FORMAT="JSON", CONF_KEY="democonf", TYPE="MQTT");`,
  288. stmt: &ast.StreamStmt{
  289. Name: ast.StreamName("test"),
  290. StreamFields: []ast.StreamField{
  291. {Name: "userID", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  292. {Name: "username", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  293. {Name: "NICKNAMES", FieldType: &ast.ArrayType{Type: ast.STRINGS}},
  294. {Name: "Gender", FieldType: &ast.BasicType{Type: ast.BOOLEAN}},
  295. {Name: "ADDRESS", FieldType: &ast.RecType{
  296. StreamFields: []ast.StreamField{
  297. {Name: "TREET_NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  298. {Name: "NUMBER", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  299. },
  300. }},
  301. {Name: "INFO", FieldType: &ast.RecType{
  302. StreamFields: []ast.StreamField{
  303. {Name: "INFO_NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  304. {Name: "NUMBER", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  305. },
  306. }},
  307. },
  308. Options: &ast.Options{
  309. DATASOURCE: "test",
  310. FORMAT: "JSON",
  311. CONF_KEY: "democonf",
  312. TYPE: "MQTT",
  313. },
  314. },
  315. }, {
  316. s: `CREATE STREAM demo (
  317. USERID BIGINT,
  318. FIRST_NAME STRING,
  319. LAST_NAME STRING,
  320. PICTURE BYTEA,
  321. ) WITH (DATASOURCE="users", FORMAT="JSON");`,
  322. stmt: &ast.StreamStmt{
  323. Name: ast.StreamName("demo"),
  324. StreamFields: []ast.StreamField{
  325. {Name: "USERID", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  326. {Name: "FIRST_NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  327. {Name: "LAST_NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  328. {Name: "PICTURE", FieldType: &ast.BasicType{Type: ast.BYTEA}},
  329. },
  330. Options: &ast.Options{
  331. DATASOURCE: "users",
  332. FORMAT: "JSON",
  333. },
  334. },
  335. }, {
  336. s: `CREATE STREAM demo (
  337. USERID BIGINT,
  338. FIRST_NAME STRING,
  339. LAST_NAME STRING,
  340. PICTURE BYTEA,
  341. ) WITH (DATASOURCE="users", FORMAT="JSON");`,
  342. stmt: &ast.StreamStmt{
  343. Name: ast.StreamName("demo"),
  344. StreamFields: []ast.StreamField{
  345. {Name: "USERID", FieldType: &ast.BasicType{Type: ast.BIGINT}},
  346. {Name: "FIRST_NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  347. {Name: "LAST_NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
  348. {Name: "PICTURE", FieldType: &ast.BasicType{Type: ast.BYTEA}},
  349. },
  350. Options: &ast.Options{
  351. DATASOURCE: "users",
  352. FORMAT: "JSON",
  353. },
  354. },
  355. }, {
  356. s: `CREATE STREAM demo (
  357. USERID BIGINT,
  358. FIRST_NAME STRING,
  359. LAST_NAME STRING,
  360. PICTURE BYTEA,
  361. ) WITH (DATASOURCE="users", format="BINARY");`,
  362. stmt: &ast.StreamStmt{
  363. Name: "",
  364. StreamFields: nil,
  365. Options: nil,
  366. },
  367. err: "'binary' format stream can have only one field",
  368. }, {
  369. s: `CREATE STREAM demo (
  370. image BYTEA
  371. ) WITH (DATASOURCE="users", FORMAT="BINARY");`,
  372. stmt: &ast.StreamStmt{
  373. Name: ast.StreamName("demo"),
  374. StreamFields: []ast.StreamField{
  375. {Name: "image", FieldType: &ast.BasicType{Type: ast.BYTEA}},
  376. },
  377. Options: &ast.Options{
  378. DATASOURCE: "users",
  379. FORMAT: "BINARY",
  380. },
  381. },
  382. },
  383. }
  384. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  385. for i, tt := range tests {
  386. stmt, err := NewParser(strings.NewReader(tt.s)).ParseCreateStmt()
  387. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  388. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.s, tt.err, err)
  389. } else if tt.err == "" && !reflect.DeepEqual(tt.stmt, stmt) {
  390. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.s, tt.stmt, stmt)
  391. }
  392. }
  393. }