filter_test.go 5.7 KB


  1. package plans
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/xsql"
  6. "github.com/emqx/kuiper/xstream/contexts"
  7. "reflect"
  8. "strings"
  9. "testing"
  10. )
  11. func TestFilterPlan_Apply(t *testing.T) {
  12. var tests = []struct {
  13. sql string
  14. data interface{}
  15. result interface{}
  16. }{
  17. {
  18. sql: "SELECT abc FROM tbl WHERE abc*2+3 > 12 AND abc < 20",
  19. data: &xsql.Tuple{
  20. Emitter: "tbl",
  21. Message: xsql.Message{
  22. "a" : int64(6),
  23. },
  24. },
  25. result: nil,
  26. },
  27. {
  28. sql: "SELECT * FROM tbl WHERE abc > def and abc <= ghi",
  29. data: &xsql.Tuple{
  30. Emitter: "tbl",
  31. Message: xsql.Message{
  32. "abc" : common.TimeFromUnixMilli(1568854515000),
  33. "def" : common.TimeFromUnixMilli(1568853515000),
  34. "ghi" : common.TimeFromUnixMilli(1568854515000),
  35. },
  36. },
  37. result: &xsql.Tuple{
  38. Emitter: "tbl",
  39. Message: xsql.Message{
  40. "abc" : common.TimeFromUnixMilli(1568854515000),
  41. "def" : common.TimeFromUnixMilli(1568853515000),
  42. "ghi" : common.TimeFromUnixMilli(1568854515000),
  43. },
  44. },
  45. },
  46. {
  47. sql: "SELECT abc FROM tbl WHERE abc*2+3 > 12 AND abc < 20",
  48. data: &xsql.Tuple{
  49. Emitter: "tbl",
  50. Message: xsql.Message{
  51. "abc" : int64(6),
  52. },
  53. },
  54. result: &xsql.Tuple{
  55. Emitter: "tbl",
  56. Message: xsql.Message{
  57. "abc": int64(6),
  58. },
  59. },
  60. },
  61. {
  62. sql: "SELECT abc FROM tbl WHERE abc*2+3 > 12 OR def = \"hello\"",
  63. data: &xsql.Tuple{
  64. Emitter: "tbl",
  65. Message: xsql.Message{
  66. "abc" : int64(34),
  67. "def" : "hello",
  68. },
  69. },
  70. result: &xsql.Tuple{
  71. Emitter: "tbl",
  72. Message: xsql.Message{
  73. "abc" : int64(34),
  74. "def" : "hello",
  75. },
  76. },
  77. },
  78. {
  79. sql: "SELECT abc FROM tbl WHERE abc > \"2019-09-19T00:55:15.000Z\"",
  80. data: &xsql.Tuple{
  81. Emitter: "tbl",
  82. Message: xsql.Message{
  83. "abc" : common.TimeFromUnixMilli(1568854515678),
  84. "def" : "hello",
  85. },
  86. },
  87. result: &xsql.Tuple{
  88. Emitter: "tbl",
  89. Message: xsql.Message{
  90. "abc" : common.TimeFromUnixMilli(1568854515678),
  91. "def" : "hello",
  92. },
  93. },
  94. },
  95. {
  96. sql: "SELECT abc FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  97. data: xsql.WindowTuplesSet{
  98. xsql.WindowTuples{
  99. Emitter:"src1",
  100. Tuples:[]xsql.Tuple{
  101. {
  102. Emitter: "src1",
  103. Message: xsql.Message{"id1" : 1, "f1" : "v1"},
  104. },{
  105. Emitter: "src1",
  106. Message: xsql.Message{"id1" : 2, "f1" : "v2"},
  107. },{
  108. Emitter: "src1",
  109. Message: xsql.Message{"id1" : 3, "f1" : "v1"},
  110. },
  111. },
  112. },
  113. },
  114. result: xsql.WindowTuplesSet{
  115. xsql.WindowTuples{
  116. Emitter:"src1",
  117. Tuples:[]xsql.Tuple{
  118. {
  119. Emitter: "src1",
  120. Message: xsql.Message{"id1" : 1, "f1" : "v1"},
  121. },{
  122. Emitter: "src1",
  123. Message: xsql.Message{"id1" : 3, "f1" : "v1"},
  124. },
  125. },
  126. },
  127. },
  128. },
  129. {
  130. sql: "SELECT abc FROM src1 WHERE f1 = \"v8\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  131. data: xsql.WindowTuplesSet{
  132. xsql.WindowTuples{
  133. Emitter:"src1",
  134. Tuples:[]xsql.Tuple{
  135. {
  136. Emitter: "src1",
  137. Message: xsql.Message{ "id1" : 1, "f1" : "v1"},
  138. },{
  139. Emitter: "src1",
  140. Message: xsql.Message{ "id1" : 2, "f1" : "v2"},
  141. },{
  142. Emitter: "src1",
  143. Message: xsql.Message{ "id1" : 3, "f1" : "v1"},
  144. },
  145. },
  146. },
  147. },
  148. result: nil,
  149. },
  150. {
  151. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  152. data: xsql.JoinTupleSets{
  153. xsql.JoinTuple{
  154. Tuples: []xsql.Tuple{
  155. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1",},},
  156. {Emitter: "src2", Message: xsql.Message{ "id2" : 2, "f2" : "w2",},},
  157. },
  158. },
  159. xsql.JoinTuple{
  160. Tuples: []xsql.Tuple{
  161. {Emitter: "src1", Message: xsql.Message{ "id1" : 2, "f1" : "v2",},},
  162. {Emitter: "src2", Message: xsql.Message{ "id2" : 4, "f2" : "w3",},},
  163. },
  164. },
  165. xsql.JoinTuple{
  166. Tuples: []xsql.Tuple{
  167. {Emitter: "src1", Message: xsql.Message{ "id1" : 3, "f1" : "v1",},},
  168. },
  169. },
  170. },
  171. result: xsql.JoinTupleSets{
  172. xsql.JoinTuple{
  173. Tuples: []xsql.Tuple{
  174. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1",},},
  175. {Emitter: "src2", Message: xsql.Message{ "id2" : 2, "f2" : "w2",},},
  176. },
  177. },
  178. xsql.JoinTuple{
  179. Tuples: []xsql.Tuple{
  180. {Emitter: "src1", Message: xsql.Message{ "id1" : 3, "f1" : "v1",},},
  181. },
  182. },
  183. },
  184. },
  185. {
  186. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v22\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  187. data: xsql.JoinTupleSets{
  188. xsql.JoinTuple{
  189. Tuples: []xsql.Tuple{
  190. {Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1",},},
  191. {Emitter: "src2", Message: xsql.Message{ "id2" : 2, "f2" : "w2",},},
  192. },
  193. },
  194. xsql.JoinTuple{
  195. Tuples: []xsql.Tuple{
  196. {Emitter: "src1", Message: xsql.Message{ "id1" : 2, "f1" : "v2",},},
  197. {Emitter: "src2", Message: xsql.Message{ "id2" : 4, "f2" : "w3",},},
  198. },
  199. },
  200. xsql.JoinTuple{
  201. Tuples: []xsql.Tuple{
  202. {Emitter: "src1", Message: xsql.Message{ "id1" : 3, "f1" : "v1",},},
  203. },
  204. },
  205. },
  206. result: nil,
  207. },
  208. }
  209. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  210. contextLogger := common.Log.WithField("rule", "TestAggregatePlan_Apply")
  211. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  212. for i, tt := range tests {
  213. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  214. if err != nil {
  215. t.Errorf("statement parse error %s", err)
  216. break
  217. }
  218. pp := &FilterPlan{Condition:stmt.Condition}
  219. result := pp.Apply(ctx, tt.data)
  220. if !reflect.DeepEqual(tt.result, result) {
  221. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  222. }
  223. }
  224. }