xsql_stream_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413
  1. package xsql
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "reflect"
  6. "strings"
  7. "testing"
  8. )
  9. func TestParser_ParseCreateStream(t *testing.T) {
  10. var tests = []struct {
  11. s string
  12. stmt *StreamStmt
  13. err string
  14. }{
  15. {
  16. s: `CREATE STREAM demo (
  17. USERID BIGINT,
  18. FIRST_NAME STRING,
  19. LAST_NAME STRING,
  20. NICKNAMES ARRAY(STRING),
  21. data bytea,
  22. Gender BOOLEAN,
  23. ADDRESS STRUCT(STREET_NAME STRING, NUMBER BIGINT),
  24. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID", CONF_KEY="srv1", type="MQTT", TIMESTAMP="USERID", TIMESTAMP_FORMAT="yyyy-MM-dd''T''HH:mm:ssX'");`,
  25. stmt: &StreamStmt{
  26. Name: StreamName("demo"),
  27. StreamFields: []StreamField{
  28. {Name: "USERID", FieldType: &BasicType{Type: BIGINT}},
  29. {Name: "FIRST_NAME", FieldType: &BasicType{Type: STRINGS}},
  30. {Name: "LAST_NAME", FieldType: &BasicType{Type: STRINGS}},
  31. {Name: "NICKNAMES", FieldType: &ArrayType{Type: STRINGS}},
  32. {Name: "data", FieldType: &BasicType{Type: BYTEA}},
  33. {Name: "Gender", FieldType: &BasicType{Type: BOOLEAN}},
  34. {Name: "ADDRESS", FieldType: &RecType{
  35. StreamFields: []StreamField{
  36. {Name: "STREET_NAME", FieldType: &BasicType{Type: STRINGS}},
  37. {Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}},
  38. },
  39. }},
  40. },
  41. Options: &Options{
  42. DATASOURCE: "users",
  43. FORMAT: "JSON",
  44. KEY: "USERID",
  45. CONF_KEY: "srv1",
  46. TYPE: "MQTT",
  47. TIMESTAMP: "USERID",
  48. TIMESTAMP_FORMAT: "yyyy-MM-dd''T''HH:mm:ssX'",
  49. },
  50. },
  51. },
  52. {
  53. s: `CREATE STREAM demo (
  54. USERID BIGINT,
  55. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID", STRICT_VALIDATION="true", SHARED="true");`,
  56. stmt: &StreamStmt{
  57. Name: StreamName("demo"),
  58. StreamFields: []StreamField{
  59. {Name: "USERID", FieldType: &BasicType{Type: BIGINT}},
  60. },
  61. Options: &Options{
  62. DATASOURCE: "users",
  63. FORMAT: "JSON",
  64. KEY: "USERID",
  65. STRICT_VALIDATION: true,
  66. SHARED: true,
  67. },
  68. },
  69. },
  70. {
  71. s: `CREATE STREAM demo (
  72. ADDRESSES ARRAY(STRUCT(STREET_NAME STRING, NUMBER BIGINT)),
  73. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID", STRICT_VALIDATION="FAlse");`,
  74. stmt: &StreamStmt{
  75. Name: StreamName("demo"),
  76. StreamFields: []StreamField{
  77. {Name: "ADDRESSES", FieldType: &ArrayType{
  78. Type: STRUCT,
  79. FieldType: &RecType{
  80. StreamFields: []StreamField{
  81. {Name: "STREET_NAME", FieldType: &BasicType{Type: STRINGS}},
  82. {Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}},
  83. },
  84. },
  85. }},
  86. },
  87. Options: &Options{
  88. DATASOURCE: "users",
  89. FORMAT: "JSON",
  90. KEY: "USERID",
  91. STRICT_VALIDATION: false,
  92. },
  93. },
  94. },
  95. {
  96. s: `CREATE STREAM demo (
  97. ADDRESSES ARRAY(STRUCT(STREET_NAME STRING, NUMBER BIGINT)),
  98. birthday datetime,
  99. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  100. stmt: &StreamStmt{
  101. Name: StreamName("demo"),
  102. StreamFields: []StreamField{
  103. {Name: "ADDRESSES", FieldType: &ArrayType{
  104. Type: STRUCT,
  105. FieldType: &RecType{
  106. StreamFields: []StreamField{
  107. {Name: "STREET_NAME", FieldType: &BasicType{Type: STRINGS}},
  108. {Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}},
  109. },
  110. },
  111. }},
  112. {Name: "birthday", FieldType: &BasicType{Type: DATETIME}},
  113. },
  114. Options: &Options{
  115. DATASOURCE: "users",
  116. FORMAT: "JSON",
  117. KEY: "USERID",
  118. },
  119. },
  120. },
  121. {
  122. s: `CREATE STREAM demo (
  123. NAME string,
  124. ADDRESSES ARRAY(STRUCT(STREET_NAME STRING, NUMBER BIGINT)),
  125. birthday datetime,
  126. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  127. stmt: &StreamStmt{
  128. Name: StreamName("demo"),
  129. StreamFields: []StreamField{
  130. {Name: "NAME", FieldType: &BasicType{Type: STRINGS}},
  131. {Name: "ADDRESSES", FieldType: &ArrayType{
  132. Type: STRUCT,
  133. FieldType: &RecType{
  134. StreamFields: []StreamField{
  135. {Name: "STREET_NAME", FieldType: &BasicType{Type: STRINGS}},
  136. {Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}},
  137. },
  138. },
  139. }},
  140. {Name: "birthday", FieldType: &BasicType{Type: DATETIME}},
  141. },
  142. Options: &Options{
  143. DATASOURCE: "users",
  144. FORMAT: "JSON",
  145. KEY: "USERID",
  146. },
  147. },
  148. },
  149. {
  150. s: `CREATE STREAM demo (
  151. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  152. stmt: &StreamStmt{
  153. Name: StreamName("demo"),
  154. StreamFields: nil,
  155. Options: &Options{
  156. DATASOURCE: "users",
  157. FORMAT: "JSON",
  158. KEY: "USERID",
  159. },
  160. },
  161. },
  162. {
  163. s: `CREATE STREAM demo() WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  164. stmt: &StreamStmt{
  165. Name: StreamName("demo"),
  166. StreamFields: nil,
  167. Options: &Options{
  168. DATASOURCE: "users",
  169. FORMAT: "JSON",
  170. KEY: "USERID",
  171. },
  172. },
  173. },
  174. {
  175. s: `CREATE STREAM demo (NAME string)
  176. WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID", STRICT_VALIDATION="true1");`, //Invalid STRICT_VALIDATION value
  177. stmt: nil,
  178. err: `found "true1", expect TRUE/FALSE value in STRICT_VALIDATION option.`,
  179. },
  180. {
  181. s: `CREATE STREAM demo (NAME string) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  182. stmt: &StreamStmt{
  183. Name: StreamName("demo"),
  184. StreamFields: []StreamField{
  185. {Name: "NAME", FieldType: &BasicType{Type: STRINGS}},
  186. },
  187. Options: &Options{
  188. DATASOURCE: "users",
  189. FORMAT: "JSON",
  190. KEY: "USERID",
  191. },
  192. },
  193. },
  194. {
  195. s: `CREATE STREAM demo (NAME string)) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  196. stmt: &StreamStmt{
  197. Name: StreamName("demo"),
  198. StreamFields: nil,
  199. Options: nil,
  200. },
  201. err: `found ")", expect stream options.`,
  202. },
  203. {
  204. s: `CREATE STREAM demo (NAME string) WITHs (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  205. stmt: &StreamStmt{
  206. Name: StreamName("demo"),
  207. StreamFields: nil,
  208. Options: nil,
  209. },
  210. err: `found "WITHs", expected is with.`,
  211. },
  212. {
  213. s: `CREATE STREAM demo (NAME integer) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  214. stmt: &StreamStmt{
  215. Name: "demo",
  216. StreamFields: nil,
  217. Options: nil,
  218. },
  219. err: `found "integer", expect valid stream field types(BIGINT | FLOAT | STRINGS | DATETIME | BOOLEAN | BYTEA | ARRAY | STRUCT).`,
  220. },
  221. {
  222. s: `CREATE STREAM demo (NAME string) WITH (sources="users", FORMAT="JSON", KEY="USERID");`,
  223. stmt: &StreamStmt{
  224. Name: "demo",
  225. StreamFields: nil,
  226. Options: nil,
  227. },
  228. err: `found "sources", unknown option keys(DATASOURCE|FORMAT|KEY|CONF_KEY|SHARED|STRICT_VALIDATION|TYPE|TIMESTAMP|TIMESTAMP_FORMAT|RETAIN_SIZE).`,
  229. },
  230. {
  231. s: `CREATE STREAM demo ((NAME string) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  232. stmt: &StreamStmt{
  233. Name: "demo",
  234. StreamFields: nil,
  235. Options: nil,
  236. },
  237. err: `found "(", expect stream field name.`,
  238. },
  239. {
  240. s: `CREATE STREAM demo (
  241. USERID BIGINT,
  242. ) WITH ();`,
  243. stmt: &StreamStmt{
  244. Name: "demo",
  245. StreamFields: []StreamField{
  246. {Name: "USERID", FieldType: &BasicType{Type: BIGINT}},
  247. },
  248. Options: &Options{},
  249. },
  250. },
  251. {
  252. s: `CREATE STREAM demo (
  253. USERID BIGINT,
  254. ) WITH ());`,
  255. stmt: &StreamStmt{
  256. Name: "",
  257. StreamFields: nil,
  258. Options: nil,
  259. },
  260. err: `found ")", expected semicolon or EOF.`,
  261. },
  262. {
  263. s: `CREATE STREAM demo (
  264. USERID BIGINT,
  265. ) WITH DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  266. stmt: &StreamStmt{
  267. Name: "",
  268. StreamFields: nil,
  269. Options: nil,
  270. },
  271. //TODO The error string should be more accurate
  272. err: `found "DATASOURCE", expect stream options.`,
  273. },
  274. {
  275. s: `CREATE STREAM test(
  276. userID bigint,
  277. username string,
  278. NICKNAMES array(string),
  279. Gender boolean,
  280. ADDRESS struct(
  281. TREET_NAME string, NUMBER bigint
  282. ),
  283. INFO struct(
  284. INFO_NAME string, NUMBER bigint
  285. )
  286. ) WITH (DATASOURCE="test", FORMAT="JSON", CONF_KEY="democonf", TYPE="MQTT");`,
  287. stmt: &StreamStmt{
  288. Name: StreamName("test"),
  289. StreamFields: []StreamField{
  290. {Name: "userID", FieldType: &BasicType{Type: BIGINT}},
  291. {Name: "username", FieldType: &BasicType{Type: STRINGS}},
  292. {Name: "NICKNAMES", FieldType: &ArrayType{Type: STRINGS}},
  293. {Name: "Gender", FieldType: &BasicType{Type: BOOLEAN}},
  294. {Name: "ADDRESS", FieldType: &RecType{
  295. StreamFields: []StreamField{
  296. {Name: "TREET_NAME", FieldType: &BasicType{Type: STRINGS}},
  297. {Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}},
  298. },
  299. }},
  300. {Name: "INFO", FieldType: &RecType{
  301. StreamFields: []StreamField{
  302. {Name: "INFO_NAME", FieldType: &BasicType{Type: STRINGS}},
  303. {Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}},
  304. },
  305. }},
  306. },
  307. Options: &Options{
  308. DATASOURCE: "test",
  309. FORMAT: "JSON",
  310. CONF_KEY: "democonf",
  311. TYPE: "MQTT",
  312. },
  313. },
  314. }, {
  315. s: `CREATE STREAM demo (
  316. USERID BIGINT,
  317. FIRST_NAME STRING,
  318. LAST_NAME STRING,
  319. PICTURE BYTEA,
  320. ) WITH (DATASOURCE="users", FORMAT="JSON");`,
  321. stmt: &StreamStmt{
  322. Name: StreamName("demo"),
  323. StreamFields: []StreamField{
  324. {Name: "USERID", FieldType: &BasicType{Type: BIGINT}},
  325. {Name: "FIRST_NAME", FieldType: &BasicType{Type: STRINGS}},
  326. {Name: "LAST_NAME", FieldType: &BasicType{Type: STRINGS}},
  327. {Name: "PICTURE", FieldType: &BasicType{Type: BYTEA}},
  328. },
  329. Options: &Options{
  330. DATASOURCE: "users",
  331. FORMAT: "JSON",
  332. },
  333. },
  334. }, {
  335. s: `CREATE STREAM demo (
  336. USERID BIGINT,
  337. FIRST_NAME STRING,
  338. LAST_NAME STRING,
  339. PICTURE BYTEA,
  340. ) WITH (DATASOURCE="users", FORMAT="JSON");`,
  341. stmt: &StreamStmt{
  342. Name: StreamName("demo"),
  343. StreamFields: []StreamField{
  344. {Name: "USERID", FieldType: &BasicType{Type: BIGINT}},
  345. {Name: "FIRST_NAME", FieldType: &BasicType{Type: STRINGS}},
  346. {Name: "LAST_NAME", FieldType: &BasicType{Type: STRINGS}},
  347. {Name: "PICTURE", FieldType: &BasicType{Type: BYTEA}},
  348. },
  349. Options: &Options{
  350. DATASOURCE: "users",
  351. FORMAT: "JSON",
  352. },
  353. },
  354. }, {
  355. s: `CREATE STREAM demo (
  356. USERID BIGINT,
  357. FIRST_NAME STRING,
  358. LAST_NAME STRING,
  359. PICTURE BYTEA,
  360. ) WITH (DATASOURCE="users", format="BINARY");`,
  361. stmt: &StreamStmt{
  362. Name: "",
  363. StreamFields: nil,
  364. Options: nil,
  365. },
  366. err: "'binary' format stream can have only one field",
  367. }, {
  368. s: `CREATE STREAM demo (
  369. image BYTEA
  370. ) WITH (DATASOURCE="users", FORMAT="BINARY");`,
  371. stmt: &StreamStmt{
  372. Name: StreamName("demo"),
  373. StreamFields: []StreamField{
  374. {Name: "image", FieldType: &BasicType{Type: BYTEA}},
  375. },
  376. Options: &Options{
  377. DATASOURCE: "users",
  378. FORMAT: "BINARY",
  379. },
  380. },
  381. },
  382. }
  383. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  384. for i, tt := range tests {
  385. stmt, err := NewParser(strings.NewReader(tt.s)).ParseCreateStmt()
  386. if !reflect.DeepEqual(tt.err, common.Errstring(err)) {
  387. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.s, tt.err, err)
  388. } else if tt.err == "" && !reflect.DeepEqual(tt.stmt, stmt) {
  389. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.s, tt.stmt, stmt)
  390. }
  391. }
  392. }