xsql_stream_test.go 11 KB


  1. package xsql
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "reflect"
  6. "strings"
  7. "testing"
  8. )
  9. func TestParser_ParseCreateStream(t *testing.T) {
  10. var tests = []struct {
  11. s string
  12. stmt *StreamStmt
  13. err string
  14. }{
  15. {
  16. s: `CREATE STREAM demo (
  17. USERID BIGINT,
  18. FIRST_NAME STRING,
  19. LAST_NAME STRING,
  20. NICKNAMES ARRAY(STRING),
  21. data bytea,
  22. Gender BOOLEAN,
  23. ADDRESS STRUCT(STREET_NAME STRING, NUMBER BIGINT),
  24. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID", CONF_KEY="srv1", type="MQTT", TIMESTAMP="USERID", TIMESTAMP_FORMAT="yyyy-MM-dd''T''HH:mm:ssX'");`,
  25. stmt: &StreamStmt{
  26. Name: StreamName("demo"),
  27. StreamFields: []StreamField{
  28. {Name: "USERID", FieldType: &BasicType{Type: BIGINT}},
  29. {Name: "FIRST_NAME", FieldType: &BasicType{Type: STRINGS}},
  30. {Name: "LAST_NAME", FieldType: &BasicType{Type: STRINGS}},
  31. {Name: "NICKNAMES", FieldType: &ArrayType{Type: STRINGS}},
  32. {Name: "data", FieldType: &BasicType{Type: BYTEA}},
  33. {Name: "Gender", FieldType: &BasicType{Type: BOOLEAN}},
  34. {Name: "ADDRESS", FieldType: &RecType{
  35. StreamFields: []StreamField{
  36. {Name: "STREET_NAME", FieldType: &BasicType{Type: STRINGS}},
  37. {Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}},
  38. },
  39. }},
  40. },
  41. Options: &Options{
  42. DATASOURCE: "users",
  43. FORMAT: "JSON",
  44. KEY: "USERID",
  45. CONF_KEY: "srv1",
  46. TYPE: "MQTT",
  47. TIMESTAMP: "USERID",
  48. TIMESTAMP_FORMAT: "yyyy-MM-dd''T''HH:mm:ssX'",
  49. },
  50. },
  51. },
  52. {
  53. s: `CREATE STREAM demo (
  54. USERID BIGINT,
  55. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID", STRICT_VALIDATION="true");`,
  56. stmt: &StreamStmt{
  57. Name: StreamName("demo"),
  58. StreamFields: []StreamField{
  59. {Name: "USERID", FieldType: &BasicType{Type: BIGINT}},
  60. },
  61. Options: &Options{
  62. DATASOURCE: "users",
  63. FORMAT: "JSON",
  64. KEY: "USERID",
  65. STRICT_VALIDATION: true,
  66. },
  67. },
  68. },
  69. {
  70. s: `CREATE STREAM demo (
  71. ADDRESSES ARRAY(STRUCT(STREET_NAME STRING, NUMBER BIGINT)),
  72. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID", STRICT_VALIDATION="FAlse");`,
  73. stmt: &StreamStmt{
  74. Name: StreamName("demo"),
  75. StreamFields: []StreamField{
  76. {Name: "ADDRESSES", FieldType: &ArrayType{
  77. Type: STRUCT,
  78. FieldType: &RecType{
  79. StreamFields: []StreamField{
  80. {Name: "STREET_NAME", FieldType: &BasicType{Type: STRINGS}},
  81. {Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}},
  82. },
  83. },
  84. }},
  85. },
  86. Options: &Options{
  87. DATASOURCE: "users",
  88. FORMAT: "JSON",
  89. KEY: "USERID",
  90. STRICT_VALIDATION: false,
  91. },
  92. },
  93. },
  94. {
  95. s: `CREATE STREAM demo (
  96. ADDRESSES ARRAY(STRUCT(STREET_NAME STRING, NUMBER BIGINT)),
  97. birthday datetime,
  98. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  99. stmt: &StreamStmt{
  100. Name: StreamName("demo"),
  101. StreamFields: []StreamField{
  102. {Name: "ADDRESSES", FieldType: &ArrayType{
  103. Type: STRUCT,
  104. FieldType: &RecType{
  105. StreamFields: []StreamField{
  106. {Name: "STREET_NAME", FieldType: &BasicType{Type: STRINGS}},
  107. {Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}},
  108. },
  109. },
  110. }},
  111. {Name: "birthday", FieldType: &BasicType{Type: DATETIME}},
  112. },
  113. Options: &Options{
  114. DATASOURCE: "users",
  115. FORMAT: "JSON",
  116. KEY: "USERID",
  117. },
  118. },
  119. },
  120. {
  121. s: `CREATE STREAM demo (
  122. NAME string,
  123. ADDRESSES ARRAY(STRUCT(STREET_NAME STRING, NUMBER BIGINT)),
  124. birthday datetime,
  125. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  126. stmt: &StreamStmt{
  127. Name: StreamName("demo"),
  128. StreamFields: []StreamField{
  129. {Name: "NAME", FieldType: &BasicType{Type: STRINGS}},
  130. {Name: "ADDRESSES", FieldType: &ArrayType{
  131. Type: STRUCT,
  132. FieldType: &RecType{
  133. StreamFields: []StreamField{
  134. {Name: "STREET_NAME", FieldType: &BasicType{Type: STRINGS}},
  135. {Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}},
  136. },
  137. },
  138. }},
  139. {Name: "birthday", FieldType: &BasicType{Type: DATETIME}},
  140. },
  141. Options: &Options{
  142. DATASOURCE: "users",
  143. FORMAT: "JSON",
  144. KEY: "USERID",
  145. },
  146. },
  147. },
  148. {
  149. s: `CREATE STREAM demo (
  150. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  151. stmt: &StreamStmt{
  152. Name: StreamName("demo"),
  153. StreamFields: nil,
  154. Options: &Options{
  155. DATASOURCE: "users",
  156. FORMAT: "JSON",
  157. KEY: "USERID",
  158. },
  159. },
  160. },
  161. {
  162. s: `CREATE STREAM demo() WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  163. stmt: &StreamStmt{
  164. Name: StreamName("demo"),
  165. StreamFields: nil,
  166. Options: &Options{
  167. DATASOURCE: "users",
  168. FORMAT: "JSON",
  169. KEY: "USERID",
  170. },
  171. },
  172. },
  173. {
  174. s: `CREATE STREAM demo (NAME string)
  175. WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID", STRICT_VALIDATION="true1");`, //Invalid STRICT_VALIDATION value
  176. stmt: nil,
  177. err: `found "true1", expect TRUE/FALSE value in STRICT_VALIDATION option.`,
  178. },
  179. {
  180. s: `CREATE STREAM demo (NAME string) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  181. stmt: &StreamStmt{
  182. Name: StreamName("demo"),
  183. StreamFields: []StreamField{
  184. {Name: "NAME", FieldType: &BasicType{Type: STRINGS}},
  185. },
  186. Options: &Options{
  187. DATASOURCE: "users",
  188. FORMAT: "JSON",
  189. KEY: "USERID",
  190. },
  191. },
  192. },
  193. {
  194. s: `CREATE STREAM demo (NAME string)) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  195. stmt: &StreamStmt{
  196. Name: StreamName("demo"),
  197. StreamFields: nil,
  198. Options: nil,
  199. },
  200. err: `found ")", expect stream options.`,
  201. },
  202. {
  203. s: `CREATE STREAM demo (NAME string) WITHs (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  204. stmt: &StreamStmt{
  205. Name: StreamName("demo"),
  206. StreamFields: nil,
  207. Options: nil,
  208. },
  209. err: `found "WITHs", expected is with.`,
  210. },
  211. {
  212. s: `CREATE STREAM demo (NAME integer) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  213. stmt: &StreamStmt{
  214. Name: "demo",
  215. StreamFields: nil,
  216. Options: nil,
  217. },
  218. err: `found "integer", expect valid stream field types(BIGINT | FLOAT | STRINGS | DATETIME | BOOLEAN | BYTEA | ARRAY | STRUCT).`,
  219. },
  220. {
  221. s: `CREATE STREAM demo (NAME string) WITH (sources="users", FORMAT="JSON", KEY="USERID");`,
  222. stmt: &StreamStmt{
  223. Name: "demo",
  224. StreamFields: nil,
  225. Options: nil,
  226. },
  227. err: `found "sources", unknown option keys(DATASOURCE|FORMAT|KEY|CONF_KEY|STRICT_VALIDATION|TYPE|TIMESTAMP|TIMESTAMP_FORMAT|RETAIN_SIZE).`,
  228. },
  229. {
  230. s: `CREATE STREAM demo ((NAME string) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  231. stmt: &StreamStmt{
  232. Name: "demo",
  233. StreamFields: nil,
  234. Options: nil,
  235. },
  236. err: `found "(", expect stream field name.`,
  237. },
  238. {
  239. s: `CREATE STREAM demo (
  240. USERID BIGINT,
  241. ) WITH ();`,
  242. stmt: &StreamStmt{
  243. Name: "demo",
  244. StreamFields: []StreamField{
  245. {Name: "USERID", FieldType: &BasicType{Type: BIGINT}},
  246. },
  247. Options: &Options{},
  248. },
  249. },
  250. {
  251. s: `CREATE STREAM demo (
  252. USERID BIGINT,
  253. ) WITH ());`,
  254. stmt: &StreamStmt{
  255. Name: "",
  256. StreamFields: nil,
  257. Options: nil,
  258. },
  259. err: `found ")", expected semicolon or EOF.`,
  260. },
  261. {
  262. s: `CREATE STREAM demo (
  263. USERID BIGINT,
  264. ) WITH DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  265. stmt: &StreamStmt{
  266. Name: "",
  267. StreamFields: nil,
  268. Options: nil,
  269. },
  270. //TODO The error string should be more accurate
  271. err: `found "DATASOURCE", expect stream options.`,
  272. },
  273. {
  274. s: `CREATE STREAM test(
  275. userID bigint,
  276. username string,
  277. NICKNAMES array(string),
  278. Gender boolean,
  279. ADDRESS struct(
  280. TREET_NAME string, NUMBER bigint
  281. ),
  282. INFO struct(
  283. INFO_NAME string, NUMBER bigint
  284. )
  285. ) WITH (DATASOURCE="test", FORMAT="JSON", CONF_KEY="democonf", TYPE="MQTT");`,
  286. stmt: &StreamStmt{
  287. Name: StreamName("test"),
  288. StreamFields: []StreamField{
  289. {Name: "userID", FieldType: &BasicType{Type: BIGINT}},
  290. {Name: "username", FieldType: &BasicType{Type: STRINGS}},
  291. {Name: "NICKNAMES", FieldType: &ArrayType{Type: STRINGS}},
  292. {Name: "Gender", FieldType: &BasicType{Type: BOOLEAN}},
  293. {Name: "ADDRESS", FieldType: &RecType{
  294. StreamFields: []StreamField{
  295. {Name: "TREET_NAME", FieldType: &BasicType{Type: STRINGS}},
  296. {Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}},
  297. },
  298. }},
  299. {Name: "INFO", FieldType: &RecType{
  300. StreamFields: []StreamField{
  301. {Name: "INFO_NAME", FieldType: &BasicType{Type: STRINGS}},
  302. {Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}},
  303. },
  304. }},
  305. },
  306. Options: &Options{
  307. DATASOURCE: "test",
  308. FORMAT: "JSON",
  309. CONF_KEY: "democonf",
  310. TYPE: "MQTT",
  311. },
  312. },
  313. }, {
  314. s: `CREATE STREAM demo (
  315. USERID BIGINT,
  316. FIRST_NAME STRING,
  317. LAST_NAME STRING,
  318. PICTURE BYTEA,
  319. ) WITH (DATASOURCE="users", FORMAT="JSON");`,
  320. stmt: &StreamStmt{
  321. Name: StreamName("demo"),
  322. StreamFields: []StreamField{
  323. {Name: "USERID", FieldType: &BasicType{Type: BIGINT}},
  324. {Name: "FIRST_NAME", FieldType: &BasicType{Type: STRINGS}},
  325. {Name: "LAST_NAME", FieldType: &BasicType{Type: STRINGS}},
  326. {Name: "PICTURE", FieldType: &BasicType{Type: BYTEA}},
  327. },
  328. Options: &Options{
  329. DATASOURCE: "users",
  330. FORMAT: "JSON",
  331. },
  332. },
  333. }, {
  334. s: `CREATE STREAM demo (
  335. USERID BIGINT,
  336. FIRST_NAME STRING,
  337. LAST_NAME STRING,
  338. PICTURE BYTEA,
  339. ) WITH (DATASOURCE="users", FORMAT="JSON");`,
  340. stmt: &StreamStmt{
  341. Name: StreamName("demo"),
  342. StreamFields: []StreamField{
  343. {Name: "USERID", FieldType: &BasicType{Type: BIGINT}},
  344. {Name: "FIRST_NAME", FieldType: &BasicType{Type: STRINGS}},
  345. {Name: "LAST_NAME", FieldType: &BasicType{Type: STRINGS}},
  346. {Name: "PICTURE", FieldType: &BasicType{Type: BYTEA}},
  347. },
  348. Options: &Options{
  349. DATASOURCE: "users",
  350. FORMAT: "JSON",
  351. },
  352. },
  353. }, {
  354. s: `CREATE STREAM demo (
  355. USERID BIGINT,
  356. FIRST_NAME STRING,
  357. LAST_NAME STRING,
  358. PICTURE BYTEA,
  359. ) WITH (DATASOURCE="users", format="BINARY");`,
  360. stmt: &StreamStmt{
  361. Name: "",
  362. StreamFields: nil,
  363. Options: nil,
  364. },
  365. err: "'binary' format stream can have only one field",
  366. }, {
  367. s: `CREATE STREAM demo (
  368. image BYTEA
  369. ) WITH (DATASOURCE="users", FORMAT="BINARY");`,
  370. stmt: &StreamStmt{
  371. Name: StreamName("demo"),
  372. StreamFields: []StreamField{
  373. {Name: "image", FieldType: &BasicType{Type: BYTEA}},
  374. },
  375. Options: &Options{
  376. DATASOURCE: "users",
  377. FORMAT: "BINARY",
  378. },
  379. },
  380. },
  381. }
  382. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  383. for i, tt := range tests {
  384. stmt, err := NewParser(strings.NewReader(tt.s)).ParseCreateStmt()
  385. if !reflect.DeepEqual(tt.err, common.Errstring(err)) {
  386. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.s, tt.err, err)
  387. } else if tt.err == "" && !reflect.DeepEqual(tt.stmt, stmt) {
  388. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.s, tt.stmt, stmt)
  389. }
  390. }
  391. }