filter_test.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396
  1. package plans
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/emqx/kuiper/xstream/contexts"
  8. "reflect"
  9. "strings"
  10. "testing"
  11. )
  12. func TestFilterPlan_Apply(t *testing.T) {
  13. var tests = []struct {
  14. sql string
  15. data interface{}
  16. result interface{}
  17. }{
  18. {
  19. sql: "SELECT abc FROM tbl WHERE abc*2+3 > 12 AND abc < 20",
  20. data: &xsql.Tuple{
  21. Emitter: "tbl",
  22. Message: xsql.Message{
  23. "a": int64(6),
  24. },
  25. },
  26. result: nil,
  27. },
  28. // nil equals nil?
  29. {
  30. sql: "SELECT a FROM tbl WHERE def = ghi",
  31. data: &xsql.Tuple{
  32. Emitter: "tbl",
  33. Message: xsql.Message{
  34. "a": int64(6),
  35. },
  36. },
  37. result: &xsql.Tuple{
  38. Emitter: "tbl",
  39. Message: xsql.Message{
  40. "a": int64(6),
  41. },
  42. },
  43. },
  44. {
  45. sql: "SELECT * FROM tbl WHERE abc > def and abc <= ghi",
  46. data: &xsql.Tuple{
  47. Emitter: "tbl",
  48. Message: xsql.Message{
  49. "abc": common.TimeFromUnixMilli(1568854515000),
  50. "def": common.TimeFromUnixMilli(1568853515000),
  51. "ghi": common.TimeFromUnixMilli(1568854515000),
  52. },
  53. },
  54. result: &xsql.Tuple{
  55. Emitter: "tbl",
  56. Message: xsql.Message{
  57. "abc": common.TimeFromUnixMilli(1568854515000),
  58. "def": common.TimeFromUnixMilli(1568853515000),
  59. "ghi": common.TimeFromUnixMilli(1568854515000),
  60. },
  61. },
  62. },
  63. {
  64. sql: "SELECT abc FROM tbl WHERE abc*2+3 > 12 AND abc < 20",
  65. data: &xsql.Tuple{
  66. Emitter: "tbl",
  67. Message: xsql.Message{
  68. "abc": int64(6),
  69. },
  70. },
  71. result: &xsql.Tuple{
  72. Emitter: "tbl",
  73. Message: xsql.Message{
  74. "abc": int64(6),
  75. },
  76. },
  77. },
  78. {
  79. sql: "SELECT abc FROM tbl WHERE abc*2+3 > 12 OR def = \"hello\"",
  80. data: &xsql.Tuple{
  81. Emitter: "tbl",
  82. Message: xsql.Message{
  83. "abc": int64(34),
  84. "def": "hello",
  85. },
  86. },
  87. result: &xsql.Tuple{
  88. Emitter: "tbl",
  89. Message: xsql.Message{
  90. "abc": int64(34),
  91. "def": "hello",
  92. },
  93. },
  94. },
  95. {
  96. sql: "SELECT abc FROM tbl WHERE abc > \"2019-09-19T00:55:15.000Z\"",
  97. data: &xsql.Tuple{
  98. Emitter: "tbl",
  99. Message: xsql.Message{
  100. "abc": common.TimeFromUnixMilli(1568854515678),
  101. "def": "hello",
  102. },
  103. },
  104. result: &xsql.Tuple{
  105. Emitter: "tbl",
  106. Message: xsql.Message{
  107. "abc": common.TimeFromUnixMilli(1568854515678),
  108. "def": "hello",
  109. },
  110. },
  111. },
  112. {
  113. sql: "SELECT abc FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  114. data: xsql.WindowTuplesSet{
  115. xsql.WindowTuples{
  116. Emitter: "src1",
  117. Tuples: []xsql.Tuple{
  118. {
  119. Emitter: "src1",
  120. Message: xsql.Message{"id1": 1, "f1": "v1"},
  121. }, {
  122. Emitter: "src1",
  123. Message: xsql.Message{"id1": 2, "f1": "v2"},
  124. }, {
  125. Emitter: "src1",
  126. Message: xsql.Message{"id1": 3, "f1": "v1"},
  127. },
  128. },
  129. },
  130. },
  131. result: xsql.WindowTuplesSet{
  132. xsql.WindowTuples{
  133. Emitter: "src1",
  134. Tuples: []xsql.Tuple{
  135. {
  136. Emitter: "src1",
  137. Message: xsql.Message{"id1": 1, "f1": "v1"},
  138. }, {
  139. Emitter: "src1",
  140. Message: xsql.Message{"id1": 3, "f1": "v1"},
  141. },
  142. },
  143. },
  144. },
  145. },
  146. {
  147. sql: "SELECT abc FROM src1 WHERE f1 = \"v8\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  148. data: xsql.WindowTuplesSet{
  149. xsql.WindowTuples{
  150. Emitter: "src1",
  151. Tuples: []xsql.Tuple{
  152. {
  153. Emitter: "src1",
  154. Message: xsql.Message{"id1": 1, "f1": "v1"},
  155. }, {
  156. Emitter: "src1",
  157. Message: xsql.Message{"id1": 2, "f1": "v2"},
  158. }, {
  159. Emitter: "src1",
  160. Message: xsql.Message{"id1": 3, "f1": "v1"},
  161. },
  162. },
  163. },
  164. },
  165. result: nil,
  166. },
  167. {
  168. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  169. data: xsql.JoinTupleSets{
  170. xsql.JoinTuple{
  171. Tuples: []xsql.Tuple{
  172. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  173. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  174. },
  175. },
  176. xsql.JoinTuple{
  177. Tuples: []xsql.Tuple{
  178. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  179. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  180. },
  181. },
  182. xsql.JoinTuple{
  183. Tuples: []xsql.Tuple{
  184. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  185. },
  186. },
  187. },
  188. result: xsql.JoinTupleSets{
  189. xsql.JoinTuple{
  190. Tuples: []xsql.Tuple{
  191. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  192. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  193. },
  194. },
  195. xsql.JoinTuple{
  196. Tuples: []xsql.Tuple{
  197. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  198. },
  199. },
  200. },
  201. },
  202. {
  203. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v22\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  204. data: xsql.JoinTupleSets{
  205. xsql.JoinTuple{
  206. Tuples: []xsql.Tuple{
  207. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  208. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  209. },
  210. },
  211. xsql.JoinTuple{
  212. Tuples: []xsql.Tuple{
  213. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  214. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  215. },
  216. },
  217. xsql.JoinTuple{
  218. Tuples: []xsql.Tuple{
  219. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  220. },
  221. },
  222. },
  223. result: nil,
  224. },
  225. {
  226. sql: "SELECT abc FROM tbl WHERE meta(topic) = \"topic1\" ",
  227. data: &xsql.Tuple{
  228. Emitter: "tbl",
  229. Message: xsql.Message{
  230. "a": int64(6),
  231. },
  232. Metadata: xsql.Metadata{
  233. "topic": "topic1",
  234. },
  235. },
  236. result: &xsql.Tuple{
  237. Emitter: "tbl",
  238. Message: xsql.Message{
  239. "a": int64(6),
  240. },
  241. Metadata: xsql.Metadata{
  242. "topic": "topic1",
  243. },
  244. },
  245. },
  246. }
  247. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  248. contextLogger := common.Log.WithField("rule", "TestAggregatePlan_Apply")
  249. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  250. for i, tt := range tests {
  251. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  252. if err != nil {
  253. t.Errorf("statement parse error %s", err)
  254. break
  255. }
  256. fv, afv := xsql.NewAggregateFunctionValuers()
  257. pp := &FilterPlan{Condition: stmt.Condition}
  258. result := pp.Apply(ctx, tt.data, fv, afv)
  259. if !reflect.DeepEqual(tt.result, result) {
  260. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  261. }
  262. }
  263. }
  264. func TestFilterPlanError(t *testing.T) {
  265. tests := []struct {
  266. sql string
  267. data interface{}
  268. result interface{}
  269. }{
  270. {
  271. sql: "SELECT a FROM tbl WHERE a = b",
  272. data: &xsql.Tuple{
  273. Emitter: "tbl",
  274. Message: xsql.Message{
  275. "a": int64(6),
  276. "b": "astring",
  277. },
  278. },
  279. result: errors.New("run Where error: invalid operation int64(6) = string(astring)"),
  280. },
  281. {
  282. sql: "SELECT a FROM tbl WHERE def = ghi",
  283. data: errors.New("an error from upstream"),
  284. result: errors.New("an error from upstream"),
  285. },
  286. {
  287. sql: "SELECT abc FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  288. data: xsql.WindowTuplesSet{
  289. xsql.WindowTuples{
  290. Emitter: "src1",
  291. Tuples: []xsql.Tuple{
  292. {
  293. Emitter: "src1",
  294. Message: xsql.Message{"id1": 1, "f1": "v1"},
  295. }, {
  296. Emitter: "src1",
  297. Message: xsql.Message{"id1": 2, "f1": "v2"},
  298. }, {
  299. Emitter: "src1",
  300. Message: xsql.Message{"id1": 3, "f1": "v1"},
  301. },
  302. },
  303. },
  304. xsql.WindowTuples{
  305. Emitter: "src2",
  306. Tuples: []xsql.Tuple{
  307. {
  308. Emitter: "src1",
  309. Message: xsql.Message{"id1": 1, "f1": "v1"},
  310. }, {
  311. Emitter: "src1",
  312. Message: xsql.Message{"id1": 2, "f1": "v2"},
  313. }, {
  314. Emitter: "src1",
  315. Message: xsql.Message{"id1": 3, "f1": "v1"},
  316. },
  317. },
  318. },
  319. },
  320. result: errors.New("run Where error: the input WindowTuplesSet with multiple tuples cannot be evaluated"),
  321. },
  322. {
  323. sql: "SELECT abc FROM src1 WHERE f1 = \"v8\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  324. data: xsql.WindowTuplesSet{
  325. xsql.WindowTuples{
  326. Emitter: "src1",
  327. Tuples: []xsql.Tuple{
  328. {
  329. Emitter: "src1",
  330. Message: xsql.Message{"id1": 1, "f1": "v1"},
  331. }, {
  332. Emitter: "src1",
  333. Message: xsql.Message{"id1": 2, "f1": 3},
  334. }, {
  335. Emitter: "src1",
  336. Message: xsql.Message{"id1": 3, "f1": "v1"},
  337. },
  338. },
  339. },
  340. },
  341. result: errors.New("run Where error: invalid operation int64(3) = string(v8)"),
  342. },
  343. {
  344. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  345. data: xsql.JoinTupleSets{
  346. xsql.JoinTuple{
  347. Tuples: []xsql.Tuple{
  348. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": 50}},
  349. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  350. },
  351. },
  352. xsql.JoinTuple{
  353. Tuples: []xsql.Tuple{
  354. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  355. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  356. },
  357. },
  358. xsql.JoinTuple{
  359. Tuples: []xsql.Tuple{
  360. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  361. },
  362. },
  363. },
  364. result: errors.New("run Where error: invalid operation int64(50) = string(v1)"),
  365. },
  366. }
  367. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  368. contextLogger := common.Log.WithField("rule", "TestAggregatePlan_Apply")
  369. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  370. for i, tt := range tests {
  371. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  372. if err != nil {
  373. t.Errorf("statement parse error %s", err)
  374. break
  375. }
  376. fv, afv := xsql.NewAggregateFunctionValuers()
  377. pp := &FilterPlan{Condition: stmt.Condition}
  378. result := pp.Apply(ctx, tt.data, fv, afv)
  379. if !reflect.DeepEqual(tt.result, result) {
  380. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  381. }
  382. }
  383. }