filter_test.go 9.7 KB


  1. package plans
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/emqx/kuiper/xstream/contexts"
  8. "reflect"
  9. "strings"
  10. "testing"
  11. )
  12. func TestFilterPlan_Apply(t *testing.T) {
  13. var tests = []struct {
  14. sql string
  15. data interface{}
  16. result interface{}
  17. }{
  18. {
  19. sql: "SELECT abc FROM tbl WHERE abc*2+3 > 12 AND abc < 20",
  20. data: &xsql.Tuple{
  21. Emitter: "tbl",
  22. Message: xsql.Message{
  23. "a": int64(6),
  24. },
  25. },
  26. result: nil,
  27. },
  28. // nil equals nil?
  29. {
  30. sql: "SELECT a FROM tbl WHERE def = ghi",
  31. data: &xsql.Tuple{
  32. Emitter: "tbl",
  33. Message: xsql.Message{
  34. "a": int64(6),
  35. },
  36. },
  37. result: &xsql.Tuple{
  38. Emitter: "tbl",
  39. Message: xsql.Message{
  40. "a": int64(6),
  41. },
  42. },
  43. },
  44. {
  45. sql: "SELECT * FROM tbl WHERE abc > def and abc <= ghi",
  46. data: &xsql.Tuple{
  47. Emitter: "tbl",
  48. Message: xsql.Message{
  49. "abc": common.TimeFromUnixMilli(1568854515000),
  50. "def": common.TimeFromUnixMilli(1568853515000),
  51. "ghi": common.TimeFromUnixMilli(1568854515000),
  52. },
  53. },
  54. result: &xsql.Tuple{
  55. Emitter: "tbl",
  56. Message: xsql.Message{
  57. "abc": common.TimeFromUnixMilli(1568854515000),
  58. "def": common.TimeFromUnixMilli(1568853515000),
  59. "ghi": common.TimeFromUnixMilli(1568854515000),
  60. },
  61. },
  62. },
  63. {
  64. sql: "SELECT abc FROM tbl WHERE abc*2+3 > 12 AND abc < 20",
  65. data: &xsql.Tuple{
  66. Emitter: "tbl",
  67. Message: xsql.Message{
  68. "abc": int64(6),
  69. },
  70. },
  71. result: &xsql.Tuple{
  72. Emitter: "tbl",
  73. Message: xsql.Message{
  74. "abc": int64(6),
  75. },
  76. },
  77. },
  78. {
  79. sql: "SELECT abc FROM tbl WHERE abc*2+3 > 12 OR def = \"hello\"",
  80. data: &xsql.Tuple{
  81. Emitter: "tbl",
  82. Message: xsql.Message{
  83. "abc": int64(34),
  84. "def": "hello",
  85. },
  86. },
  87. result: &xsql.Tuple{
  88. Emitter: "tbl",
  89. Message: xsql.Message{
  90. "abc": int64(34),
  91. "def": "hello",
  92. },
  93. },
  94. },
  95. {
  96. sql: "SELECT abc FROM tbl WHERE abc > \"2019-09-19T00:55:15.000Z\"",
  97. data: &xsql.Tuple{
  98. Emitter: "tbl",
  99. Message: xsql.Message{
  100. "abc": common.TimeFromUnixMilli(1568854515678),
  101. "def": "hello",
  102. },
  103. },
  104. result: &xsql.Tuple{
  105. Emitter: "tbl",
  106. Message: xsql.Message{
  107. "abc": common.TimeFromUnixMilli(1568854515678),
  108. "def": "hello",
  109. },
  110. },
  111. },
  112. {
  113. sql: "SELECT abc FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  114. data: xsql.WindowTuplesSet{
  115. xsql.WindowTuples{
  116. Emitter: "src1",
  117. Tuples: []xsql.Tuple{
  118. {
  119. Emitter: "src1",
  120. Message: xsql.Message{"id1": 1, "f1": "v1"},
  121. }, {
  122. Emitter: "src1",
  123. Message: xsql.Message{"id1": 2, "f1": "v2"},
  124. }, {
  125. Emitter: "src1",
  126. Message: xsql.Message{"id1": 3, "f1": "v1"},
  127. },
  128. },
  129. },
  130. },
  131. result: xsql.WindowTuplesSet{
  132. xsql.WindowTuples{
  133. Emitter: "src1",
  134. Tuples: []xsql.Tuple{
  135. {
  136. Emitter: "src1",
  137. Message: xsql.Message{"id1": 1, "f1": "v1"},
  138. }, {
  139. Emitter: "src1",
  140. Message: xsql.Message{"id1": 3, "f1": "v1"},
  141. },
  142. },
  143. },
  144. },
  145. },
  146. {
  147. sql: "SELECT abc FROM src1 WHERE f1 = \"v8\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  148. data: xsql.WindowTuplesSet{
  149. xsql.WindowTuples{
  150. Emitter: "src1",
  151. Tuples: []xsql.Tuple{
  152. {
  153. Emitter: "src1",
  154. Message: xsql.Message{"id1": 1, "f1": "v1"},
  155. }, {
  156. Emitter: "src1",
  157. Message: xsql.Message{"id1": 2, "f1": "v2"},
  158. }, {
  159. Emitter: "src1",
  160. Message: xsql.Message{"id1": 3, "f1": "v1"},
  161. },
  162. },
  163. },
  164. },
  165. result: nil,
  166. },
  167. {
  168. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  169. data: xsql.JoinTupleSets{
  170. xsql.JoinTuple{
  171. Tuples: []xsql.Tuple{
  172. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  173. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  174. },
  175. },
  176. xsql.JoinTuple{
  177. Tuples: []xsql.Tuple{
  178. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  179. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  180. },
  181. },
  182. xsql.JoinTuple{
  183. Tuples: []xsql.Tuple{
  184. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  185. },
  186. },
  187. },
  188. result: xsql.JoinTupleSets{
  189. xsql.JoinTuple{
  190. Tuples: []xsql.Tuple{
  191. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  192. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  193. },
  194. },
  195. xsql.JoinTuple{
  196. Tuples: []xsql.Tuple{
  197. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  198. },
  199. },
  200. },
  201. },
  202. {
  203. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v22\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  204. data: xsql.JoinTupleSets{
  205. xsql.JoinTuple{
  206. Tuples: []xsql.Tuple{
  207. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  208. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  209. },
  210. },
  211. xsql.JoinTuple{
  212. Tuples: []xsql.Tuple{
  213. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  214. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  215. },
  216. },
  217. xsql.JoinTuple{
  218. Tuples: []xsql.Tuple{
  219. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  220. },
  221. },
  222. },
  223. result: nil,
  224. },
  225. {
  226. sql: "SELECT abc FROM tbl WHERE meta(topic) = \"topic1\" ",
  227. data: &xsql.Tuple{
  228. Emitter: "tbl",
  229. Message: xsql.Message{
  230. "a": int64(6),
  231. },
  232. Metadata: xsql.Metadata{
  233. "topic": "topic1",
  234. },
  235. },
  236. result: &xsql.Tuple{
  237. Emitter: "tbl",
  238. Message: xsql.Message{
  239. "a": int64(6),
  240. },
  241. Metadata: xsql.Metadata{
  242. "topic": "topic1",
  243. },
  244. },
  245. },
  246. }
  247. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  248. contextLogger := common.Log.WithField("rule", "TestAggregatePlan_Apply")
  249. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  250. for i, tt := range tests {
  251. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  252. if err != nil {
  253. t.Errorf("statement parse error %s", err)
  254. break
  255. }
  256. pp := &FilterPlan{Condition: stmt.Condition}
  257. result := pp.Apply(ctx, tt.data)
  258. if !reflect.DeepEqual(tt.result, result) {
  259. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  260. }
  261. }
  262. }
  263. func TestFilterPlanError(t *testing.T) {
  264. tests := []struct {
  265. sql string
  266. data interface{}
  267. result interface{}
  268. }{
  269. {
  270. sql: "SELECT a FROM tbl WHERE a = b",
  271. data: &xsql.Tuple{
  272. Emitter: "tbl",
  273. Message: xsql.Message{
  274. "a": int64(6),
  275. "b": "astring",
  276. },
  277. },
  278. result: errors.New("run Where error: invalid operation int64(6) = string(astring)"),
  279. },
  280. {
  281. sql: "SELECT a FROM tbl WHERE def = ghi",
  282. data: errors.New("an error from upstream"),
  283. result: errors.New("an error from upstream"),
  284. },
  285. {
  286. sql: "SELECT abc FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  287. data: xsql.WindowTuplesSet{
  288. xsql.WindowTuples{
  289. Emitter: "src1",
  290. Tuples: []xsql.Tuple{
  291. {
  292. Emitter: "src1",
  293. Message: xsql.Message{"id1": 1, "f1": "v1"},
  294. }, {
  295. Emitter: "src1",
  296. Message: xsql.Message{"id1": 2, "f1": "v2"},
  297. }, {
  298. Emitter: "src1",
  299. Message: xsql.Message{"id1": 3, "f1": "v1"},
  300. },
  301. },
  302. },
  303. xsql.WindowTuples{
  304. Emitter: "src2",
  305. Tuples: []xsql.Tuple{
  306. {
  307. Emitter: "src1",
  308. Message: xsql.Message{"id1": 1, "f1": "v1"},
  309. }, {
  310. Emitter: "src1",
  311. Message: xsql.Message{"id1": 2, "f1": "v2"},
  312. }, {
  313. Emitter: "src1",
  314. Message: xsql.Message{"id1": 3, "f1": "v1"},
  315. },
  316. },
  317. },
  318. },
  319. result: errors.New("run Where error: the input WindowTuplesSet with multiple tuples cannot be evaluated"),
  320. },
  321. {
  322. sql: "SELECT abc FROM src1 WHERE f1 = \"v8\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  323. data: xsql.WindowTuplesSet{
  324. xsql.WindowTuples{
  325. Emitter: "src1",
  326. Tuples: []xsql.Tuple{
  327. {
  328. Emitter: "src1",
  329. Message: xsql.Message{"id1": 1, "f1": "v1"},
  330. }, {
  331. Emitter: "src1",
  332. Message: xsql.Message{"id1": 2, "f1": 3},
  333. }, {
  334. Emitter: "src1",
  335. Message: xsql.Message{"id1": 3, "f1": "v1"},
  336. },
  337. },
  338. },
  339. },
  340. result: errors.New("run Where error: invalid operation int64(3) = string(v8)"),
  341. },
  342. {
  343. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  344. data: xsql.JoinTupleSets{
  345. xsql.JoinTuple{
  346. Tuples: []xsql.Tuple{
  347. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": 50}},
  348. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  349. },
  350. },
  351. xsql.JoinTuple{
  352. Tuples: []xsql.Tuple{
  353. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  354. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  355. },
  356. },
  357. xsql.JoinTuple{
  358. Tuples: []xsql.Tuple{
  359. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  360. },
  361. },
  362. },
  363. result: errors.New("run Where error: invalid operation int64(50) = string(v1)"),
  364. },
  365. }
  366. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  367. contextLogger := common.Log.WithField("rule", "TestAggregatePlan_Apply")
  368. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  369. for i, tt := range tests {
  370. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  371. if err != nil {
  372. t.Errorf("statement parse error %s", err)
  373. break
  374. }
  375. pp := &FilterPlan{Condition: stmt.Condition}
  376. result := pp.Apply(ctx, tt.data)
  377. if !reflect.DeepEqual(tt.result, result) {
  378. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  379. }
  380. }
  381. }