project_test.go 58 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package operator
  15. import (
  16. "encoding/json"
  17. "errors"
  18. "fmt"
  19. "github.com/lf-edge/ekuiper/internal/conf"
  20. "github.com/lf-edge/ekuiper/internal/topo/context"
  21. "github.com/lf-edge/ekuiper/internal/xsql"
  22. "github.com/lf-edge/ekuiper/pkg/ast"
  23. "github.com/lf-edge/ekuiper/pkg/cast"
  24. "reflect"
  25. "strings"
  26. "testing"
  27. )
  28. func TestProjectPlan_Apply1(t *testing.T) {
  29. var tests = []struct {
  30. sql string
  31. data *xsql.Tuple
  32. result []map[string]interface{}
  33. }{
  34. {
  35. sql: "SELECT a FROM test",
  36. data: &xsql.Tuple{
  37. Emitter: "test",
  38. Message: xsql.Message{
  39. "a": "val_a",
  40. },
  41. Metadata: xsql.Metadata{
  42. "id": 45,
  43. "other": "mock",
  44. },
  45. },
  46. result: []map[string]interface{}{{
  47. "a": "val_a",
  48. "__meta": map[string]interface{}{
  49. "id": float64(45),
  50. "other": "mock",
  51. },
  52. }},
  53. },
  54. {
  55. sql: "SELECT b FROM test",
  56. data: &xsql.Tuple{
  57. Emitter: "test",
  58. Message: xsql.Message{
  59. "a": "val_a",
  60. },
  61. },
  62. result: []map[string]interface{}{{}},
  63. },
  64. {
  65. sql: "SELECT ts FROM test",
  66. data: &xsql.Tuple{
  67. Emitter: "test",
  68. Message: xsql.Message{
  69. "a": "val_a",
  70. "ts": cast.TimeFromUnixMilli(1568854573431),
  71. },
  72. },
  73. result: []map[string]interface{}{{
  74. "ts": "2019-09-19T00:56:13.431Z",
  75. }},
  76. },
  77. //Schemaless may return a message without selecting column
  78. {
  79. sql: "SELECT ts FROM test",
  80. data: &xsql.Tuple{
  81. Emitter: "test",
  82. Message: xsql.Message{
  83. "a": "val_a",
  84. "ts2": cast.TimeFromUnixMilli(1568854573431),
  85. },
  86. },
  87. result: []map[string]interface{}{{}},
  88. },
  89. {
  90. sql: "SELECT A FROM test",
  91. data: &xsql.Tuple{
  92. Emitter: "test",
  93. Message: xsql.Message{
  94. "a": "val_a",
  95. },
  96. },
  97. result: []map[string]interface{}{{
  98. "A": "val_a",
  99. }},
  100. },
  101. {
  102. sql: `SELECT "value" FROM test`,
  103. data: &xsql.Tuple{
  104. Emitter: "test",
  105. Message: xsql.Message{},
  106. },
  107. result: []map[string]interface{}{{
  108. "": "value",
  109. }},
  110. },
  111. {
  112. sql: `SELECT 3.4 FROM test`,
  113. data: &xsql.Tuple{
  114. Emitter: "test",
  115. Message: xsql.Message{},
  116. },
  117. result: []map[string]interface{}{{
  118. "": 3.4,
  119. }},
  120. },
  121. {
  122. sql: `SELECT 5 FROM test`,
  123. data: &xsql.Tuple{
  124. Emitter: "test",
  125. Message: xsql.Message{},
  126. },
  127. result: []map[string]interface{}{{
  128. "": 5.0,
  129. }},
  130. },
  131. {
  132. sql: `SELECT a, "value" AS b FROM test`,
  133. data: &xsql.Tuple{
  134. Emitter: "test",
  135. Message: xsql.Message{
  136. "a": "val_a",
  137. },
  138. },
  139. result: []map[string]interface{}{{
  140. "a": "val_a",
  141. "b": "value",
  142. }},
  143. },
  144. {
  145. sql: `SELECT a, "value" AS b, 3.14 as Pi, 0 as Zero FROM test`,
  146. data: &xsql.Tuple{
  147. Emitter: "test",
  148. Message: xsql.Message{
  149. "a": "val_a",
  150. },
  151. },
  152. result: []map[string]interface{}{{
  153. "a": "val_a",
  154. "b": "value",
  155. "Pi": 3.14,
  156. "Zero": 0.0,
  157. }},
  158. },
  159. {
  160. sql: `SELECT a->b AS ab FROM test`,
  161. data: &xsql.Tuple{
  162. Emitter: "test",
  163. Message: xsql.Message{
  164. "a": map[string]interface{}{"b": "hello"},
  165. },
  166. },
  167. result: []map[string]interface{}{{
  168. "ab": "hello",
  169. }},
  170. },
  171. {
  172. sql: `SELECT a->b AS ab FROM test`,
  173. data: &xsql.Tuple{
  174. Emitter: "test",
  175. Message: xsql.Message{
  176. "a": map[string]interface{}(nil),
  177. },
  178. },
  179. result: []map[string]interface{}{{}},
  180. },
  181. {
  182. sql: `SELECT a->b AS ab FROM test`,
  183. data: &xsql.Tuple{
  184. Emitter: "test",
  185. Message: xsql.Message{
  186. "name": "name",
  187. },
  188. },
  189. result: []map[string]interface{}{{}},
  190. },
  191. {
  192. sql: `SELECT a->b AS ab FROM test`,
  193. data: &xsql.Tuple{
  194. Emitter: "test",
  195. Message: xsql.Message{
  196. "a": "commonstring",
  197. },
  198. },
  199. result: []map[string]interface{}{{}},
  200. },
  201. {
  202. sql: `SELECT a[0]->b AS ab FROM test`,
  203. data: &xsql.Tuple{
  204. Emitter: "test",
  205. Message: xsql.Message{
  206. "a": []interface{}{
  207. map[string]interface{}{"b": "hello1"},
  208. map[string]interface{}{"b": "hello2"},
  209. },
  210. },
  211. },
  212. result: []map[string]interface{}{{
  213. "ab": "hello1",
  214. }},
  215. },
  216. {
  217. sql: `SELECT a[0]->b AS ab FROM test`,
  218. data: &xsql.Tuple{
  219. Emitter: "test",
  220. Message: xsql.Message{
  221. "a": []map[string]interface{}{
  222. {"b": "hello1"},
  223. {"b": "hello2"},
  224. },
  225. },
  226. },
  227. result: []map[string]interface{}{{
  228. "ab": "hello1",
  229. }},
  230. },
  231. {
  232. sql: `SELECT a[2:4] AS ab FROM test`,
  233. data: &xsql.Tuple{
  234. Emitter: "test",
  235. Message: xsql.Message{
  236. "a": []map[string]interface{}{
  237. {"b": "hello1"},
  238. {"b": "hello2"},
  239. {"b": "hello3"},
  240. {"b": "hello4"},
  241. {"b": "hello5"},
  242. },
  243. },
  244. },
  245. result: []map[string]interface{}{{
  246. "ab": []interface{}{
  247. map[string]interface{}{"b": "hello3"},
  248. map[string]interface{}{"b": "hello4"},
  249. },
  250. }},
  251. },
  252. {
  253. sql: `SELECT a[2:] AS ab FROM test`,
  254. data: &xsql.Tuple{
  255. Emitter: "test",
  256. Message: xsql.Message{
  257. "a": []map[string]interface{}{
  258. {"b": "hello1"},
  259. {"b": "hello2"},
  260. {"b": "hello3"},
  261. {"b": "hello4"},
  262. {"b": "hello5"},
  263. },
  264. },
  265. },
  266. result: []map[string]interface{}{{
  267. "ab": []interface{}{
  268. map[string]interface{}{"b": "hello3"},
  269. map[string]interface{}{"b": "hello4"},
  270. map[string]interface{}{"b": "hello5"},
  271. },
  272. }},
  273. },
  274. {
  275. sql: `SELECT a[2:] AS ab FROM test`,
  276. data: &xsql.Tuple{
  277. Emitter: "test",
  278. Message: xsql.Message{
  279. "a": []interface{}{
  280. true, false, true, false, true, true,
  281. },
  282. },
  283. },
  284. result: []map[string]interface{}{{
  285. "ab": []interface{}{
  286. true, false, true, true,
  287. },
  288. }},
  289. },
  290. {
  291. sql: `SELECT a[:4] AS ab FROM test`,
  292. data: &xsql.Tuple{
  293. Emitter: "test",
  294. Message: xsql.Message{
  295. "a": []interface{}{
  296. true, false, true, false, true, true,
  297. },
  298. },
  299. },
  300. result: []map[string]interface{}{{
  301. "ab": []interface{}{
  302. true, false, true, false,
  303. },
  304. }},
  305. },
  306. {
  307. sql: `SELECT a[:4] AS ab FROM test`,
  308. data: &xsql.Tuple{
  309. Emitter: "test",
  310. Message: xsql.Message{
  311. "a": []interface{}{
  312. 3.14, 3.141, 3.1415, 3.14159, 3.141592, 3.1415926,
  313. },
  314. },
  315. },
  316. result: []map[string]interface{}{{
  317. "ab": []interface{}{
  318. 3.14, 3.141, 3.1415, 3.14159,
  319. },
  320. }},
  321. },
  322. {
  323. sql: `SELECT a->b[:4] AS ab FROM test`,
  324. data: &xsql.Tuple{
  325. Emitter: "test",
  326. Message: xsql.Message{
  327. "a": map[string]interface{}{
  328. "b": []float64{3.14, 3.141, 3.1415, 3.14159, 3.141592, 3.1415926},
  329. },
  330. },
  331. },
  332. result: []map[string]interface{}{{
  333. "ab": []interface{}{
  334. 3.14, 3.141, 3.1415, 3.14159,
  335. },
  336. }},
  337. },
  338. {
  339. sql: `SELECT a->b[0:1] AS ab FROM test`,
  340. data: &xsql.Tuple{
  341. Emitter: "test",
  342. Message: xsql.Message{
  343. "a": map[string]interface{}{
  344. "b": []float64{3.14, 3.141, 3.1415, 3.14159, 3.141592, 3.1415926},
  345. },
  346. },
  347. },
  348. result: []map[string]interface{}{{
  349. "ab": []interface{}{
  350. 3.14,
  351. },
  352. }},
  353. },
  354. {
  355. sql: `SELECT a->c->d AS f1 FROM test`,
  356. data: &xsql.Tuple{
  357. Emitter: "test",
  358. Message: xsql.Message{
  359. "a": map[string]interface{}{
  360. "b": "hello",
  361. "c": map[string]interface{}{
  362. "d": 35.2,
  363. },
  364. },
  365. },
  366. },
  367. result: []map[string]interface{}{{
  368. "f1": 35.2,
  369. }},
  370. },
  371. {
  372. sql: `SELECT a->c->d AS f1 FROM test`,
  373. data: &xsql.Tuple{
  374. Emitter: "test",
  375. Message: xsql.Message{
  376. "a": map[string]interface{}{
  377. "b": "hello",
  378. "c": map[string]interface{}{
  379. "e": 35.2,
  380. },
  381. },
  382. },
  383. },
  384. result: []map[string]interface{}{{}},
  385. },
  386. {
  387. sql: `SELECT a->c->d AS f1 FROM test`,
  388. data: &xsql.Tuple{
  389. Emitter: "test",
  390. Message: xsql.Message{
  391. "a": map[string]interface{}{
  392. "b": "hello",
  393. },
  394. },
  395. },
  396. result: []map[string]interface{}{{}},
  397. },
  398. //The int type is not supported yet, the json parser returns float64 for int values
  399. {
  400. sql: `SELECT a->c->d AS f1 FROM test`,
  401. data: &xsql.Tuple{
  402. Emitter: "test",
  403. Message: xsql.Message{
  404. "a": map[string]interface{}{
  405. "b": "hello",
  406. "c": map[string]interface{}{
  407. "d": float64(35),
  408. },
  409. },
  410. },
  411. },
  412. result: []map[string]interface{}{{
  413. "f1": float64(35),
  414. }},
  415. },
  416. {
  417. sql: "SELECT a FROM test",
  418. data: &xsql.Tuple{
  419. Emitter: "test",
  420. Message: xsql.Message{},
  421. },
  422. result: []map[string]interface{}{
  423. {},
  424. },
  425. },
  426. {
  427. sql: "SELECT * FROM test",
  428. data: &xsql.Tuple{
  429. Emitter: "test",
  430. Message: xsql.Message{},
  431. },
  432. result: []map[string]interface{}{
  433. {},
  434. },
  435. },
  436. {
  437. sql: `SELECT * FROM test`,
  438. data: &xsql.Tuple{
  439. Emitter: "test",
  440. Message: xsql.Message{
  441. "a": map[string]interface{}{
  442. "b": "hello",
  443. "c": map[string]interface{}{
  444. "d": 35.2,
  445. },
  446. },
  447. },
  448. },
  449. result: []map[string]interface{}{{
  450. "a": map[string]interface{}{
  451. "b": "hello",
  452. "c": map[string]interface{}{
  453. "d": 35.2,
  454. },
  455. },
  456. }},
  457. },
  458. {
  459. sql: `SELECT * FROM test`,
  460. data: &xsql.Tuple{
  461. Emitter: "test",
  462. Message: xsql.Message{
  463. "a": "val1",
  464. "b": 3.14,
  465. },
  466. },
  467. result: []map[string]interface{}{{
  468. "a": "val1",
  469. "b": 3.14,
  470. }},
  471. },
  472. {
  473. sql: `SELECT 3*4 AS f1 FROM test`,
  474. data: &xsql.Tuple{
  475. Emitter: "test",
  476. Message: xsql.Message{},
  477. },
  478. result: []map[string]interface{}{{
  479. "f1": float64(12),
  480. }},
  481. },
  482. {
  483. sql: `SELECT 4.5*2 AS f1 FROM test`,
  484. data: &xsql.Tuple{
  485. Emitter: "test",
  486. Message: xsql.Message{},
  487. },
  488. result: []map[string]interface{}{{
  489. "f1": float64(9),
  490. }},
  491. },
  492. {
  493. sql: "SELECT `a.b.c` FROM test",
  494. data: &xsql.Tuple{
  495. Emitter: "test",
  496. Message: xsql.Message{
  497. "a.b.c": "val_a",
  498. },
  499. },
  500. result: []map[string]interface{}{{
  501. "a.b.c": "val_a",
  502. }},
  503. },
  504. {
  505. sql: `SELECT CASE a WHEN 10 THEN "true" END AS b FROM test`,
  506. data: &xsql.Tuple{
  507. Emitter: "test",
  508. Message: xsql.Message{
  509. "a": int64(10),
  510. },
  511. },
  512. result: []map[string]interface{}{{
  513. "b": "true",
  514. }},
  515. },
  516. }
  517. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  518. contextLogger := conf.Log.WithField("rule", "TestProjectPlan_Apply1")
  519. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  520. for i, tt := range tests {
  521. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  522. if err != nil {
  523. t.Errorf("parse sql error: %s", err)
  524. continue
  525. }
  526. pp := &ProjectOp{Fields: stmt.Fields, SendMeta: true}
  527. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  528. result := pp.Apply(ctx, tt.data, fv, afv)
  529. var mapRes []map[string]interface{}
  530. if v, ok := result.([]byte); ok {
  531. err := json.Unmarshal(v, &mapRes)
  532. if err != nil {
  533. t.Errorf("Failed to parse the input into map.\n")
  534. continue
  535. }
  536. //fmt.Printf("%t\n", mapRes["kuiper_field_0"])
  537. if !reflect.DeepEqual(tt.result, mapRes) {
  538. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  539. }
  540. } else {
  541. t.Errorf("%d. The returned result %#v is not type of []byte\n", result, i)
  542. }
  543. }
  544. }
  545. func TestProjectPlan_MultiInput(t *testing.T) {
  546. var tests = []struct {
  547. sql string
  548. data interface{}
  549. result []map[string]interface{}
  550. }{
  551. {
  552. sql: "SELECT * FROM tbl WHERE abc*2+3 > 12 AND abc < 20",
  553. data: &xsql.Tuple{
  554. Emitter: "tbl",
  555. Message: xsql.Message{
  556. "abc": int64(6),
  557. },
  558. },
  559. result: []map[string]interface{}{{
  560. "abc": float64(6), //json marshall problem
  561. }},
  562. },
  563. {
  564. sql: "SELECT abc FROM tbl WHERE abc*2+3 > 12 OR def = \"hello\"",
  565. data: &xsql.Tuple{
  566. Emitter: "tbl",
  567. Message: xsql.Message{
  568. "abc": int64(34),
  569. "def": "hello",
  570. },
  571. },
  572. result: []map[string]interface{}{{
  573. "abc": float64(34),
  574. }},
  575. },
  576. {
  577. sql: "SELECT id1 FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  578. data: xsql.WindowTuplesSet{
  579. Content: []xsql.WindowTuples{
  580. {
  581. Emitter: "src1",
  582. Tuples: []xsql.Tuple{
  583. {
  584. Emitter: "src1",
  585. Message: xsql.Message{"id1": 1, "f1": "v1"},
  586. }, {
  587. Emitter: "src1",
  588. Message: xsql.Message{"id1": 2, "f1": "v2"},
  589. }, {
  590. Emitter: "src1",
  591. Message: xsql.Message{"id1": 3, "f1": "v1"},
  592. },
  593. },
  594. },
  595. },
  596. },
  597. result: []map[string]interface{}{{
  598. "id1": float64(1),
  599. }, {
  600. "id1": float64(2),
  601. }, {
  602. "id1": float64(3),
  603. }},
  604. },
  605. {
  606. sql: "SELECT id1 FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  607. data: xsql.WindowTuplesSet{
  608. Content: []xsql.WindowTuples{
  609. {
  610. Emitter: "src1",
  611. Tuples: []xsql.Tuple{
  612. {
  613. Emitter: "src1",
  614. Message: xsql.Message{"id1": 1, "f1": "v1"},
  615. }, {
  616. Emitter: "src1",
  617. Message: xsql.Message{"id2": 2, "f1": "v2"},
  618. }, {
  619. Emitter: "src1",
  620. Message: xsql.Message{"id1": 3, "f1": "v1"},
  621. },
  622. },
  623. },
  624. },
  625. },
  626. result: []map[string]interface{}{{
  627. "id1": float64(1),
  628. }, {}, {
  629. "id1": float64(3),
  630. }},
  631. },
  632. {
  633. sql: "SELECT * FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  634. data: xsql.WindowTuplesSet{
  635. Content: []xsql.WindowTuples{
  636. {
  637. Emitter: "src1",
  638. Tuples: []xsql.Tuple{
  639. {
  640. Emitter: "src1",
  641. Message: xsql.Message{"id1": 1, "f1": "v1"},
  642. }, {
  643. Emitter: "src1",
  644. Message: xsql.Message{"id1": 2, "f1": "v2"},
  645. }, {
  646. Emitter: "src1",
  647. Message: xsql.Message{"id1": 3, "f1": "v1"},
  648. },
  649. },
  650. },
  651. },
  652. },
  653. result: []map[string]interface{}{{
  654. "id1": float64(1),
  655. "f1": "v1",
  656. }, {
  657. "id1": float64(2),
  658. "f1": "v2",
  659. }, {
  660. "id1": float64(3),
  661. "f1": "v1",
  662. }},
  663. },
  664. {
  665. sql: "SELECT * FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  666. data: xsql.WindowTuplesSet{
  667. Content: []xsql.WindowTuples{
  668. {
  669. Emitter: "src1",
  670. Tuples: []xsql.Tuple{
  671. {
  672. Emitter: "src1",
  673. Message: xsql.Message{"id1": 1, "f1": "v1"},
  674. }, {
  675. Emitter: "src1",
  676. Message: xsql.Message{"id2": 2, "f2": "v2"},
  677. }, {
  678. Emitter: "src1",
  679. Message: xsql.Message{"id1": 3, "f1": "v1"},
  680. },
  681. },
  682. },
  683. },
  684. },
  685. result: []map[string]interface{}{{
  686. "id1": float64(1),
  687. "f1": "v1",
  688. }, {
  689. "id2": float64(2),
  690. "f2": "v2",
  691. }, {
  692. "id1": float64(3),
  693. "f1": "v1",
  694. }},
  695. },
  696. {
  697. sql: "SELECT src1.* FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  698. data: xsql.WindowTuplesSet{
  699. Content: []xsql.WindowTuples{
  700. {
  701. Emitter: "src1",
  702. Tuples: []xsql.Tuple{
  703. {
  704. Emitter: "src1",
  705. Message: xsql.Message{"id1": 1, "f1": "v1"},
  706. }, {
  707. Emitter: "src1",
  708. Message: xsql.Message{"id1": 2, "f1": "v2"},
  709. }, {
  710. Emitter: "src1",
  711. Message: xsql.Message{"id1": 3, "f1": "v1"},
  712. },
  713. },
  714. },
  715. },
  716. },
  717. result: []map[string]interface{}{{
  718. "id1": float64(1),
  719. "f1": "v1",
  720. }, {
  721. "id1": float64(2),
  722. "f1": "v2",
  723. }, {
  724. "id1": float64(3),
  725. "f1": "v1",
  726. }},
  727. },
  728. {
  729. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  730. data: &xsql.JoinTupleSets{
  731. Content: []xsql.JoinTuple{
  732. {
  733. Tuples: []xsql.Tuple{
  734. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  735. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  736. },
  737. },
  738. {
  739. Tuples: []xsql.Tuple{
  740. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  741. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  742. },
  743. },
  744. {
  745. Tuples: []xsql.Tuple{
  746. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  747. },
  748. },
  749. },
  750. },
  751. result: []map[string]interface{}{{
  752. "id1": float64(1),
  753. }, {
  754. "id1": float64(2),
  755. }, {
  756. "id1": float64(3),
  757. }},
  758. },
  759. {
  760. sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
  761. data: &xsql.JoinTupleSets{
  762. Content: []xsql.JoinTuple{
  763. {
  764. Tuples: []xsql.Tuple{
  765. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  766. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  767. },
  768. },
  769. {
  770. Tuples: []xsql.Tuple{
  771. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  772. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  773. },
  774. },
  775. {
  776. Tuples: []xsql.Tuple{
  777. {Emitter: "src1", Message: xsql.Message{"id2": 3, "f1": "v1"}},
  778. },
  779. },
  780. },
  781. },
  782. result: []map[string]interface{}{{
  783. "id1": float64(1),
  784. }, {
  785. "id1": float64(2),
  786. }, {}},
  787. },
  788. {
  789. sql: "SELECT abc FROM tbl group by abc",
  790. data: xsql.GroupedTuplesSet{
  791. {
  792. Content: []xsql.DataValuer{
  793. &xsql.Tuple{
  794. Emitter: "tbl",
  795. Message: xsql.Message{
  796. "abc": int64(6),
  797. "def": "hello",
  798. },
  799. },
  800. },
  801. },
  802. },
  803. result: []map[string]interface{}{{
  804. "abc": float64(6),
  805. }},
  806. },
  807. {
  808. sql: "SELECT abc FROM tbl group by abc",
  809. data: xsql.GroupedTuplesSet{
  810. {
  811. Content: []xsql.DataValuer{
  812. &xsql.Tuple{
  813. Emitter: "tbl",
  814. Message: xsql.Message{
  815. "def": "hello",
  816. },
  817. },
  818. },
  819. },
  820. },
  821. result: []map[string]interface{}{{}},
  822. },
  823. {
  824. sql: "SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1",
  825. data: xsql.GroupedTuplesSet{
  826. {
  827. Content: []xsql.DataValuer{
  828. &xsql.Tuple{
  829. Emitter: "src1",
  830. Message: xsql.Message{"id1": 1, "f1": "v1"},
  831. },
  832. &xsql.Tuple{
  833. Emitter: "src1",
  834. Message: xsql.Message{"id1": 3, "f1": "v1"},
  835. },
  836. },
  837. },
  838. {
  839. Content: []xsql.DataValuer{
  840. &xsql.Tuple{
  841. Emitter: "src1",
  842. Message: xsql.Message{"id1": 2, "f1": "v2"},
  843. },
  844. },
  845. },
  846. },
  847. result: []map[string]interface{}{{
  848. "id1": float64(1),
  849. }, {
  850. "id1": float64(2),
  851. }},
  852. },
  853. {
  854. sql: "SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1",
  855. data: xsql.GroupedTuplesSet{
  856. {
  857. Content: []xsql.DataValuer{
  858. &xsql.Tuple{
  859. Emitter: "src1",
  860. Message: xsql.Message{"id1": 1, "f1": "v1"},
  861. },
  862. &xsql.Tuple{
  863. Emitter: "src1",
  864. Message: xsql.Message{"id1": 3, "f1": "v1"},
  865. },
  866. },
  867. },
  868. {
  869. Content: []xsql.DataValuer{
  870. &xsql.Tuple{
  871. Emitter: "src1",
  872. Message: xsql.Message{"id2": 2, "f1": "v2"},
  873. },
  874. },
  875. },
  876. },
  877. result: []map[string]interface{}{{
  878. "id1": float64(1),
  879. }, {}},
  880. },
  881. {
  882. sql: "SELECT src2.id2 FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10)",
  883. data: xsql.GroupedTuplesSet{
  884. {
  885. Content: []xsql.DataValuer{
  886. &xsql.JoinTuple{
  887. Tuples: []xsql.Tuple{
  888. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  889. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  890. },
  891. },
  892. },
  893. },
  894. {
  895. Content: []xsql.DataValuer{
  896. &xsql.JoinTuple{
  897. Tuples: []xsql.Tuple{
  898. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  899. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  900. },
  901. },
  902. },
  903. },
  904. {
  905. Content: []xsql.DataValuer{
  906. &xsql.JoinTuple{
  907. Tuples: []xsql.Tuple{
  908. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  909. },
  910. },
  911. },
  912. },
  913. },
  914. result: []map[string]interface{}{{
  915. "id2": float64(2),
  916. }, {
  917. "id2": float64(4),
  918. }, {}},
  919. },
  920. {
  921. sql: "SELECT src1.*, f2 FROM src1 left join src2 GROUP BY TUMBLINGWINDOW(ss, 10)",
  922. data: &xsql.JoinTupleSets{
  923. Content: []xsql.JoinTuple{
  924. {
  925. Tuples: []xsql.Tuple{
  926. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  927. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  928. },
  929. },
  930. {
  931. Tuples: []xsql.Tuple{
  932. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  933. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  934. },
  935. },
  936. {
  937. Tuples: []xsql.Tuple{
  938. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  939. },
  940. },
  941. },
  942. },
  943. result: []map[string]interface{}{{
  944. "id1": float64(1),
  945. "f1": "v1",
  946. "f2": "w2",
  947. }, {
  948. "id1": float64(2),
  949. "f1": "v2",
  950. "f2": "w3",
  951. }, {
  952. "id1": float64(3),
  953. "f1": "v1",
  954. }},
  955. },
  956. {
  957. sql: "SELECT * FROM src1 left join src2 GROUP BY TUMBLINGWINDOW(ss, 10)",
  958. data: &xsql.JoinTupleSets{
  959. Content: []xsql.JoinTuple{
  960. {
  961. Tuples: []xsql.Tuple{
  962. {Emitter: "src1", Message: xsql.Message{"id": 1, "f1": "v1"}},
  963. {Emitter: "src2", Message: xsql.Message{"id": 2, "f2": "w2"}},
  964. },
  965. },
  966. {
  967. Tuples: []xsql.Tuple{
  968. {Emitter: "src1", Message: xsql.Message{"id": 2, "f1": "v2"}},
  969. {Emitter: "src2", Message: xsql.Message{"id": 4, "f2": "w3"}},
  970. },
  971. },
  972. {
  973. Tuples: []xsql.Tuple{
  974. {Emitter: "src1", Message: xsql.Message{"id": 3, "f1": "v1"}},
  975. },
  976. },
  977. },
  978. },
  979. result: []map[string]interface{}{{
  980. "id": float64(1),
  981. "f1": "v1",
  982. "f2": "w2",
  983. }, {
  984. "id": float64(2),
  985. "f1": "v2",
  986. "f2": "w3",
  987. }, {
  988. "id": float64(3),
  989. "f1": "v1",
  990. }},
  991. },
  992. {
  993. sql: "SELECT src1.* FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1",
  994. data: xsql.GroupedTuplesSet{
  995. {
  996. Content: []xsql.DataValuer{
  997. &xsql.Tuple{
  998. Emitter: "src1",
  999. Message: xsql.Message{"id1": 1, "f1": "v1"},
  1000. },
  1001. &xsql.Tuple{
  1002. Emitter: "src1",
  1003. Message: xsql.Message{"id1": 3, "f1": "v1"},
  1004. },
  1005. },
  1006. },
  1007. {
  1008. Content: []xsql.DataValuer{
  1009. &xsql.Tuple{
  1010. Emitter: "src1",
  1011. Message: xsql.Message{"id1": 2, "f1": "v2"},
  1012. },
  1013. },
  1014. },
  1015. },
  1016. result: []map[string]interface{}{{
  1017. "id1": float64(1),
  1018. "f1": "v1",
  1019. }, {
  1020. "id1": float64(2),
  1021. "f1": "v2",
  1022. }},
  1023. },
  1024. {
  1025. sql: "SELECT src2.id2, src1.* FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10)",
  1026. data: xsql.GroupedTuplesSet{
  1027. {
  1028. Content: []xsql.DataValuer{
  1029. &xsql.JoinTuple{
  1030. Tuples: []xsql.Tuple{
  1031. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1032. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1033. },
  1034. },
  1035. },
  1036. },
  1037. {
  1038. Content: []xsql.DataValuer{
  1039. &xsql.JoinTuple{
  1040. Tuples: []xsql.Tuple{
  1041. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1042. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1043. },
  1044. },
  1045. },
  1046. },
  1047. {
  1048. Content: []xsql.DataValuer{
  1049. &xsql.JoinTuple{
  1050. Tuples: []xsql.Tuple{
  1051. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  1052. },
  1053. },
  1054. },
  1055. },
  1056. },
  1057. result: []map[string]interface{}{{
  1058. "id2": float64(2),
  1059. "id1": float64(1),
  1060. "f1": "v1",
  1061. }, {
  1062. "id2": float64(4),
  1063. "id1": float64(2),
  1064. "f1": "v2",
  1065. }, {
  1066. "id1": float64(3),
  1067. "f1": "v1",
  1068. }},
  1069. },
  1070. {
  1071. sql: "SELECT src2.id2, src1.* FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10)",
  1072. data: xsql.GroupedTuplesSet{
  1073. {
  1074. Content: []xsql.DataValuer{
  1075. &xsql.JoinTuple{
  1076. Tuples: []xsql.Tuple{
  1077. {Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
  1078. {Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
  1079. },
  1080. },
  1081. },
  1082. },
  1083. {
  1084. Content: []xsql.DataValuer{
  1085. &xsql.JoinTuple{
  1086. Tuples: []xsql.Tuple{
  1087. {Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
  1088. {Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
  1089. },
  1090. },
  1091. },
  1092. },
  1093. {
  1094. Content: []xsql.DataValuer{
  1095. &xsql.JoinTuple{
  1096. Tuples: []xsql.Tuple{
  1097. {Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
  1098. },
  1099. },
  1100. },
  1101. },
  1102. },
  1103. result: []map[string]interface{}{{
  1104. "id2": float64(2),
  1105. "id1": float64(1),
  1106. "f1": "v1",
  1107. }, {
  1108. "id2": float64(4),
  1109. "id1": float64(2),
  1110. "f1": "v2",
  1111. }, {
  1112. "id1": float64(3),
  1113. "f1": "v1",
  1114. }},
  1115. },
  1116. }
  1117. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1118. contextLogger := conf.Log.WithField("rule", "TestProjectPlan_MultiInput")
  1119. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  1120. for i, tt := range tests {
  1121. stmt, _ := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1122. pp := &ProjectOp{Fields: stmt.Fields}
  1123. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  1124. result := pp.Apply(ctx, tt.data, fv, afv)
  1125. var mapRes []map[string]interface{}
  1126. if v, ok := result.([]byte); ok {
  1127. err := json.Unmarshal(v, &mapRes)
  1128. if err != nil {
  1129. t.Errorf("Failed to parse the input into map.\n")
  1130. continue
  1131. }
  1132. //fmt.Printf("%t\n", mapRes["kuiper_field_0"])
  1133. if !reflect.DeepEqual(tt.result, mapRes) {
  1134. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  1135. }
  1136. } else {
  1137. t.Errorf("The returned result is not type of []byte\n")
  1138. }
  1139. }
  1140. }
  1141. func TestProjectPlan_Funcs(t *testing.T) {
  1142. var tests = []struct {
  1143. sql string
  1144. data interface{}
  1145. result []map[string]interface{}
  1146. }{
  1147. {
  1148. sql: "SELECT round(a) as r FROM test",
  1149. data: &xsql.Tuple{
  1150. Emitter: "test",
  1151. Message: xsql.Message{
  1152. "a": 47.5,
  1153. },
  1154. },
  1155. result: []map[string]interface{}{{
  1156. "r": float64(48),
  1157. }},
  1158. }, {
  1159. sql: "SELECT round(a) as r FROM test GROUP BY TumblingWindow(ss, 10)",
  1160. data: xsql.WindowTuplesSet{
  1161. Content: []xsql.WindowTuples{
  1162. {
  1163. Emitter: "test",
  1164. Tuples: []xsql.Tuple{
  1165. {
  1166. Emitter: "src1",
  1167. Message: xsql.Message{"a": 53.1},
  1168. }, {
  1169. Emitter: "src1",
  1170. Message: xsql.Message{"a": 27.4},
  1171. }, {
  1172. Emitter: "src1",
  1173. Message: xsql.Message{"a": 123123.7},
  1174. },
  1175. },
  1176. },
  1177. },
  1178. },
  1179. result: []map[string]interface{}{{
  1180. "r": float64(53),
  1181. }, {
  1182. "r": float64(27),
  1183. }, {
  1184. "r": float64(123124),
  1185. }},
  1186. }, {
  1187. sql: "SELECT round(a) as r FROM test GROUP BY TumblingWindow(ss, 10)",
  1188. data: xsql.WindowTuplesSet{
  1189. Content: []xsql.WindowTuples{
  1190. {
  1191. Emitter: "test",
  1192. Tuples: []xsql.Tuple{
  1193. {
  1194. Emitter: "src1",
  1195. Message: xsql.Message{"a": 53.1},
  1196. }, {
  1197. Emitter: "src1",
  1198. Message: xsql.Message{"a": 27.4},
  1199. }, {
  1200. Emitter: "src1",
  1201. Message: xsql.Message{"a": 123123.7},
  1202. },
  1203. },
  1204. },
  1205. },
  1206. },
  1207. result: []map[string]interface{}{{
  1208. "r": float64(53),
  1209. }, {
  1210. "r": float64(27),
  1211. }, {
  1212. "r": float64(123124),
  1213. }},
  1214. }, {
  1215. sql: "SELECT round(a) as r FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  1216. data: &xsql.JoinTupleSets{
  1217. Content: []xsql.JoinTuple{
  1218. {
  1219. Tuples: []xsql.Tuple{
  1220. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 65.55}},
  1221. {Emitter: "test1", Message: xsql.Message{"id": 1, "b": 12}},
  1222. },
  1223. },
  1224. {
  1225. Tuples: []xsql.Tuple{
  1226. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 73.499}},
  1227. {Emitter: "test1", Message: xsql.Message{"id": 2, "b": 34}},
  1228. },
  1229. },
  1230. {
  1231. Tuples: []xsql.Tuple{
  1232. {Emitter: "test", Message: xsql.Message{"id": 3, "a": 88.88}},
  1233. {Emitter: "test1", Message: xsql.Message{"id": 3, "b": 6}},
  1234. },
  1235. },
  1236. },
  1237. },
  1238. result: []map[string]interface{}{{
  1239. "r": float64(66),
  1240. }, {
  1241. "r": float64(73),
  1242. }, {
  1243. "r": float64(89),
  1244. }},
  1245. }, {
  1246. sql: "SELECT CONCAT(test.id, test.a, test1.b) as concat FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  1247. data: &xsql.JoinTupleSets{
  1248. Content: []xsql.JoinTuple{
  1249. {
  1250. Tuples: []xsql.Tuple{
  1251. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 65.55}},
  1252. {Emitter: "test1", Message: xsql.Message{"id": 1, "b": 12}},
  1253. },
  1254. },
  1255. {
  1256. Tuples: []xsql.Tuple{
  1257. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 73.499}},
  1258. {Emitter: "test1", Message: xsql.Message{"id": 2, "b": 34}},
  1259. },
  1260. },
  1261. {
  1262. Tuples: []xsql.Tuple{
  1263. {Emitter: "test", Message: xsql.Message{"id": 3, "a": 88.88}},
  1264. {Emitter: "test1", Message: xsql.Message{"id": 3, "b": 6}},
  1265. },
  1266. },
  1267. },
  1268. },
  1269. result: []map[string]interface{}{{
  1270. "concat": "165.5512",
  1271. }, {
  1272. "concat": "273.49934",
  1273. }, {
  1274. "concat": "388.886",
  1275. }},
  1276. }, {
  1277. sql: "SELECT count(a) as r FROM test",
  1278. data: &xsql.Tuple{
  1279. Emitter: "test",
  1280. Message: xsql.Message{
  1281. "a": 47.5,
  1282. },
  1283. },
  1284. result: []map[string]interface{}{{
  1285. "r": float64(1),
  1286. }},
  1287. }, {
  1288. sql: "SELECT meta(test.device) as d FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  1289. data: &xsql.JoinTupleSets{
  1290. Content: []xsql.JoinTuple{
  1291. {
  1292. Tuples: []xsql.Tuple{
  1293. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 65.55}, Metadata: xsql.Metadata{"device": "devicea"}},
  1294. {Emitter: "test1", Message: xsql.Message{"id": 1, "b": 12}},
  1295. },
  1296. },
  1297. {
  1298. Tuples: []xsql.Tuple{
  1299. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 73.499}, Metadata: xsql.Metadata{"device": "deviceb"}},
  1300. {Emitter: "test1", Message: xsql.Message{"id": 2, "b": 34}},
  1301. },
  1302. },
  1303. {
  1304. Tuples: []xsql.Tuple{
  1305. {Emitter: "test", Message: xsql.Message{"id": 3, "a": 88.88}, Metadata: xsql.Metadata{"device": "devicec"}},
  1306. {Emitter: "test1", Message: xsql.Message{"id": 3, "b": 6}},
  1307. },
  1308. },
  1309. },
  1310. },
  1311. result: []map[string]interface{}{{
  1312. "d": "devicea",
  1313. }, {
  1314. "d": "deviceb",
  1315. }, {
  1316. "d": "devicec",
  1317. }},
  1318. },
  1319. }
  1320. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1321. contextLogger := conf.Log.WithField("rule", "TestProjectPlan_Funcs")
  1322. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  1323. for i, tt := range tests {
  1324. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  1325. if err != nil {
  1326. t.Error(err)
  1327. }
  1328. pp := &ProjectOp{Fields: stmt.Fields, IsAggregate: ast.IsAggStatement(stmt)}
  1329. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  1330. result := pp.Apply(ctx, tt.data, fv, afv)
  1331. var mapRes []map[string]interface{}
  1332. if v, ok := result.([]byte); ok {
  1333. err := json.Unmarshal(v, &mapRes)
  1334. if err != nil {
  1335. t.Errorf("Failed to parse the input into map.\n")
  1336. continue
  1337. }
  1338. //fmt.Printf("%t\n", mapRes["kuiper_field_0"])
  1339. if !reflect.DeepEqual(tt.result, mapRes) {
  1340. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  1341. }
  1342. } else {
  1343. t.Errorf("%d. The returned result is not type of []byte\n", i)
  1344. }
  1345. }
  1346. }
  1347. func TestProjectPlan_AggFuncs(t *testing.T) {
  1348. var tests = []struct {
  1349. sql string
  1350. data interface{}
  1351. result []map[string]interface{}
  1352. }{
  1353. {
  1354. sql: "SELECT count(*) as c, round(a) as r, window_start() as ws, window_end() as we FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1355. data: xsql.GroupedTuplesSet{
  1356. {
  1357. Content: []xsql.DataValuer{
  1358. &xsql.JoinTuple{
  1359. Tuples: []xsql.Tuple{
  1360. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "r": 122}},
  1361. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1362. },
  1363. },
  1364. &xsql.JoinTuple{
  1365. Tuples: []xsql.Tuple{
  1366. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1367. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1368. },
  1369. },
  1370. },
  1371. WindowRange: &xsql.WindowRange{
  1372. WindowStart: 1541152486013,
  1373. WindowEnd: 1541152487013,
  1374. },
  1375. },
  1376. {
  1377. Content: []xsql.DataValuer{
  1378. &xsql.JoinTuple{
  1379. Tuples: []xsql.Tuple{
  1380. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "r": 89}},
  1381. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1382. },
  1383. },
  1384. &xsql.JoinTuple{
  1385. Tuples: []xsql.Tuple{
  1386. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1387. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1388. },
  1389. },
  1390. },
  1391. WindowRange: &xsql.WindowRange{
  1392. WindowStart: 1541152486013,
  1393. WindowEnd: 1541152487013,
  1394. },
  1395. },
  1396. },
  1397. result: []map[string]interface{}{{
  1398. "c": float64(2),
  1399. "r": float64(122),
  1400. "ws": float64(1541152486013),
  1401. "we": float64(1541152487013),
  1402. }, {
  1403. "c": float64(2),
  1404. "r": float64(89),
  1405. "ws": float64(1541152486013),
  1406. "we": float64(1541152487013),
  1407. }},
  1408. },
  1409. {
  1410. sql: "SELECT count(a) as c, avg(a) as a, sum(a) as s, min(a) as min, max(a) as max FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1411. data: xsql.GroupedTuplesSet{
  1412. {
  1413. Content: []xsql.DataValuer{
  1414. &xsql.JoinTuple{
  1415. Tuples: []xsql.Tuple{
  1416. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "r": 122}},
  1417. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1418. },
  1419. },
  1420. &xsql.JoinTuple{
  1421. Tuples: []xsql.Tuple{
  1422. {Emitter: "test", Message: xsql.Message{"id": 5}},
  1423. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1424. },
  1425. },
  1426. },
  1427. },
  1428. {
  1429. Content: []xsql.DataValuer{
  1430. &xsql.JoinTuple{
  1431. Tuples: []xsql.Tuple{
  1432. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "r": 89}},
  1433. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1434. },
  1435. },
  1436. &xsql.JoinTuple{
  1437. Tuples: []xsql.Tuple{
  1438. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1439. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1440. },
  1441. },
  1442. },
  1443. },
  1444. },
  1445. result: []map[string]interface{}{{
  1446. "c": float64(1),
  1447. "a": 122.33,
  1448. "s": 122.33,
  1449. "min": 122.33,
  1450. "max": 122.33,
  1451. }, {
  1452. "c": float64(2),
  1453. "s": 103.63,
  1454. "a": 51.815,
  1455. "min": 14.6,
  1456. "max": 89.03,
  1457. }},
  1458. },
  1459. {
  1460. sql: "SELECT avg(a) FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1461. data: xsql.GroupedTuplesSet{
  1462. {
  1463. Content: []xsql.DataValuer{
  1464. &xsql.JoinTuple{
  1465. Tuples: []xsql.Tuple{
  1466. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  1467. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1468. },
  1469. },
  1470. &xsql.JoinTuple{
  1471. Tuples: []xsql.Tuple{
  1472. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.54}},
  1473. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1474. },
  1475. },
  1476. &xsql.JoinTuple{
  1477. Tuples: []xsql.Tuple{
  1478. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 98.31}},
  1479. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w2"}},
  1480. },
  1481. },
  1482. &xsql.JoinTuple{
  1483. Tuples: []xsql.Tuple{
  1484. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.54}},
  1485. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1486. },
  1487. },
  1488. },
  1489. },
  1490. {
  1491. Content: []xsql.DataValuer{
  1492. &xsql.JoinTuple{
  1493. Tuples: []xsql.Tuple{
  1494. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
  1495. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1496. },
  1497. },
  1498. &xsql.JoinTuple{
  1499. Tuples: []xsql.Tuple{
  1500. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1501. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1502. },
  1503. },
  1504. },
  1505. },
  1506. },
  1507. result: []map[string]interface{}{{
  1508. "avg": 116.68,
  1509. }, {
  1510. "avg": 51.815,
  1511. }},
  1512. },
  1513. {
  1514. sql: "SELECT max(a) FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1515. data: xsql.GroupedTuplesSet{
  1516. {
  1517. Content: []xsql.DataValuer{
  1518. &xsql.JoinTuple{
  1519. Tuples: []xsql.Tuple{
  1520. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  1521. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1522. },
  1523. },
  1524. &xsql.JoinTuple{
  1525. Tuples: []xsql.Tuple{
  1526. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.55}},
  1527. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1528. },
  1529. },
  1530. &xsql.JoinTuple{
  1531. Tuples: []xsql.Tuple{
  1532. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1533. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1534. },
  1535. },
  1536. },
  1537. },
  1538. {
  1539. Content: []xsql.DataValuer{
  1540. &xsql.JoinTuple{
  1541. Tuples: []xsql.Tuple{
  1542. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
  1543. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1544. },
  1545. },
  1546. &xsql.JoinTuple{
  1547. Tuples: []xsql.Tuple{
  1548. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1549. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1550. },
  1551. },
  1552. },
  1553. },
  1554. },
  1555. result: []map[string]interface{}{{
  1556. "max": 177.51,
  1557. }, {
  1558. "max": 89.03,
  1559. }},
  1560. },
  1561. {
  1562. sql: "SELECT min(a), window_start(), window_end() FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  1563. data: &xsql.JoinTupleSets{
  1564. Content: []xsql.JoinTuple{
  1565. {
  1566. Tuples: []xsql.Tuple{
  1567. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  1568. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1569. },
  1570. },
  1571. {
  1572. Tuples: []xsql.Tuple{
  1573. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.55}},
  1574. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1575. },
  1576. },
  1577. {
  1578. Tuples: []xsql.Tuple{
  1579. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1580. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1581. },
  1582. },
  1583. },
  1584. WindowRange: &xsql.WindowRange{
  1585. WindowStart: 1541152486013,
  1586. WindowEnd: 1541152487013,
  1587. },
  1588. },
  1589. result: []map[string]interface{}{{
  1590. "min": 68.55,
  1591. "window_start": float64(1541152486013),
  1592. "window_end": float64(1541152487013),
  1593. }},
  1594. }, {
  1595. sql: "SELECT count(*) as all, count(a) as c, avg(a) as a, sum(a) as s, min(a) as min, max(a) as max FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
  1596. data: &xsql.JoinTupleSets{
  1597. Content: []xsql.JoinTuple{
  1598. {
  1599. Tuples: []xsql.Tuple{
  1600. {Emitter: "test", Message: xsql.Message{"id": 1}},
  1601. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1602. },
  1603. },
  1604. {
  1605. Tuples: []xsql.Tuple{
  1606. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.55}},
  1607. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1608. },
  1609. },
  1610. {
  1611. Tuples: []xsql.Tuple{
  1612. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1613. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1614. },
  1615. },
  1616. },
  1617. },
  1618. result: []map[string]interface{}{{
  1619. "all": float64(3),
  1620. "c": float64(2),
  1621. "a": 123.03,
  1622. "s": 246.06,
  1623. "min": 68.55,
  1624. "max": 177.51,
  1625. }},
  1626. }, {
  1627. sql: "SELECT sum(a), window_start() as ws, window_end() FROM test GROUP BY TumblingWindow(ss, 10)",
  1628. data: xsql.WindowTuplesSet{
  1629. Content: []xsql.WindowTuples{{
  1630. Emitter: "test",
  1631. Tuples: []xsql.Tuple{
  1632. {
  1633. Emitter: "src1",
  1634. Message: xsql.Message{"a": 53},
  1635. }, {
  1636. Emitter: "src1",
  1637. Message: xsql.Message{"a": 27},
  1638. }, {
  1639. Emitter: "src1",
  1640. Message: xsql.Message{"a": 123123},
  1641. },
  1642. },
  1643. },
  1644. },
  1645. WindowRange: &xsql.WindowRange{
  1646. WindowStart: 1541152486013,
  1647. WindowEnd: 1541152487013,
  1648. },
  1649. },
  1650. result: []map[string]interface{}{{
  1651. "sum": float64(123203),
  1652. "ws": float64(1541152486013),
  1653. "window_end": float64(1541152487013),
  1654. }},
  1655. }, {
  1656. sql: "SELECT sum(a) as s FROM test GROUP BY TumblingWindow(ss, 10)",
  1657. data: xsql.WindowTuplesSet{
  1658. Content: []xsql.WindowTuples{{
  1659. Emitter: "test",
  1660. Tuples: []xsql.Tuple{
  1661. {
  1662. Emitter: "src1",
  1663. Message: xsql.Message{"a": 53, "s": 123203},
  1664. }, {
  1665. Emitter: "src1",
  1666. Message: xsql.Message{"a": 27},
  1667. }, {
  1668. Emitter: "src1",
  1669. Message: xsql.Message{"a": 123123},
  1670. },
  1671. },
  1672. },
  1673. },
  1674. },
  1675. result: []map[string]interface{}{{
  1676. "s": float64(123203),
  1677. }},
  1678. }, {
  1679. sql: "SELECT sum(a) FROM test GROUP BY TumblingWindow(ss, 10)",
  1680. data: xsql.WindowTuplesSet{
  1681. Content: []xsql.WindowTuples{{
  1682. Emitter: "test",
  1683. Tuples: []xsql.Tuple{
  1684. {
  1685. Emitter: "src1",
  1686. Message: xsql.Message{"a": 53},
  1687. }, {
  1688. Emitter: "src1",
  1689. Message: xsql.Message{"a": 27},
  1690. }, {
  1691. Emitter: "src1",
  1692. Message: xsql.Message{"a": 123123},
  1693. },
  1694. },
  1695. },
  1696. },
  1697. },
  1698. result: []map[string]interface{}{{
  1699. "sum": float64(123203),
  1700. }},
  1701. }, {
  1702. sql: "SELECT count(*) as all, count(a) as c, avg(a) as a, sum(a) as s, min(a) as min, max(a) as max FROM test GROUP BY TumblingWindow(ss, 10)",
  1703. data: xsql.WindowTuplesSet{
  1704. Content: []xsql.WindowTuples{{
  1705. Emitter: "test",
  1706. Tuples: []xsql.Tuple{
  1707. {
  1708. Emitter: "src1",
  1709. Message: xsql.Message{"a": 53},
  1710. }, {
  1711. Emitter: "src1",
  1712. Message: xsql.Message{"a": 27},
  1713. }, {
  1714. Emitter: "src1",
  1715. Message: xsql.Message{"s": 123123},
  1716. },
  1717. },
  1718. },
  1719. },
  1720. },
  1721. result: []map[string]interface{}{{
  1722. "all": float64(3),
  1723. "c": float64(2),
  1724. "a": float64(40),
  1725. "s": float64(80),
  1726. "min": float64(27),
  1727. "max": float64(53),
  1728. }},
  1729. },
  1730. {
  1731. sql: "SELECT count(*), meta(test1.device) FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1732. data: xsql.GroupedTuplesSet{
  1733. {
  1734. Content: []xsql.DataValuer{
  1735. &xsql.JoinTuple{
  1736. Tuples: []xsql.Tuple{
  1737. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  1738. {Emitter: "test1", Message: xsql.Message{"id": 1, "color": "w2"}, Metadata: xsql.Metadata{"device": "devicea"}},
  1739. },
  1740. },
  1741. &xsql.JoinTuple{
  1742. Tuples: []xsql.Tuple{
  1743. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1744. {Emitter: "test1", Message: xsql.Message{"id": 5, "color": "w2"}, Metadata: xsql.Metadata{"device": "deviceb"}},
  1745. },
  1746. },
  1747. },
  1748. },
  1749. {
  1750. Content: []xsql.DataValuer{
  1751. &xsql.JoinTuple{
  1752. Tuples: []xsql.Tuple{
  1753. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
  1754. {Emitter: "test1", Message: xsql.Message{"id": 2, "color": "w1"}, Metadata: xsql.Metadata{"device": "devicec"}},
  1755. },
  1756. },
  1757. &xsql.JoinTuple{
  1758. Tuples: []xsql.Tuple{
  1759. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1760. {Emitter: "test1", Message: xsql.Message{"id": 4, "color": "w1"}, Metadata: xsql.Metadata{"device": "deviced"}},
  1761. },
  1762. },
  1763. },
  1764. },
  1765. },
  1766. result: []map[string]interface{}{{
  1767. "count": float64(2),
  1768. "meta": "devicea",
  1769. }, {
  1770. "count": float64(2),
  1771. "meta": "devicec",
  1772. }},
  1773. },
  1774. {
  1775. sql: "SELECT count(*) as c, meta(test1.device) as d FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1776. data: xsql.GroupedTuplesSet{
  1777. {
  1778. Content: []xsql.DataValuer{
  1779. &xsql.JoinTuple{
  1780. Tuples: []xsql.Tuple{
  1781. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "d": "devicea"}},
  1782. {Emitter: "test1", Message: xsql.Message{"id": 1, "color": "w2"}, Metadata: xsql.Metadata{"device": "devicea"}},
  1783. },
  1784. },
  1785. &xsql.JoinTuple{
  1786. Tuples: []xsql.Tuple{
  1787. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1788. {Emitter: "test1", Message: xsql.Message{"id": 5, "color": "w2"}, Metadata: xsql.Metadata{"device": "deviceb"}},
  1789. },
  1790. },
  1791. },
  1792. },
  1793. {
  1794. Content: []xsql.DataValuer{
  1795. &xsql.JoinTuple{
  1796. Tuples: []xsql.Tuple{
  1797. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "d": "devicec"}},
  1798. {Emitter: "test1", Message: xsql.Message{"id": 2, "color": "w1"}, Metadata: xsql.Metadata{"device": "devicec"}},
  1799. },
  1800. },
  1801. &xsql.JoinTuple{
  1802. Tuples: []xsql.Tuple{
  1803. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1804. {Emitter: "test1", Message: xsql.Message{"id": 4, "color": "w1"}, Metadata: xsql.Metadata{"device": "deviced"}},
  1805. },
  1806. },
  1807. },
  1808. },
  1809. },
  1810. result: []map[string]interface{}{{
  1811. "c": float64(2),
  1812. "d": "devicea",
  1813. }, {
  1814. "c": float64(2),
  1815. "d": "devicec",
  1816. }},
  1817. }, {
  1818. sql: "SELECT * FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1819. data: xsql.GroupedTuplesSet{
  1820. {
  1821. Content: []xsql.DataValuer{
  1822. &xsql.JoinTuple{
  1823. Tuples: []xsql.Tuple{
  1824. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "r": 122}},
  1825. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1826. },
  1827. },
  1828. &xsql.JoinTuple{
  1829. Tuples: []xsql.Tuple{
  1830. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1831. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1832. },
  1833. },
  1834. },
  1835. WindowRange: &xsql.WindowRange{
  1836. WindowStart: 1541152486013,
  1837. WindowEnd: 1541152487013,
  1838. },
  1839. },
  1840. {
  1841. Content: []xsql.DataValuer{
  1842. &xsql.JoinTuple{
  1843. Tuples: []xsql.Tuple{
  1844. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "r": 89}},
  1845. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1846. },
  1847. },
  1848. &xsql.JoinTuple{
  1849. Tuples: []xsql.Tuple{
  1850. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1851. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1852. },
  1853. },
  1854. },
  1855. WindowRange: &xsql.WindowRange{
  1856. WindowStart: 1541152486013,
  1857. WindowEnd: 1541152487013,
  1858. },
  1859. },
  1860. },
  1861. result: []map[string]interface{}{{
  1862. "a": 122.33,
  1863. "c": float64(2),
  1864. "color": "w2",
  1865. "id": float64(1),
  1866. "r": float64(122),
  1867. }, {
  1868. "a": 89.03,
  1869. "c": float64(2),
  1870. "color": "w1",
  1871. "id": float64(2),
  1872. "r": float64(89),
  1873. }},
  1874. }, {
  1875. sql: "SELECT collect(a) as r1 FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1876. data: xsql.GroupedTuplesSet{
  1877. {
  1878. Content: []xsql.DataValuer{
  1879. &xsql.JoinTuple{
  1880. Tuples: []xsql.Tuple{
  1881. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "r": 122}},
  1882. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1883. },
  1884. },
  1885. &xsql.JoinTuple{
  1886. Tuples: []xsql.Tuple{
  1887. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  1888. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  1889. },
  1890. },
  1891. },
  1892. },
  1893. {
  1894. Content: []xsql.DataValuer{
  1895. &xsql.JoinTuple{
  1896. Tuples: []xsql.Tuple{
  1897. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "r": 89}},
  1898. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  1899. },
  1900. },
  1901. &xsql.JoinTuple{
  1902. Tuples: []xsql.Tuple{
  1903. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  1904. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  1905. },
  1906. },
  1907. },
  1908. },
  1909. },
  1910. result: []map[string]interface{}{{
  1911. "r1": []interface{}{122.33, 177.51},
  1912. }, {"r1": []interface{}{89.03, 14.6}}},
  1913. }, {
  1914. sql: "SELECT collect(*)[1] as c1 FROM test GROUP BY TumblingWindow(ss, 10)",
  1915. data: xsql.WindowTuplesSet{
  1916. Content: []xsql.WindowTuples{
  1917. {
  1918. Emitter: "test",
  1919. Tuples: []xsql.Tuple{
  1920. {
  1921. Emitter: "src1",
  1922. Message: xsql.Message{"a": 53, "s": 123203},
  1923. }, {
  1924. Emitter: "src1",
  1925. Message: xsql.Message{"a": 27},
  1926. }, {
  1927. Emitter: "src1",
  1928. Message: xsql.Message{"a": 123123},
  1929. },
  1930. },
  1931. },
  1932. },
  1933. WindowRange: &xsql.WindowRange{
  1934. WindowStart: 1541152486013,
  1935. WindowEnd: 1541152487013,
  1936. },
  1937. },
  1938. result: []map[string]interface{}{{
  1939. "c1": map[string]interface{}{
  1940. "a": float64(27),
  1941. },
  1942. }},
  1943. }, {
  1944. sql: "SELECT collect(*)[1]->a as c1 FROM test GROUP BY TumblingWindow(ss, 10)",
  1945. data: xsql.WindowTuplesSet{
  1946. Content: []xsql.WindowTuples{{
  1947. Emitter: "test",
  1948. Tuples: []xsql.Tuple{
  1949. {
  1950. Emitter: "src1",
  1951. Message: xsql.Message{"a": 53, "s": 123203},
  1952. }, {
  1953. Emitter: "src1",
  1954. Message: xsql.Message{"a": 27},
  1955. }, {
  1956. Emitter: "src1",
  1957. Message: xsql.Message{"a": 123123},
  1958. },
  1959. },
  1960. },
  1961. },
  1962. },
  1963. result: []map[string]interface{}{{
  1964. "c1": float64(27),
  1965. }},
  1966. }, {
  1967. sql: "SELECT collect(*)[1]->sl[0] as c1 FROM test GROUP BY TumblingWindow(ss, 10)",
  1968. data: xsql.WindowTuplesSet{
  1969. Content: []xsql.WindowTuples{{
  1970. Emitter: "test",
  1971. Tuples: []xsql.Tuple{
  1972. {
  1973. Emitter: "src1",
  1974. Message: xsql.Message{"a": 53, "sl": []string{"hello", "world"}},
  1975. }, {
  1976. Emitter: "src1",
  1977. Message: xsql.Message{"a": 27, "sl": []string{"new", "horizon"}},
  1978. }, {
  1979. Emitter: "src1",
  1980. Message: xsql.Message{"a": 123123, "sl": []string{"south", "africa"}},
  1981. },
  1982. },
  1983. },
  1984. },
  1985. },
  1986. result: []map[string]interface{}{{
  1987. "c1": "new",
  1988. }},
  1989. }, {
  1990. sql: "SELECT deduplicate(id, true) as r1 FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  1991. data: xsql.GroupedTuplesSet{
  1992. {
  1993. Content: []xsql.DataValuer{
  1994. &xsql.JoinTuple{
  1995. Tuples: []xsql.Tuple{
  1996. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33, "c": 2, "r": 122}},
  1997. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  1998. },
  1999. },
  2000. &xsql.JoinTuple{
  2001. Tuples: []xsql.Tuple{
  2002. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
  2003. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  2004. },
  2005. },
  2006. },
  2007. },
  2008. {
  2009. Content: []xsql.DataValuer{
  2010. &xsql.JoinTuple{
  2011. Tuples: []xsql.Tuple{
  2012. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03, "c": 2, "r": 89}},
  2013. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  2014. },
  2015. },
  2016. &xsql.JoinTuple{
  2017. Tuples: []xsql.Tuple{
  2018. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  2019. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  2020. },
  2021. },
  2022. },
  2023. },
  2024. },
  2025. result: []map[string]interface{}{
  2026. {
  2027. "r1": []interface{}{
  2028. map[string]interface{}{"a": 122.33, "c": float64(2), "color": "w2", "id": float64(1), "r": float64(122)},
  2029. map[string]interface{}{"a": 177.51, "color": "w2", "id": float64(5)}},
  2030. }, {
  2031. "r1": []interface{}{
  2032. map[string]interface{}{"a": 89.03, "c": float64(2), "color": "w1", "id": float64(2), "r": float64(89)},
  2033. map[string]interface{}{"a": 14.6, "color": "w1", "id": float64(4)}},
  2034. },
  2035. },
  2036. }, {
  2037. sql: "SELECT deduplicate(a, false)->a as c1 FROM test GROUP BY TumblingWindow(ss, 10)",
  2038. data: xsql.WindowTuplesSet{
  2039. Content: []xsql.WindowTuples{{
  2040. Emitter: "test",
  2041. Tuples: []xsql.Tuple{
  2042. {
  2043. Emitter: "src1",
  2044. Message: xsql.Message{"a": 53, "s": 123203},
  2045. }, {
  2046. Emitter: "src1",
  2047. Message: xsql.Message{"a": 27},
  2048. }, {
  2049. Emitter: "src1",
  2050. Message: xsql.Message{"a": 123123},
  2051. },
  2052. },
  2053. },
  2054. },
  2055. },
  2056. result: []map[string]interface{}{{
  2057. "c1": float64(123123),
  2058. }},
  2059. }, {
  2060. sql: "SELECT deduplicate(a, false) as c1 FROM test GROUP BY TumblingWindow(ss, 10)",
  2061. data: xsql.WindowTuplesSet{
  2062. Content: []xsql.WindowTuples{{
  2063. Emitter: "test",
  2064. Tuples: []xsql.Tuple{
  2065. {
  2066. Emitter: "src1",
  2067. Message: xsql.Message{"a": 53, "s": 123203},
  2068. }, {
  2069. Emitter: "src1",
  2070. Message: xsql.Message{"a": 27},
  2071. }, {
  2072. Emitter: "src1",
  2073. Message: xsql.Message{"a": 53},
  2074. },
  2075. },
  2076. },
  2077. },
  2078. },
  2079. result: []map[string]interface{}{{}},
  2080. }, {
  2081. sql: "SELECT deduplicate(a, false) as c1 FROM test GROUP BY TumblingWindow(ss, 10)",
  2082. data: xsql.WindowTuplesSet{
  2083. Content: []xsql.WindowTuples{{
  2084. Emitter: "test",
  2085. Tuples: []xsql.Tuple{
  2086. {
  2087. Emitter: "src1",
  2088. Message: xsql.Message{"a": 53, "s": 123203},
  2089. }, {
  2090. Emitter: "src1",
  2091. Message: xsql.Message{"a": 27},
  2092. }, {
  2093. Emitter: "src1",
  2094. Message: xsql.Message{"a": 53},
  2095. },
  2096. },
  2097. },
  2098. },
  2099. },
  2100. result: []map[string]interface{}{{}},
  2101. },
  2102. }
  2103. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  2104. contextLogger := conf.Log.WithField("rule", "TestProjectPlan_AggFuncs")
  2105. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  2106. for i, tt := range tests {
  2107. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  2108. if err != nil {
  2109. t.Error(err)
  2110. }
  2111. pp := &ProjectOp{Fields: stmt.Fields, IsAggregate: true}
  2112. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  2113. result := pp.Apply(ctx, tt.data, fv, afv)
  2114. var mapRes []map[string]interface{}
  2115. if v, ok := result.([]byte); ok {
  2116. err := json.Unmarshal(v, &mapRes)
  2117. if err != nil {
  2118. t.Errorf("Failed to parse the input into map.\n")
  2119. continue
  2120. }
  2121. //fmt.Printf("%t\n", mapRes["kuiper_field_0"])
  2122. if !reflect.DeepEqual(tt.result, mapRes) {
  2123. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  2124. }
  2125. } else {
  2126. t.Errorf("%d. %q\n\nThe returned result is not type of []byte: %#v\n", i, tt.sql, result)
  2127. }
  2128. }
  2129. }
  2130. func TestProjectPlanError(t *testing.T) {
  2131. var tests = []struct {
  2132. sql string
  2133. data interface{}
  2134. result interface{}
  2135. }{
  2136. {
  2137. sql: "SELECT a FROM test",
  2138. data: errors.New("an error from upstream"),
  2139. result: errors.New("an error from upstream"),
  2140. }, {
  2141. sql: "SELECT a * 5 FROM test",
  2142. data: &xsql.Tuple{
  2143. Emitter: "test",
  2144. Message: xsql.Message{
  2145. "a": "val_a",
  2146. },
  2147. },
  2148. result: errors.New("run Select error: invalid operation string(val_a) * int64(5)"),
  2149. }, {
  2150. sql: `SELECT a[0]->b AS ab FROM test`,
  2151. data: &xsql.Tuple{
  2152. Emitter: "test",
  2153. Message: xsql.Message{
  2154. "a": "common string",
  2155. },
  2156. },
  2157. result: errors.New("run Select error: invalid operation string(common string) [] *xsql.BracketEvalResult(&{0 0})"),
  2158. }, {
  2159. sql: `SELECT round(a) as r FROM test`,
  2160. data: &xsql.Tuple{
  2161. Emitter: "test",
  2162. Message: xsql.Message{
  2163. "a": "common string",
  2164. },
  2165. },
  2166. result: errors.New("run Select error: call func round error: only float64 & int type are supported"),
  2167. }, {
  2168. sql: `SELECT round(a) as r FROM test`,
  2169. data: &xsql.Tuple{
  2170. Emitter: "test",
  2171. Message: xsql.Message{
  2172. "abc": "common string",
  2173. },
  2174. },
  2175. result: errors.New("run Select error: call func round error: only float64 & int type are supported"),
  2176. }, {
  2177. sql: "SELECT avg(a) as avg FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
  2178. data: xsql.GroupedTuplesSet{
  2179. {
  2180. Content: []xsql.DataValuer{
  2181. &xsql.JoinTuple{
  2182. Tuples: []xsql.Tuple{
  2183. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
  2184. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  2185. },
  2186. },
  2187. &xsql.JoinTuple{
  2188. Tuples: []xsql.Tuple{
  2189. {Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.54}},
  2190. {Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
  2191. },
  2192. },
  2193. &xsql.JoinTuple{
  2194. Tuples: []xsql.Tuple{
  2195. {Emitter: "test", Message: xsql.Message{"id": 4, "a": "dde"}},
  2196. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w2"}},
  2197. },
  2198. },
  2199. &xsql.JoinTuple{
  2200. Tuples: []xsql.Tuple{
  2201. {Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.54}},
  2202. {Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
  2203. },
  2204. },
  2205. },
  2206. },
  2207. {
  2208. Content: []xsql.DataValuer{
  2209. &xsql.JoinTuple{
  2210. Tuples: []xsql.Tuple{
  2211. {Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
  2212. {Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
  2213. },
  2214. },
  2215. &xsql.JoinTuple{
  2216. Tuples: []xsql.Tuple{
  2217. {Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
  2218. {Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
  2219. },
  2220. },
  2221. },
  2222. },
  2223. },
  2224. result: errors.New("run Select error: call func avg error: requires float64 but found string(dde)"),
  2225. }, {
  2226. sql: "SELECT sum(a) as sum FROM test GROUP BY TumblingWindow(ss, 10)",
  2227. data: xsql.WindowTuplesSet{
  2228. Content: []xsql.WindowTuples{{
  2229. Emitter: "test",
  2230. Tuples: []xsql.Tuple{
  2231. {
  2232. Emitter: "src1",
  2233. Message: xsql.Message{"a": 53},
  2234. }, {
  2235. Emitter: "src1",
  2236. Message: xsql.Message{"a": "ddd"},
  2237. }, {
  2238. Emitter: "src1",
  2239. Message: xsql.Message{"a": 123123},
  2240. },
  2241. },
  2242. },
  2243. },
  2244. },
  2245. result: errors.New("run Select error: call func sum error: requires int but found string(ddd)"),
  2246. }, {
  2247. sql: `SELECT a[0]->b AS ab FROM test`,
  2248. data: &xsql.Tuple{
  2249. Emitter: "test",
  2250. Message: xsql.Message{
  2251. "a": []map[string]interface{}(nil),
  2252. },
  2253. },
  2254. result: errors.New("run Select error: out of index: 0 of 0"),
  2255. },
  2256. }
  2257. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  2258. contextLogger := conf.Log.WithField("rule", "TestProjectPlanError")
  2259. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  2260. for i, tt := range tests {
  2261. stmt, _ := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  2262. pp := &ProjectOp{Fields: stmt.Fields, IsAggregate: ast.IsAggStatement(stmt)}
  2263. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  2264. result := pp.Apply(ctx, tt.data, fv, afv)
  2265. if !reflect.DeepEqual(tt.result, result) {
  2266. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  2267. }
  2268. }
  2269. }