project_test.go 54 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147
  1. package operators
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "github.com/emqx/kuiper/common"
  7. "github.com/emqx/kuiper/xsql"
  8. "github.com/emqx/kuiper/xstream/contexts"
  9. "reflect"
  10. "strings"
  11. "testing"
  12. )
  13. func TestProjectPlan_Apply1(t *testing.T) {
  14. var tests = []struct {
  15. sql string
  16. data *xsql.Tuple
  17. result []map[string]interface{}
  18. }{
  19. {
  20. sql: "SELECT a FROM test",
  21. data: &xsql.Tuple{
  22. Emitter: "test",
  23. Message: xsql.Message{
  24. "a": "val_a",
  25. },
  26. Metadata: xsql.Metadata{
  27. "id": 45,
  28. "other": "mock",
  29. },
  30. },
  31. result: []map[string]interface{}{{
  32. "a": "val_a",
  33. "__meta": map[string]interface{}{
  34. "id": float64(45),
  35. "other": "mock",
  36. },
  37. }},
  38. },
  39. {
  40. sql: "SELECT b FROM test",
  41. data: &xsql.Tuple{
  42. Emitter: "test",
  43. Message: xsql.Message{
  44. "a": "val_a",
  45. },
  46. },
  47. result: []map[string]interface{}{{}},
  48. },
  49. {
  50. sql: "SELECT ts FROM test",
  51. data: &xsql.Tuple{
  52. Emitter: "test",
  53. Message: xsql.Message{
  54. "a": "val_a",
  55. "ts": common.TimeFromUnixMilli(1568854573431),
  56. },
  57. },
  58. result: []map[string]interface{}{{
  59. "ts": "2019-09-19T00:56:13.431Z",
  60. }},
  61. },
  62. //Schemaless may return a message without selecting column
  63. {
  64. sql: "SELECT ts FROM test",
  65. data: &xsql.Tuple{
  66. Emitter: "test",
  67. Message: xsql.Message{
  68. "a": "val_a",
  69. "ts2": common.TimeFromUnixMilli(1568854573431),
  70. },
  71. },
  72. result: []map[string]interface{}{{}},
  73. },
  74. {
  75. sql: "SELECT A FROM test",
  76. data: &xsql.Tuple{
  77. Emitter: "test",
  78. Message: xsql.Message{
  79. "a": "val_a",
  80. },
  81. },
  82. result: []map[string]interface{}{{
  83. "A": "val_a",
  84. }},
  85. },
  86. {
  87. sql: `SELECT "value" FROM test`,
  88. data: &xsql.Tuple{
  89. Emitter: "test",
  90. Message: xsql.Message{},
  91. },
  92. result: []map[string]interface{}{{
  93. DEFAULT_FIELD_NAME_PREFIX + "0": "value",
  94. }},
  95. },
  96. {
  97. sql: `SELECT 3.4 FROM test`,
  98. data: &xsql.Tuple{
  99. Emitter: "test",
  100. Message: xsql.Message{},
  101. },
  102. result: []map[string]interface{}{{
  103. DEFAULT_FIELD_NAME_PREFIX + "0": 3.4,
  104. }},
  105. },
  106. {
  107. sql: `SELECT 5 FROM test`,
  108. data: &xsql.Tuple{
  109. Emitter: "test",
  110. Message: xsql.Message{},
  111. },
  112. result: []map[string]interface{}{{
  113. DEFAULT_FIELD_NAME_PREFIX + "0": 5.0,
  114. }},
  115. },
  116. {
  117. sql: `SELECT a, "value" AS b FROM test`,
  118. data: &xsql.Tuple{
  119. Emitter: "test",
  120. Message: xsql.Message{
  121. "a": "val_a",
  122. },
  123. },
  124. result: []map[string]interface{}{{
  125. "a": "val_a",
  126. "b": "value",
  127. }},
  128. },
  129. {
  130. sql: `SELECT a, "value" AS b, 3.14 as Pi, 0 as Zero FROM test`,
  131. data: &xsql.Tuple{
  132. Emitter: "test",
  133. Message: xsql.Message{
  134. "a": "val_a",
  135. },
  136. },
  137. result: []map[string]interface{}{{
  138. "a": "val_a",
  139. "b": "value",
  140. "Pi": 3.14,
  141. "Zero": 0.0,
  142. }},
  143. },
  144. {
  145. sql: `SELECT a->b AS ab FROM test`,
  146. data: &xsql.Tuple{
  147. Emitter: "test",
  148. Message: xsql.Message{
  149. "a": map[string]interface{}{"b": "hello"},
  150. },
  151. },
  152. result: []map[string]interface{}{{
  153. "ab": "hello",
  154. }},
  155. },
  156. {
  157. sql: `SELECT a->b AS ab FROM test`,
  158. data: &xsql.Tuple{
  159. Emitter: "test",
  160. Message: xsql.Message{
  161. "a": map[string]interface{}(nil),
  162. },
  163. },
  164. result: []map[string]interface{}{{}},
  165. },
  166. {
  167. sql: `SELECT a->b AS ab FROM test`,
  168. data: &xsql.Tuple{
  169. Emitter: "test",
  170. Message: xsql.Message{
  171. "name": "name",
  172. },
  173. },
  174. result: []map[string]interface{}{{}},
  175. },
  176. {
  177. sql: `SELECT a->b AS ab FROM test`,
  178. data: &xsql.Tuple{
  179. Emitter: "test",
  180. Message: xsql.Message{
  181. "a": "commonstring",
  182. },
  183. },
  184. result: []map[string]interface{}{{}},
  185. },
  186. {
  187. sql: `SELECT a[0]->b AS ab FROM test`,
  188. data: &xsql.Tuple{
  189. Emitter: "test",
  190. Message: xsql.Message{
  191. "a": []interface{}{
  192. map[string]interface{}{"b": "hello1"},
  193. map[string]interface{}{"b": "hello2"},
  194. },
  195. },
  196. },
  197. result: []map[string]interface{}{{
  198. "ab": "hello1",
  199. }},
  200. },
  201. {
  202. sql: `SELECT a[0]->b AS ab FROM test`,
  203. data: &xsql.Tuple{
  204. Emitter: "test",
  205. Message: xsql.Message{
  206. "a": []map[string]interface{}{
  207. {"b": "hello1"},
  208. {"b": "hello2"},
  209. },
  210. },
  211. },
  212. result: []map[string]interface{}{{
  213. "ab": "hello1",
  214. }},
  215. },
  216. {
  217. sql: `SELECT a[2:4] AS ab FROM test`,
  218. data: &xsql.Tuple{
  219. Emitter: "test",
  220. Message: xsql.Message{
  221. "a": []map[string]interface{}{
  222. {"b": "hello1"},
  223. {"b": "hello2"},
  224. {"b": "hello3"},
  225. {"b": "hello4"},
  226. {"b": "hello5"},
  227. },
  228. },
  229. },
  230. result: []map[string]interface{}{{
  231. "ab": []interface{}{
  232. map[string]interface{}{"b": "hello3"},
  233. map[string]interface{}{"b": "hello4"},
  234. },
  235. }},
  236. },
  237. {
  238. sql: `SELECT a[2:] AS ab FROM test`,
  239. data: &xsql.Tuple{
  240. Emitter: "test",
  241. Message: xsql.Message{
  242. "a": []map[string]interface{}{
  243. {"b": "hello1"},
  244. {"b": "hello2"},
  245. {"b": "hello3"},
  246. {"b": "hello4"},
  247. {"b": "hello5"},
  248. },
  249. },
  250. },
  251. result: []map[string]interface{}{{
  252. "ab": []interface{}{
  253. map[string]interface{}{"b": "hello3"},
  254. map[string]interface{}{"b": "hello4"},
  255. map[string]interface{}{"b": "hello5"},
  256. },
  257. }},
  258. },
  259. {
  260. sql: `SELECT a[2:] AS ab FROM test`,
  261. data: &xsql.Tuple{
  262. Emitter: "test",
  263. Message: xsql.Message{
  264. "a": []interface{}{
  265. true, false, true, false, true, true,
  266. },
  267. },
  268. },
  269. result: []map[string]interface{}{{
  270. "ab": []interface{}{
  271. true, false, true, true,
  272. },
  273. }},
  274. },
  275. {
  276. sql: `SELECT a[:4] AS ab FROM test`,
  277. data: &xsql.Tuple{
  278. Emitter: "test",
  279. Message: xsql.Message{
  280. "a": []interface{}{
  281. true, false, true, false, true, true,
  282. },
  283. },
  284. },
  285. result: []map[string]interface{}{{
  286. "ab": []interface{}{
  287. true, false, true, false,
  288. },
  289. }},
  290. },
  291. {
  292. sql: `SELECT a[:4] AS ab FROM test`,
  293. data: &xsql.Tuple{
  294. Emitter: "test",
  295. Message: xsql.Message{
  296. "a": []interface{}{
  297. 3.14, 3.141, 3.1415, 3.14159, 3.141592, 3.1415926,
  298. },
  299. },
  300. },
  301. result: []map[string]interface{}{{
  302. "ab": []interface{}{
  303. 3.14, 3.141, 3.1415, 3.14159,
  304. },
  305. }},
  306. },
  307. {
  308. sql: `SELECT a->b[:4] AS ab FROM test`,
  309. data: &xsql.Tuple{
  310. Emitter: "test",
  311. Message: xsql.Message{
  312. "a": map[string]interface{}{
  313. "b": []float64{3.14, 3.141, 3.1415, 3.14159, 3.141592, 3.1415926},
  314. },
  315. },
  316. },
  317. result: []map[string]interface{}{{
  318. "ab": []interface{}{
  319. 3.14, 3.141, 3.1415, 3.14159,
  320. },
  321. }},
  322. },
  323. {
  324. sql: `SELECT a->b[0:1] AS ab FROM test`,
  325. data: &xsql.Tuple{
  326. Emitter: "test",
  327. Message: xsql.Message{
  328. "a": map[string]interface{}{
  329. "b": []float64{3.14, 3.141, 3.1415, 3.14159, 3.141592, 3.1415926},
  330. },
  331. },
  332. },
  333. result: []map[string]interface{}{{
  334. "ab": []interface{}{
  335. 3.14,
  336. },
  337. }},
  338. },
  339. {
  340. sql: `SELECT a->c->d AS f1 FROM test`,
  341. data: &xsql.Tuple{
  342. Emitter: "test",
  343. Message: xsql.Message{
  344. "a": map[string]interface{}{
  345. "b": "hello",
  346. "c": map[string]interface{}{
  347. "d": 35.2,
  348. },
  349. },
  350. },
  351. },
  352. result: []map[string]interface{}{{
  353. "f1": 35.2,
  354. }},
  355. },
  356. {
  357. sql: `SELECT a->c->d AS f1 FROM test`,
  358. data: &xsql.Tuple{
  359. Emitter: "test",
  360. Message: xsql.Message{
  361. "a": map[string]interface{}{
  362. "b": "hello",
  363. "c": map[string]interface{}{
  364. "e": 35.2,
  365. },
  366. },
  367. },
  368. },
  369. result: []map[string]interface{}{{}},
  370. },
  371. {
  372. sql: `SELECT a->c->d AS f1 FROM test`,
  373. data: &xsql.Tuple{
  374. Emitter: "test",
  375. Message: xsql.Message{
  376. "a": map[string]interface{}{
  377. "b": "hello",
  378. },
  379. },
  380. },
  381. result: []map[string]interface{}{{}},
  382. },
  383. //The int type is not supported yet, the json parser returns float64 for int values
  384. {
  385. sql: `SELECT a->c->d AS f1 FROM test`,
  386. data: &xsql.Tuple{
  387. Emitter: "test",
  388. Message: xsql.Message{
  389. "a": map[string]interface{}{
  390. "b": "hello",
  391. "c": map[string]interface{}{
  392. "d": float64(35),
  393. },
  394. },
  395. },
  396. },
  397. result: []map[string]interface{}{{
  398. "f1": float64(35),
  399. }},
  400. },
  401. {
  402. sql: "SELECT a FROM test",
  403. data: &xsql.Tuple{
  404. Emitter: "test",
  405. Message: xsql.Message{},
  406. },
  407. result: []map[string]interface{}{
  408. {},
  409. },
  410. },
  411. {
  412. sql: "SELECT * FROM test",
  413. data: &xsql.Tuple{
  414. Emitter: "test",
  415. Message: xsql.Message{},
  416. },
  417. result: []map[string]interface{}{
  418. {},
  419. },
  420. },
  421. {
  422. sql: `SELECT * FROM test`,
  423. data: &xsql.Tuple{
  424. Emitter: "test",
  425. Message: xsql.Message{
  426. "a": map[string]interface{}{
  427. "b": "hello",
  428. "c": map[string]interface{}{
  429. "d": 35.2,
  430. },
  431. },
  432. },
  433. },
  434. result: []map[string]interface{}{{
  435. "a": map[string]interface{}{
  436. "b": "hello",
  437. "c": map[string]interface{}{
  438. "d": 35.2,
  439. },
  440. },
  441. }},
  442. },
  443. {
  444. sql: `SELECT * FROM test`,
  445. data: &xsql.Tuple{
  446. Emitter: "test",
  447. Message: xsql.Message{
  448. "a": "val1",
  449. "b": 3.14,
  450. },
  451. },
  452. result: []map[string]interface{}{{
  453. "a": "val1",
  454. "b": 3.14,
  455. }},
  456. },
  457. {
  458. sql: `SELECT 3*4 AS f1 FROM test`,
  459. data: &xsql.Tuple{
  460. Emitter: "test",
  461. Message: xsql.Message{},
  462. },
  463. result: []map[string]interface{}{{
  464. "f1": float64(12),
  465. }},
  466. },
  467. {
  468. sql: `SELECT 4.5*2 AS f1 FROM test`,
  469. data: &xsql.Tuple{
  470. Emitter: "test",
  471. Message: xsql.Message{},
  472. },
  473. result: []map[string]interface{}{{
  474. "f1": float64(9),
  475. }},
  476. },
  477. {
  478. sql: "SELECT `a.b.c` FROM test",
  479. data: &xsql.Tuple{
  480. Emitter: "test",
  481. Message: xsql.Message{
  482. "a.b.c": "val_a",
  483. },
  484. },
  485. result: []map[string]interface{}{{
  486. "a.b.c": "val_a",
  487. }},
  488. },
  489. {
  490. sql: `SELECT CASE a WHEN 10 THEN "true" END AS b FROM test`,
  491. data: &xsql.Tuple{
  492. Emitter: "test",
  493. Message: xsql.Message{
  494. "a": int64(10),
  495. },
  496. },
  497. result: []map[string]interface{}{{
  498. "b": "true",
  499. }},
  500. },
  501. }
  502. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  503. contextLogger := common.Log.WithField("rule", "TestProjectPlan_Apply1")
  504. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  505. for i, tt := range tests {
  506. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  507. if err != nil {
  508. t.Errorf("parse sql error: %s", err)
  509. continue
  510. }
  511. pp := &ProjectOp{Fields: stmt.Fields, SendMeta: true}
  512. pp.isTest = true
  513. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  514. result := pp.Apply(ctx, tt.data, fv, afv)
  515. var mapRes []map[string]interface{}
  516. if v, ok := result.([]byte); ok {
  517. err := json.Unmarshal(v, &mapRes)
  518. if err != nil {
  519. t.Errorf("Failed to parse the input into map.\n")
  520. continue
  521. }
  522. //fmt.Printf("%t\n", mapRes["kuiper_field_0"])
  523. if !reflect.DeepEqual(tt.result, mapRes) {
  524. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  525. }
  526. } else {
  527. t.Errorf("%d. The returned result %#v is not type of []byte\n", result, i)
  528. }
  529. }
  530. }
  531. func TestProjectPlan_MultiInput(t *testing.T) {
  532. var tests = []struct {
  533. sql string
  534. data interface{}
  535. result []map[string]interface{}
  536. }{
  537. {
  538. sql: "SELECT * FROM tbl WHERE abc*2+3 > 12 AND abc < 20",
  539. data: &xsql.Tuple{
  540. Emitter: "tbl",
  541. Message: xsql.Message{
  542. "abc": int64(6),
  543. },
  544. },
  545. result: []map[string]interface{}{{
  546. "abc": float64(6), //json marshall problem
  547. }},
  548. },
  549. {
  550. sql: "SELECT abc FROM tbl WHERE abc*2+3 > 12 OR def = \"hello\"",
  551. data: &xsql.Tuple{
  552. Emitter: "tbl",
  553. Message: xsql.Message{
  554. "abc": int64(34),
  555. "def": "hello",
  556. },
  557. },
  558. result: []map[string]interface{}{{
  559. "abc": float64(34),
  560. }},
  561. },
  562. {
  563. sql: "SELECT id1 FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  564. data: xsql.WindowTuplesSet{
  565. xsql.WindowTuples{
  566. Emitter: "src1",
  567. Tuples: []xsql.Tuple{
  568. {
  569. Emitter: "src1",
  570. Message: xsql.Message{"id1": 1, "f1": "v1"},
  571. }, {
  572. Emitter: "src1",
  573. Message: xsql.Message{"id1": 2, "f1": "v2"},
  574. }, {
  575. Emitter: "src1",
  576. Message: xsql.Message{"id1": 3, "f1": "v1"},
  577. },
  578. },
  579. },
  580. },
  581. result: []map[string]interface{}{{
  582. "id1": float64(1),
  583. }, {
  584. "id1": float64(2),
  585. }, {
  586. "id1": float64(3),
  587. }},
  588. },
  589. {
  590. sql: "SELECT id1 FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  591. data: xsql.WindowTuplesSet{
  592. xsql.WindowTuples{
  593. Emitter: "src1",
  594. Tuples: []xsql.Tuple{
  595. {
  596. Emitter: "src1",
  597. Message: xsql.Message{"id1": 1, "f1": "v1"},
  598. }, {
  599. Emitter: "src1",
  600. Message: xsql.Message{"id2": 2, "f1": "v2"},
  601. }, {
  602. Emitter: "src1",
  603. Message: xsql.Message{"id1": 3, "f1": "v1"},
  604. },
  605. },
  606. },
  607. },
  608. result: []map[string]interface{}{{
  609. "id1": float64(1),
  610. }, {}, {
  611. "id1": float64(3),
  612. }},
  613. },
  614. {
  615. sql: "SELECT * FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  616. data: xsql.WindowTuplesSet{
  617. xsql.WindowTuples{
  618. Emitter: "src1",
  619. Tuples: []xsql.Tuple{
  620. {
  621. Emitter: "src1",
  622. Message: xsql.Message{"id1": 1, "f1": "v1"},
  623. }, {
  624. Emitter: "src1",
  625. Message: xsql.Message{"id1": 2, "f1": "v2"},
  626. }, {
  627. Emitter: "src1",
  628. Message: xsql.Message{"id1": 3, "f1": "v1"},
  629. },
  630. },
  631. },
  632. },
  633. result: []map[string]interface{}{{
  634. "id1": float64(1),
  635. "f1": "v1",
  636. }, {
  637. "id1": float64(2),
  638. "f1": "v2",
  639. }, {
  640. "id1": float64(3),
  641. "f1": "v1",
  642. }},
  643. },
  644. {
  645. sql: "SELECT * FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  646. data: xsql.WindowTuplesSet{
  647. xsql.WindowTuples{
  648. Emitter: "src1",
  649. Tuples: []xsql.Tuple{
  650. {
  651. Emitter: "src1",
  652. Message: xsql.Message{"id1": 1, "f1": "v1"},
  653. }, {
  654. Emitter: "src1",
  655. Message: xsql.Message{"id2": 2, "f2": "v2"},
  656. }, {
  657. Emitter: "src1",
  658. Message: xsql.Message{"id1": 3, "f1": "v1"},
  659. },
  660. },
  661. },
  662. },
  663. result: []map[string]interface{}{{
  664. "id1": float64(1),
  665. "f1": "v1",
  666. }, {
  667. "id2": float64(2),
  668. "f2": "v2",
  669. }, {
  670. "id1": float64(3),
  671. "f1": "v1",
  672. }},
  673. },
  674. {
  675. sql: "SELECT src1.* FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  676. data: xsql.WindowTuplesSet{
  677. xsql.WindowTuples{
  678. Emitter: "src1",
  679. Tuples: []xsql.Tuple{
  680. {
  681. Emitter: "src1",
  682. Message: xsql.Message{"id1": 1, "f1": "v1"},
  683. }, {
  684. Emitter: "src1",
  685. Message: xsql.Message{"id1": 2, "f1": "v2"},
  686. }, {
  687. Emitter: "src1",
  688. Message: xsql.Message{"id1": 3, "f1": "v1"},
  689. },
  690. },
  691. },
  692. },
  693. result: []map[string]interface{}{{
  694. "id1": float64(1),
  695. "f1": "v1",
  696. }, {
  697. "id1": float64(2),
  698. "f1": "v2",
  699. }, {
  700. "id1": float64(3),
  701. "f1": "v1",
  702. }},
  703. },
  704. {
  705. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  706. data: xsql.JoinTupleSets{
  707. xsql.JoinTuple{
  708. Tuples: []xsql.Tuple{
  709. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  710. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  711. },
  712. },
  713. xsql.JoinTuple{
  714. Tuples: []xsql.Tuple{
  715. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  716. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  717. },
  718. },
  719. xsql.JoinTuple{
  720. Tuples: []xsql.Tuple{
  721. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  722. },
  723. },
  724. },
  725. result: []map[string]interface{}{{
  726. "id1": float64(1),
  727. }, {
  728. "id1": float64(2),
  729. }, {
  730. "id1": float64(3),
  731. }},
  732. },
  733. {
  734. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  735. data: xsql.JoinTupleSets{
  736. xsql.JoinTuple{
  737. Tuples: []xsql.Tuple{
  738. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  739. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  740. },
  741. },
  742. xsql.JoinTuple{
  743. Tuples: []xsql.Tuple{
  744. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  745. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  746. },
  747. },
  748. xsql.JoinTuple{
  749. Tuples: []xsql.Tuple{
  750. {Emitter: "src1", Message: xsql.Message{"id2": 3, "f1": "v1"}},
  751. },
  752. },
  753. },
  754. result: []map[string]interface{}{{
  755. "id1": float64(1),
  756. }, {
  757. "id1": float64(2),
  758. }, {}},
  759. },
  760. {
  761. sql: "SELECT abc FROM tbl group by abc",
  762. data: xsql.GroupedTuplesSet{
  763. {
  764. &xsql.Tuple{
  765. Emitter: "tbl",
  766. Message: xsql.Message{
  767. "abc": int64(6),
  768. "def": "hello",
  769. },
  770. },
  771. },
  772. },
  773. result: []map[string]interface{}{{
  774. "abc": float64(6),
  775. }},
  776. },
  777. {
  778. sql: "SELECT abc FROM tbl group by abc",
  779. data: xsql.GroupedTuplesSet{
  780. {
  781. &xsql.Tuple{
  782. Emitter: "tbl",
  783. Message: xsql.Message{
  784. "def": "hello",
  785. },
  786. },
  787. },
  788. },
  789. result: []map[string]interface{}{{}},
  790. },
  791. {
  792. sql: "SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1",
  793. data: xsql.GroupedTuplesSet{
  794. {
  795. &xsql.Tuple{
  796. Emitter: "src1",
  797. Message: xsql.Message{"id1": 1, "f1": "v1"},
  798. },
  799. &xsql.Tuple{
  800. Emitter: "src1",
  801. Message: xsql.Message{"id1": 3, "f1": "v1"},
  802. },
  803. },
  804. {
  805. &xsql.Tuple{
  806. Emitter: "src1",
  807. Message: xsql.Message{"id1": 2, "f1": "v2"},
  808. },
  809. },
  810. },
  811. result: []map[string]interface{}{{
  812. "id1": float64(1),
  813. }, {
  814. "id1": float64(2),
  815. }},
  816. },
  817. {
  818. sql: "SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1",
  819. data: xsql.GroupedTuplesSet{
  820. {
  821. &xsql.Tuple{
  822. Emitter: "src1",
  823. Message: xsql.Message{"id1": 1, "f1": "v1"},
  824. },
  825. &xsql.Tuple{
  826. Emitter: "src1",
  827. Message: xsql.Message{"id1": 3, "f1": "v1"},
  828. },
  829. },
  830. {
  831. &xsql.Tuple{
  832. Emitter: "src1",
  833. Message: xsql.Message{"id2": 2, "f1": "v2"},
  834. },
  835. },
  836. },
  837. result: []map[string]interface{}{{
  838. "id1": float64(1),
  839. }, {}},
  840. },
  841. {
  842. sql: "SELECT src2.id2 FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10)",
  843. data: xsql.GroupedTuplesSet{
  844. {
  845. &xsql.JoinTuple{
  846. Tuples: []xsql.Tuple{
  847. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  848. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  849. },
  850. },
  851. },
  852. {
  853. &xsql.JoinTuple{
  854. Tuples: []xsql.Tuple{
  855. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  856. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  857. },
  858. },
  859. },
  860. {
  861. &xsql.JoinTuple{
  862. Tuples: []xsql.Tuple{
  863. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  864. },
  865. },
  866. },
  867. },
  868. result: []map[string]interface{}{{
  869. "id2": float64(2),
  870. }, {
  871. "id2": float64(4),
  872. }, {}},
  873. },
  874. {
  875. sql: "SELECT src1.*, f2 FROM src1 left join src2 GROUP BY TUMBLINGWINDOW(ss, 10)",
  876. data: xsql.JoinTupleSets{
  877. xsql.JoinTuple{
  878. Tuples: []xsql.Tuple{
  879. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  880. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  881. },
  882. },
  883. xsql.JoinTuple{
  884. Tuples: []xsql.Tuple{
  885. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  886. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  887. },
  888. },
  889. xsql.JoinTuple{
  890. Tuples: []xsql.Tuple{
  891. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  892. },
  893. },
  894. },
  895. result: []map[string]interface{}{{
  896. "id1": float64(1),
  897. "f1": "v1",
  898. "f2": "w2",
  899. }, {
  900. "id1": float64(2),
  901. "f1": "v2",
  902. "f2": "w3",
  903. }, {
  904. "id1": float64(3),
  905. "f1": "v1",
  906. }},
  907. },
  908. {
  909. sql: "SELECT * FROM src1 left join src2 GROUP BY TUMBLINGWINDOW(ss, 10)",
  910. data: xsql.JoinTupleSets{
  911. xsql.JoinTuple{
  912. Tuples: []xsql.Tuple{
  913. {Emitter: "src1", Message: xsql.Message{"id": 1, "f1": "v1"}},
  914. {Emitter: "src2", Message: xsql.Message{"id": 2, "f2": "w2"}},
  915. },
  916. },
  917. xsql.JoinTuple{
  918. Tuples: []xsql.Tuple{
  919. {Emitter: "src1", Message: xsql.Message{"id": 2, "f1": "v2"}},
  920. {Emitter: "src2", Message: xsql.Message{"id": 4, "f2": "w3"}},
  921. },
  922. },
  923. xsql.JoinTuple{
  924. Tuples: []xsql.Tuple{
  925. {Emitter: "src1", Message: xsql.Message{"id": 3, "f1": "v1"}},
  926. },
  927. },
  928. },
  929. result: []map[string]interface{}{{
  930. "id": float64(1),
  931. "f1": "v1",
  932. "f2": "w2",
  933. }, {
  934. "id": float64(2),
  935. "f1": "v2",
  936. "f2": "w3",
  937. }, {
  938. "id": float64(3),
  939. "f1": "v1",
  940. }},
  941. },
  942. {
  943. sql: "SELECT src1.* FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1",
  944. data: xsql.GroupedTuplesSet{
  945. {
  946. &xsql.Tuple{
  947. Emitter: "src1",
  948. Message: xsql.Message{"id1": 1, "f1": "v1"},
  949. },
  950. &xsql.Tuple{
  951. Emitter: "src1",
  952. Message: xsql.Message{"id1": 3, "f1": "v1"},
  953. },
  954. },
  955. {
  956. &xsql.Tuple{
  957. Emitter: "src1",
  958. Message: xsql.Message{"id1": 2, "f1": "v2"},
  959. },
  960. },
  961. },
  962. result: []map[string]interface{}{{
  963. "id1": float64(1),
  964. "f1": "v1",
  965. }, {
  966. "id1": float64(2),
  967. "f1": "v2",
  968. }},
  969. },
  970. {
  971. sql: "SELECT src2.id2, src1.* FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10)",
  972. data: xsql.GroupedTuplesSet{
  973. {
  974. &xsql.JoinTuple{
  975. Tuples: []xsql.Tuple{
  976. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  977. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  978. },
  979. },
  980. },
  981. {
  982. &xsql.JoinTuple{
  983. Tuples: []xsql.Tuple{
  984. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  985. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  986. },
  987. },
  988. },
  989. {
  990. &xsql.JoinTuple{
  991. Tuples: []xsql.Tuple{
  992. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  993. },
  994. },
  995. },
  996. },
  997. result: []map[string]interface{}{{
  998. "id2": float64(2),
  999. "id1": float64(1),
  1000. "f1": "v1",
  1001. }, {
  1002. "id2": float64(4),
  1003. "id1": float64(2),
  1004. "f1": "v2",
  1005. }, {
  1006. "id1": float64(3),
  1007. "f1": "v1",
  1008. }},
  1009. },
  1010. {
  1011. sql: "SELECT src2.id2, src1.* FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10)",
  1012. data: xsql.GroupedTuplesSet{
  1013. {
  1014. &xsql.JoinTuple{
  1015. Tuples: []xsql.Tuple{
  1016. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1017. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1018. },
  1019. },
  1020. },
  1021. {
  1022. &xsql.JoinTuple{
  1023. Tuples: []xsql.Tuple{
  1024. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1025. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1026. },
  1027. },
  1028. },
  1029. {
  1030. &xsql.JoinTuple{
  1031. Tuples: []xsql.Tuple{
  1032. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  1033. },
  1034. },
  1035. },
  1036. },
  1037. result: []map[string]interface{}{{
  1038. "id2": float64(2),
  1039. "id1": float64(1),
  1040. "f1": "v1",
  1041. }, {
  1042. "id2": float64(4),
  1043. "id1": float64(2),
  1044. "f1": "v2",
  1045. }, {
  1046. "id1": float64(3),
  1047. "f1": "v1",
  1048. }},
  1049. },
  1050. }
  1051. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1052. contextLogger := common.Log.WithField("rule", "TestProjectPlan_MultiInput")
  1053. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  1054. for i, tt := range tests {
  1055. stmt, _ := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1056. pp := &ProjectOp{Fields: stmt.Fields}
  1057. pp.isTest = true
  1058. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  1059. result := pp.Apply(ctx, tt.data, fv, afv)
  1060. var mapRes []map[string]interface{}
  1061. if v, ok := result.([]byte); ok {
  1062. err := json.Unmarshal(v, &mapRes)
  1063. if err != nil {
  1064. t.Errorf("Failed to parse the input into map.\n")
  1065. continue
  1066. }
  1067. //fmt.Printf("%t\n", mapRes["kuiper_field_0"])
  1068. if !reflect.DeepEqual(tt.result, mapRes) {
  1069. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  1070. }
  1071. } else {
  1072. t.Errorf("The returned result is not type of []byte\n")
  1073. }
  1074. }
  1075. }
  1076. func TestProjectPlan_Funcs(t *testing.T) {
  1077. var tests = []struct {
  1078. sql string
  1079. data interface{}
  1080. result []map[string]interface{}
  1081. }{
  1082. {
  1083. sql: "SELECT round(a) as r FROM test",
  1084. data: &xsql.Tuple{
  1085. Emitter: "test",
  1086. Message: xsql.Message{
  1087. "a": 47.5,
  1088. },
  1089. },
  1090. result: []map[string]interface{}{{
  1091. "r": float64(48),
  1092. }},
  1093. }, {
  1094. sql: "SELECT round(a) as r FROM test GROUP BY TumblingWindow(ss, 10)",
  1095. data: xsql.WindowTuplesSet{
  1096. xsql.WindowTuples{
  1097. Emitter: "test",
  1098. Tuples: []xsql.Tuple{
  1099. {
  1100. Emitter: "src1",
  1101. Message: xsql.Message{"a": 53.1},
  1102. }, {
  1103. Emitter: "src1",
  1104. Message: xsql.Message{"a": 27.4},
  1105. }, {
  1106. Emitter: "src1",
  1107. Message: xsql.Message{"a": 123123.7},
  1108. },
  1109. },
  1110. },
  1111. },
  1112. result: []map[string]interface{}{{
  1113. "r": float64(53),
  1114. }, {
  1115. "r": float64(27),
  1116. }, {
  1117. "r": float64(123124),
  1118. }},
  1119. }, {
  1120. sql: "SELECT round(a) as r FROM test GROUP BY TumblingWindow(ss, 10)",
  1121. data: xsql.WindowTuplesSet{
  1122. xsql.WindowTuples{
  1123. Emitter: "test",
  1124. Tuples: []xsql.Tuple{
  1125. {
  1126. Emitter: "src1",
  1127. Message: xsql.Message{"a": 53.1},
  1128. }, {
  1129. Emitter: "src1",
  1130. Message: xsql.Message{"a": 27.4},
  1131. }, {
  1132. Emitter: "src1",
  1133. Message: xsql.Message{"a": 123123.7},
  1134. },
  1135. },
  1136. },
  1137. },
  1138. result: []map[string]interface{}{{
  1139. "r": float64(53),
  1140. }, {
  1141. "r": float64(27),
  1142. }, {
  1143. "r": float64(123124),
  1144. }},
  1145. }, {
  1146. sql: "SELECT round(a) as r FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  1147. data: xsql.JoinTupleSets{
  1148. xsql.JoinTuple{
  1149. Tuples: []xsql.Tuple{
  1150. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 65.55}},
  1151. {Emitter: "test1", Message: xsql.Message{"id": 1, "b": 12}},
  1152. },
  1153. },
  1154. xsql.JoinTuple{
  1155. Tuples: []xsql.Tuple{
  1156. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 73.499}},
  1157. {Emitter: "test1", Message: xsql.Message{"id": 2, "b": 34}},
  1158. },
  1159. },
  1160. xsql.JoinTuple{
  1161. Tuples: []xsql.Tuple{
  1162. {Emitter: "test", Message: xsql.Message{"id": 3, "a": 88.88}},
  1163. {Emitter: "test1", Message: xsql.Message{"id": 3, "b": 6}},
  1164. },
  1165. },
  1166. },
  1167. result: []map[string]interface{}{{
  1168. "r": float64(66),
  1169. }, {
  1170. "r": float64(73),
  1171. }, {
  1172. "r": float64(89),
  1173. }},
  1174. }, {
  1175. sql: "SELECT CONCAT(test.id, test.a, test1.b) as concat FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  1176. data: xsql.JoinTupleSets{
  1177. xsql.JoinTuple{
  1178. Tuples: []xsql.Tuple{
  1179. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 65.55}},
  1180. {Emitter: "test1", Message: xsql.Message{"id": 1, "b": 12}},
  1181. },
  1182. },
  1183. xsql.JoinTuple{
  1184. Tuples: []xsql.Tuple{
  1185. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 73.499}},
  1186. {Emitter: "test1", Message: xsql.Message{"id": 2, "b": 34}},
  1187. },
  1188. },
  1189. xsql.JoinTuple{
  1190. Tuples: []xsql.Tuple{
  1191. {Emitter: "test", Message: xsql.Message{"id": 3, "a": 88.88}},
  1192. {Emitter: "test1", Message: xsql.Message{"id": 3, "b": 6}},
  1193. },
  1194. },
  1195. },
  1196. result: []map[string]interface{}{{
  1197. "concat": "165.5512",
  1198. }, {
  1199. "concat": "273.49934",
  1200. }, {
  1201. "concat": "388.886",
  1202. }},
  1203. }, {
  1204. sql: "SELECT count(a) as r FROM test",
  1205. data: &xsql.Tuple{
  1206. Emitter: "test",
  1207. Message: xsql.Message{
  1208. "a": 47.5,
  1209. },
  1210. },
  1211. result: []map[string]interface{}{{
  1212. "r": float64(1),
  1213. }},
  1214. }, {
  1215. sql: "SELECT meta(test.device) as d FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  1216. data: xsql.JoinTupleSets{
  1217. xsql.JoinTuple{
  1218. Tuples: []xsql.Tuple{
  1219. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 65.55}, Metadata: xsql.Metadata{"device": "devicea"}},
  1220. {Emitter: "test1", Message: xsql.Message{"id": 1, "b": 12}},
  1221. },
  1222. },
  1223. xsql.JoinTuple{
  1224. Tuples: []xsql.Tuple{
  1225. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 73.499}, Metadata: xsql.Metadata{"device": "deviceb"}},
  1226. {Emitter: "test1", Message: xsql.Message{"id": 2, "b": 34}},
  1227. },
  1228. },
  1229. xsql.JoinTuple{
  1230. Tuples: []xsql.Tuple{
  1231. {Emitter: "test", Message: xsql.Message{"id": 3, "a": 88.88}, Metadata: xsql.Metadata{"device": "devicec"}},
  1232. {Emitter: "test1", Message: xsql.Message{"id": 3, "b": 6}},
  1233. },
  1234. },
  1235. },
  1236. result: []map[string]interface{}{{
  1237. "d": "devicea",
  1238. }, {
  1239. "d": "deviceb",
  1240. }, {
  1241. "d": "devicec",
  1242. }},
  1243. },
  1244. }
  1245. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1246. contextLogger := common.Log.WithField("rule", "TestProjectPlan_Funcs")
  1247. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  1248. for i, tt := range tests {
  1249. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1250. if err != nil {
  1251. t.Error(err)
  1252. }
  1253. pp := &ProjectOp{Fields: stmt.Fields, IsAggregate: xsql.IsAggStatement(stmt)}
  1254. pp.isTest = true
  1255. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  1256. result := pp.Apply(ctx, tt.data, fv, afv)
  1257. var mapRes []map[string]interface{}
  1258. if v, ok := result.([]byte); ok {
  1259. err := json.Unmarshal(v, &mapRes)
  1260. if err != nil {
  1261. t.Errorf("Failed to parse the input into map.\n")
  1262. continue
  1263. }
  1264. //fmt.Printf("%t\n", mapRes["kuiper_field_0"])
  1265. if !reflect.DeepEqual(tt.result, mapRes) {
  1266. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  1267. }
  1268. } else {
  1269. t.Errorf("%d. The returned result is not type of []byte\n", i)
  1270. }
  1271. }
  1272. }
  1273. func TestProjectPlan_AggFuncs(t *testing.T) {
  1274. var tests = []struct {
  1275. sql string
  1276. data interface{}
  1277. result []map[string]interface{}
  1278. }{
  1279. {
  1280. sql: "SELECT count(*) as c, round(a) as r FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1281. data: xsql.GroupedTuplesSet{
  1282. {
  1283. &xsql.JoinTuple{
  1284. Tuples: []xsql.Tuple{
  1285. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "r": 122}},
  1286. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1287. },
  1288. },
  1289. &xsql.JoinTuple{
  1290. Tuples: []xsql.Tuple{
  1291. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1292. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1293. },
  1294. },
  1295. },
  1296. {
  1297. &xsql.JoinTuple{
  1298. Tuples: []xsql.Tuple{
  1299. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "r": 89}},
  1300. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1301. },
  1302. },
  1303. &xsql.JoinTuple{
  1304. Tuples: []xsql.Tuple{
  1305. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1306. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1307. },
  1308. },
  1309. },
  1310. },
  1311. result: []map[string]interface{}{{
  1312. "c": float64(2),
  1313. "r": float64(122),
  1314. }, {
  1315. "c": float64(2),
  1316. "r": float64(89),
  1317. }},
  1318. },
  1319. {
  1320. sql: "SELECT count(a) as c, avg(a) as a, sum(a) as s, min(a) as min, max(a) as max FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1321. data: xsql.GroupedTuplesSet{
  1322. {
  1323. &xsql.JoinTuple{
  1324. Tuples: []xsql.Tuple{
  1325. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "r": 122}},
  1326. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1327. },
  1328. },
  1329. &xsql.JoinTuple{
  1330. Tuples: []xsql.Tuple{
  1331. {Emitter: "test", Message: xsql.Message{"id": 5}},
  1332. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1333. },
  1334. },
  1335. },
  1336. {
  1337. &xsql.JoinTuple{
  1338. Tuples: []xsql.Tuple{
  1339. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "r": 89}},
  1340. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1341. },
  1342. },
  1343. &xsql.JoinTuple{
  1344. Tuples: []xsql.Tuple{
  1345. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1346. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1347. },
  1348. },
  1349. },
  1350. },
  1351. result: []map[string]interface{}{{
  1352. "c": float64(1),
  1353. "a": 122.33,
  1354. "s": 122.33,
  1355. "min": 122.33,
  1356. "max": 122.33,
  1357. }, {
  1358. "c": float64(2),
  1359. "s": 103.63,
  1360. "a": 51.815,
  1361. "min": 14.6,
  1362. "max": 89.03,
  1363. }},
  1364. },
  1365. {
  1366. sql: "SELECT avg(a) FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1367. data: xsql.GroupedTuplesSet{
  1368. {
  1369. &xsql.JoinTuple{
  1370. Tuples: []xsql.Tuple{
  1371. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  1372. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1373. },
  1374. },
  1375. &xsql.JoinTuple{
  1376. Tuples: []xsql.Tuple{
  1377. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.54}},
  1378. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1379. },
  1380. },
  1381. &xsql.JoinTuple{
  1382. Tuples: []xsql.Tuple{
  1383. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 98.31}},
  1384. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w2"}},
  1385. },
  1386. },
  1387. &xsql.JoinTuple{
  1388. Tuples: []xsql.Tuple{
  1389. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.54}},
  1390. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1391. },
  1392. },
  1393. },
  1394. {
  1395. &xsql.JoinTuple{
  1396. Tuples: []xsql.Tuple{
  1397. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
  1398. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1399. },
  1400. },
  1401. &xsql.JoinTuple{
  1402. Tuples: []xsql.Tuple{
  1403. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1404. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1405. },
  1406. },
  1407. },
  1408. },
  1409. result: []map[string]interface{}{{
  1410. "avg": 116.68,
  1411. }, {
  1412. "avg": 51.815,
  1413. }},
  1414. },
  1415. {
  1416. sql: "SELECT max(a) FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1417. data: xsql.GroupedTuplesSet{
  1418. {
  1419. &xsql.JoinTuple{
  1420. Tuples: []xsql.Tuple{
  1421. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  1422. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1423. },
  1424. },
  1425. &xsql.JoinTuple{
  1426. Tuples: []xsql.Tuple{
  1427. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.55}},
  1428. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1429. },
  1430. },
  1431. &xsql.JoinTuple{
  1432. Tuples: []xsql.Tuple{
  1433. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1434. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1435. },
  1436. },
  1437. },
  1438. {
  1439. &xsql.JoinTuple{
  1440. Tuples: []xsql.Tuple{
  1441. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
  1442. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1443. },
  1444. },
  1445. &xsql.JoinTuple{
  1446. Tuples: []xsql.Tuple{
  1447. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1448. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1449. },
  1450. },
  1451. },
  1452. },
  1453. result: []map[string]interface{}{{
  1454. "max": 177.51,
  1455. }, {
  1456. "max": 89.03,
  1457. }},
  1458. },
  1459. {
  1460. sql: "SELECT min(a) FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  1461. data: xsql.JoinTupleSets{
  1462. xsql.JoinTuple{
  1463. Tuples: []xsql.Tuple{
  1464. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  1465. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1466. },
  1467. },
  1468. xsql.JoinTuple{
  1469. Tuples: []xsql.Tuple{
  1470. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.55}},
  1471. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1472. },
  1473. },
  1474. xsql.JoinTuple{
  1475. Tuples: []xsql.Tuple{
  1476. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1477. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1478. },
  1479. },
  1480. },
  1481. result: []map[string]interface{}{{
  1482. "min": 68.55,
  1483. }},
  1484. }, {
  1485. sql: "SELECT count(*) as all, count(a) as c, avg(a) as a, sum(a) as s, min(a) as min, max(a) as max FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  1486. data: xsql.JoinTupleSets{
  1487. xsql.JoinTuple{
  1488. Tuples: []xsql.Tuple{
  1489. {Emitter: "test", Message: xsql.Message{"id": 1}},
  1490. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1491. },
  1492. },
  1493. xsql.JoinTuple{
  1494. Tuples: []xsql.Tuple{
  1495. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.55}},
  1496. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1497. },
  1498. },
  1499. xsql.JoinTuple{
  1500. Tuples: []xsql.Tuple{
  1501. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1502. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1503. },
  1504. },
  1505. },
  1506. result: []map[string]interface{}{{
  1507. "all": float64(3),
  1508. "c": float64(2),
  1509. "a": 123.03,
  1510. "s": 246.06,
  1511. "min": 68.55,
  1512. "max": 177.51,
  1513. }},
  1514. }, {
  1515. sql: "SELECT sum(a) FROM test GROUP BY TumblingWindow(ss, 10)",
  1516. data: xsql.WindowTuplesSet{
  1517. xsql.WindowTuples{
  1518. Emitter: "test",
  1519. Tuples: []xsql.Tuple{
  1520. {
  1521. Emitter: "src1",
  1522. Message: xsql.Message{"a": 53},
  1523. }, {
  1524. Emitter: "src1",
  1525. Message: xsql.Message{"a": 27},
  1526. }, {
  1527. Emitter: "src1",
  1528. Message: xsql.Message{"a": 123123},
  1529. },
  1530. },
  1531. },
  1532. },
  1533. result: []map[string]interface{}{{
  1534. "sum": float64(123203),
  1535. }},
  1536. }, {
  1537. sql: "SELECT sum(a) as s FROM test GROUP BY TumblingWindow(ss, 10)",
  1538. data: xsql.WindowTuplesSet{
  1539. xsql.WindowTuples{
  1540. Emitter: "test",
  1541. Tuples: []xsql.Tuple{
  1542. {
  1543. Emitter: "src1",
  1544. Message: xsql.Message{"a": 53, "s": 123203},
  1545. }, {
  1546. Emitter: "src1",
  1547. Message: xsql.Message{"a": 27},
  1548. }, {
  1549. Emitter: "src1",
  1550. Message: xsql.Message{"a": 123123},
  1551. },
  1552. },
  1553. },
  1554. },
  1555. result: []map[string]interface{}{{
  1556. "s": float64(123203),
  1557. }},
  1558. }, {
  1559. sql: "SELECT sum(a) FROM test GROUP BY TumblingWindow(ss, 10)",
  1560. data: xsql.WindowTuplesSet{
  1561. xsql.WindowTuples{
  1562. Emitter: "test",
  1563. Tuples: []xsql.Tuple{
  1564. {
  1565. Emitter: "src1",
  1566. Message: xsql.Message{"a": 53},
  1567. }, {
  1568. Emitter: "src1",
  1569. Message: xsql.Message{"a": 27},
  1570. }, {
  1571. Emitter: "src1",
  1572. Message: xsql.Message{"a": 123123},
  1573. },
  1574. },
  1575. },
  1576. },
  1577. result: []map[string]interface{}{{
  1578. "sum": float64(123203),
  1579. }},
  1580. }, {
  1581. sql: "SELECT count(*) as all, count(a) as c, avg(a) as a, sum(a) as s, min(a) as min, max(a) as max FROM test GROUP BY TumblingWindow(ss, 10)",
  1582. data: xsql.WindowTuplesSet{
  1583. xsql.WindowTuples{
  1584. Emitter: "test",
  1585. Tuples: []xsql.Tuple{
  1586. {
  1587. Emitter: "src1",
  1588. Message: xsql.Message{"a": 53},
  1589. }, {
  1590. Emitter: "src1",
  1591. Message: xsql.Message{"a": 27},
  1592. }, {
  1593. Emitter: "src1",
  1594. Message: xsql.Message{"s": 123123},
  1595. },
  1596. },
  1597. },
  1598. },
  1599. result: []map[string]interface{}{{
  1600. "all": float64(3),
  1601. "c": float64(2),
  1602. "a": float64(40),
  1603. "s": float64(80),
  1604. "min": float64(27),
  1605. "max": float64(53),
  1606. }},
  1607. },
  1608. {
  1609. sql: "SELECT count(*), meta(test1.device) FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1610. data: xsql.GroupedTuplesSet{
  1611. {
  1612. &xsql.JoinTuple{
  1613. Tuples: []xsql.Tuple{
  1614. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  1615. {Emitter: "test1", Message: xsql.Message{"id": 1, "color": "w2"}, Metadata: xsql.Metadata{"device": "devicea"}},
  1616. },
  1617. },
  1618. &xsql.JoinTuple{
  1619. Tuples: []xsql.Tuple{
  1620. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1621. {Emitter: "test1", Message: xsql.Message{"id": 5, "color": "w2"}, Metadata: xsql.Metadata{"device": "deviceb"}},
  1622. },
  1623. },
  1624. },
  1625. {
  1626. &xsql.JoinTuple{
  1627. Tuples: []xsql.Tuple{
  1628. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
  1629. {Emitter: "test1", Message: xsql.Message{"id": 2, "color": "w1"}, Metadata: xsql.Metadata{"device": "devicec"}},
  1630. },
  1631. },
  1632. &xsql.JoinTuple{
  1633. Tuples: []xsql.Tuple{
  1634. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1635. {Emitter: "test1", Message: xsql.Message{"id": 4, "color": "w1"}, Metadata: xsql.Metadata{"device": "deviced"}},
  1636. },
  1637. },
  1638. },
  1639. },
  1640. result: []map[string]interface{}{{
  1641. "count": float64(2),
  1642. "meta": "devicea",
  1643. }, {
  1644. "count": float64(2),
  1645. "meta": "devicec",
  1646. }},
  1647. },
  1648. {
  1649. sql: "SELECT count(*) as c, meta(test1.device) as d FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1650. data: xsql.GroupedTuplesSet{
  1651. {
  1652. &xsql.JoinTuple{
  1653. Tuples: []xsql.Tuple{
  1654. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "d": "devicea"}},
  1655. {Emitter: "test1", Message: xsql.Message{"id": 1, "color": "w2"}, Metadata: xsql.Metadata{"device": "devicea"}},
  1656. },
  1657. },
  1658. &xsql.JoinTuple{
  1659. Tuples: []xsql.Tuple{
  1660. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1661. {Emitter: "test1", Message: xsql.Message{"id": 5, "color": "w2"}, Metadata: xsql.Metadata{"device": "deviceb"}},
  1662. },
  1663. },
  1664. },
  1665. {
  1666. &xsql.JoinTuple{
  1667. Tuples: []xsql.Tuple{
  1668. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "d": "devicec"}},
  1669. {Emitter: "test1", Message: xsql.Message{"id": 2, "color": "w1"}, Metadata: xsql.Metadata{"device": "devicec"}},
  1670. },
  1671. },
  1672. &xsql.JoinTuple{
  1673. Tuples: []xsql.Tuple{
  1674. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1675. {Emitter: "test1", Message: xsql.Message{"id": 4, "color": "w1"}, Metadata: xsql.Metadata{"device": "deviced"}},
  1676. },
  1677. },
  1678. },
  1679. },
  1680. result: []map[string]interface{}{{
  1681. "c": float64(2),
  1682. "d": "devicea",
  1683. }, {
  1684. "c": float64(2),
  1685. "d": "devicec",
  1686. }},
  1687. }, {
  1688. sql: "SELECT * FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1689. data: xsql.GroupedTuplesSet{
  1690. {
  1691. &xsql.JoinTuple{
  1692. Tuples: []xsql.Tuple{
  1693. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "r": 122}},
  1694. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1695. },
  1696. },
  1697. &xsql.JoinTuple{
  1698. Tuples: []xsql.Tuple{
  1699. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1700. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1701. },
  1702. },
  1703. },
  1704. {
  1705. &xsql.JoinTuple{
  1706. Tuples: []xsql.Tuple{
  1707. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "r": 89}},
  1708. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1709. },
  1710. },
  1711. &xsql.JoinTuple{
  1712. Tuples: []xsql.Tuple{
  1713. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1714. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1715. },
  1716. },
  1717. },
  1718. },
  1719. result: []map[string]interface{}{{
  1720. "a": float64(122.33),
  1721. "c": float64(2),
  1722. "color": "w2",
  1723. "id": float64(1),
  1724. "r": float64(122),
  1725. }, {
  1726. "a": float64(89.03),
  1727. "c": float64(2),
  1728. "color": "w1",
  1729. "id": float64(2),
  1730. "r": float64(89),
  1731. }},
  1732. }, {
  1733. sql: "SELECT collect(a) as r1 FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1734. data: xsql.GroupedTuplesSet{
  1735. {
  1736. &xsql.JoinTuple{
  1737. Tuples: []xsql.Tuple{
  1738. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "r": 122}},
  1739. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1740. },
  1741. },
  1742. &xsql.JoinTuple{
  1743. Tuples: []xsql.Tuple{
  1744. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1745. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1746. },
  1747. },
  1748. },
  1749. {
  1750. &xsql.JoinTuple{
  1751. Tuples: []xsql.Tuple{
  1752. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "r": 89}},
  1753. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1754. },
  1755. },
  1756. &xsql.JoinTuple{
  1757. Tuples: []xsql.Tuple{
  1758. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1759. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1760. },
  1761. },
  1762. },
  1763. },
  1764. result: []map[string]interface{}{{
  1765. "r1": []interface{}{122.33, 177.51},
  1766. }, {"r1": []interface{}{89.03, 14.6}}},
  1767. }, {
  1768. sql: "SELECT collect(*)[1] as c1 FROM test GROUP BY TumblingWindow(ss, 10)",
  1769. data: xsql.WindowTuplesSet{
  1770. xsql.WindowTuples{
  1771. Emitter: "test",
  1772. Tuples: []xsql.Tuple{
  1773. {
  1774. Emitter: "src1",
  1775. Message: xsql.Message{"a": 53, "s": 123203},
  1776. }, {
  1777. Emitter: "src1",
  1778. Message: xsql.Message{"a": 27},
  1779. }, {
  1780. Emitter: "src1",
  1781. Message: xsql.Message{"a": 123123},
  1782. },
  1783. },
  1784. },
  1785. },
  1786. result: []map[string]interface{}{{
  1787. "c1": map[string]interface{}{
  1788. "a": float64(27),
  1789. },
  1790. }},
  1791. }, {
  1792. sql: "SELECT collect(*)[1]->a as c1 FROM test GROUP BY TumblingWindow(ss, 10)",
  1793. data: xsql.WindowTuplesSet{
  1794. xsql.WindowTuples{
  1795. Emitter: "test",
  1796. Tuples: []xsql.Tuple{
  1797. {
  1798. Emitter: "src1",
  1799. Message: xsql.Message{"a": 53, "s": 123203},
  1800. }, {
  1801. Emitter: "src1",
  1802. Message: xsql.Message{"a": 27},
  1803. }, {
  1804. Emitter: "src1",
  1805. Message: xsql.Message{"a": 123123},
  1806. },
  1807. },
  1808. },
  1809. },
  1810. result: []map[string]interface{}{{
  1811. "c1": float64(27),
  1812. }},
  1813. }, {
  1814. sql: "SELECT collect(*)[1]->sl[0] as c1 FROM test GROUP BY TumblingWindow(ss, 10)",
  1815. data: xsql.WindowTuplesSet{
  1816. xsql.WindowTuples{
  1817. Emitter: "test",
  1818. Tuples: []xsql.Tuple{
  1819. {
  1820. Emitter: "src1",
  1821. Message: xsql.Message{"a": 53, "sl": []string{"hello", "world"}},
  1822. }, {
  1823. Emitter: "src1",
  1824. Message: xsql.Message{"a": 27, "sl": []string{"new", "horizon"}},
  1825. }, {
  1826. Emitter: "src1",
  1827. Message: xsql.Message{"a": 123123, "sl": []string{"south", "africa"}},
  1828. },
  1829. },
  1830. },
  1831. },
  1832. result: []map[string]interface{}{{
  1833. "c1": "new",
  1834. }},
  1835. }, {
  1836. sql: "SELECT deduplicate(id, true) as r1 FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1837. data: xsql.GroupedTuplesSet{
  1838. {
  1839. &xsql.JoinTuple{
  1840. Tuples: []xsql.Tuple{
  1841. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "r": 122}},
  1842. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1843. },
  1844. },
  1845. &xsql.JoinTuple{
  1846. Tuples: []xsql.Tuple{
  1847. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1848. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1849. },
  1850. },
  1851. },
  1852. {
  1853. &xsql.JoinTuple{
  1854. Tuples: []xsql.Tuple{
  1855. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "r": 89}},
  1856. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1857. },
  1858. },
  1859. &xsql.JoinTuple{
  1860. Tuples: []xsql.Tuple{
  1861. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1862. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1863. },
  1864. },
  1865. },
  1866. },
  1867. result: []map[string]interface{}{
  1868. {
  1869. "r1": []interface{}{
  1870. map[string]interface{}{"a": float64(122.33), "c": float64(2), "color": "w2", "id": float64(1), "r": float64(122)},
  1871. map[string]interface{}{"a": float64(177.51), "color": "w2", "id": float64(5)}},
  1872. }, {
  1873. "r1": []interface{}{
  1874. map[string]interface{}{"a": float64(89.03), "c": float64(2), "color": "w1", "id": float64(2), "r": float64(89)},
  1875. map[string]interface{}{"a": float64(14.6), "color": "w1", "id": float64(4)}},
  1876. },
  1877. },
  1878. }, {
  1879. sql: "SELECT deduplicate(a, false)->a as c1 FROM test GROUP BY TumblingWindow(ss, 10)",
  1880. data: xsql.WindowTuplesSet{
  1881. xsql.WindowTuples{
  1882. Emitter: "test",
  1883. Tuples: []xsql.Tuple{
  1884. {
  1885. Emitter: "src1",
  1886. Message: xsql.Message{"a": 53, "s": 123203},
  1887. }, {
  1888. Emitter: "src1",
  1889. Message: xsql.Message{"a": 27},
  1890. }, {
  1891. Emitter: "src1",
  1892. Message: xsql.Message{"a": 123123},
  1893. },
  1894. },
  1895. },
  1896. },
  1897. result: []map[string]interface{}{{
  1898. "c1": float64(123123),
  1899. }},
  1900. }, {
  1901. sql: "SELECT deduplicate(a, false) as c1 FROM test GROUP BY TumblingWindow(ss, 10)",
  1902. data: xsql.WindowTuplesSet{
  1903. xsql.WindowTuples{
  1904. Emitter: "test",
  1905. Tuples: []xsql.Tuple{
  1906. {
  1907. Emitter: "src1",
  1908. Message: xsql.Message{"a": 53, "s": 123203},
  1909. }, {
  1910. Emitter: "src1",
  1911. Message: xsql.Message{"a": 27},
  1912. }, {
  1913. Emitter: "src1",
  1914. Message: xsql.Message{"a": 53},
  1915. },
  1916. },
  1917. },
  1918. },
  1919. result: []map[string]interface{}{{}},
  1920. }, {
  1921. sql: "SELECT deduplicate(a, false) as c1 FROM test GROUP BY TumblingWindow(ss, 10)",
  1922. data: xsql.WindowTuplesSet{
  1923. xsql.WindowTuples{
  1924. Emitter: "test",
  1925. Tuples: []xsql.Tuple{
  1926. {
  1927. Emitter: "src1",
  1928. Message: xsql.Message{"a": 53, "s": 123203},
  1929. }, {
  1930. Emitter: "src1",
  1931. Message: xsql.Message{"a": 27},
  1932. }, {
  1933. Emitter: "src1",
  1934. Message: xsql.Message{"a": 53},
  1935. },
  1936. },
  1937. },
  1938. },
  1939. result: []map[string]interface{}{{}},
  1940. },
  1941. }
  1942. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1943. contextLogger := common.Log.WithField("rule", "TestProjectPlan_AggFuncs")
  1944. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  1945. for i, tt := range tests {
  1946. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1947. if err != nil {
  1948. t.Error(err)
  1949. }
  1950. pp := &ProjectOp{Fields: stmt.Fields, IsAggregate: true, isTest: true}
  1951. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  1952. result := pp.Apply(ctx, tt.data, fv, afv)
  1953. var mapRes []map[string]interface{}
  1954. if v, ok := result.([]byte); ok {
  1955. err := json.Unmarshal(v, &mapRes)
  1956. if err != nil {
  1957. t.Errorf("Failed to parse the input into map.\n")
  1958. continue
  1959. }
  1960. //fmt.Printf("%t\n", mapRes["kuiper_field_0"])
  1961. if !reflect.DeepEqual(tt.result, mapRes) {
  1962. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  1963. }
  1964. } else {
  1965. t.Errorf("%d. %q\n\nThe returned result is not type of []byte: %#v\n", i, tt.sql, result)
  1966. }
  1967. }
  1968. }
  1969. func TestProjectPlanError(t *testing.T) {
  1970. var tests = []struct {
  1971. sql string
  1972. data interface{}
  1973. result interface{}
  1974. }{
  1975. {
  1976. sql: "SELECT a FROM test",
  1977. data: errors.New("an error from upstream"),
  1978. result: errors.New("an error from upstream"),
  1979. }, {
  1980. sql: "SELECT a * 5 FROM test",
  1981. data: &xsql.Tuple{
  1982. Emitter: "test",
  1983. Message: xsql.Message{
  1984. "a": "val_a",
  1985. },
  1986. },
  1987. result: errors.New("run Select error: invalid operation string(val_a) * int64(5)"),
  1988. }, {
  1989. sql: `SELECT a[0]->b AS ab FROM test`,
  1990. data: &xsql.Tuple{
  1991. Emitter: "test",
  1992. Message: xsql.Message{
  1993. "a": "common string",
  1994. },
  1995. },
  1996. result: errors.New("run Select error: invalid operation string(common string) [] *xsql.BracketEvalResult(&{0 0})"),
  1997. }, {
  1998. sql: `SELECT round(a) as r FROM test`,
  1999. data: &xsql.Tuple{
  2000. Emitter: "test",
  2001. Message: xsql.Message{
  2002. "a": "common string",
  2003. },
  2004. },
  2005. result: errors.New("run Select error: call func round error: only float64 & int type are supported"),
  2006. }, {
  2007. sql: `SELECT round(a) as r FROM test`,
  2008. data: &xsql.Tuple{
  2009. Emitter: "test",
  2010. Message: xsql.Message{
  2011. "abc": "common string",
  2012. },
  2013. },
  2014. result: errors.New("run Select error: call func round error: only float64 & int type are supported"),
  2015. }, {
  2016. sql: "SELECT avg(a) as avg FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  2017. data: xsql.GroupedTuplesSet{
  2018. {
  2019. &xsql.JoinTuple{
  2020. Tuples: []xsql.Tuple{
  2021. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  2022. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  2023. },
  2024. },
  2025. &xsql.JoinTuple{
  2026. Tuples: []xsql.Tuple{
  2027. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.54}},
  2028. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  2029. },
  2030. },
  2031. &xsql.JoinTuple{
  2032. Tuples: []xsql.Tuple{
  2033. {Emitter: "test", Message: xsql.Message{"id": 4, "a": "dde"}},
  2034. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w2"}},
  2035. },
  2036. },
  2037. &xsql.JoinTuple{
  2038. Tuples: []xsql.Tuple{
  2039. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.54}},
  2040. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  2041. },
  2042. },
  2043. },
  2044. {
  2045. &xsql.JoinTuple{
  2046. Tuples: []xsql.Tuple{
  2047. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
  2048. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  2049. },
  2050. },
  2051. &xsql.JoinTuple{
  2052. Tuples: []xsql.Tuple{
  2053. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  2054. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  2055. },
  2056. },
  2057. },
  2058. },
  2059. result: errors.New("run Select error: call func avg error: requires float64 but found string(dde)"),
  2060. }, {
  2061. sql: "SELECT sum(a) as sum FROM test GROUP BY TumblingWindow(ss, 10)",
  2062. data: xsql.WindowTuplesSet{
  2063. xsql.WindowTuples{
  2064. Emitter: "test",
  2065. Tuples: []xsql.Tuple{
  2066. {
  2067. Emitter: "src1",
  2068. Message: xsql.Message{"a": 53},
  2069. }, {
  2070. Emitter: "src1",
  2071. Message: xsql.Message{"a": "ddd"},
  2072. }, {
  2073. Emitter: "src1",
  2074. Message: xsql.Message{"a": 123123},
  2075. },
  2076. },
  2077. },
  2078. },
  2079. result: errors.New("run Select error: call func sum error: requires int but found string(ddd)"),
  2080. }, {
  2081. sql: `SELECT a[0]->b AS ab FROM test`,
  2082. data: &xsql.Tuple{
  2083. Emitter: "test",
  2084. Message: xsql.Message{
  2085. "a": []map[string]interface{}(nil),
  2086. },
  2087. },
  2088. result: errors.New("run Select error: out of index: 0 of 0"),
  2089. },
  2090. }
  2091. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  2092. contextLogger := common.Log.WithField("rule", "TestProjectPlanError")
  2093. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  2094. for i, tt := range tests {
  2095. stmt, _ := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  2096. pp := &ProjectOp{Fields: stmt.Fields, IsAggregate: xsql.IsAggStatement(stmt)}
  2097. pp.isTest = true
  2098. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  2099. result := pp.Apply(ctx, tt.data, fv, afv)
  2100. if !reflect.DeepEqual(tt.result, result) {
  2101. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  2102. }
  2103. }
  2104. }