filter_test.go 11 KB


  1. package plans
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/emqx/kuiper/xstream/contexts"
  8. "reflect"
  9. "strings"
  10. "testing"
  11. )
  12. func TestFilterPlan_Apply(t *testing.T) {
  13. var tests = []struct {
  14. sql string
  15. data interface{}
  16. result interface{}
  17. }{
  18. {
  19. sql: "SELECT abc FROM tbl WHERE abc*2+3 > 12 AND abc < 20",
  20. data: &xsql.Tuple{
  21. Emitter: "tbl",
  22. Message: xsql.Message{
  23. "a": int64(6),
  24. },
  25. },
  26. result: nil,
  27. },
  28. // nil equals nil?
  29. {
  30. sql: "SELECT a FROM tbl WHERE def = ghi",
  31. data: &xsql.Tuple{
  32. Emitter: "tbl",
  33. Message: xsql.Message{
  34. "a": int64(6),
  35. },
  36. },
  37. result: &xsql.Tuple{
  38. Emitter: "tbl",
  39. Message: xsql.Message{
  40. "a": int64(6),
  41. },
  42. },
  43. },
  44. {
  45. sql: "SELECT * FROM tbl WHERE abc > def and abc <= ghi",
  46. data: &xsql.Tuple{
  47. Emitter: "tbl",
  48. Message: xsql.Message{
  49. "abc": common.TimeFromUnixMilli(1568854515000),
  50. "def": common.TimeFromUnixMilli(1568853515000),
  51. "ghi": common.TimeFromUnixMilli(1568854515000),
  52. },
  53. },
  54. result: &xsql.Tuple{
  55. Emitter: "tbl",
  56. Message: xsql.Message{
  57. "abc": common.TimeFromUnixMilli(1568854515000),
  58. "def": common.TimeFromUnixMilli(1568853515000),
  59. "ghi": common.TimeFromUnixMilli(1568854515000),
  60. },
  61. },
  62. },
  63. {
  64. sql: "SELECT abc FROM tbl WHERE abc*2+3 > 12 AND abc < 20",
  65. data: &xsql.Tuple{
  66. Emitter: "tbl",
  67. Message: xsql.Message{
  68. "abc": int64(6),
  69. },
  70. },
  71. result: &xsql.Tuple{
  72. Emitter: "tbl",
  73. Message: xsql.Message{
  74. "abc": int64(6),
  75. },
  76. },
  77. },
  78. {
  79. sql: "SELECT abc FROM tbl WHERE abc*2+3 > 12 OR def = \"hello\"",
  80. data: &xsql.Tuple{
  81. Emitter: "tbl",
  82. Message: xsql.Message{
  83. "abc": int64(34),
  84. "def": "hello",
  85. },
  86. },
  87. result: &xsql.Tuple{
  88. Emitter: "tbl",
  89. Message: xsql.Message{
  90. "abc": int64(34),
  91. "def": "hello",
  92. },
  93. },
  94. },
  95. {
  96. sql: "SELECT abc FROM tbl WHERE abc > \"2019-09-19T00:55:15.000Z\"",
  97. data: &xsql.Tuple{
  98. Emitter: "tbl",
  99. Message: xsql.Message{
  100. "abc": common.TimeFromUnixMilli(1568854515678),
  101. "def": "hello",
  102. },
  103. },
  104. result: &xsql.Tuple{
  105. Emitter: "tbl",
  106. Message: xsql.Message{
  107. "abc": common.TimeFromUnixMilli(1568854515678),
  108. "def": "hello",
  109. },
  110. },
  111. },
  112. {
  113. sql: "SELECT abc FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  114. data: xsql.WindowTuplesSet{
  115. xsql.WindowTuples{
  116. Emitter: "src1",
  117. Tuples: []xsql.Tuple{
  118. {
  119. Emitter: "src1",
  120. Message: xsql.Message{"id1": 1, "f1": "v1"},
  121. }, {
  122. Emitter: "src1",
  123. Message: xsql.Message{"id1": 2, "f1": "v2"},
  124. }, {
  125. Emitter: "src1",
  126. Message: xsql.Message{"id1": 3, "f1": "v1"},
  127. },
  128. },
  129. },
  130. },
  131. result: xsql.WindowTuplesSet{
  132. xsql.WindowTuples{
  133. Emitter: "src1",
  134. Tuples: []xsql.Tuple{
  135. {
  136. Emitter: "src1",
  137. Message: xsql.Message{"id1": 1, "f1": "v1"},
  138. }, {
  139. Emitter: "src1",
  140. Message: xsql.Message{"id1": 3, "f1": "v1"},
  141. },
  142. },
  143. },
  144. },
  145. },
  146. {
  147. sql: "SELECT abc FROM src1 WHERE f1 = \"v8\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  148. data: xsql.WindowTuplesSet{
  149. xsql.WindowTuples{
  150. Emitter: "src1",
  151. Tuples: []xsql.Tuple{
  152. {
  153. Emitter: "src1",
  154. Message: xsql.Message{"id1": 1, "f1": "v1"},
  155. }, {
  156. Emitter: "src1",
  157. Message: xsql.Message{"id1": 2, "f1": "v2"},
  158. }, {
  159. Emitter: "src1",
  160. Message: xsql.Message{"id1": 3, "f1": "v1"},
  161. },
  162. },
  163. },
  164. },
  165. result: nil,
  166. },
  167. {
  168. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  169. data: xsql.JoinTupleSets{
  170. xsql.JoinTuple{
  171. Tuples: []xsql.Tuple{
  172. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  173. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  174. },
  175. },
  176. xsql.JoinTuple{
  177. Tuples: []xsql.Tuple{
  178. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  179. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  180. },
  181. },
  182. xsql.JoinTuple{
  183. Tuples: []xsql.Tuple{
  184. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  185. },
  186. },
  187. },
  188. result: xsql.JoinTupleSets{
  189. xsql.JoinTuple{
  190. Tuples: []xsql.Tuple{
  191. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  192. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  193. },
  194. },
  195. xsql.JoinTuple{
  196. Tuples: []xsql.Tuple{
  197. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  198. },
  199. },
  200. },
  201. },
  202. {
  203. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v22\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  204. data: xsql.JoinTupleSets{
  205. xsql.JoinTuple{
  206. Tuples: []xsql.Tuple{
  207. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  208. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  209. },
  210. },
  211. xsql.JoinTuple{
  212. Tuples: []xsql.Tuple{
  213. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  214. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  215. },
  216. },
  217. xsql.JoinTuple{
  218. Tuples: []xsql.Tuple{
  219. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  220. },
  221. },
  222. },
  223. result: nil,
  224. },
  225. {
  226. sql: "SELECT abc FROM tbl WHERE meta(topic) = \"topic1\" ",
  227. data: &xsql.Tuple{
  228. Emitter: "tbl",
  229. Message: xsql.Message{
  230. "a": int64(6),
  231. },
  232. Metadata: xsql.Metadata{
  233. "topic": "topic1",
  234. },
  235. },
  236. result: &xsql.Tuple{
  237. Emitter: "tbl",
  238. Message: xsql.Message{
  239. "a": int64(6),
  240. },
  241. Metadata: xsql.Metadata{
  242. "topic": "topic1",
  243. },
  244. },
  245. },
  246. {
  247. sql: `SELECT abc FROM tbl WHERE json_path_exists(samplers, "$[? @.result.throughput==30]")`,
  248. data: &xsql.Tuple{
  249. Emitter: "tbl",
  250. Message: xsql.Message{
  251. "samplers": []interface{}{
  252. map[string]interface{}{
  253. "name": "page1",
  254. "result": map[string]interface{}{
  255. "throughput": float64(25),
  256. "rt": float64(20),
  257. },
  258. },
  259. map[string]interface{}{
  260. "name": "page2",
  261. "result": map[string]interface{}{
  262. "throughput": float64(30),
  263. "rt": float64(20),
  264. },
  265. },
  266. },
  267. },
  268. },
  269. result: &xsql.Tuple{
  270. Emitter: "tbl",
  271. Message: xsql.Message{
  272. "samplers": []interface{}{
  273. map[string]interface{}{
  274. "name": "page1",
  275. "result": map[string]interface{}{
  276. "throughput": float64(25),
  277. "rt": float64(20),
  278. },
  279. },
  280. map[string]interface{}{
  281. "name": "page2",
  282. "result": map[string]interface{}{
  283. "throughput": float64(30),
  284. "rt": float64(20),
  285. },
  286. },
  287. },
  288. },
  289. },
  290. },
  291. {
  292. sql: `SELECT abc FROM tbl WHERE json_path_exists(samplers, "$[? @.result.throughput<20]")`,
  293. data: &xsql.Tuple{
  294. Emitter: "tbl",
  295. Message: xsql.Message{
  296. "samplers": []interface{}{
  297. map[string]interface{}{
  298. "name": "page1",
  299. "result": map[string]interface{}{
  300. "throughput": 25,
  301. "rt": 20,
  302. },
  303. },
  304. map[string]interface{}{
  305. "name": "page2",
  306. "result": map[string]interface{}{
  307. "throughput": 30,
  308. "rt": 20,
  309. },
  310. },
  311. },
  312. },
  313. },
  314. result: nil,
  315. },
  316. }
  317. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  318. contextLogger := common.Log.WithField("rule", "TestAggregatePlan_Apply")
  319. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  320. for i, tt := range tests {
  321. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  322. if err != nil {
  323. t.Errorf("statement parse error %s", err)
  324. break
  325. }
  326. fv, afv := xsql.NewFunctionValuersForOp(nil)
  327. pp := &FilterPlan{Condition: stmt.Condition}
  328. result := pp.Apply(ctx, tt.data, fv, afv)
  329. if !reflect.DeepEqual(tt.result, result) {
  330. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  331. }
  332. }
  333. }
  334. func TestFilterPlanError(t *testing.T) {
  335. tests := []struct {
  336. sql string
  337. data interface{}
  338. result interface{}
  339. }{
  340. {
  341. sql: "SELECT a FROM tbl WHERE a = b",
  342. data: &xsql.Tuple{
  343. Emitter: "tbl",
  344. Message: xsql.Message{
  345. "a": int64(6),
  346. "b": "astring",
  347. },
  348. },
  349. result: errors.New("run Where error: invalid operation int64(6) = string(astring)"),
  350. },
  351. {
  352. sql: "SELECT a FROM tbl WHERE def = ghi",
  353. data: errors.New("an error from upstream"),
  354. result: errors.New("an error from upstream"),
  355. },
  356. {
  357. sql: "SELECT abc FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  358. data: xsql.WindowTuplesSet{
  359. xsql.WindowTuples{
  360. Emitter: "src1",
  361. Tuples: []xsql.Tuple{
  362. {
  363. Emitter: "src1",
  364. Message: xsql.Message{"id1": 1, "f1": "v1"},
  365. }, {
  366. Emitter: "src1",
  367. Message: xsql.Message{"id1": 2, "f1": "v2"},
  368. }, {
  369. Emitter: "src1",
  370. Message: xsql.Message{"id1": 3, "f1": "v1"},
  371. },
  372. },
  373. },
  374. xsql.WindowTuples{
  375. Emitter: "src2",
  376. Tuples: []xsql.Tuple{
  377. {
  378. Emitter: "src1",
  379. Message: xsql.Message{"id1": 1, "f1": "v1"},
  380. }, {
  381. Emitter: "src1",
  382. Message: xsql.Message{"id1": 2, "f1": "v2"},
  383. }, {
  384. Emitter: "src1",
  385. Message: xsql.Message{"id1": 3, "f1": "v1"},
  386. },
  387. },
  388. },
  389. },
  390. result: errors.New("run Where error: the input WindowTuplesSet with multiple tuples cannot be evaluated"),
  391. },
  392. {
  393. sql: "SELECT abc FROM src1 WHERE f1 = \"v8\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  394. data: xsql.WindowTuplesSet{
  395. xsql.WindowTuples{
  396. Emitter: "src1",
  397. Tuples: []xsql.Tuple{
  398. {
  399. Emitter: "src1",
  400. Message: xsql.Message{"id1": 1, "f1": "v1"},
  401. }, {
  402. Emitter: "src1",
  403. Message: xsql.Message{"id1": 2, "f1": 3},
  404. }, {
  405. Emitter: "src1",
  406. Message: xsql.Message{"id1": 3, "f1": "v1"},
  407. },
  408. },
  409. },
  410. },
  411. result: errors.New("run Where error: invalid operation int64(3) = string(v8)"),
  412. },
  413. {
  414. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  415. data: xsql.JoinTupleSets{
  416. xsql.JoinTuple{
  417. Tuples: []xsql.Tuple{
  418. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": 50}},
  419. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  420. },
  421. },
  422. xsql.JoinTuple{
  423. Tuples: []xsql.Tuple{
  424. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  425. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  426. },
  427. },
  428. xsql.JoinTuple{
  429. Tuples: []xsql.Tuple{
  430. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  431. },
  432. },
  433. },
  434. result: errors.New("run Where error: invalid operation int64(50) = string(v1)"),
  435. },
  436. }
  437. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  438. contextLogger := common.Log.WithField("rule", "TestAggregatePlan_Apply")
  439. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  440. for i, tt := range tests {
  441. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  442. if err != nil {
  443. t.Errorf("statement parse error %s", err)
  444. break
  445. }
  446. fv, afv := xsql.NewFunctionValuersForOp(nil)
  447. pp := &FilterPlan{Condition: stmt.Condition}
  448. result := pp.Apply(ctx, tt.data, fv, afv)
  449. if !reflect.DeepEqual(tt.result, result) {
  450. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  451. }
  452. }
  453. }