filter_test.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375
  1. package plans
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/emqx/kuiper/xstream/contexts"
  8. "reflect"
  9. "strings"
  10. "testing"
  11. )
  12. func TestFilterPlan_Apply(t *testing.T) {
  13. var tests = []struct {
  14. sql string
  15. data interface{}
  16. result interface{}
  17. }{
  18. {
  19. sql: "SELECT abc FROM tbl WHERE abc*2+3 > 12 AND abc < 20",
  20. data: &xsql.Tuple{
  21. Emitter: "tbl",
  22. Message: xsql.Message{
  23. "a": int64(6),
  24. },
  25. },
  26. result: nil,
  27. },
  28. // nil equals nil?
  29. {
  30. sql: "SELECT a FROM tbl WHERE def = ghi",
  31. data: &xsql.Tuple{
  32. Emitter: "tbl",
  33. Message: xsql.Message{
  34. "a": int64(6),
  35. },
  36. },
  37. result: &xsql.Tuple{
  38. Emitter: "tbl",
  39. Message: xsql.Message{
  40. "a": int64(6),
  41. },
  42. },
  43. },
  44. {
  45. sql: "SELECT * FROM tbl WHERE abc > def and abc <= ghi",
  46. data: &xsql.Tuple{
  47. Emitter: "tbl",
  48. Message: xsql.Message{
  49. "abc": common.TimeFromUnixMilli(1568854515000),
  50. "def": common.TimeFromUnixMilli(1568853515000),
  51. "ghi": common.TimeFromUnixMilli(1568854515000),
  52. },
  53. },
  54. result: &xsql.Tuple{
  55. Emitter: "tbl",
  56. Message: xsql.Message{
  57. "abc": common.TimeFromUnixMilli(1568854515000),
  58. "def": common.TimeFromUnixMilli(1568853515000),
  59. "ghi": common.TimeFromUnixMilli(1568854515000),
  60. },
  61. },
  62. },
  63. {
  64. sql: "SELECT abc FROM tbl WHERE abc*2+3 > 12 AND abc < 20",
  65. data: &xsql.Tuple{
  66. Emitter: "tbl",
  67. Message: xsql.Message{
  68. "abc": int64(6),
  69. },
  70. },
  71. result: &xsql.Tuple{
  72. Emitter: "tbl",
  73. Message: xsql.Message{
  74. "abc": int64(6),
  75. },
  76. },
  77. },
  78. {
  79. sql: "SELECT abc FROM tbl WHERE abc*2+3 > 12 OR def = \"hello\"",
  80. data: &xsql.Tuple{
  81. Emitter: "tbl",
  82. Message: xsql.Message{
  83. "abc": int64(34),
  84. "def": "hello",
  85. },
  86. },
  87. result: &xsql.Tuple{
  88. Emitter: "tbl",
  89. Message: xsql.Message{
  90. "abc": int64(34),
  91. "def": "hello",
  92. },
  93. },
  94. },
  95. {
  96. sql: "SELECT abc FROM tbl WHERE abc > \"2019-09-19T00:55:15.000Z\"",
  97. data: &xsql.Tuple{
  98. Emitter: "tbl",
  99. Message: xsql.Message{
  100. "abc": common.TimeFromUnixMilli(1568854515678),
  101. "def": "hello",
  102. },
  103. },
  104. result: &xsql.Tuple{
  105. Emitter: "tbl",
  106. Message: xsql.Message{
  107. "abc": common.TimeFromUnixMilli(1568854515678),
  108. "def": "hello",
  109. },
  110. },
  111. },
  112. {
  113. sql: "SELECT abc FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  114. data: xsql.WindowTuplesSet{
  115. xsql.WindowTuples{
  116. Emitter: "src1",
  117. Tuples: []xsql.Tuple{
  118. {
  119. Emitter: "src1",
  120. Message: xsql.Message{"id1": 1, "f1": "v1"},
  121. }, {
  122. Emitter: "src1",
  123. Message: xsql.Message{"id1": 2, "f1": "v2"},
  124. }, {
  125. Emitter: "src1",
  126. Message: xsql.Message{"id1": 3, "f1": "v1"},
  127. },
  128. },
  129. },
  130. },
  131. result: xsql.WindowTuplesSet{
  132. xsql.WindowTuples{
  133. Emitter: "src1",
  134. Tuples: []xsql.Tuple{
  135. {
  136. Emitter: "src1",
  137. Message: xsql.Message{"id1": 1, "f1": "v1"},
  138. }, {
  139. Emitter: "src1",
  140. Message: xsql.Message{"id1": 3, "f1": "v1"},
  141. },
  142. },
  143. },
  144. },
  145. },
  146. {
  147. sql: "SELECT abc FROM src1 WHERE f1 = \"v8\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  148. data: xsql.WindowTuplesSet{
  149. xsql.WindowTuples{
  150. Emitter: "src1",
  151. Tuples: []xsql.Tuple{
  152. {
  153. Emitter: "src1",
  154. Message: xsql.Message{"id1": 1, "f1": "v1"},
  155. }, {
  156. Emitter: "src1",
  157. Message: xsql.Message{"id1": 2, "f1": "v2"},
  158. }, {
  159. Emitter: "src1",
  160. Message: xsql.Message{"id1": 3, "f1": "v1"},
  161. },
  162. },
  163. },
  164. },
  165. result: nil,
  166. },
  167. {
  168. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  169. data: xsql.JoinTupleSets{
  170. xsql.JoinTuple{
  171. Tuples: []xsql.Tuple{
  172. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  173. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  174. },
  175. },
  176. xsql.JoinTuple{
  177. Tuples: []xsql.Tuple{
  178. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  179. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  180. },
  181. },
  182. xsql.JoinTuple{
  183. Tuples: []xsql.Tuple{
  184. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  185. },
  186. },
  187. },
  188. result: xsql.JoinTupleSets{
  189. xsql.JoinTuple{
  190. Tuples: []xsql.Tuple{
  191. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  192. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  193. },
  194. },
  195. xsql.JoinTuple{
  196. Tuples: []xsql.Tuple{
  197. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  198. },
  199. },
  200. },
  201. },
  202. {
  203. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v22\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  204. data: xsql.JoinTupleSets{
  205. xsql.JoinTuple{
  206. Tuples: []xsql.Tuple{
  207. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  208. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  209. },
  210. },
  211. xsql.JoinTuple{
  212. Tuples: []xsql.Tuple{
  213. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  214. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  215. },
  216. },
  217. xsql.JoinTuple{
  218. Tuples: []xsql.Tuple{
  219. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  220. },
  221. },
  222. },
  223. result: nil,
  224. },
  225. }
  226. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  227. contextLogger := common.Log.WithField("rule", "TestAggregatePlan_Apply")
  228. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  229. for i, tt := range tests {
  230. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  231. if err != nil {
  232. t.Errorf("statement parse error %s", err)
  233. break
  234. }
  235. pp := &FilterPlan{Condition: stmt.Condition}
  236. result := pp.Apply(ctx, tt.data)
  237. if !reflect.DeepEqual(tt.result, result) {
  238. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  239. }
  240. }
  241. }
  242. func TestFilterPlanError(t *testing.T) {
  243. tests := []struct {
  244. sql string
  245. data interface{}
  246. result interface{}
  247. }{
  248. {
  249. sql: "SELECT a FROM tbl WHERE a = b",
  250. data: &xsql.Tuple{
  251. Emitter: "tbl",
  252. Message: xsql.Message{
  253. "a": int64(6),
  254. "b": "astring",
  255. },
  256. },
  257. result: errors.New("invalid operation int64 = string"),
  258. },
  259. {
  260. sql: "SELECT a FROM tbl WHERE def = ghi",
  261. data: errors.New("an error from upstream"),
  262. result: errors.New("an error from upstream"),
  263. },
  264. {
  265. sql: "SELECT abc FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  266. data: xsql.WindowTuplesSet{
  267. xsql.WindowTuples{
  268. Emitter: "src1",
  269. Tuples: []xsql.Tuple{
  270. {
  271. Emitter: "src1",
  272. Message: xsql.Message{"id1": 1, "f1": "v1"},
  273. }, {
  274. Emitter: "src1",
  275. Message: xsql.Message{"id1": 2, "f1": "v2"},
  276. }, {
  277. Emitter: "src1",
  278. Message: xsql.Message{"id1": 3, "f1": "v1"},
  279. },
  280. },
  281. },
  282. xsql.WindowTuples{
  283. Emitter: "src2",
  284. Tuples: []xsql.Tuple{
  285. {
  286. Emitter: "src1",
  287. Message: xsql.Message{"id1": 1, "f1": "v1"},
  288. }, {
  289. Emitter: "src1",
  290. Message: xsql.Message{"id1": 2, "f1": "v2"},
  291. }, {
  292. Emitter: "src1",
  293. Message: xsql.Message{"id1": 3, "f1": "v1"},
  294. },
  295. },
  296. },
  297. },
  298. result: errors.New("WindowTuplesSet with multiple tuples cannot be evaluated"),
  299. },
  300. {
  301. sql: "SELECT abc FROM src1 WHERE f1 = \"v8\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  302. data: xsql.WindowTuplesSet{
  303. xsql.WindowTuples{
  304. Emitter: "src1",
  305. Tuples: []xsql.Tuple{
  306. {
  307. Emitter: "src1",
  308. Message: xsql.Message{"id1": 1, "f1": "v1"},
  309. }, {
  310. Emitter: "src1",
  311. Message: xsql.Message{"id1": 2, "f1": 3},
  312. }, {
  313. Emitter: "src1",
  314. Message: xsql.Message{"id1": 3, "f1": "v1"},
  315. },
  316. },
  317. },
  318. },
  319. result: errors.New("invalid operation int64 = string"),
  320. },
  321. {
  322. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  323. data: xsql.JoinTupleSets{
  324. xsql.JoinTuple{
  325. Tuples: []xsql.Tuple{
  326. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": 50}},
  327. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  328. },
  329. },
  330. xsql.JoinTuple{
  331. Tuples: []xsql.Tuple{
  332. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  333. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  334. },
  335. },
  336. xsql.JoinTuple{
  337. Tuples: []xsql.Tuple{
  338. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  339. },
  340. },
  341. },
  342. result: errors.New("invalid operation int64 = string"),
  343. },
  344. }
  345. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  346. contextLogger := common.Log.WithField("rule", "TestAggregatePlan_Apply")
  347. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  348. for i, tt := range tests {
  349. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  350. if err != nil {
  351. t.Errorf("statement parse error %s", err)
  352. break
  353. }
  354. pp := &FilterPlan{Condition: stmt.Condition}
  355. result := pp.Apply(ctx, tt.data)
  356. if !reflect.DeepEqual(tt.result, result) {
  357. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  358. }
  359. }
  360. }