project_test.go 57 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299
  1. package operator
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "github.com/lf-edge/ekuiper/internal/conf"
  7. "github.com/lf-edge/ekuiper/internal/topo/context"
  8. "github.com/lf-edge/ekuiper/internal/xsql"
  9. "github.com/lf-edge/ekuiper/pkg/ast"
  10. "github.com/lf-edge/ekuiper/pkg/cast"
  11. "reflect"
  12. "strings"
  13. "testing"
  14. )
  15. func TestProjectPlan_Apply1(t *testing.T) {
  16. var tests = []struct {
  17. sql string
  18. data *xsql.Tuple
  19. result []map[string]interface{}
  20. }{
  21. {
  22. sql: "SELECT a FROM test",
  23. data: &xsql.Tuple{
  24. Emitter: "test",
  25. Message: xsql.Message{
  26. "a": "val_a",
  27. },
  28. Metadata: xsql.Metadata{
  29. "id": 45,
  30. "other": "mock",
  31. },
  32. },
  33. result: []map[string]interface{}{{
  34. "a": "val_a",
  35. "__meta": map[string]interface{}{
  36. "id": float64(45),
  37. "other": "mock",
  38. },
  39. }},
  40. },
  41. {
  42. sql: "SELECT b FROM test",
  43. data: &xsql.Tuple{
  44. Emitter: "test",
  45. Message: xsql.Message{
  46. "a": "val_a",
  47. },
  48. },
  49. result: []map[string]interface{}{{}},
  50. },
  51. {
  52. sql: "SELECT ts FROM test",
  53. data: &xsql.Tuple{
  54. Emitter: "test",
  55. Message: xsql.Message{
  56. "a": "val_a",
  57. "ts": cast.TimeFromUnixMilli(1568854573431),
  58. },
  59. },
  60. result: []map[string]interface{}{{
  61. "ts": "2019-09-19T00:56:13.431Z",
  62. }},
  63. },
  64. //Schemaless may return a message without selecting column
  65. {
  66. sql: "SELECT ts FROM test",
  67. data: &xsql.Tuple{
  68. Emitter: "test",
  69. Message: xsql.Message{
  70. "a": "val_a",
  71. "ts2": cast.TimeFromUnixMilli(1568854573431),
  72. },
  73. },
  74. result: []map[string]interface{}{{}},
  75. },
  76. {
  77. sql: "SELECT A FROM test",
  78. data: &xsql.Tuple{
  79. Emitter: "test",
  80. Message: xsql.Message{
  81. "a": "val_a",
  82. },
  83. },
  84. result: []map[string]interface{}{{
  85. "A": "val_a",
  86. }},
  87. },
  88. {
  89. sql: `SELECT "value" FROM test`,
  90. data: &xsql.Tuple{
  91. Emitter: "test",
  92. Message: xsql.Message{},
  93. },
  94. result: []map[string]interface{}{{
  95. "": "value",
  96. }},
  97. },
  98. {
  99. sql: `SELECT 3.4 FROM test`,
  100. data: &xsql.Tuple{
  101. Emitter: "test",
  102. Message: xsql.Message{},
  103. },
  104. result: []map[string]interface{}{{
  105. "": 3.4,
  106. }},
  107. },
  108. {
  109. sql: `SELECT 5 FROM test`,
  110. data: &xsql.Tuple{
  111. Emitter: "test",
  112. Message: xsql.Message{},
  113. },
  114. result: []map[string]interface{}{{
  115. "": 5.0,
  116. }},
  117. },
  118. {
  119. sql: `SELECT a, "value" AS b FROM test`,
  120. data: &xsql.Tuple{
  121. Emitter: "test",
  122. Message: xsql.Message{
  123. "a": "val_a",
  124. },
  125. },
  126. result: []map[string]interface{}{{
  127. "a": "val_a",
  128. "b": "value",
  129. }},
  130. },
  131. {
  132. sql: `SELECT a, "value" AS b, 3.14 as Pi, 0 as Zero FROM test`,
  133. data: &xsql.Tuple{
  134. Emitter: "test",
  135. Message: xsql.Message{
  136. "a": "val_a",
  137. },
  138. },
  139. result: []map[string]interface{}{{
  140. "a": "val_a",
  141. "b": "value",
  142. "Pi": 3.14,
  143. "Zero": 0.0,
  144. }},
  145. },
  146. {
  147. sql: `SELECT a->b AS ab FROM test`,
  148. data: &xsql.Tuple{
  149. Emitter: "test",
  150. Message: xsql.Message{
  151. "a": map[string]interface{}{"b": "hello"},
  152. },
  153. },
  154. result: []map[string]interface{}{{
  155. "ab": "hello",
  156. }},
  157. },
  158. {
  159. sql: `SELECT a->b AS ab FROM test`,
  160. data: &xsql.Tuple{
  161. Emitter: "test",
  162. Message: xsql.Message{
  163. "a": map[string]interface{}(nil),
  164. },
  165. },
  166. result: []map[string]interface{}{{}},
  167. },
  168. {
  169. sql: `SELECT a->b AS ab FROM test`,
  170. data: &xsql.Tuple{
  171. Emitter: "test",
  172. Message: xsql.Message{
  173. "name": "name",
  174. },
  175. },
  176. result: []map[string]interface{}{{}},
  177. },
  178. {
  179. sql: `SELECT a->b AS ab FROM test`,
  180. data: &xsql.Tuple{
  181. Emitter: "test",
  182. Message: xsql.Message{
  183. "a": "commonstring",
  184. },
  185. },
  186. result: []map[string]interface{}{{}},
  187. },
  188. {
  189. sql: `SELECT a[0]->b AS ab FROM test`,
  190. data: &xsql.Tuple{
  191. Emitter: "test",
  192. Message: xsql.Message{
  193. "a": []interface{}{
  194. map[string]interface{}{"b": "hello1"},
  195. map[string]interface{}{"b": "hello2"},
  196. },
  197. },
  198. },
  199. result: []map[string]interface{}{{
  200. "ab": "hello1",
  201. }},
  202. },
  203. {
  204. sql: `SELECT a[0]->b AS ab FROM test`,
  205. data: &xsql.Tuple{
  206. Emitter: "test",
  207. Message: xsql.Message{
  208. "a": []map[string]interface{}{
  209. {"b": "hello1"},
  210. {"b": "hello2"},
  211. },
  212. },
  213. },
  214. result: []map[string]interface{}{{
  215. "ab": "hello1",
  216. }},
  217. },
  218. {
  219. sql: `SELECT a[2:4] AS ab FROM test`,
  220. data: &xsql.Tuple{
  221. Emitter: "test",
  222. Message: xsql.Message{
  223. "a": []map[string]interface{}{
  224. {"b": "hello1"},
  225. {"b": "hello2"},
  226. {"b": "hello3"},
  227. {"b": "hello4"},
  228. {"b": "hello5"},
  229. },
  230. },
  231. },
  232. result: []map[string]interface{}{{
  233. "ab": []interface{}{
  234. map[string]interface{}{"b": "hello3"},
  235. map[string]interface{}{"b": "hello4"},
  236. },
  237. }},
  238. },
  239. {
  240. sql: `SELECT a[2:] AS ab FROM test`,
  241. data: &xsql.Tuple{
  242. Emitter: "test",
  243. Message: xsql.Message{
  244. "a": []map[string]interface{}{
  245. {"b": "hello1"},
  246. {"b": "hello2"},
  247. {"b": "hello3"},
  248. {"b": "hello4"},
  249. {"b": "hello5"},
  250. },
  251. },
  252. },
  253. result: []map[string]interface{}{{
  254. "ab": []interface{}{
  255. map[string]interface{}{"b": "hello3"},
  256. map[string]interface{}{"b": "hello4"},
  257. map[string]interface{}{"b": "hello5"},
  258. },
  259. }},
  260. },
  261. {
  262. sql: `SELECT a[2:] AS ab FROM test`,
  263. data: &xsql.Tuple{
  264. Emitter: "test",
  265. Message: xsql.Message{
  266. "a": []interface{}{
  267. true, false, true, false, true, true,
  268. },
  269. },
  270. },
  271. result: []map[string]interface{}{{
  272. "ab": []interface{}{
  273. true, false, true, true,
  274. },
  275. }},
  276. },
  277. {
  278. sql: `SELECT a[:4] AS ab FROM test`,
  279. data: &xsql.Tuple{
  280. Emitter: "test",
  281. Message: xsql.Message{
  282. "a": []interface{}{
  283. true, false, true, false, true, true,
  284. },
  285. },
  286. },
  287. result: []map[string]interface{}{{
  288. "ab": []interface{}{
  289. true, false, true, false,
  290. },
  291. }},
  292. },
  293. {
  294. sql: `SELECT a[:4] AS ab FROM test`,
  295. data: &xsql.Tuple{
  296. Emitter: "test",
  297. Message: xsql.Message{
  298. "a": []interface{}{
  299. 3.14, 3.141, 3.1415, 3.14159, 3.141592, 3.1415926,
  300. },
  301. },
  302. },
  303. result: []map[string]interface{}{{
  304. "ab": []interface{}{
  305. 3.14, 3.141, 3.1415, 3.14159,
  306. },
  307. }},
  308. },
  309. {
  310. sql: `SELECT a->b[:4] AS ab FROM test`,
  311. data: &xsql.Tuple{
  312. Emitter: "test",
  313. Message: xsql.Message{
  314. "a": map[string]interface{}{
  315. "b": []float64{3.14, 3.141, 3.1415, 3.14159, 3.141592, 3.1415926},
  316. },
  317. },
  318. },
  319. result: []map[string]interface{}{{
  320. "ab": []interface{}{
  321. 3.14, 3.141, 3.1415, 3.14159,
  322. },
  323. }},
  324. },
  325. {
  326. sql: `SELECT a->b[0:1] AS ab FROM test`,
  327. data: &xsql.Tuple{
  328. Emitter: "test",
  329. Message: xsql.Message{
  330. "a": map[string]interface{}{
  331. "b": []float64{3.14, 3.141, 3.1415, 3.14159, 3.141592, 3.1415926},
  332. },
  333. },
  334. },
  335. result: []map[string]interface{}{{
  336. "ab": []interface{}{
  337. 3.14,
  338. },
  339. }},
  340. },
  341. {
  342. sql: `SELECT a->c->d AS f1 FROM test`,
  343. data: &xsql.Tuple{
  344. Emitter: "test",
  345. Message: xsql.Message{
  346. "a": map[string]interface{}{
  347. "b": "hello",
  348. "c": map[string]interface{}{
  349. "d": 35.2,
  350. },
  351. },
  352. },
  353. },
  354. result: []map[string]interface{}{{
  355. "f1": 35.2,
  356. }},
  357. },
  358. {
  359. sql: `SELECT a->c->d AS f1 FROM test`,
  360. data: &xsql.Tuple{
  361. Emitter: "test",
  362. Message: xsql.Message{
  363. "a": map[string]interface{}{
  364. "b": "hello",
  365. "c": map[string]interface{}{
  366. "e": 35.2,
  367. },
  368. },
  369. },
  370. },
  371. result: []map[string]interface{}{{}},
  372. },
  373. {
  374. sql: `SELECT a->c->d AS f1 FROM test`,
  375. data: &xsql.Tuple{
  376. Emitter: "test",
  377. Message: xsql.Message{
  378. "a": map[string]interface{}{
  379. "b": "hello",
  380. },
  381. },
  382. },
  383. result: []map[string]interface{}{{}},
  384. },
  385. //The int type is not supported yet, the json parser returns float64 for int values
  386. {
  387. sql: `SELECT a->c->d AS f1 FROM test`,
  388. data: &xsql.Tuple{
  389. Emitter: "test",
  390. Message: xsql.Message{
  391. "a": map[string]interface{}{
  392. "b": "hello",
  393. "c": map[string]interface{}{
  394. "d": float64(35),
  395. },
  396. },
  397. },
  398. },
  399. result: []map[string]interface{}{{
  400. "f1": float64(35),
  401. }},
  402. },
  403. {
  404. sql: "SELECT a FROM test",
  405. data: &xsql.Tuple{
  406. Emitter: "test",
  407. Message: xsql.Message{},
  408. },
  409. result: []map[string]interface{}{
  410. {},
  411. },
  412. },
  413. {
  414. sql: "SELECT * FROM test",
  415. data: &xsql.Tuple{
  416. Emitter: "test",
  417. Message: xsql.Message{},
  418. },
  419. result: []map[string]interface{}{
  420. {},
  421. },
  422. },
  423. {
  424. sql: `SELECT * FROM test`,
  425. data: &xsql.Tuple{
  426. Emitter: "test",
  427. Message: xsql.Message{
  428. "a": map[string]interface{}{
  429. "b": "hello",
  430. "c": map[string]interface{}{
  431. "d": 35.2,
  432. },
  433. },
  434. },
  435. },
  436. result: []map[string]interface{}{{
  437. "a": map[string]interface{}{
  438. "b": "hello",
  439. "c": map[string]interface{}{
  440. "d": 35.2,
  441. },
  442. },
  443. }},
  444. },
  445. {
  446. sql: `SELECT * FROM test`,
  447. data: &xsql.Tuple{
  448. Emitter: "test",
  449. Message: xsql.Message{
  450. "a": "val1",
  451. "b": 3.14,
  452. },
  453. },
  454. result: []map[string]interface{}{{
  455. "a": "val1",
  456. "b": 3.14,
  457. }},
  458. },
  459. {
  460. sql: `SELECT 3*4 AS f1 FROM test`,
  461. data: &xsql.Tuple{
  462. Emitter: "test",
  463. Message: xsql.Message{},
  464. },
  465. result: []map[string]interface{}{{
  466. "f1": float64(12),
  467. }},
  468. },
  469. {
  470. sql: `SELECT 4.5*2 AS f1 FROM test`,
  471. data: &xsql.Tuple{
  472. Emitter: "test",
  473. Message: xsql.Message{},
  474. },
  475. result: []map[string]interface{}{{
  476. "f1": float64(9),
  477. }},
  478. },
  479. {
  480. sql: "SELECT `a.b.c` FROM test",
  481. data: &xsql.Tuple{
  482. Emitter: "test",
  483. Message: xsql.Message{
  484. "a.b.c": "val_a",
  485. },
  486. },
  487. result: []map[string]interface{}{{
  488. "a.b.c": "val_a",
  489. }},
  490. },
  491. {
  492. sql: `SELECT CASE a WHEN 10 THEN "true" END AS b FROM test`,
  493. data: &xsql.Tuple{
  494. Emitter: "test",
  495. Message: xsql.Message{
  496. "a": int64(10),
  497. },
  498. },
  499. result: []map[string]interface{}{{
  500. "b": "true",
  501. }},
  502. },
  503. }
  504. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  505. contextLogger := conf.Log.WithField("rule", "TestProjectPlan_Apply1")
  506. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  507. for i, tt := range tests {
  508. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  509. if err != nil {
  510. t.Errorf("parse sql error: %s", err)
  511. continue
  512. }
  513. pp := &ProjectOp{Fields: stmt.Fields, SendMeta: true}
  514. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  515. result := pp.Apply(ctx, tt.data, fv, afv)
  516. var mapRes []map[string]interface{}
  517. if v, ok := result.([]byte); ok {
  518. err := json.Unmarshal(v, &mapRes)
  519. if err != nil {
  520. t.Errorf("Failed to parse the input into map.\n")
  521. continue
  522. }
  523. //fmt.Printf("%t\n", mapRes["kuiper_field_0"])
  524. if !reflect.DeepEqual(tt.result, mapRes) {
  525. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  526. }
  527. } else {
  528. t.Errorf("%d. The returned result %#v is not type of []byte\n", result, i)
  529. }
  530. }
  531. }
  532. func TestProjectPlan_MultiInput(t *testing.T) {
  533. var tests = []struct {
  534. sql string
  535. data interface{}
  536. result []map[string]interface{}
  537. }{
  538. {
  539. sql: "SELECT * FROM tbl WHERE abc*2+3 > 12 AND abc < 20",
  540. data: &xsql.Tuple{
  541. Emitter: "tbl",
  542. Message: xsql.Message{
  543. "abc": int64(6),
  544. },
  545. },
  546. result: []map[string]interface{}{{
  547. "abc": float64(6), //json marshall problem
  548. }},
  549. },
  550. {
  551. sql: "SELECT abc FROM tbl WHERE abc*2+3 > 12 OR def = \"hello\"",
  552. data: &xsql.Tuple{
  553. Emitter: "tbl",
  554. Message: xsql.Message{
  555. "abc": int64(34),
  556. "def": "hello",
  557. },
  558. },
  559. result: []map[string]interface{}{{
  560. "abc": float64(34),
  561. }},
  562. },
  563. {
  564. sql: "SELECT id1 FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  565. data: xsql.WindowTuplesSet{
  566. Content: []xsql.WindowTuples{
  567. {
  568. Emitter: "src1",
  569. Tuples: []xsql.Tuple{
  570. {
  571. Emitter: "src1",
  572. Message: xsql.Message{"id1": 1, "f1": "v1"},
  573. }, {
  574. Emitter: "src1",
  575. Message: xsql.Message{"id1": 2, "f1": "v2"},
  576. }, {
  577. Emitter: "src1",
  578. Message: xsql.Message{"id1": 3, "f1": "v1"},
  579. },
  580. },
  581. },
  582. },
  583. },
  584. result: []map[string]interface{}{{
  585. "id1": float64(1),
  586. }, {
  587. "id1": float64(2),
  588. }, {
  589. "id1": float64(3),
  590. }},
  591. },
  592. {
  593. sql: "SELECT id1 FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  594. data: xsql.WindowTuplesSet{
  595. Content: []xsql.WindowTuples{
  596. {
  597. Emitter: "src1",
  598. Tuples: []xsql.Tuple{
  599. {
  600. Emitter: "src1",
  601. Message: xsql.Message{"id1": 1, "f1": "v1"},
  602. }, {
  603. Emitter: "src1",
  604. Message: xsql.Message{"id2": 2, "f1": "v2"},
  605. }, {
  606. Emitter: "src1",
  607. Message: xsql.Message{"id1": 3, "f1": "v1"},
  608. },
  609. },
  610. },
  611. },
  612. },
  613. result: []map[string]interface{}{{
  614. "id1": float64(1),
  615. }, {}, {
  616. "id1": float64(3),
  617. }},
  618. },
  619. {
  620. sql: "SELECT * FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  621. data: xsql.WindowTuplesSet{
  622. Content: []xsql.WindowTuples{
  623. {
  624. Emitter: "src1",
  625. Tuples: []xsql.Tuple{
  626. {
  627. Emitter: "src1",
  628. Message: xsql.Message{"id1": 1, "f1": "v1"},
  629. }, {
  630. Emitter: "src1",
  631. Message: xsql.Message{"id1": 2, "f1": "v2"},
  632. }, {
  633. Emitter: "src1",
  634. Message: xsql.Message{"id1": 3, "f1": "v1"},
  635. },
  636. },
  637. },
  638. },
  639. },
  640. result: []map[string]interface{}{{
  641. "id1": float64(1),
  642. "f1": "v1",
  643. }, {
  644. "id1": float64(2),
  645. "f1": "v2",
  646. }, {
  647. "id1": float64(3),
  648. "f1": "v1",
  649. }},
  650. },
  651. {
  652. sql: "SELECT * FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  653. data: xsql.WindowTuplesSet{
  654. Content: []xsql.WindowTuples{
  655. {
  656. Emitter: "src1",
  657. Tuples: []xsql.Tuple{
  658. {
  659. Emitter: "src1",
  660. Message: xsql.Message{"id1": 1, "f1": "v1"},
  661. }, {
  662. Emitter: "src1",
  663. Message: xsql.Message{"id2": 2, "f2": "v2"},
  664. }, {
  665. Emitter: "src1",
  666. Message: xsql.Message{"id1": 3, "f1": "v1"},
  667. },
  668. },
  669. },
  670. },
  671. },
  672. result: []map[string]interface{}{{
  673. "id1": float64(1),
  674. "f1": "v1",
  675. }, {
  676. "id2": float64(2),
  677. "f2": "v2",
  678. }, {
  679. "id1": float64(3),
  680. "f1": "v1",
  681. }},
  682. },
  683. {
  684. sql: "SELECT src1.* FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  685. data: xsql.WindowTuplesSet{
  686. Content: []xsql.WindowTuples{
  687. {
  688. Emitter: "src1",
  689. Tuples: []xsql.Tuple{
  690. {
  691. Emitter: "src1",
  692. Message: xsql.Message{"id1": 1, "f1": "v1"},
  693. }, {
  694. Emitter: "src1",
  695. Message: xsql.Message{"id1": 2, "f1": "v2"},
  696. }, {
  697. Emitter: "src1",
  698. Message: xsql.Message{"id1": 3, "f1": "v1"},
  699. },
  700. },
  701. },
  702. },
  703. },
  704. result: []map[string]interface{}{{
  705. "id1": float64(1),
  706. "f1": "v1",
  707. }, {
  708. "id1": float64(2),
  709. "f1": "v2",
  710. }, {
  711. "id1": float64(3),
  712. "f1": "v1",
  713. }},
  714. },
  715. {
  716. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  717. data: &xsql.JoinTupleSets{
  718. Content: []xsql.JoinTuple{
  719. {
  720. Tuples: []xsql.Tuple{
  721. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  722. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  723. },
  724. },
  725. {
  726. Tuples: []xsql.Tuple{
  727. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  728. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  729. },
  730. },
  731. {
  732. Tuples: []xsql.Tuple{
  733. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  734. },
  735. },
  736. },
  737. },
  738. result: []map[string]interface{}{{
  739. "id1": float64(1),
  740. }, {
  741. "id1": float64(2),
  742. }, {
  743. "id1": float64(3),
  744. }},
  745. },
  746. {
  747. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  748. data: &xsql.JoinTupleSets{
  749. Content: []xsql.JoinTuple{
  750. {
  751. Tuples: []xsql.Tuple{
  752. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  753. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  754. },
  755. },
  756. {
  757. Tuples: []xsql.Tuple{
  758. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  759. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  760. },
  761. },
  762. {
  763. Tuples: []xsql.Tuple{
  764. {Emitter: "src1", Message: xsql.Message{"id2": 3, "f1": "v1"}},
  765. },
  766. },
  767. },
  768. },
  769. result: []map[string]interface{}{{
  770. "id1": float64(1),
  771. }, {
  772. "id1": float64(2),
  773. }, {}},
  774. },
  775. {
  776. sql: "SELECT abc FROM tbl group by abc",
  777. data: xsql.GroupedTuplesSet{
  778. {
  779. Content: []xsql.DataValuer{
  780. &xsql.Tuple{
  781. Emitter: "tbl",
  782. Message: xsql.Message{
  783. "abc": int64(6),
  784. "def": "hello",
  785. },
  786. },
  787. },
  788. },
  789. },
  790. result: []map[string]interface{}{{
  791. "abc": float64(6),
  792. }},
  793. },
  794. {
  795. sql: "SELECT abc FROM tbl group by abc",
  796. data: xsql.GroupedTuplesSet{
  797. {
  798. Content: []xsql.DataValuer{
  799. &xsql.Tuple{
  800. Emitter: "tbl",
  801. Message: xsql.Message{
  802. "def": "hello",
  803. },
  804. },
  805. },
  806. },
  807. },
  808. result: []map[string]interface{}{{}},
  809. },
  810. {
  811. sql: "SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1",
  812. data: xsql.GroupedTuplesSet{
  813. {
  814. Content: []xsql.DataValuer{
  815. &xsql.Tuple{
  816. Emitter: "src1",
  817. Message: xsql.Message{"id1": 1, "f1": "v1"},
  818. },
  819. &xsql.Tuple{
  820. Emitter: "src1",
  821. Message: xsql.Message{"id1": 3, "f1": "v1"},
  822. },
  823. },
  824. },
  825. {
  826. Content: []xsql.DataValuer{
  827. &xsql.Tuple{
  828. Emitter: "src1",
  829. Message: xsql.Message{"id1": 2, "f1": "v2"},
  830. },
  831. },
  832. },
  833. },
  834. result: []map[string]interface{}{{
  835. "id1": float64(1),
  836. }, {
  837. "id1": float64(2),
  838. }},
  839. },
  840. {
  841. sql: "SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1",
  842. data: xsql.GroupedTuplesSet{
  843. {
  844. Content: []xsql.DataValuer{
  845. &xsql.Tuple{
  846. Emitter: "src1",
  847. Message: xsql.Message{"id1": 1, "f1": "v1"},
  848. },
  849. &xsql.Tuple{
  850. Emitter: "src1",
  851. Message: xsql.Message{"id1": 3, "f1": "v1"},
  852. },
  853. },
  854. },
  855. {
  856. Content: []xsql.DataValuer{
  857. &xsql.Tuple{
  858. Emitter: "src1",
  859. Message: xsql.Message{"id2": 2, "f1": "v2"},
  860. },
  861. },
  862. },
  863. },
  864. result: []map[string]interface{}{{
  865. "id1": float64(1),
  866. }, {}},
  867. },
  868. {
  869. sql: "SELECT src2.id2 FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10)",
  870. data: xsql.GroupedTuplesSet{
  871. {
  872. Content: []xsql.DataValuer{
  873. &xsql.JoinTuple{
  874. Tuples: []xsql.Tuple{
  875. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  876. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  877. },
  878. },
  879. },
  880. },
  881. {
  882. Content: []xsql.DataValuer{
  883. &xsql.JoinTuple{
  884. Tuples: []xsql.Tuple{
  885. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  886. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  887. },
  888. },
  889. },
  890. },
  891. {
  892. Content: []xsql.DataValuer{
  893. &xsql.JoinTuple{
  894. Tuples: []xsql.Tuple{
  895. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  896. },
  897. },
  898. },
  899. },
  900. },
  901. result: []map[string]interface{}{{
  902. "id2": float64(2),
  903. }, {
  904. "id2": float64(4),
  905. }, {}},
  906. },
  907. {
  908. sql: "SELECT src1.*, f2 FROM src1 left join src2 GROUP BY TUMBLINGWINDOW(ss, 10)",
  909. data: &xsql.JoinTupleSets{
  910. Content: []xsql.JoinTuple{
  911. {
  912. Tuples: []xsql.Tuple{
  913. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  914. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  915. },
  916. },
  917. {
  918. Tuples: []xsql.Tuple{
  919. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  920. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  921. },
  922. },
  923. {
  924. Tuples: []xsql.Tuple{
  925. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  926. },
  927. },
  928. },
  929. },
  930. result: []map[string]interface{}{{
  931. "id1": float64(1),
  932. "f1": "v1",
  933. "f2": "w2",
  934. }, {
  935. "id1": float64(2),
  936. "f1": "v2",
  937. "f2": "w3",
  938. }, {
  939. "id1": float64(3),
  940. "f1": "v1",
  941. }},
  942. },
  943. {
  944. sql: "SELECT * FROM src1 left join src2 GROUP BY TUMBLINGWINDOW(ss, 10)",
  945. data: &xsql.JoinTupleSets{
  946. Content: []xsql.JoinTuple{
  947. {
  948. Tuples: []xsql.Tuple{
  949. {Emitter: "src1", Message: xsql.Message{"id": 1, "f1": "v1"}},
  950. {Emitter: "src2", Message: xsql.Message{"id": 2, "f2": "w2"}},
  951. },
  952. },
  953. {
  954. Tuples: []xsql.Tuple{
  955. {Emitter: "src1", Message: xsql.Message{"id": 2, "f1": "v2"}},
  956. {Emitter: "src2", Message: xsql.Message{"id": 4, "f2": "w3"}},
  957. },
  958. },
  959. {
  960. Tuples: []xsql.Tuple{
  961. {Emitter: "src1", Message: xsql.Message{"id": 3, "f1": "v1"}},
  962. },
  963. },
  964. },
  965. },
  966. result: []map[string]interface{}{{
  967. "id": float64(1),
  968. "f1": "v1",
  969. "f2": "w2",
  970. }, {
  971. "id": float64(2),
  972. "f1": "v2",
  973. "f2": "w3",
  974. }, {
  975. "id": float64(3),
  976. "f1": "v1",
  977. }},
  978. },
  979. {
  980. sql: "SELECT src1.* FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1",
  981. data: xsql.GroupedTuplesSet{
  982. {
  983. Content: []xsql.DataValuer{
  984. &xsql.Tuple{
  985. Emitter: "src1",
  986. Message: xsql.Message{"id1": 1, "f1": "v1"},
  987. },
  988. &xsql.Tuple{
  989. Emitter: "src1",
  990. Message: xsql.Message{"id1": 3, "f1": "v1"},
  991. },
  992. },
  993. },
  994. {
  995. Content: []xsql.DataValuer{
  996. &xsql.Tuple{
  997. Emitter: "src1",
  998. Message: xsql.Message{"id1": 2, "f1": "v2"},
  999. },
  1000. },
  1001. },
  1002. },
  1003. result: []map[string]interface{}{{
  1004. "id1": float64(1),
  1005. "f1": "v1",
  1006. }, {
  1007. "id1": float64(2),
  1008. "f1": "v2",
  1009. }},
  1010. },
  1011. {
  1012. sql: "SELECT src2.id2, src1.* FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10)",
  1013. data: xsql.GroupedTuplesSet{
  1014. {
  1015. Content: []xsql.DataValuer{
  1016. &xsql.JoinTuple{
  1017. Tuples: []xsql.Tuple{
  1018. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1019. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1020. },
  1021. },
  1022. },
  1023. },
  1024. {
  1025. Content: []xsql.DataValuer{
  1026. &xsql.JoinTuple{
  1027. Tuples: []xsql.Tuple{
  1028. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1029. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1030. },
  1031. },
  1032. },
  1033. },
  1034. {
  1035. Content: []xsql.DataValuer{
  1036. &xsql.JoinTuple{
  1037. Tuples: []xsql.Tuple{
  1038. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  1039. },
  1040. },
  1041. },
  1042. },
  1043. },
  1044. result: []map[string]interface{}{{
  1045. "id2": float64(2),
  1046. "id1": float64(1),
  1047. "f1": "v1",
  1048. }, {
  1049. "id2": float64(4),
  1050. "id1": float64(2),
  1051. "f1": "v2",
  1052. }, {
  1053. "id1": float64(3),
  1054. "f1": "v1",
  1055. }},
  1056. },
  1057. {
  1058. sql: "SELECT src2.id2, src1.* FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10)",
  1059. data: xsql.GroupedTuplesSet{
  1060. {
  1061. Content: []xsql.DataValuer{
  1062. &xsql.JoinTuple{
  1063. Tuples: []xsql.Tuple{
  1064. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1065. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1066. },
  1067. },
  1068. },
  1069. },
  1070. {
  1071. Content: []xsql.DataValuer{
  1072. &xsql.JoinTuple{
  1073. Tuples: []xsql.Tuple{
  1074. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1075. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1076. },
  1077. },
  1078. },
  1079. },
  1080. {
  1081. Content: []xsql.DataValuer{
  1082. &xsql.JoinTuple{
  1083. Tuples: []xsql.Tuple{
  1084. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  1085. },
  1086. },
  1087. },
  1088. },
  1089. },
  1090. result: []map[string]interface{}{{
  1091. "id2": float64(2),
  1092. "id1": float64(1),
  1093. "f1": "v1",
  1094. }, {
  1095. "id2": float64(4),
  1096. "id1": float64(2),
  1097. "f1": "v2",
  1098. }, {
  1099. "id1": float64(3),
  1100. "f1": "v1",
  1101. }},
  1102. },
  1103. }
  1104. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1105. contextLogger := conf.Log.WithField("rule", "TestProjectPlan_MultiInput")
  1106. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  1107. for i, tt := range tests {
  1108. stmt, _ := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1109. pp := &ProjectOp{Fields: stmt.Fields}
  1110. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  1111. result := pp.Apply(ctx, tt.data, fv, afv)
  1112. var mapRes []map[string]interface{}
  1113. if v, ok := result.([]byte); ok {
  1114. err := json.Unmarshal(v, &mapRes)
  1115. if err != nil {
  1116. t.Errorf("Failed to parse the input into map.\n")
  1117. continue
  1118. }
  1119. //fmt.Printf("%t\n", mapRes["kuiper_field_0"])
  1120. if !reflect.DeepEqual(tt.result, mapRes) {
  1121. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  1122. }
  1123. } else {
  1124. t.Errorf("The returned result is not type of []byte\n")
  1125. }
  1126. }
  1127. }
  1128. func TestProjectPlan_Funcs(t *testing.T) {
  1129. var tests = []struct {
  1130. sql string
  1131. data interface{}
  1132. result []map[string]interface{}
  1133. }{
  1134. {
  1135. sql: "SELECT round(a) as r FROM test",
  1136. data: &xsql.Tuple{
  1137. Emitter: "test",
  1138. Message: xsql.Message{
  1139. "a": 47.5,
  1140. },
  1141. },
  1142. result: []map[string]interface{}{{
  1143. "r": float64(48),
  1144. }},
  1145. }, {
  1146. sql: "SELECT round(a) as r FROM test GROUP BY TumblingWindow(ss, 10)",
  1147. data: xsql.WindowTuplesSet{
  1148. Content: []xsql.WindowTuples{
  1149. {
  1150. Emitter: "test",
  1151. Tuples: []xsql.Tuple{
  1152. {
  1153. Emitter: "src1",
  1154. Message: xsql.Message{"a": 53.1},
  1155. }, {
  1156. Emitter: "src1",
  1157. Message: xsql.Message{"a": 27.4},
  1158. }, {
  1159. Emitter: "src1",
  1160. Message: xsql.Message{"a": 123123.7},
  1161. },
  1162. },
  1163. },
  1164. },
  1165. },
  1166. result: []map[string]interface{}{{
  1167. "r": float64(53),
  1168. }, {
  1169. "r": float64(27),
  1170. }, {
  1171. "r": float64(123124),
  1172. }},
  1173. }, {
  1174. sql: "SELECT round(a) as r FROM test GROUP BY TumblingWindow(ss, 10)",
  1175. data: xsql.WindowTuplesSet{
  1176. Content: []xsql.WindowTuples{
  1177. {
  1178. Emitter: "test",
  1179. Tuples: []xsql.Tuple{
  1180. {
  1181. Emitter: "src1",
  1182. Message: xsql.Message{"a": 53.1},
  1183. }, {
  1184. Emitter: "src1",
  1185. Message: xsql.Message{"a": 27.4},
  1186. }, {
  1187. Emitter: "src1",
  1188. Message: xsql.Message{"a": 123123.7},
  1189. },
  1190. },
  1191. },
  1192. },
  1193. },
  1194. result: []map[string]interface{}{{
  1195. "r": float64(53),
  1196. }, {
  1197. "r": float64(27),
  1198. }, {
  1199. "r": float64(123124),
  1200. }},
  1201. }, {
  1202. sql: "SELECT round(a) as r FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  1203. data: &xsql.JoinTupleSets{
  1204. Content: []xsql.JoinTuple{
  1205. {
  1206. Tuples: []xsql.Tuple{
  1207. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 65.55}},
  1208. {Emitter: "test1", Message: xsql.Message{"id": 1, "b": 12}},
  1209. },
  1210. },
  1211. {
  1212. Tuples: []xsql.Tuple{
  1213. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 73.499}},
  1214. {Emitter: "test1", Message: xsql.Message{"id": 2, "b": 34}},
  1215. },
  1216. },
  1217. {
  1218. Tuples: []xsql.Tuple{
  1219. {Emitter: "test", Message: xsql.Message{"id": 3, "a": 88.88}},
  1220. {Emitter: "test1", Message: xsql.Message{"id": 3, "b": 6}},
  1221. },
  1222. },
  1223. },
  1224. },
  1225. result: []map[string]interface{}{{
  1226. "r": float64(66),
  1227. }, {
  1228. "r": float64(73),
  1229. }, {
  1230. "r": float64(89),
  1231. }},
  1232. }, {
  1233. sql: "SELECT CONCAT(test.id, test.a, test1.b) as concat FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  1234. data: &xsql.JoinTupleSets{
  1235. Content: []xsql.JoinTuple{
  1236. {
  1237. Tuples: []xsql.Tuple{
  1238. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 65.55}},
  1239. {Emitter: "test1", Message: xsql.Message{"id": 1, "b": 12}},
  1240. },
  1241. },
  1242. {
  1243. Tuples: []xsql.Tuple{
  1244. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 73.499}},
  1245. {Emitter: "test1", Message: xsql.Message{"id": 2, "b": 34}},
  1246. },
  1247. },
  1248. {
  1249. Tuples: []xsql.Tuple{
  1250. {Emitter: "test", Message: xsql.Message{"id": 3, "a": 88.88}},
  1251. {Emitter: "test1", Message: xsql.Message{"id": 3, "b": 6}},
  1252. },
  1253. },
  1254. },
  1255. },
  1256. result: []map[string]interface{}{{
  1257. "concat": "165.5512",
  1258. }, {
  1259. "concat": "273.49934",
  1260. }, {
  1261. "concat": "388.886",
  1262. }},
  1263. }, {
  1264. sql: "SELECT count(a) as r FROM test",
  1265. data: &xsql.Tuple{
  1266. Emitter: "test",
  1267. Message: xsql.Message{
  1268. "a": 47.5,
  1269. },
  1270. },
  1271. result: []map[string]interface{}{{
  1272. "r": float64(1),
  1273. }},
  1274. }, {
  1275. sql: "SELECT meta(test.device) as d FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  1276. data: &xsql.JoinTupleSets{
  1277. Content: []xsql.JoinTuple{
  1278. {
  1279. Tuples: []xsql.Tuple{
  1280. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 65.55}, Metadata: xsql.Metadata{"device": "devicea"}},
  1281. {Emitter: "test1", Message: xsql.Message{"id": 1, "b": 12}},
  1282. },
  1283. },
  1284. {
  1285. Tuples: []xsql.Tuple{
  1286. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 73.499}, Metadata: xsql.Metadata{"device": "deviceb"}},
  1287. {Emitter: "test1", Message: xsql.Message{"id": 2, "b": 34}},
  1288. },
  1289. },
  1290. {
  1291. Tuples: []xsql.Tuple{
  1292. {Emitter: "test", Message: xsql.Message{"id": 3, "a": 88.88}, Metadata: xsql.Metadata{"device": "devicec"}},
  1293. {Emitter: "test1", Message: xsql.Message{"id": 3, "b": 6}},
  1294. },
  1295. },
  1296. },
  1297. },
  1298. result: []map[string]interface{}{{
  1299. "d": "devicea",
  1300. }, {
  1301. "d": "deviceb",
  1302. }, {
  1303. "d": "devicec",
  1304. }},
  1305. },
  1306. }
  1307. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1308. contextLogger := conf.Log.WithField("rule", "TestProjectPlan_Funcs")
  1309. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  1310. for i, tt := range tests {
  1311. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1312. if err != nil {
  1313. t.Error(err)
  1314. }
  1315. pp := &ProjectOp{Fields: stmt.Fields, IsAggregate: ast.IsAggStatement(stmt)}
  1316. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  1317. result := pp.Apply(ctx, tt.data, fv, afv)
  1318. var mapRes []map[string]interface{}
  1319. if v, ok := result.([]byte); ok {
  1320. err := json.Unmarshal(v, &mapRes)
  1321. if err != nil {
  1322. t.Errorf("Failed to parse the input into map.\n")
  1323. continue
  1324. }
  1325. //fmt.Printf("%t\n", mapRes["kuiper_field_0"])
  1326. if !reflect.DeepEqual(tt.result, mapRes) {
  1327. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  1328. }
  1329. } else {
  1330. t.Errorf("%d. The returned result is not type of []byte\n", i)
  1331. }
  1332. }
  1333. }
  1334. func TestProjectPlan_AggFuncs(t *testing.T) {
  1335. var tests = []struct {
  1336. sql string
  1337. data interface{}
  1338. result []map[string]interface{}
  1339. }{
  1340. {
  1341. sql: "SELECT count(*) as c, round(a) as r, window_start() as ws, window_end() as we FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1342. data: xsql.GroupedTuplesSet{
  1343. {
  1344. Content: []xsql.DataValuer{
  1345. &xsql.JoinTuple{
  1346. Tuples: []xsql.Tuple{
  1347. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "r": 122}},
  1348. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1349. },
  1350. },
  1351. &xsql.JoinTuple{
  1352. Tuples: []xsql.Tuple{
  1353. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1354. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1355. },
  1356. },
  1357. },
  1358. WindowRange: &xsql.WindowRange{
  1359. WindowStart: 1541152486013,
  1360. WindowEnd: 1541152487013,
  1361. },
  1362. },
  1363. {
  1364. Content: []xsql.DataValuer{
  1365. &xsql.JoinTuple{
  1366. Tuples: []xsql.Tuple{
  1367. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "r": 89}},
  1368. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1369. },
  1370. },
  1371. &xsql.JoinTuple{
  1372. Tuples: []xsql.Tuple{
  1373. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1374. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1375. },
  1376. },
  1377. },
  1378. WindowRange: &xsql.WindowRange{
  1379. WindowStart: 1541152486013,
  1380. WindowEnd: 1541152487013,
  1381. },
  1382. },
  1383. },
  1384. result: []map[string]interface{}{{
  1385. "c": float64(2),
  1386. "r": float64(122),
  1387. "ws": float64(1541152486013),
  1388. "we": float64(1541152487013),
  1389. }, {
  1390. "c": float64(2),
  1391. "r": float64(89),
  1392. "ws": float64(1541152486013),
  1393. "we": float64(1541152487013),
  1394. }},
  1395. },
  1396. {
  1397. sql: "SELECT count(a) as c, avg(a) as a, sum(a) as s, min(a) as min, max(a) as max FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1398. data: xsql.GroupedTuplesSet{
  1399. {
  1400. Content: []xsql.DataValuer{
  1401. &xsql.JoinTuple{
  1402. Tuples: []xsql.Tuple{
  1403. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "r": 122}},
  1404. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1405. },
  1406. },
  1407. &xsql.JoinTuple{
  1408. Tuples: []xsql.Tuple{
  1409. {Emitter: "test", Message: xsql.Message{"id": 5}},
  1410. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1411. },
  1412. },
  1413. },
  1414. },
  1415. {
  1416. Content: []xsql.DataValuer{
  1417. &xsql.JoinTuple{
  1418. Tuples: []xsql.Tuple{
  1419. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "r": 89}},
  1420. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1421. },
  1422. },
  1423. &xsql.JoinTuple{
  1424. Tuples: []xsql.Tuple{
  1425. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1426. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1427. },
  1428. },
  1429. },
  1430. },
  1431. },
  1432. result: []map[string]interface{}{{
  1433. "c": float64(1),
  1434. "a": 122.33,
  1435. "s": 122.33,
  1436. "min": 122.33,
  1437. "max": 122.33,
  1438. }, {
  1439. "c": float64(2),
  1440. "s": 103.63,
  1441. "a": 51.815,
  1442. "min": 14.6,
  1443. "max": 89.03,
  1444. }},
  1445. },
  1446. {
  1447. sql: "SELECT avg(a) FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1448. data: xsql.GroupedTuplesSet{
  1449. {
  1450. Content: []xsql.DataValuer{
  1451. &xsql.JoinTuple{
  1452. Tuples: []xsql.Tuple{
  1453. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  1454. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1455. },
  1456. },
  1457. &xsql.JoinTuple{
  1458. Tuples: []xsql.Tuple{
  1459. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.54}},
  1460. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1461. },
  1462. },
  1463. &xsql.JoinTuple{
  1464. Tuples: []xsql.Tuple{
  1465. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 98.31}},
  1466. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w2"}},
  1467. },
  1468. },
  1469. &xsql.JoinTuple{
  1470. Tuples: []xsql.Tuple{
  1471. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.54}},
  1472. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1473. },
  1474. },
  1475. },
  1476. },
  1477. {
  1478. Content: []xsql.DataValuer{
  1479. &xsql.JoinTuple{
  1480. Tuples: []xsql.Tuple{
  1481. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
  1482. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1483. },
  1484. },
  1485. &xsql.JoinTuple{
  1486. Tuples: []xsql.Tuple{
  1487. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1488. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1489. },
  1490. },
  1491. },
  1492. },
  1493. },
  1494. result: []map[string]interface{}{{
  1495. "avg": 116.68,
  1496. }, {
  1497. "avg": 51.815,
  1498. }},
  1499. },
  1500. {
  1501. sql: "SELECT max(a) FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1502. data: xsql.GroupedTuplesSet{
  1503. {
  1504. Content: []xsql.DataValuer{
  1505. &xsql.JoinTuple{
  1506. Tuples: []xsql.Tuple{
  1507. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  1508. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1509. },
  1510. },
  1511. &xsql.JoinTuple{
  1512. Tuples: []xsql.Tuple{
  1513. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.55}},
  1514. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1515. },
  1516. },
  1517. &xsql.JoinTuple{
  1518. Tuples: []xsql.Tuple{
  1519. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1520. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1521. },
  1522. },
  1523. },
  1524. },
  1525. {
  1526. Content: []xsql.DataValuer{
  1527. &xsql.JoinTuple{
  1528. Tuples: []xsql.Tuple{
  1529. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
  1530. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1531. },
  1532. },
  1533. &xsql.JoinTuple{
  1534. Tuples: []xsql.Tuple{
  1535. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1536. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1537. },
  1538. },
  1539. },
  1540. },
  1541. },
  1542. result: []map[string]interface{}{{
  1543. "max": 177.51,
  1544. }, {
  1545. "max": 89.03,
  1546. }},
  1547. },
  1548. {
  1549. sql: "SELECT min(a), window_start(), window_end() FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  1550. data: &xsql.JoinTupleSets{
  1551. Content: []xsql.JoinTuple{
  1552. {
  1553. Tuples: []xsql.Tuple{
  1554. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  1555. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1556. },
  1557. },
  1558. {
  1559. Tuples: []xsql.Tuple{
  1560. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.55}},
  1561. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1562. },
  1563. },
  1564. {
  1565. Tuples: []xsql.Tuple{
  1566. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1567. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1568. },
  1569. },
  1570. },
  1571. WindowRange: &xsql.WindowRange{
  1572. WindowStart: 1541152486013,
  1573. WindowEnd: 1541152487013,
  1574. },
  1575. },
  1576. result: []map[string]interface{}{{
  1577. "min": 68.55,
  1578. "window_start": float64(1541152486013),
  1579. "window_end": float64(1541152487013),
  1580. }},
  1581. }, {
  1582. sql: "SELECT count(*) as all, count(a) as c, avg(a) as a, sum(a) as s, min(a) as min, max(a) as max FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  1583. data: &xsql.JoinTupleSets{
  1584. Content: []xsql.JoinTuple{
  1585. {
  1586. Tuples: []xsql.Tuple{
  1587. {Emitter: "test", Message: xsql.Message{"id": 1}},
  1588. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1589. },
  1590. },
  1591. {
  1592. Tuples: []xsql.Tuple{
  1593. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.55}},
  1594. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1595. },
  1596. },
  1597. {
  1598. Tuples: []xsql.Tuple{
  1599. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1600. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1601. },
  1602. },
  1603. },
  1604. },
  1605. result: []map[string]interface{}{{
  1606. "all": float64(3),
  1607. "c": float64(2),
  1608. "a": 123.03,
  1609. "s": 246.06,
  1610. "min": 68.55,
  1611. "max": 177.51,
  1612. }},
  1613. }, {
  1614. sql: "SELECT sum(a), window_start() as ws, window_end() FROM test GROUP BY TumblingWindow(ss, 10)",
  1615. data: xsql.WindowTuplesSet{
  1616. Content: []xsql.WindowTuples{{
  1617. Emitter: "test",
  1618. Tuples: []xsql.Tuple{
  1619. {
  1620. Emitter: "src1",
  1621. Message: xsql.Message{"a": 53},
  1622. }, {
  1623. Emitter: "src1",
  1624. Message: xsql.Message{"a": 27},
  1625. }, {
  1626. Emitter: "src1",
  1627. Message: xsql.Message{"a": 123123},
  1628. },
  1629. },
  1630. },
  1631. },
  1632. WindowRange: &xsql.WindowRange{
  1633. WindowStart: 1541152486013,
  1634. WindowEnd: 1541152487013,
  1635. },
  1636. },
  1637. result: []map[string]interface{}{{
  1638. "sum": float64(123203),
  1639. "ws": float64(1541152486013),
  1640. "window_end": float64(1541152487013),
  1641. }},
  1642. }, {
  1643. sql: "SELECT sum(a) as s FROM test GROUP BY TumblingWindow(ss, 10)",
  1644. data: xsql.WindowTuplesSet{
  1645. Content: []xsql.WindowTuples{{
  1646. Emitter: "test",
  1647. Tuples: []xsql.Tuple{
  1648. {
  1649. Emitter: "src1",
  1650. Message: xsql.Message{"a": 53, "s": 123203},
  1651. }, {
  1652. Emitter: "src1",
  1653. Message: xsql.Message{"a": 27},
  1654. }, {
  1655. Emitter: "src1",
  1656. Message: xsql.Message{"a": 123123},
  1657. },
  1658. },
  1659. },
  1660. },
  1661. },
  1662. result: []map[string]interface{}{{
  1663. "s": float64(123203),
  1664. }},
  1665. }, {
  1666. sql: "SELECT sum(a) FROM test GROUP BY TumblingWindow(ss, 10)",
  1667. data: xsql.WindowTuplesSet{
  1668. Content: []xsql.WindowTuples{{
  1669. Emitter: "test",
  1670. Tuples: []xsql.Tuple{
  1671. {
  1672. Emitter: "src1",
  1673. Message: xsql.Message{"a": 53},
  1674. }, {
  1675. Emitter: "src1",
  1676. Message: xsql.Message{"a": 27},
  1677. }, {
  1678. Emitter: "src1",
  1679. Message: xsql.Message{"a": 123123},
  1680. },
  1681. },
  1682. },
  1683. },
  1684. },
  1685. result: []map[string]interface{}{{
  1686. "sum": float64(123203),
  1687. }},
  1688. }, {
  1689. sql: "SELECT count(*) as all, count(a) as c, avg(a) as a, sum(a) as s, min(a) as min, max(a) as max FROM test GROUP BY TumblingWindow(ss, 10)",
  1690. data: xsql.WindowTuplesSet{
  1691. Content: []xsql.WindowTuples{{
  1692. Emitter: "test",
  1693. Tuples: []xsql.Tuple{
  1694. {
  1695. Emitter: "src1",
  1696. Message: xsql.Message{"a": 53},
  1697. }, {
  1698. Emitter: "src1",
  1699. Message: xsql.Message{"a": 27},
  1700. }, {
  1701. Emitter: "src1",
  1702. Message: xsql.Message{"s": 123123},
  1703. },
  1704. },
  1705. },
  1706. },
  1707. },
  1708. result: []map[string]interface{}{{
  1709. "all": float64(3),
  1710. "c": float64(2),
  1711. "a": float64(40),
  1712. "s": float64(80),
  1713. "min": float64(27),
  1714. "max": float64(53),
  1715. }},
  1716. },
  1717. {
  1718. sql: "SELECT count(*), meta(test1.device) FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1719. data: xsql.GroupedTuplesSet{
  1720. {
  1721. Content: []xsql.DataValuer{
  1722. &xsql.JoinTuple{
  1723. Tuples: []xsql.Tuple{
  1724. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  1725. {Emitter: "test1", Message: xsql.Message{"id": 1, "color": "w2"}, Metadata: xsql.Metadata{"device": "devicea"}},
  1726. },
  1727. },
  1728. &xsql.JoinTuple{
  1729. Tuples: []xsql.Tuple{
  1730. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1731. {Emitter: "test1", Message: xsql.Message{"id": 5, "color": "w2"}, Metadata: xsql.Metadata{"device": "deviceb"}},
  1732. },
  1733. },
  1734. },
  1735. },
  1736. {
  1737. Content: []xsql.DataValuer{
  1738. &xsql.JoinTuple{
  1739. Tuples: []xsql.Tuple{
  1740. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
  1741. {Emitter: "test1", Message: xsql.Message{"id": 2, "color": "w1"}, Metadata: xsql.Metadata{"device": "devicec"}},
  1742. },
  1743. },
  1744. &xsql.JoinTuple{
  1745. Tuples: []xsql.Tuple{
  1746. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1747. {Emitter: "test1", Message: xsql.Message{"id": 4, "color": "w1"}, Metadata: xsql.Metadata{"device": "deviced"}},
  1748. },
  1749. },
  1750. },
  1751. },
  1752. },
  1753. result: []map[string]interface{}{{
  1754. "count": float64(2),
  1755. "meta": "devicea",
  1756. }, {
  1757. "count": float64(2),
  1758. "meta": "devicec",
  1759. }},
  1760. },
  1761. {
  1762. sql: "SELECT count(*) as c, meta(test1.device) as d FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1763. data: xsql.GroupedTuplesSet{
  1764. {
  1765. Content: []xsql.DataValuer{
  1766. &xsql.JoinTuple{
  1767. Tuples: []xsql.Tuple{
  1768. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "d": "devicea"}},
  1769. {Emitter: "test1", Message: xsql.Message{"id": 1, "color": "w2"}, Metadata: xsql.Metadata{"device": "devicea"}},
  1770. },
  1771. },
  1772. &xsql.JoinTuple{
  1773. Tuples: []xsql.Tuple{
  1774. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1775. {Emitter: "test1", Message: xsql.Message{"id": 5, "color": "w2"}, Metadata: xsql.Metadata{"device": "deviceb"}},
  1776. },
  1777. },
  1778. },
  1779. },
  1780. {
  1781. Content: []xsql.DataValuer{
  1782. &xsql.JoinTuple{
  1783. Tuples: []xsql.Tuple{
  1784. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "d": "devicec"}},
  1785. {Emitter: "test1", Message: xsql.Message{"id": 2, "color": "w1"}, Metadata: xsql.Metadata{"device": "devicec"}},
  1786. },
  1787. },
  1788. &xsql.JoinTuple{
  1789. Tuples: []xsql.Tuple{
  1790. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1791. {Emitter: "test1", Message: xsql.Message{"id": 4, "color": "w1"}, Metadata: xsql.Metadata{"device": "deviced"}},
  1792. },
  1793. },
  1794. },
  1795. },
  1796. },
  1797. result: []map[string]interface{}{{
  1798. "c": float64(2),
  1799. "d": "devicea",
  1800. }, {
  1801. "c": float64(2),
  1802. "d": "devicec",
  1803. }},
  1804. }, {
  1805. sql: "SELECT * FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1806. data: xsql.GroupedTuplesSet{
  1807. {
  1808. Content: []xsql.DataValuer{
  1809. &xsql.JoinTuple{
  1810. Tuples: []xsql.Tuple{
  1811. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "r": 122}},
  1812. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1813. },
  1814. },
  1815. &xsql.JoinTuple{
  1816. Tuples: []xsql.Tuple{
  1817. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1818. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1819. },
  1820. },
  1821. },
  1822. WindowRange: &xsql.WindowRange{
  1823. WindowStart: 1541152486013,
  1824. WindowEnd: 1541152487013,
  1825. },
  1826. },
  1827. {
  1828. Content: []xsql.DataValuer{
  1829. &xsql.JoinTuple{
  1830. Tuples: []xsql.Tuple{
  1831. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "r": 89}},
  1832. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1833. },
  1834. },
  1835. &xsql.JoinTuple{
  1836. Tuples: []xsql.Tuple{
  1837. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1838. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1839. },
  1840. },
  1841. },
  1842. WindowRange: &xsql.WindowRange{
  1843. WindowStart: 1541152486013,
  1844. WindowEnd: 1541152487013,
  1845. },
  1846. },
  1847. },
  1848. result: []map[string]interface{}{{
  1849. "a": 122.33,
  1850. "c": float64(2),
  1851. "color": "w2",
  1852. "id": float64(1),
  1853. "r": float64(122),
  1854. }, {
  1855. "a": 89.03,
  1856. "c": float64(2),
  1857. "color": "w1",
  1858. "id": float64(2),
  1859. "r": float64(89),
  1860. }},
  1861. }, {
  1862. sql: "SELECT collect(a) as r1 FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1863. data: xsql.GroupedTuplesSet{
  1864. {
  1865. Content: []xsql.DataValuer{
  1866. &xsql.JoinTuple{
  1867. Tuples: []xsql.Tuple{
  1868. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "r": 122}},
  1869. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1870. },
  1871. },
  1872. &xsql.JoinTuple{
  1873. Tuples: []xsql.Tuple{
  1874. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1875. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1876. },
  1877. },
  1878. },
  1879. },
  1880. {
  1881. Content: []xsql.DataValuer{
  1882. &xsql.JoinTuple{
  1883. Tuples: []xsql.Tuple{
  1884. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "r": 89}},
  1885. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1886. },
  1887. },
  1888. &xsql.JoinTuple{
  1889. Tuples: []xsql.Tuple{
  1890. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1891. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1892. },
  1893. },
  1894. },
  1895. },
  1896. },
  1897. result: []map[string]interface{}{{
  1898. "r1": []interface{}{122.33, 177.51},
  1899. }, {"r1": []interface{}{89.03, 14.6}}},
  1900. }, {
  1901. sql: "SELECT collect(*)[1] as c1 FROM test GROUP BY TumblingWindow(ss, 10)",
  1902. data: xsql.WindowTuplesSet{
  1903. Content: []xsql.WindowTuples{
  1904. {
  1905. Emitter: "test",
  1906. Tuples: []xsql.Tuple{
  1907. {
  1908. Emitter: "src1",
  1909. Message: xsql.Message{"a": 53, "s": 123203},
  1910. }, {
  1911. Emitter: "src1",
  1912. Message: xsql.Message{"a": 27},
  1913. }, {
  1914. Emitter: "src1",
  1915. Message: xsql.Message{"a": 123123},
  1916. },
  1917. },
  1918. },
  1919. },
  1920. WindowRange: &xsql.WindowRange{
  1921. WindowStart: 1541152486013,
  1922. WindowEnd: 1541152487013,
  1923. },
  1924. },
  1925. result: []map[string]interface{}{{
  1926. "c1": map[string]interface{}{
  1927. "a": float64(27),
  1928. },
  1929. }},
  1930. }, {
  1931. sql: "SELECT collect(*)[1]->a as c1 FROM test GROUP BY TumblingWindow(ss, 10)",
  1932. data: xsql.WindowTuplesSet{
  1933. Content: []xsql.WindowTuples{{
  1934. Emitter: "test",
  1935. Tuples: []xsql.Tuple{
  1936. {
  1937. Emitter: "src1",
  1938. Message: xsql.Message{"a": 53, "s": 123203},
  1939. }, {
  1940. Emitter: "src1",
  1941. Message: xsql.Message{"a": 27},
  1942. }, {
  1943. Emitter: "src1",
  1944. Message: xsql.Message{"a": 123123},
  1945. },
  1946. },
  1947. },
  1948. },
  1949. },
  1950. result: []map[string]interface{}{{
  1951. "c1": float64(27),
  1952. }},
  1953. }, {
  1954. sql: "SELECT collect(*)[1]->sl[0] as c1 FROM test GROUP BY TumblingWindow(ss, 10)",
  1955. data: xsql.WindowTuplesSet{
  1956. Content: []xsql.WindowTuples{{
  1957. Emitter: "test",
  1958. Tuples: []xsql.Tuple{
  1959. {
  1960. Emitter: "src1",
  1961. Message: xsql.Message{"a": 53, "sl": []string{"hello", "world"}},
  1962. }, {
  1963. Emitter: "src1",
  1964. Message: xsql.Message{"a": 27, "sl": []string{"new", "horizon"}},
  1965. }, {
  1966. Emitter: "src1",
  1967. Message: xsql.Message{"a": 123123, "sl": []string{"south", "africa"}},
  1968. },
  1969. },
  1970. },
  1971. },
  1972. },
  1973. result: []map[string]interface{}{{
  1974. "c1": "new",
  1975. }},
  1976. }, {
  1977. sql: "SELECT deduplicate(id, true) as r1 FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1978. data: xsql.GroupedTuplesSet{
  1979. {
  1980. Content: []xsql.DataValuer{
  1981. &xsql.JoinTuple{
  1982. Tuples: []xsql.Tuple{
  1983. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "r": 122}},
  1984. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1985. },
  1986. },
  1987. &xsql.JoinTuple{
  1988. Tuples: []xsql.Tuple{
  1989. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1990. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1991. },
  1992. },
  1993. },
  1994. },
  1995. {
  1996. Content: []xsql.DataValuer{
  1997. &xsql.JoinTuple{
  1998. Tuples: []xsql.Tuple{
  1999. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "r": 89}},
  2000. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  2001. },
  2002. },
  2003. &xsql.JoinTuple{
  2004. Tuples: []xsql.Tuple{
  2005. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  2006. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  2007. },
  2008. },
  2009. },
  2010. },
  2011. },
  2012. result: []map[string]interface{}{
  2013. {
  2014. "r1": []interface{}{
  2015. map[string]interface{}{"a": 122.33, "c": float64(2), "color": "w2", "id": float64(1), "r": float64(122)},
  2016. map[string]interface{}{"a": 177.51, "color": "w2", "id": float64(5)}},
  2017. }, {
  2018. "r1": []interface{}{
  2019. map[string]interface{}{"a": 89.03, "c": float64(2), "color": "w1", "id": float64(2), "r": float64(89)},
  2020. map[string]interface{}{"a": 14.6, "color": "w1", "id": float64(4)}},
  2021. },
  2022. },
  2023. }, {
  2024. sql: "SELECT deduplicate(a, false)->a as c1 FROM test GROUP BY TumblingWindow(ss, 10)",
  2025. data: xsql.WindowTuplesSet{
  2026. Content: []xsql.WindowTuples{{
  2027. Emitter: "test",
  2028. Tuples: []xsql.Tuple{
  2029. {
  2030. Emitter: "src1",
  2031. Message: xsql.Message{"a": 53, "s": 123203},
  2032. }, {
  2033. Emitter: "src1",
  2034. Message: xsql.Message{"a": 27},
  2035. }, {
  2036. Emitter: "src1",
  2037. Message: xsql.Message{"a": 123123},
  2038. },
  2039. },
  2040. },
  2041. },
  2042. },
  2043. result: []map[string]interface{}{{
  2044. "c1": float64(123123),
  2045. }},
  2046. }, {
  2047. sql: "SELECT deduplicate(a, false) as c1 FROM test GROUP BY TumblingWindow(ss, 10)",
  2048. data: xsql.WindowTuplesSet{
  2049. Content: []xsql.WindowTuples{{
  2050. Emitter: "test",
  2051. Tuples: []xsql.Tuple{
  2052. {
  2053. Emitter: "src1",
  2054. Message: xsql.Message{"a": 53, "s": 123203},
  2055. }, {
  2056. Emitter: "src1",
  2057. Message: xsql.Message{"a": 27},
  2058. }, {
  2059. Emitter: "src1",
  2060. Message: xsql.Message{"a": 53},
  2061. },
  2062. },
  2063. },
  2064. },
  2065. },
  2066. result: []map[string]interface{}{{}},
  2067. }, {
  2068. sql: "SELECT deduplicate(a, false) as c1 FROM test GROUP BY TumblingWindow(ss, 10)",
  2069. data: xsql.WindowTuplesSet{
  2070. Content: []xsql.WindowTuples{{
  2071. Emitter: "test",
  2072. Tuples: []xsql.Tuple{
  2073. {
  2074. Emitter: "src1",
  2075. Message: xsql.Message{"a": 53, "s": 123203},
  2076. }, {
  2077. Emitter: "src1",
  2078. Message: xsql.Message{"a": 27},
  2079. }, {
  2080. Emitter: "src1",
  2081. Message: xsql.Message{"a": 53},
  2082. },
  2083. },
  2084. },
  2085. },
  2086. },
  2087. result: []map[string]interface{}{{}},
  2088. },
  2089. }
  2090. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  2091. contextLogger := conf.Log.WithField("rule", "TestProjectPlan_AggFuncs")
  2092. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  2093. for i, tt := range tests {
  2094. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  2095. if err != nil {
  2096. t.Error(err)
  2097. }
  2098. pp := &ProjectOp{Fields: stmt.Fields, IsAggregate: true}
  2099. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  2100. result := pp.Apply(ctx, tt.data, fv, afv)
  2101. var mapRes []map[string]interface{}
  2102. if v, ok := result.([]byte); ok {
  2103. err := json.Unmarshal(v, &mapRes)
  2104. if err != nil {
  2105. t.Errorf("Failed to parse the input into map.\n")
  2106. continue
  2107. }
  2108. //fmt.Printf("%t\n", mapRes["kuiper_field_0"])
  2109. if !reflect.DeepEqual(tt.result, mapRes) {
  2110. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  2111. }
  2112. } else {
  2113. t.Errorf("%d. %q\n\nThe returned result is not type of []byte: %#v\n", i, tt.sql, result)
  2114. }
  2115. }
  2116. }
  2117. func TestProjectPlanError(t *testing.T) {
  2118. var tests = []struct {
  2119. sql string
  2120. data interface{}
  2121. result interface{}
  2122. }{
  2123. {
  2124. sql: "SELECT a FROM test",
  2125. data: errors.New("an error from upstream"),
  2126. result: errors.New("an error from upstream"),
  2127. }, {
  2128. sql: "SELECT a * 5 FROM test",
  2129. data: &xsql.Tuple{
  2130. Emitter: "test",
  2131. Message: xsql.Message{
  2132. "a": "val_a",
  2133. },
  2134. },
  2135. result: errors.New("run Select error: invalid operation string(val_a) * int64(5)"),
  2136. }, {
  2137. sql: `SELECT a[0]->b AS ab FROM test`,
  2138. data: &xsql.Tuple{
  2139. Emitter: "test",
  2140. Message: xsql.Message{
  2141. "a": "common string",
  2142. },
  2143. },
  2144. result: errors.New("run Select error: invalid operation string(common string) [] *xsql.BracketEvalResult(&{0 0})"),
  2145. }, {
  2146. sql: `SELECT round(a) as r FROM test`,
  2147. data: &xsql.Tuple{
  2148. Emitter: "test",
  2149. Message: xsql.Message{
  2150. "a": "common string",
  2151. },
  2152. },
  2153. result: errors.New("run Select error: call func round error: only float64 & int type are supported"),
  2154. }, {
  2155. sql: `SELECT round(a) as r FROM test`,
  2156. data: &xsql.Tuple{
  2157. Emitter: "test",
  2158. Message: xsql.Message{
  2159. "abc": "common string",
  2160. },
  2161. },
  2162. result: errors.New("run Select error: call func round error: only float64 & int type are supported"),
  2163. }, {
  2164. sql: "SELECT avg(a) as avg FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  2165. data: xsql.GroupedTuplesSet{
  2166. {
  2167. Content: []xsql.DataValuer{
  2168. &xsql.JoinTuple{
  2169. Tuples: []xsql.Tuple{
  2170. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  2171. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  2172. },
  2173. },
  2174. &xsql.JoinTuple{
  2175. Tuples: []xsql.Tuple{
  2176. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.54}},
  2177. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  2178. },
  2179. },
  2180. &xsql.JoinTuple{
  2181. Tuples: []xsql.Tuple{
  2182. {Emitter: "test", Message: xsql.Message{"id": 4, "a": "dde"}},
  2183. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w2"}},
  2184. },
  2185. },
  2186. &xsql.JoinTuple{
  2187. Tuples: []xsql.Tuple{
  2188. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.54}},
  2189. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  2190. },
  2191. },
  2192. },
  2193. },
  2194. {
  2195. Content: []xsql.DataValuer{
  2196. &xsql.JoinTuple{
  2197. Tuples: []xsql.Tuple{
  2198. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
  2199. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  2200. },
  2201. },
  2202. &xsql.JoinTuple{
  2203. Tuples: []xsql.Tuple{
  2204. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  2205. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  2206. },
  2207. },
  2208. },
  2209. },
  2210. },
  2211. result: errors.New("run Select error: call func avg error: requires float64 but found string(dde)"),
  2212. }, {
  2213. sql: "SELECT sum(a) as sum FROM test GROUP BY TumblingWindow(ss, 10)",
  2214. data: xsql.WindowTuplesSet{
  2215. Content: []xsql.WindowTuples{{
  2216. Emitter: "test",
  2217. Tuples: []xsql.Tuple{
  2218. {
  2219. Emitter: "src1",
  2220. Message: xsql.Message{"a": 53},
  2221. }, {
  2222. Emitter: "src1",
  2223. Message: xsql.Message{"a": "ddd"},
  2224. }, {
  2225. Emitter: "src1",
  2226. Message: xsql.Message{"a": 123123},
  2227. },
  2228. },
  2229. },
  2230. },
  2231. },
  2232. result: errors.New("run Select error: call func sum error: requires int but found string(ddd)"),
  2233. }, {
  2234. sql: `SELECT a[0]->b AS ab FROM test`,
  2235. data: &xsql.Tuple{
  2236. Emitter: "test",
  2237. Message: xsql.Message{
  2238. "a": []map[string]interface{}(nil),
  2239. },
  2240. },
  2241. result: errors.New("run Select error: out of index: 0 of 0"),
  2242. },
  2243. }
  2244. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  2245. contextLogger := conf.Log.WithField("rule", "TestProjectPlanError")
  2246. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  2247. for i, tt := range tests {
  2248. stmt, _ := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  2249. pp := &ProjectOp{Fields: stmt.Fields, IsAggregate: ast.IsAggStatement(stmt)}
  2250. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  2251. result := pp.Apply(ctx, tt.data, fv, afv)
  2252. if !reflect.DeepEqual(tt.result, result) {
  2253. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  2254. }
  2255. }
  2256. }