xsql_stream_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417
  1. package xsql
  2. import (
  3. "fmt"
  4. "reflect"
  5. "strings"
  6. "testing"
  7. )
  8. func TestParser_ParseCreateStream(t *testing.T) {
  9. var tests = []struct {
  10. s string
  11. stmt *StreamStmt
  12. err string
  13. }{
  14. {
  15. s: `CREATE STREAM demo (
  16. USERID BIGINT,
  17. FIRST_NAME STRING,
  18. LAST_NAME STRING,
  19. NICKNAMES ARRAY(STRING),
  20. Gender BOOLEAN,
  21. ADDRESS STRUCT(STREET_NAME STRING, NUMBER BIGINT),
  22. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID", CONF_KEY="srv1", type="MQTT", TIMESTAMP="USERID", TIMESTAMP_FORMAT="yyyy-MM-dd''T''HH:mm:ssX'");`,
  23. stmt: &StreamStmt{
  24. Name: StreamName("demo"),
  25. StreamFields: []StreamField{
  26. {Name: "USERID", FieldType: &BasicType{Type: BIGINT}},
  27. {Name: "FIRST_NAME", FieldType: &BasicType{Type: STRINGS}},
  28. {Name: "LAST_NAME", FieldType: &BasicType{Type: STRINGS}},
  29. {Name: "NICKNAMES", FieldType: &ArrayType{Type: STRINGS}},
  30. {Name: "Gender", FieldType: &BasicType{Type: BOOLEAN}},
  31. {Name: "ADDRESS", FieldType: &RecType{
  32. StreamFields: []StreamField{
  33. {Name: "STREET_NAME", FieldType: &BasicType{Type: STRINGS}},
  34. {Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}},
  35. },
  36. }},
  37. },
  38. Options: map[string]string{
  39. "DATASOURCE": "users",
  40. "FORMAT": "JSON",
  41. "KEY": "USERID",
  42. "CONF_KEY": "srv1",
  43. "TYPE": "MQTT",
  44. "TIMESTAMP": "USERID",
  45. "TIMESTAMP_FORMAT": "yyyy-MM-dd''T''HH:mm:ssX'",
  46. },
  47. },
  48. },
  49. {
  50. s: `CREATE STREAM demo (
  51. USERID BIGINT,
  52. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID", STRICT_VALIDATION="true");`,
  53. stmt: &StreamStmt{
  54. Name: StreamName("demo"),
  55. StreamFields: []StreamField{
  56. {Name: "USERID", FieldType: &BasicType{Type: BIGINT}},
  57. },
  58. Options: map[string]string{
  59. "DATASOURCE": "users",
  60. "FORMAT": "JSON",
  61. "KEY": "USERID",
  62. "STRICT_VALIDATION": "true",
  63. },
  64. },
  65. },
  66. {
  67. s: `CREATE STREAM demo (
  68. ADDRESSES ARRAY(STRUCT(STREET_NAME STRING, NUMBER BIGINT)),
  69. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID", STRICT_VALIDATION="FAlse");`,
  70. stmt: &StreamStmt{
  71. Name: StreamName("demo"),
  72. StreamFields: []StreamField{
  73. {Name: "ADDRESSES", FieldType: &ArrayType{
  74. Type: STRUCT,
  75. FieldType: &RecType{
  76. StreamFields: []StreamField{
  77. {Name: "STREET_NAME", FieldType: &BasicType{Type: STRINGS}},
  78. {Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}},
  79. },
  80. },
  81. }},
  82. },
  83. Options: map[string]string{
  84. "DATASOURCE": "users",
  85. "FORMAT": "JSON",
  86. "KEY": "USERID",
  87. "STRICT_VALIDATION": "FAlse",
  88. },
  89. },
  90. },
  91. {
  92. s: `CREATE STREAM demo (
  93. ADDRESSES ARRAY(STRUCT(STREET_NAME STRING, NUMBER BIGINT)),
  94. birthday datetime,
  95. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  96. stmt: &StreamStmt{
  97. Name: StreamName("demo"),
  98. StreamFields: []StreamField{
  99. {Name: "ADDRESSES", FieldType: &ArrayType{
  100. Type: STRUCT,
  101. FieldType: &RecType{
  102. StreamFields: []StreamField{
  103. {Name: "STREET_NAME", FieldType: &BasicType{Type: STRINGS}},
  104. {Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}},
  105. },
  106. },
  107. }},
  108. {Name: "birthday", FieldType: &BasicType{Type: DATETIME}},
  109. },
  110. Options: map[string]string{
  111. "DATASOURCE": "users",
  112. "FORMAT": "JSON",
  113. "KEY": "USERID",
  114. },
  115. },
  116. },
  117. {
  118. s: `CREATE STREAM demo (
  119. NAME string,
  120. ADDRESSES ARRAY(STRUCT(STREET_NAME STRING, NUMBER BIGINT)),
  121. birthday datetime,
  122. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  123. stmt: &StreamStmt{
  124. Name: StreamName("demo"),
  125. StreamFields: []StreamField{
  126. {Name: "NAME", FieldType: &BasicType{Type: STRINGS}},
  127. {Name: "ADDRESSES", FieldType: &ArrayType{
  128. Type: STRUCT,
  129. FieldType: &RecType{
  130. StreamFields: []StreamField{
  131. {Name: "STREET_NAME", FieldType: &BasicType{Type: STRINGS}},
  132. {Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}},
  133. },
  134. },
  135. }},
  136. {Name: "birthday", FieldType: &BasicType{Type: DATETIME}},
  137. },
  138. Options: map[string]string{
  139. "DATASOURCE": "users",
  140. "FORMAT": "JSON",
  141. "KEY": "USERID",
  142. },
  143. },
  144. },
  145. {
  146. s: `CREATE STREAM demo (
  147. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  148. stmt: &StreamStmt{
  149. Name: StreamName("demo"),
  150. StreamFields: nil,
  151. Options: map[string]string{
  152. "DATASOURCE": "users",
  153. "FORMAT": "JSON",
  154. "KEY": "USERID",
  155. },
  156. },
  157. },
  158. {
  159. s: `CREATE STREAM demo() WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  160. stmt: &StreamStmt{
  161. Name: StreamName("demo"),
  162. StreamFields: nil,
  163. Options: map[string]string{
  164. "DATASOURCE": "users",
  165. "FORMAT": "JSON",
  166. "KEY": "USERID",
  167. },
  168. },
  169. },
  170. {
  171. s: `CREATE STREAM demo (NAME string)
  172. WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID", STRICT_VALIDATION="true1");`, //Invalid STRICT_VALIDATION value
  173. stmt: nil,
  174. err: `found "true1", expect TRUE/FALSE value in STRICT_VALIDATION option.`,
  175. },
  176. {
  177. s: `CREATE STREAM demo (NAME string) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  178. stmt: &StreamStmt{
  179. Name: StreamName("demo"),
  180. StreamFields: []StreamField{
  181. {Name: "NAME", FieldType: &BasicType{Type: STRINGS}},
  182. },
  183. Options: map[string]string{
  184. "DATASOURCE": "users",
  185. "FORMAT": "JSON",
  186. "KEY": "USERID",
  187. },
  188. },
  189. },
  190. {
  191. s: `CREATE STREAM demo (NAME string)) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  192. stmt: &StreamStmt{
  193. Name: StreamName("demo"),
  194. StreamFields: nil,
  195. Options: nil,
  196. },
  197. err: `found ")", expect stream options.`,
  198. },
  199. {
  200. s: `CREATE STREAM demo (NAME string) WITHs (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  201. stmt: &StreamStmt{
  202. Name: StreamName("demo"),
  203. StreamFields: nil,
  204. Options: nil,
  205. },
  206. err: `found "WITHs", expected is with.`,
  207. },
  208. {
  209. s: `CREATE STREAM demo (NAME integer) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  210. stmt: &StreamStmt{
  211. Name: "demo",
  212. StreamFields: nil,
  213. Options: nil,
  214. },
  215. err: `found "integer", expect valid stream field types(BIGINT | FLOAT | STRINGS | DATETIME | BOOLEAN | ARRAY | STRUCT).`,
  216. },
  217. {
  218. s: `CREATE STREAM demo (NAME string) WITH (sources="users", FORMAT="JSON", KEY="USERID");`,
  219. stmt: &StreamStmt{
  220. Name: "demo",
  221. StreamFields: nil,
  222. Options: nil,
  223. },
  224. err: `found "sources", unknown option keys(DATASOURCE|FORMAT|KEY|CONF_KEY|STRICT_VALIDATION|TYPE).`,
  225. },
  226. {
  227. s: `CREATE STREAM demo ((NAME string) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  228. stmt: &StreamStmt{
  229. Name: "demo",
  230. StreamFields: nil,
  231. Options: nil,
  232. },
  233. err: `found "(", expect stream field name.`,
  234. },
  235. {
  236. s: `CREATE STREAM demo (
  237. USERID BIGINT,
  238. ) WITH ();`,
  239. stmt: &StreamStmt{
  240. Name: "demo",
  241. StreamFields: []StreamField{
  242. {Name: "USERID", FieldType: &BasicType{Type: BIGINT}},
  243. },
  244. Options: map[string]string{},
  245. },
  246. },
  247. {
  248. s: `CREATE STREAM demo (
  249. USERID BIGINT,
  250. ) WITH ());`,
  251. stmt: &StreamStmt{
  252. Name: "",
  253. StreamFields: nil,
  254. Options: nil,
  255. },
  256. err: `found ")", expected semicolon or EOF.`,
  257. },
  258. {
  259. s: `CREATE STREAM demo (
  260. USERID BIGINT,
  261. ) WITH DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  262. stmt: &StreamStmt{
  263. Name: "",
  264. StreamFields: nil,
  265. Options: nil,
  266. },
  267. //TODO The error string should be more accurate
  268. err: `found "DATASOURCE", expect stream options.`,
  269. },
  270. {
  271. s: `CREATE STREAM test(
  272. userID bigint,
  273. username string,
  274. NICKNAMES array(string),
  275. Gender boolean,
  276. ADDRESS struct(
  277. TREET_NAME string, NUMBER bigint
  278. ),
  279. INFO struct(
  280. INFO_NAME string, NUMBER bigint
  281. )
  282. ) WITH (DATASOURCE="test", FORMAT="JSON", CONF_KEY="democonf", TYPE="MQTT");`,
  283. stmt: &StreamStmt{
  284. Name: StreamName("test"),
  285. StreamFields: []StreamField{
  286. {Name: "userID", FieldType: &BasicType{Type: BIGINT}},
  287. {Name: "username", FieldType: &BasicType{Type: STRINGS}},
  288. {Name: "NICKNAMES", FieldType: &ArrayType{Type: STRINGS}},
  289. {Name: "Gender", FieldType: &BasicType{Type: BOOLEAN}},
  290. {Name: "ADDRESS", FieldType: &RecType{
  291. StreamFields: []StreamField{
  292. {Name: "TREET_NAME", FieldType: &BasicType{Type: STRINGS}},
  293. {Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}},
  294. },
  295. }},
  296. {Name: "INFO", FieldType: &RecType{
  297. StreamFields: []StreamField{
  298. {Name: "INFO_NAME", FieldType: &BasicType{Type: STRINGS}},
  299. {Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}},
  300. },
  301. }},
  302. },
  303. Options: map[string]string{
  304. "DATASOURCE": "test",
  305. "FORMAT": "JSON",
  306. "CONF_KEY": "democonf",
  307. "TYPE": "MQTT",
  308. },
  309. },
  310. }, {
  311. s: `CREATE STREAM demo (
  312. USERID BIGINT,
  313. FIRST_NAME STRING,
  314. LAST_NAME STRING,
  315. PICTURE BYTEA,
  316. ) WITH (DATASOURCE="users", FORMAT="JSON");`,
  317. stmt: &StreamStmt{
  318. Name: StreamName("demo"),
  319. StreamFields: []StreamField{
  320. {Name: "USERID", FieldType: &BasicType{Type: BIGINT}},
  321. {Name: "FIRST_NAME", FieldType: &BasicType{Type: STRINGS}},
  322. {Name: "LAST_NAME", FieldType: &BasicType{Type: STRINGS}},
  323. {Name: "PICTURE", FieldType: &BasicType{Type: BYTEA}},
  324. },
  325. Options: map[string]string{
  326. "DATASOURCE": "users",
  327. "FORMAT": "JSON",
  328. },
  329. },
  330. }, {
  331. s: `CREATE STREAM demo (
  332. USERID BIGINT,
  333. FIRST_NAME STRING,
  334. LAST_NAME STRING,
  335. PICTURE BYTEA,
  336. ) WITH (DATASOURCE="users", FORMAT="JSON");`,
  337. stmt: &StreamStmt{
  338. Name: StreamName("demo"),
  339. StreamFields: []StreamField{
  340. {Name: "USERID", FieldType: &BasicType{Type: BIGINT}},
  341. {Name: "FIRST_NAME", FieldType: &BasicType{Type: STRINGS}},
  342. {Name: "LAST_NAME", FieldType: &BasicType{Type: STRINGS}},
  343. {Name: "PICTURE", FieldType: &BasicType{Type: BYTEA}},
  344. },
  345. Options: map[string]string{
  346. "DATASOURCE": "users",
  347. "FORMAT": "JSON",
  348. },
  349. },
  350. }, {
  351. s: `CREATE STREAM demo (
  352. USERID BIGINT,
  353. FIRST_NAME STRING,
  354. LAST_NAME STRING,
  355. PICTURE BYTEA,
  356. ) WITH (DATASOURCE="users", format="BINARY");`,
  357. stmt: &StreamStmt{
  358. Name: "",
  359. StreamFields: nil,
  360. Options: nil,
  361. },
  362. err: "'binary' format stream can have only one field",
  363. }, {
  364. s: `CREATE STREAM demo (
  365. image BYTEA
  366. ) WITH (DATASOURCE="users", FORMAT="BINARY");`,
  367. stmt: &StreamStmt{
  368. Name: StreamName("demo"),
  369. StreamFields: []StreamField{
  370. {Name: "image", FieldType: &BasicType{Type: BYTEA}},
  371. },
  372. Options: map[string]string{
  373. "DATASOURCE": "users",
  374. "FORMAT": "BINARY",
  375. },
  376. },
  377. },
  378. }
  379. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  380. for i, tt := range tests {
  381. stmt, err := NewParser(strings.NewReader(tt.s)).ParseCreateStreamStmt()
  382. if !reflect.DeepEqual(tt.err, errstring(err)) {
  383. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.s, tt.err, err)
  384. } else if tt.err == "" && !reflect.DeepEqual(tt.stmt, stmt) {
  385. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.s, tt.stmt, stmt)
  386. }
  387. }
  388. }
  389. // errstring returns the string representation of an error.
  390. //func errstring(err error) string {
  391. // if err != nil {
  392. // return err.Error()
  393. // }
  394. // return ""
  395. //}