filter_test.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. package plans
  2. import (
  3. "engine/common"
  4. "engine/xsql"
  5. "fmt"
  6. "reflect"
  7. "strings"
  8. "testing"
  9. )
  10. func TestFilterPlan_Apply(t *testing.T) {
  11. var tests = []struct {
  12. sql string
  13. data interface{}
  14. result interface{}
  15. }{
  16. {
  17. sql: "SELECT abc FROM tbl WHERE abc*2+3 > 12 AND abc < 20",
  18. data: &xsql.Tuple{
  19. Emitter: "tbl",
  20. Message: xsql.Message{
  21. "a" : int64(6),
  22. },
  23. },
  24. result: nil,
  25. },
  26. {
  27. sql: "SELECT * FROM tbl WHERE abc > def and abc <= ghi",
  28. data: &xsql.Tuple{
  29. Emitter: "tbl",
  30. Message: xsql.Message{
  31. "abc" : common.TimeFromUnixMilli(1568854515000),
  32. "def" : common.TimeFromUnixMilli(1568853515000),
  33. "ghi" : common.TimeFromUnixMilli(1568854515000),
  34. },
  35. },
  36. result: &xsql.Tuple{
  37. Emitter: "tbl",
  38. Message: xsql.Message{
  39. "abc" : common.TimeFromUnixMilli(1568854515000),
  40. "def" : common.TimeFromUnixMilli(1568853515000),
  41. "ghi" : common.TimeFromUnixMilli(1568854515000),
  42. },
  43. },
  44. },
  45. {
  46. sql: "SELECT abc FROM tbl WHERE abc*2+3 > 12 AND abc < 20",
  47. data: &xsql.Tuple{
  48. Emitter: "tbl",
  49. Message: xsql.Message{
  50. "abc" : int64(6),
  51. },
  52. },
  53. result: &xsql.Tuple{
  54. Emitter: "tbl",
  55. Message: xsql.Message{
  56. "abc": int64(6),
  57. },
  58. },
  59. },
  60. {
  61. sql: "SELECT abc FROM tbl WHERE abc*2+3 > 12 OR def = \"hello\"",
  62. data: &xsql.Tuple{
  63. Emitter: "tbl",
  64. Message: xsql.Message{
  65. "abc" : int64(34),
  66. "def" : "hello",
  67. },
  68. },
  69. result: &xsql.Tuple{
  70. Emitter: "tbl",
  71. Message: xsql.Message{
  72. "abc" : int64(34),
  73. "def" : "hello",
  74. },
  75. },
  76. },
  77. {
  78. sql: "SELECT abc FROM tbl WHERE abc > \"2019-09-19T00:55:15.000Z\"",
  79. data: &xsql.Tuple{
  80. Emitter: "tbl",
  81. Message: xsql.Message{
  82. "abc" : common.TimeFromUnixMilli(1568854515678),
  83. "def" : "hello",
  84. },
  85. },
  86. result: &xsql.Tuple{
  87. Emitter: "tbl",
  88. Message: xsql.Message{
  89. "abc" : common.TimeFromUnixMilli(1568854515678),
  90. "def" : "hello",
  91. },
  92. },
  93. },
  94. {
  95. sql: "SELECT abc FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  96. data: xsql.WindowTuplesSet{
  97. xsql.WindowTuples{
  98. Emitter:"src1",
  99. Tuples:[]xsql.Tuple{
  100. {
  101. Emitter: "src1",
  102. Message: xsql.Message{"id1" : 1, "f1" : "v1"},
  103. },{
  104. Emitter: "src1",
  105. Message: xsql.Message{"id1" : 2, "f1" : "v2"},
  106. },{
  107. Emitter: "src1",
  108. Message: xsql.Message{"id1" : 3, "f1" : "v1"},
  109. },
  110. },
  111. },
  112. },
  113. result: xsql.WindowTuplesSet{
  114. xsql.WindowTuples{
  115. Emitter:"src1",
  116. Tuples:[]xsql.Tuple{
  117. {
  118. Emitter: "src1",
  119. Message: xsql.Message{"id1" : 1, "f1" : "v1"},
  120. },{
  121. Emitter: "src1",
  122. Message: xsql.Message{"id1" : 3, "f1" : "v1"},
  123. },
  124. },
  125. },
  126. },
  127. },
  128. {
  129. sql: "SELECT abc FROM src1 WHERE f1 = \"v8\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  130. data: xsql.WindowTuplesSet{
  131. xsql.WindowTuples{
  132. Emitter:"src1",
  133. Tuples:[]xsql.Tuple{
  134. {
  135. Emitter: "src1",
  136. Message: xsql.Message{ "id1" : 1, "f1" : "v1"},
  137. },{
  138. Emitter: "src1",
  139. Message: xsql.Message{ "id1" : 2, "f1" : "v2"},
  140. },{
  141. Emitter: "src1",
  142. Message: xsql.Message{ "id1" : 3, "f1" : "v1"},
  143. },
  144. },
  145. },
  146. },
  147. result: nil,
  148. },
  149. {
  150. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  151. data: xsql.JoinTupleSets{
  152. xsql.JoinTuple{
  153. Tuples: []xsql.Tuple{
  154. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1",},},
  155. {Emitter: "src2", Message: xsql.Message{ "id2" : 2, "f2" : "w2",},},
  156. },
  157. },
  158. xsql.JoinTuple{
  159. Tuples: []xsql.Tuple{
  160. {Emitter: "src1", Message: xsql.Message{ "id1" : 2, "f1" : "v2",},},
  161. {Emitter: "src2", Message: xsql.Message{ "id2" : 4, "f2" : "w3",},},
  162. },
  163. },
  164. xsql.JoinTuple{
  165. Tuples: []xsql.Tuple{
  166. {Emitter: "src1", Message: xsql.Message{ "id1" : 3, "f1" : "v1",},},
  167. },
  168. },
  169. },
  170. result: xsql.JoinTupleSets{
  171. xsql.JoinTuple{
  172. Tuples: []xsql.Tuple{
  173. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1",},},
  174. {Emitter: "src2", Message: xsql.Message{ "id2" : 2, "f2" : "w2",},},
  175. },
  176. },
  177. xsql.JoinTuple{
  178. Tuples: []xsql.Tuple{
  179. {Emitter: "src1", Message: xsql.Message{ "id1" : 3, "f1" : "v1",},},
  180. },
  181. },
  182. },
  183. },
  184. {
  185. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v22\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  186. data: xsql.JoinTupleSets{
  187. xsql.JoinTuple{
  188. Tuples: []xsql.Tuple{
  189. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1",},},
  190. {Emitter: "src2", Message: xsql.Message{ "id2" : 2, "f2" : "w2",},},
  191. },
  192. },
  193. xsql.JoinTuple{
  194. Tuples: []xsql.Tuple{
  195. {Emitter: "src1", Message: xsql.Message{ "id1" : 2, "f1" : "v2",},},
  196. {Emitter: "src2", Message: xsql.Message{ "id2" : 4, "f2" : "w3",},},
  197. },
  198. },
  199. xsql.JoinTuple{
  200. Tuples: []xsql.Tuple{
  201. {Emitter: "src1", Message: xsql.Message{ "id1" : 3, "f1" : "v1",},},
  202. },
  203. },
  204. },
  205. result: nil,
  206. },
  207. }
  208. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  209. for i, tt := range tests {
  210. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  211. if err != nil {
  212. t.Errorf("statement parse error %s", err)
  213. break
  214. }
  215. pp := &FilterPlan{Condition:stmt.Condition}
  216. result := pp.Apply(nil, tt.data)
  217. if !reflect.DeepEqual(tt.result, result) {
  218. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  219. }
  220. }
  221. }