xsql_stream_test.go 7.7 KB


  1. package xsql
  2. import (
  3. "fmt"
  4. "reflect"
  5. "strings"
  6. "testing"
  7. )
  8. func TestParser_ParseCreateStream(t *testing.T) {
  9. var tests = []struct {
  10. s string
  11. stmt *StreamStmt
  12. err string
  13. }{
  14. {
  15. s: `CREATE STREAM demo (
  16. USERID BIGINT,
  17. FIRST_NAME STRING,
  18. LAST_NAME STRING,
  19. NICKNAMES ARRAY(STRING),
  20. Gender BOOLEAN,
  21. ADDRESS STRUCT(STREET_NAME STRING, NUMBER BIGINT),
  22. ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID", CONF_KEY="srv1", type="MQTT", TIMESTAMP="USERID", TIMESTAMP_FORMAT="yyyy-MM-dd''T''HH:mm:ssX'");`,
  23. stmt: &StreamStmt{
  24. Name: StreamName("demo"),
  25. StreamFields: []StreamField{
  26. {Name: "USERID", FieldType: &BasicType{Type: BIGINT}},
  27. {Name: "FIRST_NAME", FieldType: &BasicType{Type: STRINGS}},
  28. {Name: "LAST_NAME", FieldType: &BasicType{Type: STRINGS}},
  29. {Name: "NICKNAMES", FieldType: &ArrayType{Type: STRINGS}},
  30. {Name: "Gender", FieldType: &BasicType{Type: BOOLEAN}},
  31. {Name: "ADDRESS", FieldType: &RecType{
  32. StreamFields: []StreamField{
  33. {Name: "STREET_NAME", FieldType: &BasicType{Type: STRINGS}},
  34. {Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}},
  35. },
  36. }},
  37. },
  38. Options: map[string]string{
  39. "DATASOURCE" : "users",
  40. "FORMAT" : "AVRO",
  41. "KEY" : "USERID",
  42. "CONF_KEY" : "srv1",
  43. "TYPE" : "MQTT",
  44. "TIMESTAMP" : "USERID",
  45. "TIMESTAMP_FORMAT" : "yyyy-MM-dd''T''HH:mm:ssX'",
  46. },
  47. },
  48. },
  49. {
  50. s: `CREATE STREAM demo (
  51. USERID BIGINT,
  52. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID", STRICT_VALIDATION="true");`,
  53. stmt: &StreamStmt{
  54. Name: StreamName("demo"),
  55. StreamFields: []StreamField{
  56. {Name: "USERID", FieldType: &BasicType{Type: BIGINT}},
  57. },
  58. Options: map[string]string{
  59. "DATASOURCE" : "users",
  60. "FORMAT" : "JSON",
  61. "KEY" : "USERID",
  62. "STRICT_VALIDATION" : "true",
  63. },
  64. },
  65. },
  66. {
  67. s: `CREATE STREAM demo (
  68. ADDRESSES ARRAY(STRUCT(STREET_NAME STRING, NUMBER BIGINT)),
  69. ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID", STRICT_VALIDATION="FAlse");`,
  70. stmt: &StreamStmt{
  71. Name: StreamName("demo"),
  72. StreamFields: []StreamField{
  73. {Name: "ADDRESSES", FieldType: &ArrayType{
  74. Type: STRUCT,
  75. FieldType: &RecType{
  76. StreamFields: []StreamField{
  77. {Name: "STREET_NAME", FieldType: &BasicType{Type: STRINGS}},
  78. {Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}},
  79. },
  80. },
  81. }},
  82. },
  83. Options: map[string]string{
  84. "DATASOURCE" : "users",
  85. "FORMAT" : "AVRO",
  86. "KEY" : "USERID",
  87. "STRICT_VALIDATION": "FAlse",
  88. },
  89. },
  90. },
  91. {
  92. s: `CREATE STREAM demo (
  93. ADDRESSES ARRAY(STRUCT(STREET_NAME STRING, NUMBER BIGINT)),
  94. birthday datetime,
  95. ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID");`,
  96. stmt: &StreamStmt{
  97. Name: StreamName("demo"),
  98. StreamFields: []StreamField{
  99. {Name: "ADDRESSES", FieldType: &ArrayType{
  100. Type: STRUCT,
  101. FieldType: &RecType{
  102. StreamFields: []StreamField{
  103. {Name: "STREET_NAME", FieldType: &BasicType{Type: STRINGS}},
  104. {Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}},
  105. },
  106. },
  107. }},
  108. {Name: "birthday", FieldType: &BasicType{Type: DATETIME}},
  109. },
  110. Options: map[string]string{
  111. "DATASOURCE" : "users",
  112. "FORMAT" : "AVRO",
  113. "KEY" : "USERID",
  114. },
  115. },
  116. },
  117. {
  118. s: `CREATE STREAM demo (
  119. NAME string,
  120. ADDRESSES ARRAY(STRUCT(STREET_NAME STRING, NUMBER BIGINT)),
  121. birthday datetime,
  122. ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID");`,
  123. stmt: &StreamStmt{
  124. Name: StreamName("demo"),
  125. StreamFields: []StreamField{
  126. {Name: "NAME", FieldType: &BasicType{Type: STRINGS}},
  127. {Name: "ADDRESSES", FieldType: &ArrayType{
  128. Type: STRUCT,
  129. FieldType: &RecType{
  130. StreamFields: []StreamField{
  131. {Name: "STREET_NAME", FieldType: &BasicType{Type: STRINGS}},
  132. {Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}},
  133. },
  134. },
  135. }},
  136. {Name: "birthday", FieldType: &BasicType{Type: DATETIME}},
  137. },
  138. Options: map[string]string{
  139. "DATASOURCE" : "users",
  140. "FORMAT" : "AVRO",
  141. "KEY" : "USERID",
  142. },
  143. },
  144. },
  145. {
  146. s: `CREATE STREAM demo (
  147. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  148. stmt: nil,
  149. err: `found ")", expect stream field name.`,
  150. },
  151. {
  152. s: `CREATE STREAM demo (NAME string)
  153. WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID", STRICT_VALIDATION="true1");`, //Invalid STRICT_VALIDATION value
  154. stmt: nil,
  155. err: `found "true1", expect TRUE/FALSE value in STRICT_VALIDATION option.`,
  156. },
  157. {
  158. s: `CREATE STREAM demo (NAME string) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  159. stmt: &StreamStmt{
  160. Name: StreamName("demo"),
  161. StreamFields: []StreamField{
  162. {Name: "NAME", FieldType: &BasicType{Type: STRINGS}},
  163. },
  164. Options: map[string]string{
  165. "DATASOURCE" : "users",
  166. "FORMAT" : "JSON",
  167. "KEY" : "USERID",
  168. },
  169. },
  170. },
  171. {
  172. s: `CREATE STREAM demo (NAME string)) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  173. stmt: &StreamStmt{
  174. Name: StreamName("demo"),
  175. StreamFields: nil,
  176. Options: nil,
  177. },
  178. err: `found ")", expect stream options.`,
  179. },
  180. {
  181. s: `CREATE STREAM demo (NAME string) WITHs (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  182. stmt: &StreamStmt{
  183. Name: StreamName("demo"),
  184. StreamFields: nil,
  185. Options: nil,
  186. },
  187. err: `found "WITHs", expected is with.`,
  188. },
  189. {
  190. s: `CREATE STREAM demo (NAME integer) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  191. stmt: &StreamStmt{
  192. Name: "demo",
  193. StreamFields: nil,
  194. Options: nil,
  195. },
  196. err: `found "integer", expect valid stream field types(BIGINT | FLOAT | STRINGS | DATETIME | BOOLEAN | ARRAY | STRUCT).`,
  197. },
  198. {
  199. s: `CREATE STREAM demo (NAME string) WITH (sources="users", FORMAT="JSON", KEY="USERID");`,
  200. stmt: &StreamStmt{
  201. Name: "demo",
  202. StreamFields: nil,
  203. Options: nil,
  204. },
  205. err: `found "sources", unknown option keys(DATASOURCE|FORMAT|KEY|CONF_KEY|STRICT_VALIDATION|TYPE).`,
  206. },
  207. {
  208. s: `CREATE STREAM demo ((NAME string) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  209. stmt: &StreamStmt{
  210. Name: "demo",
  211. StreamFields: nil,
  212. Options: nil,
  213. },
  214. err: `found "(", expect stream field name.`,
  215. },
  216. {
  217. s: `CREATE STREAM demo (
  218. USERID BIGINT,
  219. ) WITH ();`,
  220. stmt: &StreamStmt{
  221. Name: "demo",
  222. StreamFields: []StreamField{
  223. {Name: "USERID", FieldType: &BasicType{Type: BIGINT}},
  224. },
  225. Options: map[string]string{},
  226. },
  227. },
  228. {
  229. s: `CREATE STREAM demo (
  230. USERID BIGINT,
  231. ) WITH ());`,
  232. stmt: &StreamStmt{
  233. Name: "",
  234. StreamFields: nil,
  235. Options: nil,
  236. },
  237. err: `found ")", expected semicolon or EOF.`,
  238. },
  239. {
  240. s: `CREATE STREAM demo (
  241. USERID BIGINT,
  242. ) WITH DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  243. stmt: &StreamStmt{
  244. Name: "",
  245. StreamFields: nil,
  246. Options: nil,
  247. },
  248. //TODO The error string should be more accurate
  249. err: `found "DATASOURCE", expect stream options.`,
  250. },
  251. }
  252. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  253. for i, tt := range tests {
  254. stmt, err := NewParser(strings.NewReader(tt.s)).ParseCreateStreamStmt()
  255. if !reflect.DeepEqual(tt.err, errstring(err)) {
  256. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.s, tt.err, err)
  257. } else if tt.err == "" && !reflect.DeepEqual(tt.stmt, stmt) {
  258. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.s, tt.stmt, stmt)
  259. }
  260. }
  261. }
  262. // errstring returns the string representation of an error.
  263. //func errstring(err error) string {
  264. // if err != nil {
  265. // return err.Error()
  266. // }
  267. // return ""
  268. //}