xsql_stream_test.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. package xsql
  2. import (
  3. "fmt"
  4. "reflect"
  5. "strings"
  6. "testing"
  7. )
  8. func TestParser_ParseCreateStream(t *testing.T) {
  9. var tests = []struct {
  10. s string
  11. stmt *StreamStmt
  12. err string
  13. }{
  14. {
  15. s: `CREATE STREAM demo (
  16. USERID BIGINT,
  17. FIRST_NAME STRING,
  18. LAST_NAME STRING,
  19. NICKNAMES ARRAY(STRING),
  20. Gender BOOLEAN,
  21. ADDRESS STRUCT(STREET_NAME STRING, NUMBER BIGINT),
  22. ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID", CONF_KEY="srv1", type="MQTT");`,
  23. stmt: &StreamStmt{
  24. Name: StreamName("demo"),
  25. StreamFields: []StreamField{
  26. {Name: "USERID", FieldType: &BasicType{Type: BIGINT}},
  27. {Name: "FIRST_NAME", FieldType: &BasicType{Type: STRINGS}},
  28. {Name: "LAST_NAME", FieldType: &BasicType{Type: STRINGS}},
  29. {Name: "NICKNAMES", FieldType: &ArrayType{Type: STRINGS}},
  30. {Name: "Gender", FieldType: &BasicType{Type: BOOLEAN}},
  31. {Name: "ADDRESS", FieldType: &RecType{
  32. StreamFields: []StreamField{
  33. {Name: "STREET_NAME", FieldType: &BasicType{Type: STRINGS}},
  34. {Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}},
  35. },
  36. }},
  37. },
  38. Options: map[string]string{
  39. "DATASOURCE" : "users",
  40. "FORMAT" : "AVRO",
  41. "KEY" : "USERID",
  42. "CONF_KEY" : "srv1",
  43. "TYPE" : "MQTT",
  44. },
  45. },
  46. },
  47. {
  48. s: `CREATE STREAM demo (
  49. USERID BIGINT,
  50. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID", STRICT_VALIDATION="true");`,
  51. stmt: &StreamStmt{
  52. Name: StreamName("demo"),
  53. StreamFields: []StreamField{
  54. {Name: "USERID", FieldType: &BasicType{Type: BIGINT}},
  55. },
  56. Options: map[string]string{
  57. "DATASOURCE" : "users",
  58. "FORMAT" : "JSON",
  59. "KEY" : "USERID",
  60. "STRICT_VALIDATION" : "true",
  61. },
  62. },
  63. },
  64. {
  65. s: `CREATE STREAM demo (
  66. ADDRESSES ARRAY(STRUCT(STREET_NAME STRING, NUMBER BIGINT)),
  67. ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID", STRICT_VALIDATION="FAlse");`,
  68. stmt: &StreamStmt{
  69. Name: StreamName("demo"),
  70. StreamFields: []StreamField{
  71. {Name: "ADDRESSES", FieldType: &ArrayType{
  72. Type: STRUCT,
  73. FieldType: &RecType{
  74. StreamFields: []StreamField{
  75. {Name: "STREET_NAME", FieldType: &BasicType{Type: STRINGS}},
  76. {Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}},
  77. },
  78. },
  79. }},
  80. },
  81. Options: map[string]string{
  82. "DATASOURCE" : "users",
  83. "FORMAT" : "AVRO",
  84. "KEY" : "USERID",
  85. "STRICT_VALIDATION": "FAlse",
  86. },
  87. },
  88. },
  89. {
  90. s: `CREATE STREAM demo (
  91. ADDRESSES ARRAY(STRUCT(STREET_NAME STRING, NUMBER BIGINT)),
  92. birthday datetime,
  93. ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID");`,
  94. stmt: &StreamStmt{
  95. Name: StreamName("demo"),
  96. StreamFields: []StreamField{
  97. {Name: "ADDRESSES", FieldType: &ArrayType{
  98. Type: STRUCT,
  99. FieldType: &RecType{
  100. StreamFields: []StreamField{
  101. {Name: "STREET_NAME", FieldType: &BasicType{Type: STRINGS}},
  102. {Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}},
  103. },
  104. },
  105. }},
  106. {Name: "birthday", FieldType: &BasicType{Type: DATETIME}},
  107. },
  108. Options: map[string]string{
  109. "DATASOURCE" : "users",
  110. "FORMAT" : "AVRO",
  111. "KEY" : "USERID",
  112. },
  113. },
  114. },
  115. {
  116. s: `CREATE STREAM demo (
  117. NAME string,
  118. ADDRESSES ARRAY(STRUCT(STREET_NAME STRING, NUMBER BIGINT)),
  119. birthday datetime,
  120. ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID");`,
  121. stmt: &StreamStmt{
  122. Name: StreamName("demo"),
  123. StreamFields: []StreamField{
  124. {Name: "NAME", FieldType: &BasicType{Type: STRINGS}},
  125. {Name: "ADDRESSES", FieldType: &ArrayType{
  126. Type: STRUCT,
  127. FieldType: &RecType{
  128. StreamFields: []StreamField{
  129. {Name: "STREET_NAME", FieldType: &BasicType{Type: STRINGS}},
  130. {Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}},
  131. },
  132. },
  133. }},
  134. {Name: "birthday", FieldType: &BasicType{Type: DATETIME}},
  135. },
  136. Options: map[string]string{
  137. "DATASOURCE" : "users",
  138. "FORMAT" : "AVRO",
  139. "KEY" : "USERID",
  140. },
  141. },
  142. },
  143. {
  144. s: `CREATE STREAM demo (
  145. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  146. stmt: nil,
  147. err: `found ")", expect stream field name.`,
  148. },
  149. {
  150. s: `CREATE STREAM demo (NAME string)
  151. WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID", STRICT_VALIDATION="true1");`, //Invalid STRICT_VALIDATION value
  152. stmt: nil,
  153. err: `found "true1", expect TRUE/FALSE value in STRICT_VALIDATION option.`,
  154. },
  155. {
  156. s: `CREATE STREAM demo (NAME string) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  157. stmt: &StreamStmt{
  158. Name: StreamName("demo"),
  159. StreamFields: []StreamField{
  160. {Name: "NAME", FieldType: &BasicType{Type: STRINGS}},
  161. },
  162. Options: map[string]string{
  163. "DATASOURCE" : "users",
  164. "FORMAT" : "JSON",
  165. "KEY" : "USERID",
  166. },
  167. },
  168. },
  169. {
  170. s: `CREATE STREAM demo (NAME string)) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  171. stmt: &StreamStmt{
  172. Name: StreamName("demo"),
  173. StreamFields: nil,
  174. Options: nil,
  175. },
  176. err: `found ")", expect stream options.`,
  177. },
  178. {
  179. s: `CREATE STREAM demo (NAME string) WITHs (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  180. stmt: &StreamStmt{
  181. Name: StreamName("demo"),
  182. StreamFields: nil,
  183. Options: nil,
  184. },
  185. err: `found "WITHs", expected is with.`,
  186. },
  187. {
  188. s: `CREATE STREAM demo (NAME integer) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  189. stmt: &StreamStmt{
  190. Name: "demo",
  191. StreamFields: nil,
  192. Options: nil,
  193. },
  194. err: `found "integer", expect valid stream field types(BIGINT | FLOAT | STRINGS | DATETIME | BOOLEAN | ARRAY | STRUCT).`,
  195. },
  196. {
  197. s: `CREATE STREAM demo (NAME string) WITH (sources="users", FORMAT="JSON", KEY="USERID");`,
  198. stmt: &StreamStmt{
  199. Name: "demo",
  200. StreamFields: nil,
  201. Options: nil,
  202. },
  203. err: `found "sources", unknown option keys(DATASOURCE|FORMAT|KEY|CONF_KEY|STRICT_VALIDATION|TYPE).`,
  204. },
  205. {
  206. s: `CREATE STREAM demo ((NAME string) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  207. stmt: &StreamStmt{
  208. Name: "demo",
  209. StreamFields: nil,
  210. Options: nil,
  211. },
  212. err: `found "(", expect stream field name.`,
  213. },
  214. {
  215. s: `CREATE STREAM demo (
  216. USERID BIGINT,
  217. ) WITH ();`,
  218. stmt: &StreamStmt{
  219. Name: "demo",
  220. StreamFields: []StreamField{
  221. {Name: "USERID", FieldType: &BasicType{Type: BIGINT}},
  222. },
  223. Options: map[string]string{},
  224. },
  225. },
  226. {
  227. s: `CREATE STREAM demo (
  228. USERID BIGINT,
  229. ) WITH ());`,
  230. stmt: &StreamStmt{
  231. Name: "",
  232. StreamFields: nil,
  233. Options: nil,
  234. },
  235. err: `found ")", expected semicolon or EOF.`,
  236. },
  237. {
  238. s: `CREATE STREAM demo (
  239. USERID BIGINT,
  240. ) WITH DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  241. stmt: &StreamStmt{
  242. Name: "",
  243. StreamFields: nil,
  244. Options: nil,
  245. },
  246. //TODO The error string should be more accurate
  247. err: `found "DATASOURCE", expect stream options.`,
  248. },
  249. }
  250. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  251. for i, tt := range tests {
  252. stmt, err := NewParser(strings.NewReader(tt.s)).ParseCreateStreamStmt()
  253. if !reflect.DeepEqual(tt.err, errstring(err)) {
  254. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.s, tt.err, err)
  255. } else if tt.err == "" && !reflect.DeepEqual(tt.stmt, stmt) {
  256. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.s, tt.stmt, stmt)
  257. }
  258. }
  259. }
  260. // errstring returns the string representation of an error.
  261. //func errstring(err error) string {
  262. // if err != nil {
  263. // return err.Error()
  264. // }
  265. // return ""
  266. //}