project_test.go 35 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456
  1. package plans
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "github.com/emqx/kuiper/common"
  7. "github.com/emqx/kuiper/xsql"
  8. "github.com/emqx/kuiper/xstream/contexts"
  9. "reflect"
  10. "strings"
  11. "testing"
  12. )
  13. func TestProjectPlan_Apply1(t *testing.T) {
  14. var tests = []struct {
  15. sql string
  16. data *xsql.Tuple
  17. result []map[string]interface{}
  18. }{
  19. {
  20. sql: "SELECT a FROM test",
  21. data: &xsql.Tuple{
  22. Emitter: "test",
  23. Message: xsql.Message{
  24. "a": "val_a",
  25. },
  26. },
  27. result: []map[string]interface{}{{
  28. "a": "val_a",
  29. }},
  30. },
  31. {
  32. sql: "SELECT b FROM test",
  33. data: &xsql.Tuple{
  34. Emitter: "test",
  35. Message: xsql.Message{
  36. "a": "val_a",
  37. },
  38. },
  39. result: []map[string]interface{}{{}},
  40. },
  41. {
  42. sql: "SELECT ts FROM test",
  43. data: &xsql.Tuple{
  44. Emitter: "test",
  45. Message: xsql.Message{
  46. "a": "val_a",
  47. "ts": common.TimeFromUnixMilli(1568854573431),
  48. },
  49. },
  50. result: []map[string]interface{}{{
  51. "ts": "2019-09-19T00:56:13.431Z",
  52. }},
  53. },
  54. //Schemaless may return a message without selecting column
  55. {
  56. sql: "SELECT ts FROM test",
  57. data: &xsql.Tuple{
  58. Emitter: "test",
  59. Message: xsql.Message{
  60. "a": "val_a",
  61. "ts2": common.TimeFromUnixMilli(1568854573431),
  62. },
  63. },
  64. result: []map[string]interface{}{{}},
  65. },
  66. {
  67. sql: "SELECT A FROM test",
  68. data: &xsql.Tuple{
  69. Emitter: "test",
  70. Message: xsql.Message{
  71. "a": "val_a",
  72. },
  73. },
  74. result: []map[string]interface{}{{
  75. "A": "val_a",
  76. }},
  77. },
  78. {
  79. sql: `SELECT "value" FROM test`,
  80. data: &xsql.Tuple{
  81. Emitter: "test",
  82. Message: xsql.Message{},
  83. },
  84. result: []map[string]interface{}{{
  85. DEFAULT_FIELD_NAME_PREFIX + "0": "value",
  86. }},
  87. },
  88. {
  89. sql: `SELECT 3.4 FROM test`,
  90. data: &xsql.Tuple{
  91. Emitter: "test",
  92. Message: xsql.Message{},
  93. },
  94. result: []map[string]interface{}{{
  95. DEFAULT_FIELD_NAME_PREFIX + "0": 3.4,
  96. }},
  97. },
  98. {
  99. sql: `SELECT 5 FROM test`,
  100. data: &xsql.Tuple{
  101. Emitter: "test",
  102. Message: xsql.Message{},
  103. },
  104. result: []map[string]interface{}{{
  105. DEFAULT_FIELD_NAME_PREFIX + "0": 5.0,
  106. }},
  107. },
  108. {
  109. sql: `SELECT a, "value" AS b FROM test`,
  110. data: &xsql.Tuple{
  111. Emitter: "test",
  112. Message: xsql.Message{
  113. "a": "val_a",
  114. },
  115. },
  116. result: []map[string]interface{}{{
  117. "a": "val_a",
  118. "b": "value",
  119. }},
  120. },
  121. {
  122. sql: `SELECT a, "value" AS b, 3.14 as Pi, 0 as Zero FROM test`,
  123. data: &xsql.Tuple{
  124. Emitter: "test",
  125. Message: xsql.Message{
  126. "a": "val_a",
  127. },
  128. },
  129. result: []map[string]interface{}{{
  130. "a": "val_a",
  131. "b": "value",
  132. "Pi": 3.14,
  133. "Zero": 0.0,
  134. }},
  135. },
  136. {
  137. sql: `SELECT a->b AS ab FROM test`,
  138. data: &xsql.Tuple{
  139. Emitter: "test",
  140. Message: xsql.Message{
  141. "a": map[string]interface{}{"b": "hello"},
  142. },
  143. },
  144. result: []map[string]interface{}{{
  145. "ab": "hello",
  146. }},
  147. },
  148. {
  149. sql: `SELECT a->b AS ab FROM test`,
  150. data: &xsql.Tuple{
  151. Emitter: "test",
  152. Message: xsql.Message{
  153. "name": "name",
  154. },
  155. },
  156. result: []map[string]interface{}{{}},
  157. },
  158. {
  159. sql: `SELECT a->b AS ab FROM test`,
  160. data: &xsql.Tuple{
  161. Emitter: "test",
  162. Message: xsql.Message{
  163. "a": "commonstring",
  164. },
  165. },
  166. result: []map[string]interface{}{{}},
  167. },
  168. {
  169. sql: `SELECT a[0]->b AS ab FROM test`,
  170. data: &xsql.Tuple{
  171. Emitter: "test",
  172. Message: xsql.Message{
  173. "a": []interface{}{
  174. map[string]interface{}{"b": "hello1"},
  175. map[string]interface{}{"b": "hello2"},
  176. },
  177. },
  178. },
  179. result: []map[string]interface{}{{
  180. "ab": "hello1",
  181. }},
  182. },
  183. {
  184. sql: `SELECT a->c->d AS f1 FROM test`,
  185. data: &xsql.Tuple{
  186. Emitter: "test",
  187. Message: xsql.Message{
  188. "a": map[string]interface{}{
  189. "b": "hello",
  190. "c": map[string]interface{}{
  191. "d": 35.2,
  192. },
  193. },
  194. },
  195. },
  196. result: []map[string]interface{}{{
  197. "f1": 35.2,
  198. }},
  199. },
  200. {
  201. sql: `SELECT a->c->d AS f1 FROM test`,
  202. data: &xsql.Tuple{
  203. Emitter: "test",
  204. Message: xsql.Message{
  205. "a": map[string]interface{}{
  206. "b": "hello",
  207. "c": map[string]interface{}{
  208. "e": 35.2,
  209. },
  210. },
  211. },
  212. },
  213. result: []map[string]interface{}{{}},
  214. },
  215. {
  216. sql: `SELECT a->c->d AS f1 FROM test`,
  217. data: &xsql.Tuple{
  218. Emitter: "test",
  219. Message: xsql.Message{
  220. "a": map[string]interface{}{
  221. "b": "hello",
  222. },
  223. },
  224. },
  225. result: []map[string]interface{}{{}},
  226. },
  227. //The int type is not supported yet, the json parser returns float64 for int values
  228. {
  229. sql: `SELECT a->c->d AS f1 FROM test`,
  230. data: &xsql.Tuple{
  231. Emitter: "test",
  232. Message: xsql.Message{
  233. "a": map[string]interface{}{
  234. "b": "hello",
  235. "c": map[string]interface{}{
  236. "d": float64(35),
  237. },
  238. },
  239. },
  240. },
  241. result: []map[string]interface{}{{
  242. "f1": float64(35),
  243. }},
  244. },
  245. {
  246. sql: "SELECT a FROM test",
  247. data: &xsql.Tuple{
  248. Emitter: "test",
  249. Message: xsql.Message{},
  250. },
  251. result: []map[string]interface{}{
  252. {},
  253. },
  254. },
  255. {
  256. sql: "SELECT * FROM test",
  257. data: &xsql.Tuple{
  258. Emitter: "test",
  259. Message: xsql.Message{},
  260. },
  261. result: []map[string]interface{}{
  262. {},
  263. },
  264. },
  265. {
  266. sql: `SELECT * FROM test`,
  267. data: &xsql.Tuple{
  268. Emitter: "test",
  269. Message: xsql.Message{
  270. "a": map[string]interface{}{
  271. "b": "hello",
  272. "c": map[string]interface{}{
  273. "d": 35.2,
  274. },
  275. },
  276. },
  277. },
  278. result: []map[string]interface{}{{
  279. "a": map[string]interface{}{
  280. "b": "hello",
  281. "c": map[string]interface{}{
  282. "d": 35.2,
  283. },
  284. },
  285. }},
  286. },
  287. {
  288. sql: `SELECT * FROM test`,
  289. data: &xsql.Tuple{
  290. Emitter: "test",
  291. Message: xsql.Message{
  292. "a": "val1",
  293. "b": 3.14,
  294. },
  295. },
  296. result: []map[string]interface{}{{
  297. "a": "val1",
  298. "b": 3.14,
  299. }},
  300. },
  301. {
  302. sql: `SELECT 3*4 AS f1 FROM test`,
  303. data: &xsql.Tuple{
  304. Emitter: "test",
  305. Message: xsql.Message{},
  306. },
  307. result: []map[string]interface{}{{
  308. "f1": float64(12),
  309. }},
  310. },
  311. {
  312. sql: `SELECT 4.5*2 AS f1 FROM test`,
  313. data: &xsql.Tuple{
  314. Emitter: "test",
  315. Message: xsql.Message{},
  316. },
  317. result: []map[string]interface{}{{
  318. "f1": float64(9),
  319. }},
  320. },
  321. }
  322. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  323. contextLogger := common.Log.WithField("rule", "TestProjectPlan_Apply1")
  324. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  325. for i, tt := range tests {
  326. stmt, _ := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  327. pp := &ProjectPlan{Fields: stmt.Fields}
  328. pp.isTest = true
  329. result := pp.Apply(ctx, tt.data)
  330. var mapRes []map[string]interface{}
  331. if v, ok := result.([]byte); ok {
  332. err := json.Unmarshal(v, &mapRes)
  333. if err != nil {
  334. t.Errorf("Failed to parse the input into map.\n")
  335. continue
  336. }
  337. //fmt.Printf("%t\n", mapRes["rengine_field_0"])
  338. if !reflect.DeepEqual(tt.result, mapRes) {
  339. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  340. }
  341. } else {
  342. t.Errorf("%d. The returned result is not type of []byte\n", i)
  343. }
  344. }
  345. }
  346. func TestProjectPlan_MultiInput(t *testing.T) {
  347. var tests = []struct {
  348. sql string
  349. data interface{}
  350. result []map[string]interface{}
  351. }{
  352. {
  353. sql: "SELECT * FROM tbl WHERE abc*2+3 > 12 AND abc < 20",
  354. data: &xsql.Tuple{
  355. Emitter: "tbl",
  356. Message: xsql.Message{
  357. "abc": int64(6),
  358. },
  359. },
  360. result: []map[string]interface{}{{
  361. "abc": float64(6), //json marshall problem
  362. }},
  363. },
  364. {
  365. sql: "SELECT abc FROM tbl WHERE abc*2+3 > 12 OR def = \"hello\"",
  366. data: &xsql.Tuple{
  367. Emitter: "tbl",
  368. Message: xsql.Message{
  369. "abc": int64(34),
  370. "def": "hello",
  371. },
  372. },
  373. result: []map[string]interface{}{{
  374. "abc": float64(34),
  375. }},
  376. },
  377. {
  378. sql: "SELECT id1 FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  379. data: xsql.WindowTuplesSet{
  380. xsql.WindowTuples{
  381. Emitter: "src1",
  382. Tuples: []xsql.Tuple{
  383. {
  384. Emitter: "src1",
  385. Message: xsql.Message{"id1": 1, "f1": "v1"},
  386. }, {
  387. Emitter: "src1",
  388. Message: xsql.Message{"id1": 2, "f1": "v2"},
  389. }, {
  390. Emitter: "src1",
  391. Message: xsql.Message{"id1": 3, "f1": "v1"},
  392. },
  393. },
  394. },
  395. },
  396. result: []map[string]interface{}{{
  397. "id1": float64(1),
  398. }, {
  399. "id1": float64(2),
  400. }, {
  401. "id1": float64(3),
  402. }},
  403. },
  404. {
  405. sql: "SELECT id1 FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  406. data: xsql.WindowTuplesSet{
  407. xsql.WindowTuples{
  408. Emitter: "src1",
  409. Tuples: []xsql.Tuple{
  410. {
  411. Emitter: "src1",
  412. Message: xsql.Message{"id1": 1, "f1": "v1"},
  413. }, {
  414. Emitter: "src1",
  415. Message: xsql.Message{"id2": 2, "f1": "v2"},
  416. }, {
  417. Emitter: "src1",
  418. Message: xsql.Message{"id1": 3, "f1": "v1"},
  419. },
  420. },
  421. },
  422. },
  423. result: []map[string]interface{}{{
  424. "id1": float64(1),
  425. }, {}, {
  426. "id1": float64(3),
  427. }},
  428. },
  429. {
  430. sql: "SELECT * FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  431. data: xsql.WindowTuplesSet{
  432. xsql.WindowTuples{
  433. Emitter: "src1",
  434. Tuples: []xsql.Tuple{
  435. {
  436. Emitter: "src1",
  437. Message: xsql.Message{"id1": 1, "f1": "v1"},
  438. }, {
  439. Emitter: "src1",
  440. Message: xsql.Message{"id1": 2, "f1": "v2"},
  441. }, {
  442. Emitter: "src1",
  443. Message: xsql.Message{"id1": 3, "f1": "v1"},
  444. },
  445. },
  446. },
  447. },
  448. result: []map[string]interface{}{{
  449. "id1": float64(1),
  450. "f1": "v1",
  451. }, {
  452. "id1": float64(2),
  453. "f1": "v2",
  454. }, {
  455. "id1": float64(3),
  456. "f1": "v1",
  457. }},
  458. },
  459. {
  460. sql: "SELECT * FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  461. data: xsql.WindowTuplesSet{
  462. xsql.WindowTuples{
  463. Emitter: "src1",
  464. Tuples: []xsql.Tuple{
  465. {
  466. Emitter: "src1",
  467. Message: xsql.Message{"id1": 1, "f1": "v1"},
  468. }, {
  469. Emitter: "src1",
  470. Message: xsql.Message{"id2": 2, "f2": "v2"},
  471. }, {
  472. Emitter: "src1",
  473. Message: xsql.Message{"id1": 3, "f1": "v1"},
  474. },
  475. },
  476. },
  477. },
  478. result: []map[string]interface{}{{
  479. "id1": float64(1),
  480. "f1": "v1",
  481. }, {
  482. "id2": float64(2),
  483. "f2": "v2",
  484. }, {
  485. "id1": float64(3),
  486. "f1": "v1",
  487. }},
  488. },
  489. {
  490. sql: "SELECT src1.* FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  491. data: xsql.WindowTuplesSet{
  492. xsql.WindowTuples{
  493. Emitter: "src1",
  494. Tuples: []xsql.Tuple{
  495. {
  496. Emitter: "src1",
  497. Message: xsql.Message{"id1": 1, "f1": "v1"},
  498. }, {
  499. Emitter: "src1",
  500. Message: xsql.Message{"id1": 2, "f1": "v2"},
  501. }, {
  502. Emitter: "src1",
  503. Message: xsql.Message{"id1": 3, "f1": "v1"},
  504. },
  505. },
  506. },
  507. },
  508. result: []map[string]interface{}{{
  509. "id1": float64(1),
  510. "f1": "v1",
  511. }, {
  512. "id1": float64(2),
  513. "f1": "v2",
  514. }, {
  515. "id1": float64(3),
  516. "f1": "v1",
  517. }},
  518. },
  519. {
  520. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  521. data: xsql.JoinTupleSets{
  522. xsql.JoinTuple{
  523. Tuples: []xsql.Tuple{
  524. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  525. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  526. },
  527. },
  528. xsql.JoinTuple{
  529. Tuples: []xsql.Tuple{
  530. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  531. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  532. },
  533. },
  534. xsql.JoinTuple{
  535. Tuples: []xsql.Tuple{
  536. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  537. },
  538. },
  539. },
  540. result: []map[string]interface{}{{
  541. "id1": float64(1),
  542. }, {
  543. "id1": float64(2),
  544. }, {
  545. "id1": float64(3),
  546. }},
  547. },
  548. {
  549. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  550. data: xsql.JoinTupleSets{
  551. xsql.JoinTuple{
  552. Tuples: []xsql.Tuple{
  553. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  554. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  555. },
  556. },
  557. xsql.JoinTuple{
  558. Tuples: []xsql.Tuple{
  559. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  560. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  561. },
  562. },
  563. xsql.JoinTuple{
  564. Tuples: []xsql.Tuple{
  565. {Emitter: "src1", Message: xsql.Message{"id2": 3, "f1": "v1"}},
  566. },
  567. },
  568. },
  569. result: []map[string]interface{}{{
  570. "id1": float64(1),
  571. }, {
  572. "id1": float64(2),
  573. }, {}},
  574. },
  575. {
  576. sql: "SELECT abc FROM tbl group by abc",
  577. data: xsql.GroupedTuplesSet{
  578. {
  579. &xsql.Tuple{
  580. Emitter: "tbl",
  581. Message: xsql.Message{
  582. "abc": int64(6),
  583. "def": "hello",
  584. },
  585. },
  586. },
  587. },
  588. result: []map[string]interface{}{{
  589. "abc": float64(6),
  590. }},
  591. },
  592. {
  593. sql: "SELECT abc FROM tbl group by abc",
  594. data: xsql.GroupedTuplesSet{
  595. {
  596. &xsql.Tuple{
  597. Emitter: "tbl",
  598. Message: xsql.Message{
  599. "def": "hello",
  600. },
  601. },
  602. },
  603. },
  604. result: []map[string]interface{}{{}},
  605. },
  606. {
  607. sql: "SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1",
  608. data: xsql.GroupedTuplesSet{
  609. {
  610. &xsql.Tuple{
  611. Emitter: "src1",
  612. Message: xsql.Message{"id1": 1, "f1": "v1"},
  613. },
  614. &xsql.Tuple{
  615. Emitter: "src1",
  616. Message: xsql.Message{"id1": 3, "f1": "v1"},
  617. },
  618. },
  619. {
  620. &xsql.Tuple{
  621. Emitter: "src1",
  622. Message: xsql.Message{"id1": 2, "f1": "v2"},
  623. },
  624. },
  625. },
  626. result: []map[string]interface{}{{
  627. "id1": float64(1),
  628. }, {
  629. "id1": float64(2),
  630. }},
  631. },
  632. {
  633. sql: "SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1",
  634. data: xsql.GroupedTuplesSet{
  635. {
  636. &xsql.Tuple{
  637. Emitter: "src1",
  638. Message: xsql.Message{"id1": 1, "f1": "v1"},
  639. },
  640. &xsql.Tuple{
  641. Emitter: "src1",
  642. Message: xsql.Message{"id1": 3, "f1": "v1"},
  643. },
  644. },
  645. {
  646. &xsql.Tuple{
  647. Emitter: "src1",
  648. Message: xsql.Message{"id2": 2, "f1": "v2"},
  649. },
  650. },
  651. },
  652. result: []map[string]interface{}{{
  653. "id1": float64(1),
  654. }, {}},
  655. },
  656. {
  657. sql: "SELECT src2.id2 FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10)",
  658. data: xsql.GroupedTuplesSet{
  659. {
  660. &xsql.JoinTuple{
  661. Tuples: []xsql.Tuple{
  662. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  663. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  664. },
  665. },
  666. },
  667. {
  668. &xsql.JoinTuple{
  669. Tuples: []xsql.Tuple{
  670. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  671. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  672. },
  673. },
  674. },
  675. {
  676. &xsql.JoinTuple{
  677. Tuples: []xsql.Tuple{
  678. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  679. },
  680. },
  681. },
  682. },
  683. result: []map[string]interface{}{{
  684. "id2": float64(2),
  685. }, {
  686. "id2": float64(4),
  687. }, {}},
  688. },
  689. {
  690. sql: "SELECT src1.*, f2 FROM src1 left join src2 GROUP BY TUMBLINGWINDOW(ss, 10)",
  691. data: xsql.JoinTupleSets{
  692. xsql.JoinTuple{
  693. Tuples: []xsql.Tuple{
  694. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  695. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  696. },
  697. },
  698. xsql.JoinTuple{
  699. Tuples: []xsql.Tuple{
  700. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  701. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  702. },
  703. },
  704. xsql.JoinTuple{
  705. Tuples: []xsql.Tuple{
  706. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  707. },
  708. },
  709. },
  710. result: []map[string]interface{}{{
  711. "id1": float64(1),
  712. "f1": "v1",
  713. "f2": "w2",
  714. }, {
  715. "id1": float64(2),
  716. "f1": "v2",
  717. "f2": "w3",
  718. }, {
  719. "id1": float64(3),
  720. "f1": "v1",
  721. }},
  722. },
  723. {
  724. sql: "SELECT * FROM src1 left join src2 GROUP BY TUMBLINGWINDOW(ss, 10)",
  725. data: xsql.JoinTupleSets{
  726. xsql.JoinTuple{
  727. Tuples: []xsql.Tuple{
  728. {Emitter: "src1", Message: xsql.Message{"id": 1, "f1": "v1"}},
  729. {Emitter: "src2", Message: xsql.Message{"id": 2, "f2": "w2"}},
  730. },
  731. },
  732. xsql.JoinTuple{
  733. Tuples: []xsql.Tuple{
  734. {Emitter: "src1", Message: xsql.Message{"id": 2, "f1": "v2"}},
  735. {Emitter: "src2", Message: xsql.Message{"id": 4, "f2": "w3"}},
  736. },
  737. },
  738. xsql.JoinTuple{
  739. Tuples: []xsql.Tuple{
  740. {Emitter: "src1", Message: xsql.Message{"id": 3, "f1": "v1"}},
  741. },
  742. },
  743. },
  744. result: []map[string]interface{}{{
  745. "id": float64(1),
  746. "f1": "v1",
  747. "f2": "w2",
  748. }, {
  749. "id": float64(2),
  750. "f1": "v2",
  751. "f2": "w3",
  752. }, {
  753. "id": float64(3),
  754. "f1": "v1",
  755. }},
  756. },
  757. {
  758. sql: "SELECT src1.* FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1",
  759. data: xsql.GroupedTuplesSet{
  760. {
  761. &xsql.Tuple{
  762. Emitter: "src1",
  763. Message: xsql.Message{"id1": 1, "f1": "v1"},
  764. },
  765. &xsql.Tuple{
  766. Emitter: "src1",
  767. Message: xsql.Message{"id1": 3, "f1": "v1"},
  768. },
  769. },
  770. {
  771. &xsql.Tuple{
  772. Emitter: "src1",
  773. Message: xsql.Message{"id1": 2, "f1": "v2"},
  774. },
  775. },
  776. },
  777. result: []map[string]interface{}{{
  778. "id1": float64(1),
  779. "f1": "v1",
  780. }, {
  781. "id1": float64(2),
  782. "f1": "v2",
  783. }},
  784. },
  785. {
  786. sql: "SELECT src2.id2, src1.* FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10)",
  787. data: xsql.GroupedTuplesSet{
  788. {
  789. &xsql.JoinTuple{
  790. Tuples: []xsql.Tuple{
  791. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  792. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  793. },
  794. },
  795. },
  796. {
  797. &xsql.JoinTuple{
  798. Tuples: []xsql.Tuple{
  799. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  800. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  801. },
  802. },
  803. },
  804. {
  805. &xsql.JoinTuple{
  806. Tuples: []xsql.Tuple{
  807. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  808. },
  809. },
  810. },
  811. },
  812. result: []map[string]interface{}{{
  813. "id2": float64(2),
  814. "id1": float64(1),
  815. "f1": "v1",
  816. }, {
  817. "id2": float64(4),
  818. "id1": float64(2),
  819. "f1": "v2",
  820. }, {
  821. "id1": float64(3),
  822. "f1": "v1",
  823. }},
  824. },
  825. {
  826. sql: "SELECT src2.id2, src1.* FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10)",
  827. data: xsql.GroupedTuplesSet{
  828. {
  829. &xsql.JoinTuple{
  830. Tuples: []xsql.Tuple{
  831. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  832. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  833. },
  834. },
  835. },
  836. {
  837. &xsql.JoinTuple{
  838. Tuples: []xsql.Tuple{
  839. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  840. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  841. },
  842. },
  843. },
  844. {
  845. &xsql.JoinTuple{
  846. Tuples: []xsql.Tuple{
  847. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  848. },
  849. },
  850. },
  851. },
  852. result: []map[string]interface{}{{
  853. "id2": float64(2),
  854. "id1": float64(1),
  855. "f1": "v1",
  856. }, {
  857. "id2": float64(4),
  858. "id1": float64(2),
  859. "f1": "v2",
  860. }, {
  861. "id1": float64(3),
  862. "f1": "v1",
  863. }},
  864. },
  865. }
  866. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  867. contextLogger := common.Log.WithField("rule", "TestProjectPlan_MultiInput")
  868. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  869. for i, tt := range tests {
  870. stmt, _ := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  871. pp := &ProjectPlan{Fields: stmt.Fields}
  872. pp.isTest = true
  873. result := pp.Apply(ctx, tt.data)
  874. var mapRes []map[string]interface{}
  875. if v, ok := result.([]byte); ok {
  876. err := json.Unmarshal(v, &mapRes)
  877. if err != nil {
  878. t.Errorf("Failed to parse the input into map.\n")
  879. continue
  880. }
  881. //fmt.Printf("%t\n", mapRes["rengine_field_0"])
  882. if !reflect.DeepEqual(tt.result, mapRes) {
  883. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  884. }
  885. } else {
  886. t.Errorf("The returned result is not type of []byte\n")
  887. }
  888. }
  889. }
  890. func TestProjectPlan_Funcs(t *testing.T) {
  891. var tests = []struct {
  892. sql string
  893. data interface{}
  894. result []map[string]interface{}
  895. }{
  896. {
  897. sql: "SELECT round(a) as r FROM test",
  898. data: &xsql.Tuple{
  899. Emitter: "test",
  900. Message: xsql.Message{
  901. "a": 47.5,
  902. },
  903. },
  904. result: []map[string]interface{}{{
  905. "r": float64(48),
  906. }},
  907. }, {
  908. sql: "SELECT round(a) as r FROM test GROUP BY TumblingWindow(ss, 10)",
  909. data: xsql.WindowTuplesSet{
  910. xsql.WindowTuples{
  911. Emitter: "test",
  912. Tuples: []xsql.Tuple{
  913. {
  914. Emitter: "src1",
  915. Message: xsql.Message{"a": 53.1},
  916. }, {
  917. Emitter: "src1",
  918. Message: xsql.Message{"a": 27.4},
  919. }, {
  920. Emitter: "src1",
  921. Message: xsql.Message{"a": 123123.7},
  922. },
  923. },
  924. },
  925. },
  926. result: []map[string]interface{}{{
  927. "r": float64(53),
  928. }, {
  929. "r": float64(27),
  930. }, {
  931. "r": float64(123124),
  932. }},
  933. }, {
  934. sql: "SELECT round(a) as r FROM test GROUP BY TumblingWindow(ss, 10)",
  935. data: xsql.WindowTuplesSet{
  936. xsql.WindowTuples{
  937. Emitter: "test",
  938. Tuples: []xsql.Tuple{
  939. {
  940. Emitter: "src1",
  941. Message: xsql.Message{"a": 53.1},
  942. }, {
  943. Emitter: "src1",
  944. Message: xsql.Message{"a": 27.4},
  945. }, {
  946. Emitter: "src1",
  947. Message: xsql.Message{"a": 123123.7},
  948. },
  949. },
  950. },
  951. },
  952. result: []map[string]interface{}{{
  953. "r": float64(53),
  954. }, {
  955. "r": float64(27),
  956. }, {
  957. "r": float64(123124),
  958. }},
  959. }, {
  960. sql: "SELECT round(a) as r FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  961. data: xsql.JoinTupleSets{
  962. xsql.JoinTuple{
  963. Tuples: []xsql.Tuple{
  964. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 65.55}},
  965. {Emitter: "test1", Message: xsql.Message{"id": 1, "b": 12}},
  966. },
  967. },
  968. xsql.JoinTuple{
  969. Tuples: []xsql.Tuple{
  970. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 73.499}},
  971. {Emitter: "test1", Message: xsql.Message{"id": 2, "b": 34}},
  972. },
  973. },
  974. xsql.JoinTuple{
  975. Tuples: []xsql.Tuple{
  976. {Emitter: "test", Message: xsql.Message{"id": 3, "a": 88.88}},
  977. {Emitter: "test1", Message: xsql.Message{"id": 3, "b": 6}},
  978. },
  979. },
  980. },
  981. result: []map[string]interface{}{{
  982. "r": float64(66),
  983. }, {
  984. "r": float64(73),
  985. }, {
  986. "r": float64(89),
  987. }},
  988. }, {
  989. sql: "SELECT CONCAT(test.id, test.a, test1.b) as concat FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  990. data: xsql.JoinTupleSets{
  991. xsql.JoinTuple{
  992. Tuples: []xsql.Tuple{
  993. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 65.55}},
  994. {Emitter: "test1", Message: xsql.Message{"id": 1, "b": 12}},
  995. },
  996. },
  997. xsql.JoinTuple{
  998. Tuples: []xsql.Tuple{
  999. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 73.499}},
  1000. {Emitter: "test1", Message: xsql.Message{"id": 2, "b": 34}},
  1001. },
  1002. },
  1003. xsql.JoinTuple{
  1004. Tuples: []xsql.Tuple{
  1005. {Emitter: "test", Message: xsql.Message{"id": 3, "a": 88.88}},
  1006. {Emitter: "test1", Message: xsql.Message{"id": 3, "b": 6}},
  1007. },
  1008. },
  1009. },
  1010. result: []map[string]interface{}{{
  1011. "concat": "165.5512",
  1012. }, {
  1013. "concat": "273.49934",
  1014. }, {
  1015. "concat": "388.886",
  1016. }},
  1017. }, {
  1018. sql: "SELECT count(a) as r FROM test",
  1019. data: &xsql.Tuple{
  1020. Emitter: "test",
  1021. Message: xsql.Message{
  1022. "a": 47.5,
  1023. },
  1024. },
  1025. result: []map[string]interface{}{{
  1026. "r": float64(1),
  1027. }},
  1028. },
  1029. }
  1030. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1031. contextLogger := common.Log.WithField("rule", "TestProjectPlan_Funcs")
  1032. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  1033. for i, tt := range tests {
  1034. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1035. if err != nil {
  1036. t.Error(err)
  1037. }
  1038. pp := &ProjectPlan{Fields: stmt.Fields, IsAggregate: xsql.IsAggStatement(stmt)}
  1039. pp.isTest = true
  1040. result := pp.Apply(ctx, tt.data)
  1041. var mapRes []map[string]interface{}
  1042. if v, ok := result.([]byte); ok {
  1043. err := json.Unmarshal(v, &mapRes)
  1044. if err != nil {
  1045. t.Errorf("Failed to parse the input into map.\n")
  1046. continue
  1047. }
  1048. //fmt.Printf("%t\n", mapRes["rengine_field_0"])
  1049. if !reflect.DeepEqual(tt.result, mapRes) {
  1050. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  1051. }
  1052. } else {
  1053. t.Errorf("%d. The returned result is not type of []byte\n", i)
  1054. }
  1055. }
  1056. }
  1057. func TestProjectPlan_AggFuncs(t *testing.T) {
  1058. var tests = []struct {
  1059. sql string
  1060. data interface{}
  1061. result []map[string]interface{}
  1062. }{
  1063. {
  1064. sql: "SELECT count(*) as c, round(a) as r FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1065. data: xsql.GroupedTuplesSet{
  1066. {
  1067. &xsql.JoinTuple{
  1068. Tuples: []xsql.Tuple{
  1069. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  1070. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1071. },
  1072. },
  1073. &xsql.JoinTuple{
  1074. Tuples: []xsql.Tuple{
  1075. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1076. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1077. },
  1078. },
  1079. },
  1080. {
  1081. &xsql.JoinTuple{
  1082. Tuples: []xsql.Tuple{
  1083. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
  1084. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1085. },
  1086. },
  1087. &xsql.JoinTuple{
  1088. Tuples: []xsql.Tuple{
  1089. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1090. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1091. },
  1092. },
  1093. },
  1094. },
  1095. result: []map[string]interface{}{{
  1096. "c": float64(2),
  1097. "r": float64(122),
  1098. }, {
  1099. "c": float64(2),
  1100. "r": float64(89),
  1101. }},
  1102. },
  1103. {
  1104. sql: "SELECT avg(a) as avg FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1105. data: xsql.GroupedTuplesSet{
  1106. {
  1107. &xsql.JoinTuple{
  1108. Tuples: []xsql.Tuple{
  1109. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  1110. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1111. },
  1112. },
  1113. &xsql.JoinTuple{
  1114. Tuples: []xsql.Tuple{
  1115. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.54}},
  1116. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1117. },
  1118. },
  1119. &xsql.JoinTuple{
  1120. Tuples: []xsql.Tuple{
  1121. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 98.31}},
  1122. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w2"}},
  1123. },
  1124. },
  1125. &xsql.JoinTuple{
  1126. Tuples: []xsql.Tuple{
  1127. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.54}},
  1128. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1129. },
  1130. },
  1131. },
  1132. {
  1133. &xsql.JoinTuple{
  1134. Tuples: []xsql.Tuple{
  1135. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
  1136. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1137. },
  1138. },
  1139. &xsql.JoinTuple{
  1140. Tuples: []xsql.Tuple{
  1141. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1142. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1143. },
  1144. },
  1145. },
  1146. },
  1147. result: []map[string]interface{}{{
  1148. "avg": 116.68,
  1149. }, {
  1150. "avg": 51.815,
  1151. }},
  1152. },
  1153. {
  1154. sql: "SELECT max(a) as max FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1155. data: xsql.GroupedTuplesSet{
  1156. {
  1157. &xsql.JoinTuple{
  1158. Tuples: []xsql.Tuple{
  1159. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  1160. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1161. },
  1162. },
  1163. &xsql.JoinTuple{
  1164. Tuples: []xsql.Tuple{
  1165. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.55}},
  1166. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1167. },
  1168. },
  1169. &xsql.JoinTuple{
  1170. Tuples: []xsql.Tuple{
  1171. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1172. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1173. },
  1174. },
  1175. },
  1176. {
  1177. &xsql.JoinTuple{
  1178. Tuples: []xsql.Tuple{
  1179. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
  1180. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1181. },
  1182. },
  1183. &xsql.JoinTuple{
  1184. Tuples: []xsql.Tuple{
  1185. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1186. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1187. },
  1188. },
  1189. },
  1190. },
  1191. result: []map[string]interface{}{{
  1192. "max": 177.51,
  1193. }, {
  1194. "max": 89.03,
  1195. }},
  1196. },
  1197. {
  1198. sql: "SELECT min(a) as min FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  1199. data: xsql.JoinTupleSets{
  1200. xsql.JoinTuple{
  1201. Tuples: []xsql.Tuple{
  1202. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  1203. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1204. },
  1205. },
  1206. xsql.JoinTuple{
  1207. Tuples: []xsql.Tuple{
  1208. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.55}},
  1209. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1210. },
  1211. },
  1212. xsql.JoinTuple{
  1213. Tuples: []xsql.Tuple{
  1214. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1215. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1216. },
  1217. },
  1218. },
  1219. result: []map[string]interface{}{{
  1220. "min": 68.55,
  1221. }},
  1222. }, {
  1223. sql: "SELECT sum(a) as sum FROM test GROUP BY TumblingWindow(ss, 10)",
  1224. data: xsql.WindowTuplesSet{
  1225. xsql.WindowTuples{
  1226. Emitter: "test",
  1227. Tuples: []xsql.Tuple{
  1228. {
  1229. Emitter: "src1",
  1230. Message: xsql.Message{"a": 53},
  1231. }, {
  1232. Emitter: "src1",
  1233. Message: xsql.Message{"a": 27},
  1234. }, {
  1235. Emitter: "src1",
  1236. Message: xsql.Message{"a": 123123},
  1237. },
  1238. },
  1239. },
  1240. },
  1241. result: []map[string]interface{}{{
  1242. "sum": float64(123203),
  1243. }},
  1244. }, {
  1245. sql: "SELECT sum(a) as sum FROM test GROUP BY TumblingWindow(ss, 10)",
  1246. data: xsql.WindowTuplesSet{
  1247. xsql.WindowTuples{
  1248. Emitter: "test",
  1249. Tuples: []xsql.Tuple{
  1250. {
  1251. Emitter: "src1",
  1252. Message: xsql.Message{"a": 53},
  1253. }, {
  1254. Emitter: "src1",
  1255. Message: xsql.Message{"a": 27},
  1256. }, {
  1257. Emitter: "src1",
  1258. Message: xsql.Message{"a": 123123},
  1259. },
  1260. },
  1261. },
  1262. },
  1263. result: []map[string]interface{}{{
  1264. "sum": float64(123203),
  1265. }},
  1266. },
  1267. }
  1268. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1269. contextLogger := common.Log.WithField("rule", "TestProjectPlan_AggFuncs")
  1270. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  1271. for i, tt := range tests {
  1272. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1273. if err != nil {
  1274. t.Error(err)
  1275. }
  1276. pp := &ProjectPlan{Fields: stmt.Fields, IsAggregate: true}
  1277. pp.isTest = true
  1278. result := pp.Apply(ctx, tt.data)
  1279. var mapRes []map[string]interface{}
  1280. if v, ok := result.([]byte); ok {
  1281. err := json.Unmarshal(v, &mapRes)
  1282. if err != nil {
  1283. t.Errorf("Failed to parse the input into map.\n")
  1284. continue
  1285. }
  1286. //fmt.Printf("%t\n", mapRes["rengine_field_0"])
  1287. if !reflect.DeepEqual(tt.result, mapRes) {
  1288. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  1289. }
  1290. } else {
  1291. t.Errorf("%d. %q\n\nThe returned result is not type of []byte: %#v\n", i, tt.sql, result)
  1292. }
  1293. }
  1294. }
  1295. func TestProjectPlanError(t *testing.T) {
  1296. var tests = []struct {
  1297. sql string
  1298. data interface{}
  1299. result interface{}
  1300. }{
  1301. {
  1302. sql: "SELECT a FROM test",
  1303. data: errors.New("an error from upstream"),
  1304. result: errors.New("an error from upstream"),
  1305. }, {
  1306. sql: "SELECT a * 5 FROM test",
  1307. data: &xsql.Tuple{
  1308. Emitter: "test",
  1309. Message: xsql.Message{
  1310. "a": "val_a",
  1311. },
  1312. },
  1313. result: errors.New("run Select error: invalid operation string(val_a) * int64(5)"),
  1314. }, {
  1315. sql: `SELECT a[0]->b AS ab FROM test`,
  1316. data: &xsql.Tuple{
  1317. Emitter: "test",
  1318. Message: xsql.Message{
  1319. "a": "common string",
  1320. },
  1321. },
  1322. result: errors.New("run Select error: invalid operation string(common string) [] *xsql.BracketEvalResult(&{0 0})"),
  1323. }, {
  1324. sql: `SELECT round(a) as r FROM test`,
  1325. data: &xsql.Tuple{
  1326. Emitter: "test",
  1327. Message: xsql.Message{
  1328. "a": "common string",
  1329. },
  1330. },
  1331. result: errors.New("run Select error: call func round error: only float64 & int type are supported"),
  1332. }, {
  1333. sql: `SELECT round(a) as r FROM test`,
  1334. data: &xsql.Tuple{
  1335. Emitter: "test",
  1336. Message: xsql.Message{
  1337. "abc": "common string",
  1338. },
  1339. },
  1340. result: errors.New("run Select error: call func round error: only float64 & int type are supported"),
  1341. }, {
  1342. sql: "SELECT avg(a) as avg FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1343. data: xsql.GroupedTuplesSet{
  1344. {
  1345. &xsql.JoinTuple{
  1346. Tuples: []xsql.Tuple{
  1347. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  1348. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1349. },
  1350. },
  1351. &xsql.JoinTuple{
  1352. Tuples: []xsql.Tuple{
  1353. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.54}},
  1354. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1355. },
  1356. },
  1357. &xsql.JoinTuple{
  1358. Tuples: []xsql.Tuple{
  1359. {Emitter: "test", Message: xsql.Message{"id": 4, "a": "dde"}},
  1360. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w2"}},
  1361. },
  1362. },
  1363. &xsql.JoinTuple{
  1364. Tuples: []xsql.Tuple{
  1365. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.54}},
  1366. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1367. },
  1368. },
  1369. },
  1370. {
  1371. &xsql.JoinTuple{
  1372. Tuples: []xsql.Tuple{
  1373. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
  1374. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1375. },
  1376. },
  1377. &xsql.JoinTuple{
  1378. Tuples: []xsql.Tuple{
  1379. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1380. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1381. },
  1382. },
  1383. },
  1384. },
  1385. result: errors.New("run Select error: call func avg error: requires float64 but found string(dde)"),
  1386. }, {
  1387. sql: "SELECT sum(a) as sum FROM test GROUP BY TumblingWindow(ss, 10)",
  1388. data: xsql.WindowTuplesSet{
  1389. xsql.WindowTuples{
  1390. Emitter: "test",
  1391. Tuples: []xsql.Tuple{
  1392. {
  1393. Emitter: "src1",
  1394. Message: xsql.Message{"a": 53},
  1395. }, {
  1396. Emitter: "src1",
  1397. Message: xsql.Message{"a": "ddd"},
  1398. }, {
  1399. Emitter: "src1",
  1400. Message: xsql.Message{"a": 123123},
  1401. },
  1402. },
  1403. },
  1404. },
  1405. result: errors.New("run Select error: call func sum error: requires int but found string(ddd)"),
  1406. },
  1407. }
  1408. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1409. contextLogger := common.Log.WithField("rule", "TestProjectPlanError")
  1410. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  1411. for i, tt := range tests {
  1412. stmt, _ := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1413. pp := &ProjectPlan{Fields: stmt.Fields, IsAggregate: xsql.IsAggStatement(stmt)}
  1414. pp.isTest = true
  1415. result := pp.Apply(ctx, tt.data)
  1416. if !reflect.DeepEqual(tt.result, result) {
  1417. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  1418. }
  1419. }
  1420. }