xsql_processor_test.go 79 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994
  1. package processors
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/emqx/kuiper/xstream"
  8. "github.com/emqx/kuiper/xstream/api"
  9. "github.com/emqx/kuiper/xstream/nodes"
  10. "github.com/emqx/kuiper/xstream/test"
  11. "path"
  12. "reflect"
  13. "strings"
  14. "testing"
  15. "time"
  16. )
  17. var DbDir = getDbDir()
  18. func getDbDir() string {
  19. dbDir, err := common.GetAndCreateDataLoc("test")
  20. if err != nil {
  21. log.Panic(err)
  22. }
  23. log.Infof("db location is %s", dbDir)
  24. return dbDir
  25. }
  26. func TestStreamCreateProcessor(t *testing.T) {
  27. var tests = []struct {
  28. s string
  29. r []string
  30. err string
  31. }{
  32. {
  33. s: `SHOW STREAMS;`,
  34. r: []string{"No stream definitions are found."},
  35. },
  36. {
  37. s: `EXPLAIN STREAM topic1;`,
  38. err: "Stream topic1 is not found.",
  39. },
  40. {
  41. s: `CREATE STREAM topic1 (
  42. USERID BIGINT,
  43. FIRST_NAME STRING,
  44. LAST_NAME STRING,
  45. NICKNAMES ARRAY(STRING),
  46. Gender BOOLEAN,
  47. ADDRESS STRUCT(STREET_NAME STRING, NUMBER BIGINT),
  48. ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID");`,
  49. r: []string{"Stream topic1 is created."},
  50. },
  51. {
  52. s: `CREATE STREAM topic1 (
  53. USERID BIGINT,
  54. ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID");`,
  55. err: "Create stream fails: Item topic1 already exists.",
  56. },
  57. {
  58. s: `EXPLAIN STREAM topic1;`,
  59. r: []string{"TO BE SUPPORTED"},
  60. },
  61. {
  62. s: `DESCRIBE STREAM topic1;`,
  63. r: []string{"Fields\n--------------------------------------------------------------------------------\nUSERID\tbigint\nFIRST_NAME\tstring\nLAST_NAME\tstring\nNICKNAMES\t" +
  64. "array(string)\nGender\tboolean\nADDRESS\tstruct(STREET_NAME string, NUMBER bigint)\n\n" +
  65. "DATASOURCE: users\nFORMAT: AVRO\nKEY: USERID\n"},
  66. },
  67. {
  68. s: `SHOW STREAMS;`,
  69. r: []string{"topic1"},
  70. },
  71. {
  72. s: `DROP STREAM topic1;`,
  73. r: []string{"Stream topic1 is dropped."},
  74. },
  75. {
  76. s: `DESCRIBE STREAM topic1;`,
  77. err: "Stream topic1 is not found.",
  78. },
  79. {
  80. s: `DROP STREAM topic1;`,
  81. err: "Drop stream fails: topic1 is not found.",
  82. },
  83. }
  84. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  85. streamDB := path.Join(getDbDir(), "streamTest")
  86. for i, tt := range tests {
  87. results, err := NewStreamProcessor(streamDB).ExecStmt(tt.s)
  88. if !reflect.DeepEqual(tt.err, errstring(err)) {
  89. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.s, tt.err, err)
  90. } else if tt.err == "" {
  91. if !reflect.DeepEqual(tt.r, results) {
  92. t.Errorf("%d. %q\n\nstmt mismatch:\nexp=%s\ngot=%#v\n\n", i, tt.s, tt.r, results)
  93. }
  94. }
  95. }
  96. }
  97. func createStreams(t *testing.T) {
  98. p := NewStreamProcessor(path.Join(DbDir, "stream"))
  99. demo := `CREATE STREAM demo (
  100. color STRING,
  101. size BIGINT,
  102. ts BIGINT
  103. ) WITH (DATASOURCE="demo", FORMAT="json", KEY="ts");`
  104. _, err := p.ExecStmt(demo)
  105. if err != nil {
  106. t.Log(err)
  107. }
  108. demoE := `CREATE STREAM demoE (
  109. color STRING,
  110. size BIGINT,
  111. ts BIGINT
  112. ) WITH (DATASOURCE="demoE", FORMAT="json", KEY="ts");`
  113. _, err = p.ExecStmt(demoE)
  114. if err != nil {
  115. t.Log(err)
  116. }
  117. demo1 := `CREATE STREAM demo1 (
  118. temp FLOAT,
  119. hum BIGINT,
  120. ts BIGINT
  121. ) WITH (DATASOURCE="demo1", FORMAT="json", KEY="ts");`
  122. _, err = p.ExecStmt(demo1)
  123. if err != nil {
  124. t.Log(err)
  125. }
  126. sessionDemo := `CREATE STREAM sessionDemo (
  127. temp FLOAT,
  128. hum BIGINT,
  129. ts BIGINT
  130. ) WITH (DATASOURCE="sessionDemo", FORMAT="json", KEY="ts");`
  131. _, err = p.ExecStmt(sessionDemo)
  132. if err != nil {
  133. t.Log(err)
  134. }
  135. }
  136. func dropStreams(t *testing.T) {
  137. p := NewStreamProcessor(path.Join(DbDir, "stream"))
  138. demo := `DROP STREAM demo`
  139. _, err := p.ExecStmt(demo)
  140. if err != nil {
  141. t.Log(err)
  142. }
  143. demoE := `DROP STREAM demoE`
  144. _, err = p.ExecStmt(demoE)
  145. if err != nil {
  146. t.Log(err)
  147. }
  148. demo1 := `DROP STREAM demo1`
  149. _, err = p.ExecStmt(demo1)
  150. if err != nil {
  151. t.Log(err)
  152. }
  153. sessionDemo := `DROP STREAM sessionDemo`
  154. _, err = p.ExecStmt(sessionDemo)
  155. if err != nil {
  156. t.Log(err)
  157. }
  158. }
  159. func createSchemalessStreams(t *testing.T) {
  160. p := NewStreamProcessor(path.Join(DbDir, "stream"))
  161. demo := `CREATE STREAM ldemo (
  162. ) WITH (DATASOURCE="ldemo", FORMAT="json");`
  163. _, err := p.ExecStmt(demo)
  164. if err != nil {
  165. t.Log(err)
  166. }
  167. demo1 := `CREATE STREAM ldemo1 (
  168. ) WITH (DATASOURCE="ldemo1", FORMAT="json");`
  169. _, err = p.ExecStmt(demo1)
  170. if err != nil {
  171. t.Log(err)
  172. }
  173. sessionDemo := `CREATE STREAM lsessionDemo (
  174. ) WITH (DATASOURCE="lsessionDemo", FORMAT="json");`
  175. _, err = p.ExecStmt(sessionDemo)
  176. if err != nil {
  177. t.Log(err)
  178. }
  179. }
  180. func dropSchemalessStreams(t *testing.T) {
  181. p := NewStreamProcessor(path.Join(DbDir, "stream"))
  182. demo := `DROP STREAM ldemo`
  183. _, err := p.ExecStmt(demo)
  184. if err != nil {
  185. t.Log(err)
  186. }
  187. demo1 := `DROP STREAM ldemo1`
  188. _, err = p.ExecStmt(demo1)
  189. if err != nil {
  190. t.Log(err)
  191. }
  192. sessionDemo := `DROP STREAM lsessionDemo`
  193. _, err = p.ExecStmt(sessionDemo)
  194. if err != nil {
  195. t.Log(err)
  196. }
  197. }
  198. func getMockSource(name string, done <-chan int, size int) *nodes.SourceNode {
  199. var data []*xsql.Tuple
  200. switch name {
  201. case "demo":
  202. data = []*xsql.Tuple{
  203. {
  204. Emitter: name,
  205. Message: map[string]interface{}{
  206. "color": "red",
  207. "size": 3,
  208. "ts": 1541152486013,
  209. },
  210. Timestamp: 1541152486013,
  211. },
  212. {
  213. Emitter: name,
  214. Message: map[string]interface{}{
  215. "color": "blue",
  216. "size": 6,
  217. "ts": 1541152486822,
  218. },
  219. Timestamp: 1541152486822,
  220. },
  221. {
  222. Emitter: name,
  223. Message: map[string]interface{}{
  224. "color": "blue",
  225. "size": 2,
  226. "ts": 1541152487632,
  227. },
  228. Timestamp: 1541152487632,
  229. },
  230. {
  231. Emitter: name,
  232. Message: map[string]interface{}{
  233. "color": "yellow",
  234. "size": 4,
  235. "ts": 1541152488442,
  236. },
  237. Timestamp: 1541152488442,
  238. },
  239. {
  240. Emitter: name,
  241. Message: map[string]interface{}{
  242. "color": "red",
  243. "size": 1,
  244. "ts": 1541152489252,
  245. },
  246. Timestamp: 1541152489252,
  247. },
  248. }
  249. case "demoE":
  250. data = []*xsql.Tuple{
  251. {
  252. Emitter: name,
  253. Message: map[string]interface{}{
  254. "color": 3,
  255. "size": "red",
  256. "ts": 1541152486013,
  257. },
  258. Timestamp: 1541152486013,
  259. },
  260. {
  261. Emitter: name,
  262. Message: map[string]interface{}{
  263. "color": "blue",
  264. "size": 6,
  265. "ts": "1541152486822",
  266. },
  267. Timestamp: 1541152486822,
  268. },
  269. {
  270. Emitter: name,
  271. Message: map[string]interface{}{
  272. "color": "blue",
  273. "size": 2,
  274. "ts": 1541152487632,
  275. },
  276. Timestamp: 1541152487632,
  277. },
  278. {
  279. Emitter: name,
  280. Message: map[string]interface{}{
  281. "color": 7,
  282. "size": 4,
  283. "ts": 1541152488442,
  284. },
  285. Timestamp: 1541152488442,
  286. },
  287. {
  288. Emitter: name,
  289. Message: map[string]interface{}{
  290. "color": "red",
  291. "size": "blue",
  292. "ts": 1541152489252,
  293. },
  294. Timestamp: 1541152489252,
  295. },
  296. }
  297. case "demo1":
  298. data = []*xsql.Tuple{
  299. {
  300. Emitter: name,
  301. Message: map[string]interface{}{
  302. "temp": 25.5,
  303. "hum": 65,
  304. "ts": 1541152486013,
  305. },
  306. Timestamp: 1541152486013,
  307. },
  308. {
  309. Emitter: name,
  310. Message: map[string]interface{}{
  311. "temp": 27.5,
  312. "hum": 59,
  313. "ts": 1541152486823,
  314. },
  315. Timestamp: 1541152486823,
  316. },
  317. {
  318. Emitter: name,
  319. Message: map[string]interface{}{
  320. "temp": 28.1,
  321. "hum": 75,
  322. "ts": 1541152487632,
  323. },
  324. Timestamp: 1541152487632,
  325. },
  326. {
  327. Emitter: name,
  328. Message: map[string]interface{}{
  329. "temp": 27.4,
  330. "hum": 80,
  331. "ts": 1541152488442,
  332. },
  333. Timestamp: 1541152488442,
  334. },
  335. {
  336. Emitter: name,
  337. Message: map[string]interface{}{
  338. "temp": 25.5,
  339. "hum": 62,
  340. "ts": 1541152489252,
  341. },
  342. Timestamp: 1541152489252,
  343. },
  344. }
  345. case "sessionDemo":
  346. data = []*xsql.Tuple{
  347. {
  348. Emitter: name,
  349. Message: map[string]interface{}{
  350. "temp": 25.5,
  351. "hum": 65,
  352. "ts": 1541152486013,
  353. },
  354. Timestamp: 1541152486013,
  355. },
  356. {
  357. Emitter: name,
  358. Message: map[string]interface{}{
  359. "temp": 27.5,
  360. "hum": 59,
  361. "ts": 1541152486823,
  362. },
  363. Timestamp: 1541152486823,
  364. },
  365. {
  366. Emitter: name,
  367. Message: map[string]interface{}{
  368. "temp": 28.1,
  369. "hum": 75,
  370. "ts": 1541152487932,
  371. },
  372. Timestamp: 1541152487932,
  373. },
  374. {
  375. Emitter: name,
  376. Message: map[string]interface{}{
  377. "temp": 27.4,
  378. "hum": 80,
  379. "ts": 1541152488442,
  380. },
  381. Timestamp: 1541152488442,
  382. },
  383. {
  384. Emitter: name,
  385. Message: map[string]interface{}{
  386. "temp": 25.5,
  387. "hum": 62,
  388. "ts": 1541152489252,
  389. },
  390. Timestamp: 1541152489252,
  391. },
  392. {
  393. Emitter: name,
  394. Message: map[string]interface{}{
  395. "temp": 26.2,
  396. "hum": 63,
  397. "ts": 1541152490062,
  398. },
  399. Timestamp: 1541152490062,
  400. },
  401. {
  402. Emitter: name,
  403. Message: map[string]interface{}{
  404. "temp": 26.8,
  405. "hum": 71,
  406. "ts": 1541152490872,
  407. },
  408. Timestamp: 1541152490872,
  409. },
  410. {
  411. Emitter: name,
  412. Message: map[string]interface{}{
  413. "temp": 28.9,
  414. "hum": 85,
  415. "ts": 1541152491682,
  416. },
  417. Timestamp: 1541152491682,
  418. },
  419. {
  420. Emitter: name,
  421. Message: map[string]interface{}{
  422. "temp": 29.1,
  423. "hum": 92,
  424. "ts": 1541152492492,
  425. },
  426. Timestamp: 1541152492492,
  427. },
  428. {
  429. Emitter: name,
  430. Message: map[string]interface{}{
  431. "temp": 32.2,
  432. "hum": 99,
  433. "ts": 1541152493202,
  434. },
  435. Timestamp: 1541152493202,
  436. },
  437. {
  438. Emitter: name,
  439. Message: map[string]interface{}{
  440. "temp": 30.9,
  441. "hum": 87,
  442. "ts": 1541152494112,
  443. },
  444. Timestamp: 1541152494112,
  445. },
  446. }
  447. }
  448. return nodes.NewSourceNodeWithSource(name, test.NewMockSource(data[:size], done, false), map[string]string{
  449. "DATASOURCE": name,
  450. })
  451. }
  452. func TestSingleSQL(t *testing.T) {
  453. var tests = []struct {
  454. name string
  455. sql string
  456. r [][]map[string]interface{}
  457. s string
  458. m map[string]interface{}
  459. }{
  460. {
  461. name: `rule1`,
  462. sql: `SELECT * FROM demo`,
  463. r: [][]map[string]interface{}{
  464. {{
  465. "color": "red",
  466. "size": float64(3),
  467. "ts": float64(1541152486013),
  468. }},
  469. {{
  470. "color": "blue",
  471. "size": float64(6),
  472. "ts": float64(1541152486822),
  473. }},
  474. {{
  475. "color": "blue",
  476. "size": float64(2),
  477. "ts": float64(1541152487632),
  478. }},
  479. {{
  480. "color": "yellow",
  481. "size": float64(4),
  482. "ts": float64(1541152488442),
  483. }},
  484. {{
  485. "color": "red",
  486. "size": float64(1),
  487. "ts": float64(1541152489252),
  488. }},
  489. },
  490. m: map[string]interface{}{
  491. "op_preprocessor_demo_0_exceptions_total": int64(0),
  492. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  493. "op_preprocessor_demo_0_records_in_total": int64(5),
  494. "op_preprocessor_demo_0_records_out_total": int64(5),
  495. "op_project_0_exceptions_total": int64(0),
  496. "op_project_0_process_latency_ms": int64(0),
  497. "op_project_0_records_in_total": int64(5),
  498. "op_project_0_records_out_total": int64(5),
  499. "sink_mockSink_0_exceptions_total": int64(0),
  500. "sink_mockSink_0_records_in_total": int64(5),
  501. "sink_mockSink_0_records_out_total": int64(5),
  502. "source_demo_0_exceptions_total": int64(0),
  503. "source_demo_0_records_in_total": int64(5),
  504. "source_demo_0_records_out_total": int64(5),
  505. },
  506. s: "sink_mockSink_0_records_out_total",
  507. }, {
  508. name: `rule2`,
  509. sql: `SELECT color, ts FROM demo where size > 3`,
  510. r: [][]map[string]interface{}{
  511. {{
  512. "color": "blue",
  513. "ts": float64(1541152486822),
  514. }},
  515. {{
  516. "color": "yellow",
  517. "ts": float64(1541152488442),
  518. }},
  519. },
  520. s: "op_filter_0_records_in_total",
  521. m: map[string]interface{}{
  522. "op_preprocessor_demo_0_exceptions_total": int64(0),
  523. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  524. "op_preprocessor_demo_0_records_in_total": int64(5),
  525. "op_preprocessor_demo_0_records_out_total": int64(5),
  526. "op_project_0_exceptions_total": int64(0),
  527. "op_project_0_process_latency_ms": int64(0),
  528. "op_project_0_records_in_total": int64(2),
  529. "op_project_0_records_out_total": int64(2),
  530. "sink_mockSink_0_exceptions_total": int64(0),
  531. "sink_mockSink_0_records_in_total": int64(2),
  532. "sink_mockSink_0_records_out_total": int64(2),
  533. "source_demo_0_exceptions_total": int64(0),
  534. "source_demo_0_records_in_total": int64(5),
  535. "source_demo_0_records_out_total": int64(5),
  536. "op_filter_0_exceptions_total": int64(0),
  537. "op_filter_0_process_latency_ms": int64(0),
  538. "op_filter_0_records_in_total": int64(5),
  539. "op_filter_0_records_out_total": int64(2),
  540. },
  541. }, {
  542. name: `rule3`,
  543. sql: `SELECT size as Int8, ts FROM demo where size > 3`,
  544. r: [][]map[string]interface{}{
  545. {{
  546. "Int8": float64(6),
  547. "ts": float64(1541152486822),
  548. }},
  549. {{
  550. "Int8": float64(4),
  551. "ts": float64(1541152488442),
  552. }},
  553. },
  554. s: "op_filter_0_records_in_total",
  555. m: map[string]interface{}{
  556. "op_preprocessor_demo_0_exceptions_total": int64(0),
  557. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  558. "op_preprocessor_demo_0_records_in_total": int64(5),
  559. "op_preprocessor_demo_0_records_out_total": int64(5),
  560. "op_project_0_exceptions_total": int64(0),
  561. "op_project_0_process_latency_ms": int64(0),
  562. "op_project_0_records_in_total": int64(2),
  563. "op_project_0_records_out_total": int64(2),
  564. "sink_mockSink_0_exceptions_total": int64(0),
  565. "sink_mockSink_0_records_in_total": int64(2),
  566. "sink_mockSink_0_records_out_total": int64(2),
  567. "source_demo_0_exceptions_total": int64(0),
  568. "source_demo_0_records_in_total": int64(5),
  569. "source_demo_0_records_out_total": int64(5),
  570. "op_filter_0_exceptions_total": int64(0),
  571. "op_filter_0_process_latency_ms": int64(0),
  572. "op_filter_0_records_in_total": int64(5),
  573. "op_filter_0_records_out_total": int64(2),
  574. },
  575. }, {
  576. name: `rule4`,
  577. sql: `SELECT size as Int8, ts FROM demoE where size > 3`,
  578. r: [][]map[string]interface{}{
  579. {{
  580. "error": "error in preprocessor: invalid data type for color, expect string but found int(3)",
  581. }},
  582. {{
  583. "Int8": float64(6),
  584. "ts": float64(1541152486822),
  585. }},
  586. {{
  587. "error": "error in preprocessor: invalid data type for color, expect string but found int(7)",
  588. }},
  589. {{
  590. "error": "error in preprocessor: invalid data type for size, expect bigint but found string(blue)",
  591. }},
  592. },
  593. s: "op_filter_0_records_in_total",
  594. m: map[string]interface{}{
  595. "op_preprocessor_demoE_0_exceptions_total": int64(3),
  596. "op_preprocessor_demoE_0_process_latency_ms": int64(0),
  597. "op_preprocessor_demoE_0_records_in_total": int64(5),
  598. "op_preprocessor_demoE_0_records_out_total": int64(2),
  599. "op_project_0_exceptions_total": int64(3),
  600. "op_project_0_process_latency_ms": int64(0),
  601. "op_project_0_records_in_total": int64(4),
  602. "op_project_0_records_out_total": int64(1),
  603. "sink_mockSink_0_exceptions_total": int64(0),
  604. "sink_mockSink_0_records_in_total": int64(4),
  605. "sink_mockSink_0_records_out_total": int64(4),
  606. "source_demoE_0_exceptions_total": int64(0),
  607. "source_demoE_0_records_in_total": int64(5),
  608. "source_demoE_0_records_out_total": int64(5),
  609. "op_filter_0_exceptions_total": int64(3),
  610. "op_filter_0_process_latency_ms": int64(0),
  611. "op_filter_0_records_in_total": int64(5),
  612. "op_filter_0_records_out_total": int64(1),
  613. },
  614. }, {
  615. name: `rule4`,
  616. sql: `SELECT size as Int8, ts FROM demoE where size > 3`,
  617. r: [][]map[string]interface{}{
  618. {{
  619. "error": "error in preprocessor: invalid data type for color, expect string but found int(3)",
  620. }},
  621. {{
  622. "Int8": float64(6),
  623. "ts": float64(1541152486822),
  624. }},
  625. {{
  626. "error": "error in preprocessor: invalid data type for color, expect string but found int(7)",
  627. }},
  628. {{
  629. "error": "error in preprocessor: invalid data type for size, expect bigint but found string(blue)",
  630. }},
  631. },
  632. s: "op_filter_0_records_in_total",
  633. m: map[string]interface{}{
  634. "op_preprocessor_demoE_0_exceptions_total": int64(3),
  635. "op_preprocessor_demoE_0_process_latency_ms": int64(0),
  636. "op_preprocessor_demoE_0_records_in_total": int64(5),
  637. "op_preprocessor_demoE_0_records_out_total": int64(2),
  638. "op_project_0_exceptions_total": int64(3),
  639. "op_project_0_process_latency_ms": int64(0),
  640. "op_project_0_records_in_total": int64(4),
  641. "op_project_0_records_out_total": int64(1),
  642. "sink_mockSink_0_exceptions_total": int64(0),
  643. "sink_mockSink_0_records_in_total": int64(4),
  644. "sink_mockSink_0_records_out_total": int64(4),
  645. "source_demoE_0_exceptions_total": int64(0),
  646. "source_demoE_0_records_in_total": int64(5),
  647. "source_demoE_0_records_out_total": int64(5),
  648. "op_filter_0_exceptions_total": int64(3),
  649. "op_filter_0_process_latency_ms": int64(0),
  650. "op_filter_0_records_in_total": int64(5),
  651. "op_filter_0_records_out_total": int64(1),
  652. },
  653. }, {
  654. name: `rule5`,
  655. sql: `SELECT meta(topic) as m, ts FROM demo`,
  656. r: [][]map[string]interface{}{
  657. {{
  658. "m": "mock",
  659. "ts": float64(1541152486013),
  660. }},
  661. {{
  662. "m": "mock",
  663. "ts": float64(1541152486822),
  664. }},
  665. {{
  666. "m": "mock",
  667. "ts": float64(1541152487632),
  668. }},
  669. {{
  670. "m": "mock",
  671. "ts": float64(1541152488442),
  672. }},
  673. {{
  674. "m": "mock",
  675. "ts": float64(1541152489252),
  676. }},
  677. },
  678. m: map[string]interface{}{
  679. "op_preprocessor_demo_0_exceptions_total": int64(0),
  680. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  681. "op_preprocessor_demo_0_records_in_total": int64(5),
  682. "op_preprocessor_demo_0_records_out_total": int64(5),
  683. "op_project_0_exceptions_total": int64(0),
  684. "op_project_0_process_latency_ms": int64(0),
  685. "op_project_0_records_in_total": int64(5),
  686. "op_project_0_records_out_total": int64(5),
  687. "sink_mockSink_0_exceptions_total": int64(0),
  688. "sink_mockSink_0_records_in_total": int64(5),
  689. "sink_mockSink_0_records_out_total": int64(5),
  690. "source_demo_0_exceptions_total": int64(0),
  691. "source_demo_0_records_in_total": int64(5),
  692. "source_demo_0_records_out_total": int64(5),
  693. },
  694. s: "sink_mockSink_0_records_out_total",
  695. }, {
  696. name: `rule6`,
  697. sql: `SELECT color, ts FROM demo where size > 3 and meta(topic)="mock"`,
  698. r: [][]map[string]interface{}{
  699. {{
  700. "color": "blue",
  701. "ts": float64(1541152486822),
  702. }},
  703. {{
  704. "color": "yellow",
  705. "ts": float64(1541152488442),
  706. }},
  707. },
  708. s: "op_filter_0_records_in_total",
  709. m: map[string]interface{}{
  710. "op_preprocessor_demo_0_exceptions_total": int64(0),
  711. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  712. "op_preprocessor_demo_0_records_in_total": int64(5),
  713. "op_preprocessor_demo_0_records_out_total": int64(5),
  714. "op_project_0_exceptions_total": int64(0),
  715. "op_project_0_process_latency_ms": int64(0),
  716. "op_project_0_records_in_total": int64(2),
  717. "op_project_0_records_out_total": int64(2),
  718. "sink_mockSink_0_exceptions_total": int64(0),
  719. "sink_mockSink_0_records_in_total": int64(2),
  720. "sink_mockSink_0_records_out_total": int64(2),
  721. "source_demo_0_exceptions_total": int64(0),
  722. "source_demo_0_records_in_total": int64(5),
  723. "source_demo_0_records_out_total": int64(5),
  724. "op_filter_0_exceptions_total": int64(0),
  725. "op_filter_0_process_latency_ms": int64(0),
  726. "op_filter_0_records_in_total": int64(5),
  727. "op_filter_0_records_out_total": int64(2),
  728. },
  729. },
  730. }
  731. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  732. createStreams(t)
  733. defer dropStreams(t)
  734. //defer close(done)
  735. for i, tt := range tests {
  736. test.ResetClock(1541152486000)
  737. p := NewRuleProcessor(DbDir)
  738. parser := xsql.NewParser(strings.NewReader(tt.sql))
  739. var (
  740. sources []*nodes.SourceNode
  741. syncs []chan int
  742. )
  743. if stmt, err := xsql.Language.Parse(parser); err != nil {
  744. t.Errorf("parse sql %s error: %s", tt.sql, err)
  745. } else {
  746. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  747. t.Errorf("sql %s is not a select statement", tt.sql)
  748. } else {
  749. streams := xsql.GetStreams(selectStmt)
  750. for _, stream := range streams {
  751. next := make(chan int)
  752. syncs = append(syncs, next)
  753. source := getMockSource(stream, next, 5)
  754. sources = append(sources, source)
  755. }
  756. }
  757. }
  758. tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql, Options: map[string]interface{}{
  759. "bufferLength": float64(100),
  760. }}, sources)
  761. if err != nil {
  762. t.Error(err)
  763. }
  764. mockSink := test.NewMockSink()
  765. sink := nodes.NewSinkNodeWithSink("mockSink", mockSink)
  766. tp.AddSink(inputs, sink)
  767. errCh := tp.Open()
  768. func() {
  769. for i := 0; i < 5; i++ {
  770. syncs[i%len(syncs)] <- i
  771. select {
  772. case err = <-errCh:
  773. t.Log(err)
  774. tp.Cancel()
  775. return
  776. default:
  777. }
  778. }
  779. for retry := 100; retry > 0; retry-- {
  780. if err := compareMetrics(tp, tt.m, tt.sql); err == nil {
  781. break
  782. }
  783. time.Sleep(time.Duration(retry) * time.Millisecond)
  784. }
  785. }()
  786. results := mockSink.GetResults()
  787. var maps [][]map[string]interface{}
  788. for _, v := range results {
  789. var mapRes []map[string]interface{}
  790. err := json.Unmarshal(v, &mapRes)
  791. if err != nil {
  792. t.Errorf("Failed to parse the input into map")
  793. continue
  794. }
  795. maps = append(maps, mapRes)
  796. }
  797. if !reflect.DeepEqual(tt.r, maps) {
  798. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  799. continue
  800. }
  801. if err := compareMetrics(tp, tt.m, tt.sql); err != nil {
  802. t.Errorf("%d. %q\n\n%v", i, tt.sql, err)
  803. }
  804. tp.Cancel()
  805. }
  806. }
  807. func getMockSourceL(name string, done <-chan int, size int) *nodes.SourceNode {
  808. var data []*xsql.Tuple
  809. switch name {
  810. case "ldemo":
  811. data = []*xsql.Tuple{
  812. {
  813. Emitter: name,
  814. Message: map[string]interface{}{
  815. "color": "red",
  816. "size": 3,
  817. "ts": 1541152486013,
  818. },
  819. Timestamp: 1541152486013,
  820. },
  821. {
  822. Emitter: name,
  823. Message: map[string]interface{}{
  824. "color": "blue",
  825. "size": "string",
  826. "ts": 1541152486822,
  827. },
  828. Timestamp: 1541152486822,
  829. },
  830. {
  831. Emitter: name,
  832. Message: map[string]interface{}{
  833. "size": 3,
  834. "ts": 1541152487632,
  835. },
  836. Timestamp: 1541152487632,
  837. },
  838. {
  839. Emitter: name,
  840. Message: map[string]interface{}{
  841. "color": 49,
  842. "size": 2,
  843. "ts": 1541152488442,
  844. },
  845. Timestamp: 1541152488442,
  846. },
  847. {
  848. Emitter: name,
  849. Message: map[string]interface{}{
  850. "color": "red",
  851. "ts": 1541152489252,
  852. },
  853. Timestamp: 1541152489252,
  854. },
  855. }
  856. case "ldemo1":
  857. data = []*xsql.Tuple{
  858. {
  859. Emitter: name,
  860. Message: map[string]interface{}{
  861. "temp": 25.5,
  862. "hum": 65,
  863. "ts": 1541152486013,
  864. },
  865. Timestamp: 1541152486013,
  866. },
  867. {
  868. Emitter: name,
  869. Message: map[string]interface{}{
  870. "temp": 27.5,
  871. "hum": 59,
  872. "ts": 1541152486823,
  873. },
  874. Timestamp: 1541152486823,
  875. },
  876. {
  877. Emitter: name,
  878. Message: map[string]interface{}{
  879. "temp": 28.1,
  880. "hum": 75,
  881. "ts": 1541152487632,
  882. },
  883. Timestamp: 1541152487632,
  884. },
  885. {
  886. Emitter: name,
  887. Message: map[string]interface{}{
  888. "temp": 27.4,
  889. "hum": 80,
  890. "ts": "1541152488442",
  891. },
  892. Timestamp: 1541152488442,
  893. },
  894. {
  895. Emitter: name,
  896. Message: map[string]interface{}{
  897. "temp": 25.5,
  898. "hum": 62,
  899. "ts": 1541152489252,
  900. },
  901. Timestamp: 1541152489252,
  902. },
  903. }
  904. case "lsessionDemo":
  905. data = []*xsql.Tuple{
  906. {
  907. Emitter: name,
  908. Message: map[string]interface{}{
  909. "temp": 25.5,
  910. "hum": 65,
  911. "ts": 1541152486013,
  912. },
  913. Timestamp: 1541152486013,
  914. },
  915. {
  916. Emitter: name,
  917. Message: map[string]interface{}{
  918. "temp": 27.5,
  919. "hum": 59,
  920. "ts": 1541152486823,
  921. },
  922. Timestamp: 1541152486823,
  923. },
  924. {
  925. Emitter: name,
  926. Message: map[string]interface{}{
  927. "temp": 28.1,
  928. "hum": 75,
  929. "ts": 1541152487932,
  930. },
  931. Timestamp: 1541152487932,
  932. },
  933. {
  934. Emitter: name,
  935. Message: map[string]interface{}{
  936. "temp": 27.4,
  937. "hum": 80,
  938. "ts": 1541152488442,
  939. },
  940. Timestamp: 1541152488442,
  941. },
  942. {
  943. Emitter: name,
  944. Message: map[string]interface{}{
  945. "temp": 25.5,
  946. "hum": 62,
  947. "ts": 1541152489252,
  948. },
  949. Timestamp: 1541152489252,
  950. },
  951. {
  952. Emitter: name,
  953. Message: map[string]interface{}{
  954. "temp": 26.2,
  955. "hum": 63,
  956. "ts": 1541152490062,
  957. },
  958. Timestamp: 1541152490062,
  959. },
  960. {
  961. Emitter: name,
  962. Message: map[string]interface{}{
  963. "temp": 26.8,
  964. "hum": 71,
  965. "ts": 1541152490872,
  966. },
  967. Timestamp: 1541152490872,
  968. },
  969. {
  970. Emitter: name,
  971. Message: map[string]interface{}{
  972. "temp": 28.9,
  973. "hum": 85,
  974. "ts": 1541152491682,
  975. },
  976. Timestamp: 1541152491682,
  977. },
  978. {
  979. Emitter: name,
  980. Message: map[string]interface{}{
  981. "temp": 29.1,
  982. "hum": 92,
  983. "ts": 1541152492492,
  984. },
  985. Timestamp: 1541152492492,
  986. },
  987. {
  988. Emitter: name,
  989. Message: map[string]interface{}{
  990. "temp": 2.2,
  991. "hum": 99,
  992. "ts": 1541152493202,
  993. },
  994. Timestamp: 1541152493202,
  995. },
  996. {
  997. Emitter: name,
  998. Message: map[string]interface{}{
  999. "temp": 30.9,
  1000. "hum": 87,
  1001. "ts": 1541152494112,
  1002. },
  1003. Timestamp: 1541152494112,
  1004. },
  1005. }
  1006. }
  1007. return nodes.NewSourceNodeWithSource(name, test.NewMockSource(data[:size], done, false), map[string]string{
  1008. "DATASOURCE": name,
  1009. })
  1010. }
  1011. func TestSingleSQLError(t *testing.T) {
  1012. var tests = []struct {
  1013. name string
  1014. sql string
  1015. r [][]map[string]interface{}
  1016. s string
  1017. m map[string]interface{}
  1018. }{
  1019. {
  1020. name: `rule1`,
  1021. sql: `SELECT color, ts FROM ldemo where size >= 3`,
  1022. r: [][]map[string]interface{}{
  1023. {{
  1024. "color": "red",
  1025. "ts": float64(1541152486013),
  1026. }},
  1027. {{
  1028. "error": "run Where error: invalid operation string(string) >= int64(3)",
  1029. }},
  1030. {{
  1031. "ts": float64(1541152487632),
  1032. }},
  1033. },
  1034. s: "op_filter_0_records_in_total",
  1035. m: map[string]interface{}{
  1036. "op_preprocessor_ldemo_0_exceptions_total": int64(0),
  1037. "op_preprocessor_ldemo_0_process_latency_ms": int64(0),
  1038. "op_preprocessor_ldemo_0_records_in_total": int64(5),
  1039. "op_preprocessor_ldemo_0_records_out_total": int64(5),
  1040. "op_project_0_exceptions_total": int64(1),
  1041. "op_project_0_process_latency_ms": int64(0),
  1042. "op_project_0_records_in_total": int64(3),
  1043. "op_project_0_records_out_total": int64(2),
  1044. "sink_mockSink_0_exceptions_total": int64(0),
  1045. "sink_mockSink_0_records_in_total": int64(3),
  1046. "sink_mockSink_0_records_out_total": int64(3),
  1047. "source_ldemo_0_exceptions_total": int64(0),
  1048. "source_ldemo_0_records_in_total": int64(5),
  1049. "source_ldemo_0_records_out_total": int64(5),
  1050. "op_filter_0_exceptions_total": int64(1),
  1051. "op_filter_0_process_latency_ms": int64(0),
  1052. "op_filter_0_records_in_total": int64(5),
  1053. "op_filter_0_records_out_total": int64(2),
  1054. },
  1055. }, {
  1056. name: `rule2`,
  1057. sql: `SELECT size * 5 FROM ldemo`,
  1058. r: [][]map[string]interface{}{
  1059. {{
  1060. "rengine_field_0": float64(15),
  1061. }},
  1062. {{
  1063. "error": "run Select error: invalid operation string(string) * int64(5)",
  1064. }},
  1065. {{
  1066. "rengine_field_0": float64(15),
  1067. }},
  1068. {{
  1069. "rengine_field_0": float64(10),
  1070. }},
  1071. {{}},
  1072. },
  1073. s: "op_filter_0_records_in_total",
  1074. m: map[string]interface{}{
  1075. "op_preprocessor_ldemo_0_exceptions_total": int64(0),
  1076. "op_preprocessor_ldemo_0_process_latency_ms": int64(0),
  1077. "op_preprocessor_ldemo_0_records_in_total": int64(5),
  1078. "op_preprocessor_ldemo_0_records_out_total": int64(5),
  1079. "op_project_0_exceptions_total": int64(1),
  1080. "op_project_0_process_latency_ms": int64(0),
  1081. "op_project_0_records_in_total": int64(5),
  1082. "op_project_0_records_out_total": int64(4),
  1083. "sink_mockSink_0_exceptions_total": int64(0),
  1084. "sink_mockSink_0_records_in_total": int64(5),
  1085. "sink_mockSink_0_records_out_total": int64(5),
  1086. "source_ldemo_0_exceptions_total": int64(0),
  1087. "source_ldemo_0_records_in_total": int64(5),
  1088. "source_ldemo_0_records_out_total": int64(5),
  1089. },
  1090. },
  1091. }
  1092. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1093. createSchemalessStreams(t)
  1094. defer dropSchemalessStreams(t)
  1095. //defer close(done)
  1096. for i, tt := range tests {
  1097. test.ResetClock(1541152486000)
  1098. p := NewRuleProcessor(DbDir)
  1099. parser := xsql.NewParser(strings.NewReader(tt.sql))
  1100. var (
  1101. sources []*nodes.SourceNode
  1102. syncs []chan int
  1103. )
  1104. if stmt, err := xsql.Language.Parse(parser); err != nil {
  1105. t.Errorf("parse sql %s error: %s", tt.sql, err)
  1106. } else {
  1107. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  1108. t.Errorf("sql %s is not a select statement", tt.sql)
  1109. } else {
  1110. streams := xsql.GetStreams(selectStmt)
  1111. for _, stream := range streams {
  1112. next := make(chan int)
  1113. syncs = append(syncs, next)
  1114. source := getMockSourceL(stream, next, 5)
  1115. sources = append(sources, source)
  1116. }
  1117. }
  1118. }
  1119. tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql, Options: map[string]interface{}{
  1120. "bufferLength": float64(100),
  1121. }}, sources)
  1122. if err != nil {
  1123. t.Error(err)
  1124. }
  1125. mockSink := test.NewMockSink()
  1126. sink := nodes.NewSinkNodeWithSink("mockSink", mockSink)
  1127. tp.AddSink(inputs, sink)
  1128. errCh := tp.Open()
  1129. func() {
  1130. for i := 0; i < 5; i++ {
  1131. syncs[i%len(syncs)] <- i
  1132. select {
  1133. case err = <-errCh:
  1134. t.Log(err)
  1135. tp.Cancel()
  1136. return
  1137. default:
  1138. }
  1139. }
  1140. for retry := 100; retry > 0; retry-- {
  1141. if err := compareMetrics(tp, tt.m, tt.sql); err == nil {
  1142. break
  1143. }
  1144. time.Sleep(time.Duration(retry) * time.Millisecond)
  1145. }
  1146. }()
  1147. results := mockSink.GetResults()
  1148. var maps [][]map[string]interface{}
  1149. for _, v := range results {
  1150. var mapRes []map[string]interface{}
  1151. err := json.Unmarshal(v, &mapRes)
  1152. if err != nil {
  1153. t.Errorf("Failed to parse the input into map")
  1154. continue
  1155. }
  1156. maps = append(maps, mapRes)
  1157. }
  1158. if !reflect.DeepEqual(tt.r, maps) {
  1159. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  1160. continue
  1161. }
  1162. if err := compareMetrics(tp, tt.m, tt.sql); err != nil {
  1163. t.Errorf("%d. %q\n\n%v", i, tt.sql, err)
  1164. }
  1165. tp.Cancel()
  1166. }
  1167. }
  1168. func TestWindow(t *testing.T) {
  1169. var tests = []struct {
  1170. name string
  1171. sql string
  1172. size int
  1173. r [][]map[string]interface{}
  1174. m map[string]interface{}
  1175. }{
  1176. {
  1177. name: `rule1`,
  1178. sql: `SELECT * FROM demo GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  1179. size: 5,
  1180. r: [][]map[string]interface{}{
  1181. {{
  1182. "color": "red",
  1183. "size": float64(3),
  1184. "ts": float64(1541152486013),
  1185. }, {
  1186. "color": "blue",
  1187. "size": float64(6),
  1188. "ts": float64(1541152486822),
  1189. }},
  1190. {{
  1191. "color": "red",
  1192. "size": float64(3),
  1193. "ts": float64(1541152486013),
  1194. }, {
  1195. "color": "blue",
  1196. "size": float64(6),
  1197. "ts": float64(1541152486822),
  1198. }, {
  1199. "color": "blue",
  1200. "size": float64(2),
  1201. "ts": float64(1541152487632),
  1202. }},
  1203. {{
  1204. "color": "blue",
  1205. "size": float64(2),
  1206. "ts": float64(1541152487632),
  1207. }, {
  1208. "color": "yellow",
  1209. "size": float64(4),
  1210. "ts": float64(1541152488442),
  1211. }},
  1212. },
  1213. m: map[string]interface{}{
  1214. "op_preprocessor_demo_0_exceptions_total": int64(0),
  1215. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  1216. "op_preprocessor_demo_0_records_in_total": int64(5),
  1217. "op_preprocessor_demo_0_records_out_total": int64(5),
  1218. "op_project_0_exceptions_total": int64(0),
  1219. "op_project_0_process_latency_ms": int64(0),
  1220. "op_project_0_records_in_total": int64(3),
  1221. "op_project_0_records_out_total": int64(3),
  1222. "sink_mockSink_0_exceptions_total": int64(0),
  1223. "sink_mockSink_0_records_in_total": int64(3),
  1224. "sink_mockSink_0_records_out_total": int64(3),
  1225. "source_demo_0_exceptions_total": int64(0),
  1226. "source_demo_0_records_in_total": int64(5),
  1227. "source_demo_0_records_out_total": int64(5),
  1228. "op_window_0_exceptions_total": int64(0),
  1229. "op_window_0_process_latency_ms": int64(0),
  1230. "op_window_0_records_in_total": int64(5),
  1231. "op_window_0_records_out_total": int64(3),
  1232. },
  1233. }, {
  1234. name: `rule2`,
  1235. sql: `SELECT color, ts FROM demo where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  1236. size: 5,
  1237. r: [][]map[string]interface{}{
  1238. {{
  1239. "color": "red",
  1240. "ts": float64(1541152486013),
  1241. }, {
  1242. "color": "blue",
  1243. "ts": float64(1541152486822),
  1244. }},
  1245. {{
  1246. "color": "yellow",
  1247. "ts": float64(1541152488442),
  1248. }},
  1249. },
  1250. m: map[string]interface{}{
  1251. "op_preprocessor_demo_0_exceptions_total": int64(0),
  1252. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  1253. "op_preprocessor_demo_0_records_in_total": int64(5),
  1254. "op_preprocessor_demo_0_records_out_total": int64(5),
  1255. "op_project_0_exceptions_total": int64(0),
  1256. "op_project_0_process_latency_ms": int64(0),
  1257. "op_project_0_records_in_total": int64(2),
  1258. "op_project_0_records_out_total": int64(2),
  1259. "sink_mockSink_0_exceptions_total": int64(0),
  1260. "sink_mockSink_0_records_in_total": int64(2),
  1261. "sink_mockSink_0_records_out_total": int64(2),
  1262. "source_demo_0_exceptions_total": int64(0),
  1263. "source_demo_0_records_in_total": int64(5),
  1264. "source_demo_0_records_out_total": int64(5),
  1265. "op_window_0_exceptions_total": int64(0),
  1266. "op_window_0_process_latency_ms": int64(0),
  1267. "op_window_0_records_in_total": int64(5),
  1268. "op_window_0_records_out_total": int64(3),
  1269. "op_filter_0_exceptions_total": int64(0),
  1270. "op_filter_0_process_latency_ms": int64(0),
  1271. "op_filter_0_records_in_total": int64(3),
  1272. "op_filter_0_records_out_total": int64(2),
  1273. },
  1274. }, {
  1275. name: `rule3`,
  1276. sql: `SELECT color, temp, ts FROM demo INNER JOIN demo1 ON demo.ts = demo1.ts GROUP BY SlidingWindow(ss, 1)`,
  1277. size: 5,
  1278. r: [][]map[string]interface{}{
  1279. {{
  1280. "color": "red",
  1281. "temp": 25.5,
  1282. "ts": float64(1541152486013),
  1283. }}, {{
  1284. "color": "red",
  1285. "temp": 25.5,
  1286. "ts": float64(1541152486013),
  1287. }}, {{
  1288. "color": "red",
  1289. "temp": 25.5,
  1290. "ts": float64(1541152486013),
  1291. }}, {{
  1292. "color": "blue",
  1293. "temp": 28.1,
  1294. "ts": float64(1541152487632),
  1295. }}, {{
  1296. "color": "blue",
  1297. "temp": 28.1,
  1298. "ts": float64(1541152487632),
  1299. }}, {{
  1300. "color": "blue",
  1301. "temp": 28.1,
  1302. "ts": float64(1541152487632),
  1303. }, {
  1304. "color": "yellow",
  1305. "temp": 27.4,
  1306. "ts": float64(1541152488442),
  1307. }}, {{
  1308. "color": "yellow",
  1309. "temp": 27.4,
  1310. "ts": float64(1541152488442),
  1311. }}, {{
  1312. "color": "yellow",
  1313. "temp": 27.4,
  1314. "ts": float64(1541152488442),
  1315. }, {
  1316. "color": "red",
  1317. "temp": 25.5,
  1318. "ts": float64(1541152489252),
  1319. }},
  1320. },
  1321. m: map[string]interface{}{
  1322. "op_preprocessor_demo_0_exceptions_total": int64(0),
  1323. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  1324. "op_preprocessor_demo_0_records_in_total": int64(5),
  1325. "op_preprocessor_demo_0_records_out_total": int64(5),
  1326. "op_preprocessor_demo1_0_exceptions_total": int64(0),
  1327. "op_preprocessor_demo1_0_process_latency_ms": int64(0),
  1328. "op_preprocessor_demo1_0_records_in_total": int64(5),
  1329. "op_preprocessor_demo1_0_records_out_total": int64(5),
  1330. "op_project_0_exceptions_total": int64(0),
  1331. "op_project_0_process_latency_ms": int64(0),
  1332. "op_project_0_records_in_total": int64(8),
  1333. "op_project_0_records_out_total": int64(8),
  1334. "sink_mockSink_0_exceptions_total": int64(0),
  1335. "sink_mockSink_0_records_in_total": int64(8),
  1336. "sink_mockSink_0_records_out_total": int64(8),
  1337. "source_demo_0_exceptions_total": int64(0),
  1338. "source_demo_0_records_in_total": int64(5),
  1339. "source_demo_0_records_out_total": int64(5),
  1340. "source_demo1_0_exceptions_total": int64(0),
  1341. "source_demo1_0_records_in_total": int64(5),
  1342. "source_demo1_0_records_out_total": int64(5),
  1343. "op_window_0_exceptions_total": int64(0),
  1344. "op_window_0_process_latency_ms": int64(0),
  1345. "op_window_0_records_in_total": int64(10),
  1346. "op_window_0_records_out_total": int64(10),
  1347. "op_join_0_exceptions_total": int64(0),
  1348. "op_join_0_process_latency_ms": int64(0),
  1349. "op_join_0_records_in_total": int64(10),
  1350. "op_join_0_records_out_total": int64(8),
  1351. },
  1352. }, {
  1353. name: `rule4`,
  1354. sql: `SELECT color FROM demo GROUP BY SlidingWindow(ss, 2), color ORDER BY color`,
  1355. size: 5,
  1356. r: [][]map[string]interface{}{
  1357. {{
  1358. "color": "red",
  1359. }}, {{
  1360. "color": "blue",
  1361. }, {
  1362. "color": "red",
  1363. }}, {{
  1364. "color": "blue",
  1365. }, {
  1366. "color": "red",
  1367. }}, {{
  1368. "color": "blue",
  1369. }, {
  1370. "color": "yellow",
  1371. }}, {{
  1372. "color": "blue",
  1373. }, {
  1374. "color": "red",
  1375. }, {
  1376. "color": "yellow",
  1377. }},
  1378. },
  1379. m: map[string]interface{}{
  1380. "op_preprocessor_demo_0_exceptions_total": int64(0),
  1381. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  1382. "op_preprocessor_demo_0_records_in_total": int64(5),
  1383. "op_preprocessor_demo_0_records_out_total": int64(5),
  1384. "op_project_0_exceptions_total": int64(0),
  1385. "op_project_0_process_latency_ms": int64(0),
  1386. "op_project_0_records_in_total": int64(5),
  1387. "op_project_0_records_out_total": int64(5),
  1388. "sink_mockSink_0_exceptions_total": int64(0),
  1389. "sink_mockSink_0_records_in_total": int64(5),
  1390. "sink_mockSink_0_records_out_total": int64(5),
  1391. "source_demo_0_exceptions_total": int64(0),
  1392. "source_demo_0_records_in_total": int64(5),
  1393. "source_demo_0_records_out_total": int64(5),
  1394. "op_window_0_exceptions_total": int64(0),
  1395. "op_window_0_process_latency_ms": int64(0),
  1396. "op_window_0_records_in_total": int64(5),
  1397. "op_window_0_records_out_total": int64(5),
  1398. "op_aggregate_0_exceptions_total": int64(0),
  1399. "op_aggregate_0_process_latency_ms": int64(0),
  1400. "op_aggregate_0_records_in_total": int64(5),
  1401. "op_aggregate_0_records_out_total": int64(5),
  1402. "op_order_0_exceptions_total": int64(0),
  1403. "op_order_0_process_latency_ms": int64(0),
  1404. "op_order_0_records_in_total": int64(5),
  1405. "op_order_0_records_out_total": int64(5),
  1406. },
  1407. }, {
  1408. name: `rule5`,
  1409. sql: `SELECT temp FROM sessionDemo GROUP BY SessionWindow(ss, 2, 1) `,
  1410. size: 11,
  1411. r: [][]map[string]interface{}{
  1412. {{
  1413. "temp": 25.5,
  1414. }, {
  1415. "temp": 27.5,
  1416. }}, {{
  1417. "temp": 28.1,
  1418. }, {
  1419. "temp": 27.4,
  1420. }, {
  1421. "temp": 25.5,
  1422. }}, {{
  1423. "temp": 26.2,
  1424. }, {
  1425. "temp": 26.8,
  1426. }, {
  1427. "temp": 28.9,
  1428. }, {
  1429. "temp": 29.1,
  1430. }, {
  1431. "temp": 32.2,
  1432. }},
  1433. },
  1434. m: map[string]interface{}{
  1435. "op_preprocessor_sessionDemo_0_exceptions_total": int64(0),
  1436. "op_preprocessor_sessionDemo_0_process_latency_ms": int64(0),
  1437. "op_preprocessor_sessionDemo_0_records_in_total": int64(11),
  1438. "op_preprocessor_sessionDemo_0_records_out_total": int64(11),
  1439. "op_project_0_exceptions_total": int64(0),
  1440. "op_project_0_process_latency_ms": int64(0),
  1441. "op_project_0_records_in_total": int64(3),
  1442. "op_project_0_records_out_total": int64(3),
  1443. "sink_mockSink_0_exceptions_total": int64(0),
  1444. "sink_mockSink_0_records_in_total": int64(3),
  1445. "sink_mockSink_0_records_out_total": int64(3),
  1446. "source_sessionDemo_0_exceptions_total": int64(0),
  1447. "source_sessionDemo_0_records_in_total": int64(11),
  1448. "source_sessionDemo_0_records_out_total": int64(11),
  1449. "op_window_0_exceptions_total": int64(0),
  1450. "op_window_0_process_latency_ms": int64(0),
  1451. "op_window_0_records_in_total": int64(11),
  1452. "op_window_0_records_out_total": int64(3),
  1453. },
  1454. }, {
  1455. name: `rule6`,
  1456. sql: `SELECT max(temp) as m, count(color) as c FROM demo INNER JOIN demo1 ON demo.ts = demo1.ts GROUP BY SlidingWindow(ss, 1)`,
  1457. size: 5,
  1458. r: [][]map[string]interface{}{
  1459. {{
  1460. "m": 25.5,
  1461. "c": float64(1),
  1462. }}, {{
  1463. "m": 25.5,
  1464. "c": float64(1),
  1465. }}, {{
  1466. "m": 25.5,
  1467. "c": float64(1),
  1468. }}, {{
  1469. "m": 28.1,
  1470. "c": float64(1),
  1471. }}, {{
  1472. "m": 28.1,
  1473. "c": float64(1),
  1474. }}, {{
  1475. "m": 28.1,
  1476. "c": float64(2),
  1477. }}, {{
  1478. "m": 27.4,
  1479. "c": float64(1),
  1480. }}, {{
  1481. "m": 27.4,
  1482. "c": float64(2),
  1483. }},
  1484. },
  1485. m: map[string]interface{}{
  1486. "op_preprocessor_demo_0_exceptions_total": int64(0),
  1487. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  1488. "op_preprocessor_demo_0_records_in_total": int64(5),
  1489. "op_preprocessor_demo_0_records_out_total": int64(5),
  1490. "op_preprocessor_demo1_0_exceptions_total": int64(0),
  1491. "op_preprocessor_demo1_0_process_latency_ms": int64(0),
  1492. "op_preprocessor_demo1_0_records_in_total": int64(5),
  1493. "op_preprocessor_demo1_0_records_out_total": int64(5),
  1494. "op_project_0_exceptions_total": int64(0),
  1495. "op_project_0_process_latency_ms": int64(0),
  1496. "op_project_0_records_in_total": int64(8),
  1497. "op_project_0_records_out_total": int64(8),
  1498. "sink_mockSink_0_exceptions_total": int64(0),
  1499. "sink_mockSink_0_records_in_total": int64(8),
  1500. "sink_mockSink_0_records_out_total": int64(8),
  1501. "source_demo_0_exceptions_total": int64(0),
  1502. "source_demo_0_records_in_total": int64(5),
  1503. "source_demo_0_records_out_total": int64(5),
  1504. "source_demo1_0_exceptions_total": int64(0),
  1505. "source_demo1_0_records_in_total": int64(5),
  1506. "source_demo1_0_records_out_total": int64(5),
  1507. "op_window_0_exceptions_total": int64(0),
  1508. "op_window_0_process_latency_ms": int64(0),
  1509. "op_window_0_records_in_total": int64(10),
  1510. "op_window_0_records_out_total": int64(10),
  1511. "op_join_0_exceptions_total": int64(0),
  1512. "op_join_0_process_latency_ms": int64(0),
  1513. "op_join_0_records_in_total": int64(10),
  1514. "op_join_0_records_out_total": int64(8),
  1515. },
  1516. }, {
  1517. name: `rule7`,
  1518. sql: `SELECT * FROM demoE GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  1519. size: 5,
  1520. r: [][]map[string]interface{}{
  1521. {{
  1522. "error": "error in preprocessor: invalid data type for color, expect string but found int(3)",
  1523. }},
  1524. {{
  1525. "color": "blue",
  1526. "size": float64(6),
  1527. "ts": float64(1541152486822),
  1528. }},
  1529. {{
  1530. "color": "blue",
  1531. "size": float64(6),
  1532. "ts": float64(1541152486822),
  1533. }, {
  1534. "color": "blue",
  1535. "size": float64(2),
  1536. "ts": float64(1541152487632),
  1537. }},
  1538. {{
  1539. "error": "error in preprocessor: invalid data type for color, expect string but found int(7)",
  1540. }},
  1541. {{
  1542. "color": "blue",
  1543. "size": float64(2),
  1544. "ts": float64(1541152487632),
  1545. }},
  1546. {{
  1547. "error": "error in preprocessor: invalid data type for size, expect bigint but found string(blue)",
  1548. }},
  1549. },
  1550. m: map[string]interface{}{
  1551. "op_preprocessor_demoE_0_exceptions_total": int64(3),
  1552. "op_preprocessor_demoE_0_process_latency_ms": int64(0),
  1553. "op_preprocessor_demoE_0_records_in_total": int64(5),
  1554. "op_preprocessor_demoE_0_records_out_total": int64(2),
  1555. "op_project_0_exceptions_total": int64(3),
  1556. "op_project_0_process_latency_ms": int64(0),
  1557. "op_project_0_records_in_total": int64(6),
  1558. "op_project_0_records_out_total": int64(3),
  1559. "sink_mockSink_0_exceptions_total": int64(0),
  1560. "sink_mockSink_0_records_in_total": int64(6),
  1561. "sink_mockSink_0_records_out_total": int64(6),
  1562. "source_demoE_0_exceptions_total": int64(0),
  1563. "source_demoE_0_records_in_total": int64(5),
  1564. "source_demoE_0_records_out_total": int64(5),
  1565. "op_window_0_exceptions_total": int64(3),
  1566. "op_window_0_process_latency_ms": int64(0),
  1567. "op_window_0_records_in_total": int64(5),
  1568. "op_window_0_records_out_total": int64(3),
  1569. },
  1570. },
  1571. }
  1572. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1573. createStreams(t)
  1574. defer dropStreams(t)
  1575. for i, tt := range tests {
  1576. test.ResetClock(1541152486000)
  1577. p := NewRuleProcessor(DbDir)
  1578. parser := xsql.NewParser(strings.NewReader(tt.sql))
  1579. var (
  1580. sources []*nodes.SourceNode
  1581. syncs []chan int
  1582. )
  1583. if stmt, err := xsql.Language.Parse(parser); err != nil {
  1584. t.Errorf("parse sql %s error: %s", tt.sql, err)
  1585. } else {
  1586. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  1587. t.Errorf("sql %s is not a select statement", tt.sql)
  1588. } else {
  1589. streams := xsql.GetStreams(selectStmt)
  1590. for _, stream := range streams {
  1591. next := make(chan int)
  1592. syncs = append(syncs, next)
  1593. source := getMockSource(stream, next, tt.size)
  1594. sources = append(sources, source)
  1595. }
  1596. }
  1597. }
  1598. tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql}, sources)
  1599. if err != nil {
  1600. t.Error(err)
  1601. }
  1602. mockSink := test.NewMockSink()
  1603. sink := nodes.NewSinkNodeWithSink("mockSink", mockSink)
  1604. tp.AddSink(inputs, sink)
  1605. errCh := tp.Open()
  1606. func() {
  1607. for i := 0; i < tt.size*len(syncs); i++ {
  1608. syncs[i%len(syncs)] <- i
  1609. for {
  1610. time.Sleep(1)
  1611. if getMetric(tp, "op_window_0_records_in_total") == (i + 1) {
  1612. break
  1613. }
  1614. }
  1615. select {
  1616. case err = <-errCh:
  1617. t.Log(err)
  1618. tp.Cancel()
  1619. return
  1620. default:
  1621. }
  1622. }
  1623. retry := 100
  1624. for ; retry > 0; retry-- {
  1625. if err := compareMetrics(tp, tt.m, tt.sql); err == nil {
  1626. break
  1627. }
  1628. t.Logf("wait to try another %d times", retry)
  1629. time.Sleep(time.Duration(retry) * time.Millisecond)
  1630. }
  1631. if retry == 0 {
  1632. err := compareMetrics(tp, tt.m, tt.sql)
  1633. t.Errorf("could not get correct metrics: %v", err)
  1634. }
  1635. }()
  1636. results := mockSink.GetResults()
  1637. var maps [][]map[string]interface{}
  1638. for _, v := range results {
  1639. var mapRes []map[string]interface{}
  1640. err := json.Unmarshal(v, &mapRes)
  1641. if err != nil {
  1642. t.Errorf("Failed to parse the input into map")
  1643. continue
  1644. }
  1645. maps = append(maps, mapRes)
  1646. }
  1647. if !reflect.DeepEqual(tt.r, maps) {
  1648. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  1649. }
  1650. if err := compareMetrics(tp, tt.m, tt.sql); err != nil {
  1651. t.Errorf("%d. %q\n\n%v", i, tt.sql, err)
  1652. }
  1653. tp.Cancel()
  1654. }
  1655. }
  1656. func TestWindowError(t *testing.T) {
  1657. var tests = []struct {
  1658. name string
  1659. sql string
  1660. size int
  1661. r [][]map[string]interface{}
  1662. m map[string]interface{}
  1663. }{
  1664. {
  1665. name: `rule1`,
  1666. sql: `SELECT size * 3 FROM ldemo GROUP BY TUMBLINGWINDOW(ss, 2)`,
  1667. size: 5,
  1668. r: [][]map[string]interface{}{
  1669. {{
  1670. "error": "run Select error: invalid operation string(string) * int64(3)",
  1671. }},
  1672. },
  1673. m: map[string]interface{}{
  1674. "op_preprocessor_ldemo_0_exceptions_total": int64(0),
  1675. "op_preprocessor_ldemo_0_process_latency_ms": int64(0),
  1676. "op_preprocessor_ldemo_0_records_in_total": int64(5),
  1677. "op_preprocessor_ldemo_0_records_out_total": int64(5),
  1678. "op_project_0_exceptions_total": int64(1),
  1679. "op_project_0_process_latency_ms": int64(0),
  1680. "op_project_0_records_in_total": int64(1),
  1681. "op_project_0_records_out_total": int64(0),
  1682. "sink_mockSink_0_exceptions_total": int64(0),
  1683. "sink_mockSink_0_records_in_total": int64(1),
  1684. "sink_mockSink_0_records_out_total": int64(1),
  1685. "source_ldemo_0_exceptions_total": int64(0),
  1686. "source_ldemo_0_records_in_total": int64(5),
  1687. "source_ldemo_0_records_out_total": int64(5),
  1688. "op_window_0_exceptions_total": int64(0),
  1689. "op_window_0_process_latency_ms": int64(0),
  1690. "op_window_0_records_in_total": int64(5),
  1691. "op_window_0_records_out_total": int64(1),
  1692. },
  1693. }, {
  1694. name: `rule2`,
  1695. sql: `SELECT color, ts FROM ldemo where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  1696. size: 5,
  1697. r: [][]map[string]interface{}{
  1698. {{
  1699. "error": "run Where error: invalid operation string(string) > int64(2)",
  1700. }}, {{
  1701. "ts": float64(1541152487632),
  1702. }},
  1703. },
  1704. m: map[string]interface{}{
  1705. "op_preprocessor_ldemo_0_exceptions_total": int64(0),
  1706. "op_preprocessor_ldemo_0_process_latency_ms": int64(0),
  1707. "op_preprocessor_ldemo_0_records_in_total": int64(5),
  1708. "op_preprocessor_ldemo_0_records_out_total": int64(5),
  1709. "op_project_0_exceptions_total": int64(1),
  1710. "op_project_0_process_latency_ms": int64(0),
  1711. "op_project_0_records_in_total": int64(2),
  1712. "op_project_0_records_out_total": int64(1),
  1713. "sink_mockSink_0_exceptions_total": int64(0),
  1714. "sink_mockSink_0_records_in_total": int64(2),
  1715. "sink_mockSink_0_records_out_total": int64(2),
  1716. "source_ldemo_0_exceptions_total": int64(0),
  1717. "source_ldemo_0_records_in_total": int64(5),
  1718. "source_ldemo_0_records_out_total": int64(5),
  1719. "op_window_0_exceptions_total": int64(0),
  1720. "op_window_0_process_latency_ms": int64(0),
  1721. "op_window_0_records_in_total": int64(5),
  1722. "op_window_0_records_out_total": int64(3),
  1723. "op_filter_0_exceptions_total": int64(1),
  1724. "op_filter_0_process_latency_ms": int64(0),
  1725. "op_filter_0_records_in_total": int64(3),
  1726. "op_filter_0_records_out_total": int64(1),
  1727. },
  1728. }, {
  1729. name: `rule3`,
  1730. sql: `SELECT color, temp, ts FROM ldemo INNER JOIN ldemo1 ON ldemo.ts = ldemo1.ts GROUP BY SlidingWindow(ss, 1)`,
  1731. size: 5,
  1732. r: [][]map[string]interface{}{
  1733. {{
  1734. "color": "red",
  1735. "temp": 25.5,
  1736. "ts": float64(1541152486013),
  1737. }}, {{
  1738. "color": "red",
  1739. "temp": 25.5,
  1740. "ts": float64(1541152486013),
  1741. }}, {{
  1742. "color": "red",
  1743. "temp": 25.5,
  1744. "ts": float64(1541152486013),
  1745. }}, {{
  1746. "temp": 28.1,
  1747. "ts": float64(1541152487632),
  1748. }}, {{
  1749. "temp": 28.1,
  1750. "ts": float64(1541152487632),
  1751. }}, {{
  1752. "error": "run Join error: invalid operation int64(1541152487632) = string(1541152488442)",
  1753. }}, {{
  1754. "error": "run Join error: invalid operation int64(1541152488442) = string(1541152488442)",
  1755. }}, {{
  1756. "error": "run Join error: invalid operation int64(1541152488442) = string(1541152488442)",
  1757. }},
  1758. },
  1759. m: map[string]interface{}{
  1760. "op_preprocessor_ldemo_0_exceptions_total": int64(0),
  1761. "op_preprocessor_ldemo_0_process_latency_ms": int64(0),
  1762. "op_preprocessor_ldemo_0_records_in_total": int64(5),
  1763. "op_preprocessor_ldemo_0_records_out_total": int64(5),
  1764. "op_preprocessor_ldemo1_0_exceptions_total": int64(0),
  1765. "op_preprocessor_ldemo1_0_process_latency_ms": int64(0),
  1766. "op_preprocessor_ldemo1_0_records_in_total": int64(5),
  1767. "op_preprocessor_ldemo1_0_records_out_total": int64(5),
  1768. "op_project_0_exceptions_total": int64(3),
  1769. "op_project_0_process_latency_ms": int64(0),
  1770. "op_project_0_records_in_total": int64(8),
  1771. "op_project_0_records_out_total": int64(5),
  1772. "sink_mockSink_0_exceptions_total": int64(0),
  1773. "sink_mockSink_0_records_in_total": int64(8),
  1774. "sink_mockSink_0_records_out_total": int64(8),
  1775. "source_ldemo_0_exceptions_total": int64(0),
  1776. "source_ldemo_0_records_in_total": int64(5),
  1777. "source_ldemo_0_records_out_total": int64(5),
  1778. "source_ldemo1_0_exceptions_total": int64(0),
  1779. "source_ldemo1_0_records_in_total": int64(5),
  1780. "source_ldemo1_0_records_out_total": int64(5),
  1781. "op_window_0_exceptions_total": int64(0),
  1782. "op_window_0_process_latency_ms": int64(0),
  1783. "op_window_0_records_in_total": int64(10),
  1784. "op_window_0_records_out_total": int64(10),
  1785. "op_join_0_exceptions_total": int64(3),
  1786. "op_join_0_process_latency_ms": int64(0),
  1787. "op_join_0_records_in_total": int64(10),
  1788. "op_join_0_records_out_total": int64(5),
  1789. },
  1790. }, {
  1791. name: `rule4`,
  1792. sql: `SELECT color FROM ldemo GROUP BY SlidingWindow(ss, 2), color having size >= 2 order by color`,
  1793. size: 5,
  1794. r: [][]map[string]interface{}{
  1795. {{
  1796. "color": "red",
  1797. }}, {{
  1798. "error": "run Having error: invalid operation string(string) >= int64(2)",
  1799. }}, {{
  1800. "error": "run Having error: invalid operation string(string) >= int64(2)",
  1801. }}, {{
  1802. "error": "run Having error: invalid operation string(string) >= int64(2)",
  1803. }}, {{
  1804. "color": float64(49),
  1805. }, {}},
  1806. },
  1807. m: map[string]interface{}{
  1808. "op_preprocessor_ldemo_0_exceptions_total": int64(0),
  1809. "op_preprocessor_ldemo_0_process_latency_ms": int64(0),
  1810. "op_preprocessor_ldemo_0_records_in_total": int64(5),
  1811. "op_preprocessor_ldemo_0_records_out_total": int64(5),
  1812. "op_project_0_exceptions_total": int64(3),
  1813. "op_project_0_process_latency_ms": int64(0),
  1814. "op_project_0_records_in_total": int64(5),
  1815. "op_project_0_records_out_total": int64(2),
  1816. "sink_mockSink_0_exceptions_total": int64(0),
  1817. "sink_mockSink_0_records_in_total": int64(5),
  1818. "sink_mockSink_0_records_out_total": int64(5),
  1819. "source_ldemo_0_exceptions_total": int64(0),
  1820. "source_ldemo_0_records_in_total": int64(5),
  1821. "source_ldemo_0_records_out_total": int64(5),
  1822. "op_window_0_exceptions_total": int64(0),
  1823. "op_window_0_process_latency_ms": int64(0),
  1824. "op_window_0_records_in_total": int64(5),
  1825. "op_window_0_records_out_total": int64(5),
  1826. "op_aggregate_0_exceptions_total": int64(0),
  1827. "op_aggregate_0_process_latency_ms": int64(0),
  1828. "op_aggregate_0_records_in_total": int64(5),
  1829. "op_aggregate_0_records_out_total": int64(5),
  1830. "op_having_0_exceptions_total": int64(3),
  1831. "op_having_0_process_latency_ms": int64(0),
  1832. "op_having_0_records_in_total": int64(5),
  1833. "op_having_0_records_out_total": int64(2),
  1834. },
  1835. }, {
  1836. name: `rule5`,
  1837. sql: `SELECT color, size FROM ldemo GROUP BY tumblingwindow(ss, 1) ORDER BY size`,
  1838. size: 5,
  1839. r: [][]map[string]interface{}{
  1840. {{
  1841. "error": "run Order By error: incompatible types for comparison: int and string",
  1842. }}, {{
  1843. "size": float64(3),
  1844. }}, {{
  1845. "color": float64(49),
  1846. "size": float64(2),
  1847. }},
  1848. },
  1849. m: map[string]interface{}{
  1850. "op_preprocessor_ldemo_0_exceptions_total": int64(0),
  1851. "op_preprocessor_ldemo_0_process_latency_ms": int64(0),
  1852. "op_preprocessor_ldemo_0_records_in_total": int64(5),
  1853. "op_preprocessor_ldemo_0_records_out_total": int64(5),
  1854. "op_project_0_exceptions_total": int64(1),
  1855. "op_project_0_process_latency_ms": int64(0),
  1856. "op_project_0_records_in_total": int64(3),
  1857. "op_project_0_records_out_total": int64(2),
  1858. "sink_mockSink_0_exceptions_total": int64(0),
  1859. "sink_mockSink_0_records_in_total": int64(3),
  1860. "sink_mockSink_0_records_out_total": int64(3),
  1861. "source_ldemo_0_exceptions_total": int64(0),
  1862. "source_ldemo_0_records_in_total": int64(5),
  1863. "source_ldemo_0_records_out_total": int64(5),
  1864. "op_window_0_exceptions_total": int64(0),
  1865. "op_window_0_process_latency_ms": int64(0),
  1866. "op_window_0_records_in_total": int64(5),
  1867. "op_window_0_records_out_total": int64(3),
  1868. "op_order_0_exceptions_total": int64(1),
  1869. "op_order_0_process_latency_ms": int64(0),
  1870. "op_order_0_records_in_total": int64(3),
  1871. "op_order_0_records_out_total": int64(2),
  1872. },
  1873. },
  1874. }
  1875. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1876. createSchemalessStreams(t)
  1877. defer dropSchemalessStreams(t)
  1878. for i, tt := range tests {
  1879. test.ResetClock(1541152486000)
  1880. p := NewRuleProcessor(DbDir)
  1881. parser := xsql.NewParser(strings.NewReader(tt.sql))
  1882. var (
  1883. sources []*nodes.SourceNode
  1884. syncs []chan int
  1885. )
  1886. if stmt, err := xsql.Language.Parse(parser); err != nil {
  1887. t.Errorf("parse sql %s error: %s", tt.sql, err)
  1888. } else {
  1889. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  1890. t.Errorf("sql %s is not a select statement", tt.sql)
  1891. } else {
  1892. streams := xsql.GetStreams(selectStmt)
  1893. for _, stream := range streams {
  1894. next := make(chan int)
  1895. syncs = append(syncs, next)
  1896. source := getMockSourceL(stream, next, tt.size)
  1897. sources = append(sources, source)
  1898. }
  1899. }
  1900. }
  1901. tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql}, sources)
  1902. if err != nil {
  1903. t.Error(err)
  1904. }
  1905. mockSink := test.NewMockSink()
  1906. sink := nodes.NewSinkNodeWithSink("mockSink", mockSink)
  1907. tp.AddSink(inputs, sink)
  1908. errCh := tp.Open()
  1909. func() {
  1910. for i := 0; i < tt.size*len(syncs); i++ {
  1911. syncs[i%len(syncs)] <- i
  1912. for {
  1913. time.Sleep(1)
  1914. if getMetric(tp, "op_window_0_records_in_total") == (i + 1) {
  1915. break
  1916. }
  1917. }
  1918. select {
  1919. case err = <-errCh:
  1920. t.Log(err)
  1921. tp.Cancel()
  1922. return
  1923. default:
  1924. }
  1925. }
  1926. retry := 100
  1927. for ; retry > 0; retry-- {
  1928. if err := compareMetrics(tp, tt.m, tt.sql); err == nil {
  1929. break
  1930. }
  1931. t.Logf("wait to try another %d times", retry)
  1932. time.Sleep(time.Duration(retry) * time.Millisecond)
  1933. }
  1934. if retry == 0 {
  1935. err := compareMetrics(tp, tt.m, tt.sql)
  1936. t.Errorf("could not get correct metrics: %v", err)
  1937. }
  1938. }()
  1939. results := mockSink.GetResults()
  1940. var maps [][]map[string]interface{}
  1941. for _, v := range results {
  1942. var mapRes []map[string]interface{}
  1943. err := json.Unmarshal(v, &mapRes)
  1944. if err != nil {
  1945. t.Errorf("Failed to parse the input into map")
  1946. continue
  1947. }
  1948. maps = append(maps, mapRes)
  1949. }
  1950. if !reflect.DeepEqual(tt.r, maps) {
  1951. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  1952. }
  1953. if err := compareMetrics(tp, tt.m, tt.sql); err != nil {
  1954. t.Errorf("%d. %q\n\n%v", i, tt.sql, err)
  1955. }
  1956. tp.Cancel()
  1957. }
  1958. }
  1959. func createEventStreams(t *testing.T) {
  1960. p := NewStreamProcessor(path.Join(DbDir, "stream"))
  1961. demo := `CREATE STREAM demoE (
  1962. color STRING,
  1963. size BIGINT,
  1964. ts BIGINT
  1965. ) WITH (DATASOURCE="demoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  1966. _, err := p.ExecStmt(demo)
  1967. if err != nil {
  1968. t.Log(err)
  1969. }
  1970. demo1 := `CREATE STREAM demo1E (
  1971. temp FLOAT,
  1972. hum BIGINT,
  1973. ts BIGINT
  1974. ) WITH (DATASOURCE="demo1E", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  1975. _, err = p.ExecStmt(demo1)
  1976. if err != nil {
  1977. t.Log(err)
  1978. }
  1979. sessionDemo := `CREATE STREAM sessionDemoE (
  1980. temp FLOAT,
  1981. hum BIGINT,
  1982. ts BIGINT
  1983. ) WITH (DATASOURCE="sessionDemoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  1984. _, err = p.ExecStmt(sessionDemo)
  1985. if err != nil {
  1986. t.Log(err)
  1987. }
  1988. demoErr := `CREATE STREAM demoErr (
  1989. color STRING,
  1990. size BIGINT,
  1991. ts BIGINT
  1992. ) WITH (DATASOURCE="demoErr", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  1993. _, err = p.ExecStmt(demoErr)
  1994. if err != nil {
  1995. t.Log(err)
  1996. }
  1997. }
  1998. func dropEventStreams(t *testing.T) {
  1999. p := NewStreamProcessor(path.Join(DbDir, "stream"))
  2000. demo := `DROP STREAM demoE`
  2001. _, err := p.ExecStmt(demo)
  2002. if err != nil {
  2003. t.Log(err)
  2004. }
  2005. demo1 := `DROP STREAM demo1E`
  2006. _, err = p.ExecStmt(demo1)
  2007. if err != nil {
  2008. t.Log(err)
  2009. }
  2010. sessionDemo := `DROP STREAM sessionDemoE`
  2011. _, err = p.ExecStmt(sessionDemo)
  2012. if err != nil {
  2013. t.Log(err)
  2014. }
  2015. demoErr := `DROP STREAM demoErr`
  2016. _, err = p.ExecStmt(demoErr)
  2017. if err != nil {
  2018. t.Log(err)
  2019. }
  2020. }
  2021. func getEventMockSource(name string, done <-chan int, size int) *nodes.SourceNode {
  2022. var data []*xsql.Tuple
  2023. switch name {
  2024. case "demoE":
  2025. data = []*xsql.Tuple{
  2026. {
  2027. Emitter: name,
  2028. Message: map[string]interface{}{
  2029. "color": "red",
  2030. "size": 3,
  2031. "ts": 1541152486013,
  2032. },
  2033. Timestamp: 1541152486013,
  2034. },
  2035. {
  2036. Emitter: name,
  2037. Message: map[string]interface{}{
  2038. "color": "blue",
  2039. "size": 2,
  2040. "ts": 1541152487632,
  2041. },
  2042. Timestamp: 1541152487632,
  2043. },
  2044. {
  2045. Emitter: name,
  2046. Message: map[string]interface{}{
  2047. "color": "red",
  2048. "size": 1,
  2049. "ts": 1541152489252,
  2050. },
  2051. Timestamp: 1541152489252,
  2052. },
  2053. { //dropped item
  2054. Emitter: name,
  2055. Message: map[string]interface{}{
  2056. "color": "blue",
  2057. "size": 6,
  2058. "ts": 1541152486822,
  2059. },
  2060. Timestamp: 1541152486822,
  2061. },
  2062. {
  2063. Emitter: name,
  2064. Message: map[string]interface{}{
  2065. "color": "yellow",
  2066. "size": 4,
  2067. "ts": 1541152488442,
  2068. },
  2069. Timestamp: 1541152488442,
  2070. },
  2071. { //To lift the watermark and issue all windows
  2072. Emitter: name,
  2073. Message: map[string]interface{}{
  2074. "color": "yellow",
  2075. "size": 4,
  2076. "ts": 1541152492342,
  2077. },
  2078. Timestamp: 1541152488442,
  2079. },
  2080. }
  2081. case "demo1E":
  2082. data = []*xsql.Tuple{
  2083. {
  2084. Emitter: name,
  2085. Message: map[string]interface{}{
  2086. "temp": 27.5,
  2087. "hum": 59,
  2088. "ts": 1541152486823,
  2089. },
  2090. Timestamp: 1541152486823,
  2091. },
  2092. {
  2093. Emitter: name,
  2094. Message: map[string]interface{}{
  2095. "temp": 25.5,
  2096. "hum": 65,
  2097. "ts": 1541152486013,
  2098. },
  2099. Timestamp: 1541152486013,
  2100. },
  2101. {
  2102. Emitter: name,
  2103. Message: map[string]interface{}{
  2104. "temp": 27.4,
  2105. "hum": 80,
  2106. "ts": 1541152488442,
  2107. },
  2108. Timestamp: 1541152488442,
  2109. },
  2110. {
  2111. Emitter: name,
  2112. Message: map[string]interface{}{
  2113. "temp": 28.1,
  2114. "hum": 75,
  2115. "ts": 1541152487632,
  2116. },
  2117. Timestamp: 1541152487632,
  2118. },
  2119. {
  2120. Emitter: name,
  2121. Message: map[string]interface{}{
  2122. "temp": 25.5,
  2123. "hum": 62,
  2124. "ts": 1541152489252,
  2125. },
  2126. Timestamp: 1541152489252,
  2127. },
  2128. {
  2129. Emitter: name,
  2130. Message: map[string]interface{}{
  2131. "temp": 25.5,
  2132. "hum": 62,
  2133. "ts": 1541152499252,
  2134. },
  2135. Timestamp: 1541152499252,
  2136. },
  2137. }
  2138. case "sessionDemoE":
  2139. data = []*xsql.Tuple{
  2140. {
  2141. Emitter: name,
  2142. Message: map[string]interface{}{
  2143. "temp": 25.5,
  2144. "hum": 65,
  2145. "ts": 1541152486013,
  2146. },
  2147. Timestamp: 1541152486013,
  2148. },
  2149. {
  2150. Emitter: name,
  2151. Message: map[string]interface{}{
  2152. "temp": 28.1,
  2153. "hum": 75,
  2154. "ts": 1541152487932,
  2155. },
  2156. Timestamp: 1541152487932,
  2157. },
  2158. {
  2159. Emitter: name,
  2160. Message: map[string]interface{}{
  2161. "temp": 27.5,
  2162. "hum": 59,
  2163. "ts": 1541152486823,
  2164. },
  2165. Timestamp: 1541152486823,
  2166. },
  2167. {
  2168. Emitter: name,
  2169. Message: map[string]interface{}{
  2170. "temp": 25.5,
  2171. "hum": 62,
  2172. "ts": 1541152489252,
  2173. },
  2174. Timestamp: 1541152489252,
  2175. },
  2176. {
  2177. Emitter: name,
  2178. Message: map[string]interface{}{
  2179. "temp": 27.4,
  2180. "hum": 80,
  2181. "ts": 1541152488442,
  2182. },
  2183. Timestamp: 1541152488442,
  2184. },
  2185. {
  2186. Emitter: name,
  2187. Message: map[string]interface{}{
  2188. "temp": 26.2,
  2189. "hum": 63,
  2190. "ts": 1541152490062,
  2191. },
  2192. Timestamp: 1541152490062,
  2193. },
  2194. {
  2195. Emitter: name,
  2196. Message: map[string]interface{}{
  2197. "temp": 28.9,
  2198. "hum": 85,
  2199. "ts": 1541152491682,
  2200. },
  2201. Timestamp: 1541152491682,
  2202. },
  2203. {
  2204. Emitter: name,
  2205. Message: map[string]interface{}{
  2206. "temp": 26.8,
  2207. "hum": 71,
  2208. "ts": 1541152490872,
  2209. },
  2210. Timestamp: 1541152490872,
  2211. },
  2212. {
  2213. Emitter: name,
  2214. Message: map[string]interface{}{
  2215. "temp": 29.1,
  2216. "hum": 92,
  2217. "ts": 1541152492492,
  2218. },
  2219. Timestamp: 1541152492492,
  2220. },
  2221. {
  2222. Emitter: name,
  2223. Message: map[string]interface{}{
  2224. "temp": 30.9,
  2225. "hum": 87,
  2226. "ts": 1541152494112,
  2227. },
  2228. Timestamp: 1541152494112,
  2229. },
  2230. {
  2231. Emitter: name,
  2232. Message: map[string]interface{}{
  2233. "temp": 32.2,
  2234. "hum": 99,
  2235. "ts": 1541152493202,
  2236. },
  2237. Timestamp: 1541152493202,
  2238. },
  2239. {
  2240. Emitter: name,
  2241. Message: map[string]interface{}{
  2242. "temp": 32.2,
  2243. "hum": 99,
  2244. "ts": 1541152499202,
  2245. },
  2246. Timestamp: 1541152499202,
  2247. },
  2248. }
  2249. case "demoErr":
  2250. data = []*xsql.Tuple{
  2251. {
  2252. Emitter: name,
  2253. Message: map[string]interface{}{
  2254. "color": "red",
  2255. "size": 3,
  2256. "ts": 1541152486013,
  2257. },
  2258. Timestamp: 1541152486013,
  2259. },
  2260. {
  2261. Emitter: name,
  2262. Message: map[string]interface{}{
  2263. "color": 2,
  2264. "size": "blue",
  2265. "ts": 1541152487632,
  2266. },
  2267. Timestamp: 1541152487632,
  2268. },
  2269. {
  2270. Emitter: name,
  2271. Message: map[string]interface{}{
  2272. "color": "red",
  2273. "size": 1,
  2274. "ts": 1541152489252,
  2275. },
  2276. Timestamp: 1541152489252,
  2277. },
  2278. { //dropped item
  2279. Emitter: name,
  2280. Message: map[string]interface{}{
  2281. "color": "blue",
  2282. "size": 6,
  2283. "ts": 1541152486822,
  2284. },
  2285. Timestamp: 1541152486822,
  2286. },
  2287. {
  2288. Emitter: name,
  2289. Message: map[string]interface{}{
  2290. "color": "yellow",
  2291. "size": 4,
  2292. "ts": 1541152488442,
  2293. },
  2294. Timestamp: 1541152488442,
  2295. },
  2296. { //To lift the watermark and issue all windows
  2297. Emitter: name,
  2298. Message: map[string]interface{}{
  2299. "color": "yellow",
  2300. "size": 4,
  2301. "ts": 1541152492342,
  2302. },
  2303. Timestamp: 1541152488442,
  2304. },
  2305. }
  2306. }
  2307. return nodes.NewSourceNodeWithSource(name, test.NewMockSource(data[:size], done, true), map[string]string{
  2308. "DATASOURCE": name,
  2309. })
  2310. }
  2311. func TestEventWindow(t *testing.T) {
  2312. var tests = []struct {
  2313. name string
  2314. sql string
  2315. size int
  2316. r [][]map[string]interface{}
  2317. m map[string]interface{}
  2318. }{
  2319. {
  2320. name: `rule1`,
  2321. sql: `SELECT * FROM demoE GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  2322. size: 6,
  2323. r: [][]map[string]interface{}{
  2324. {{
  2325. "color": "red",
  2326. "size": float64(3),
  2327. "ts": float64(1541152486013),
  2328. }},
  2329. {{
  2330. "color": "red",
  2331. "size": float64(3),
  2332. "ts": float64(1541152486013),
  2333. }, {
  2334. "color": "blue",
  2335. "size": float64(2),
  2336. "ts": float64(1541152487632),
  2337. }},
  2338. {{
  2339. "color": "blue",
  2340. "size": float64(2),
  2341. "ts": float64(1541152487632),
  2342. }, {
  2343. "color": "yellow",
  2344. "size": float64(4),
  2345. "ts": float64(1541152488442),
  2346. }}, {{
  2347. "color": "yellow",
  2348. "size": float64(4),
  2349. "ts": float64(1541152488442),
  2350. }, {
  2351. "color": "red",
  2352. "size": float64(1),
  2353. "ts": float64(1541152489252),
  2354. }}, {{
  2355. "color": "red",
  2356. "size": float64(1),
  2357. "ts": float64(1541152489252),
  2358. }},
  2359. },
  2360. m: map[string]interface{}{
  2361. "op_preprocessor_demoE_0_exceptions_total": int64(0),
  2362. "op_preprocessor_demoE_0_process_latency_ms": int64(0),
  2363. "op_preprocessor_demoE_0_records_in_total": int64(6),
  2364. "op_preprocessor_demoE_0_records_out_total": int64(6),
  2365. "op_project_0_exceptions_total": int64(0),
  2366. "op_project_0_process_latency_ms": int64(0),
  2367. "op_project_0_records_in_total": int64(5),
  2368. "op_project_0_records_out_total": int64(5),
  2369. "sink_mockSink_0_exceptions_total": int64(0),
  2370. "sink_mockSink_0_records_in_total": int64(5),
  2371. "sink_mockSink_0_records_out_total": int64(5),
  2372. "source_demoE_0_exceptions_total": int64(0),
  2373. "source_demoE_0_records_in_total": int64(6),
  2374. "source_demoE_0_records_out_total": int64(6),
  2375. "op_window_0_exceptions_total": int64(0),
  2376. "op_window_0_process_latency_ms": int64(0),
  2377. "op_window_0_records_in_total": int64(6),
  2378. "op_window_0_records_out_total": int64(5),
  2379. },
  2380. }, {
  2381. name: `rule2`,
  2382. sql: `SELECT color, ts FROM demoE where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  2383. size: 6,
  2384. r: [][]map[string]interface{}{
  2385. {{
  2386. "color": "red",
  2387. "ts": float64(1541152486013),
  2388. }},
  2389. {{
  2390. "color": "yellow",
  2391. "ts": float64(1541152488442),
  2392. }},
  2393. },
  2394. m: map[string]interface{}{
  2395. "op_preprocessor_demoE_0_exceptions_total": int64(0),
  2396. "op_preprocessor_demoE_0_process_latency_ms": int64(0),
  2397. "op_preprocessor_demoE_0_records_in_total": int64(6),
  2398. "op_preprocessor_demoE_0_records_out_total": int64(6),
  2399. "op_project_0_exceptions_total": int64(0),
  2400. "op_project_0_process_latency_ms": int64(0),
  2401. "op_project_0_records_in_total": int64(2),
  2402. "op_project_0_records_out_total": int64(2),
  2403. "sink_mockSink_0_exceptions_total": int64(0),
  2404. "sink_mockSink_0_records_in_total": int64(2),
  2405. "sink_mockSink_0_records_out_total": int64(2),
  2406. "source_demoE_0_exceptions_total": int64(0),
  2407. "source_demoE_0_records_in_total": int64(6),
  2408. "source_demoE_0_records_out_total": int64(6),
  2409. "op_window_0_exceptions_total": int64(0),
  2410. "op_window_0_process_latency_ms": int64(0),
  2411. "op_window_0_records_in_total": int64(6),
  2412. "op_window_0_records_out_total": int64(4),
  2413. "op_filter_0_exceptions_total": int64(0),
  2414. "op_filter_0_process_latency_ms": int64(0),
  2415. "op_filter_0_records_in_total": int64(4),
  2416. "op_filter_0_records_out_total": int64(2),
  2417. },
  2418. }, {
  2419. name: `rule3`,
  2420. sql: `SELECT color, temp, ts FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1)`,
  2421. size: 6,
  2422. r: [][]map[string]interface{}{
  2423. {{
  2424. "color": "red",
  2425. "temp": 25.5,
  2426. "ts": float64(1541152486013),
  2427. }}, {{
  2428. "color": "red",
  2429. "temp": 25.5,
  2430. "ts": float64(1541152486013),
  2431. }}, {{
  2432. "color": "blue",
  2433. "temp": 28.1,
  2434. "ts": float64(1541152487632),
  2435. }}, {{
  2436. "color": "blue",
  2437. "temp": 28.1,
  2438. "ts": float64(1541152487632),
  2439. }, {
  2440. "color": "yellow",
  2441. "temp": 27.4,
  2442. "ts": float64(1541152488442),
  2443. }}, {{
  2444. "color": "yellow",
  2445. "temp": 27.4,
  2446. "ts": float64(1541152488442),
  2447. }, {
  2448. "color": "red",
  2449. "temp": 25.5,
  2450. "ts": float64(1541152489252),
  2451. }},
  2452. },
  2453. m: map[string]interface{}{
  2454. "op_preprocessor_demoE_0_exceptions_total": int64(0),
  2455. "op_preprocessor_demoE_0_process_latency_ms": int64(0),
  2456. "op_preprocessor_demoE_0_records_in_total": int64(6),
  2457. "op_preprocessor_demoE_0_records_out_total": int64(6),
  2458. "op_preprocessor_demo1E_0_exceptions_total": int64(0),
  2459. "op_preprocessor_demo1E_0_process_latency_ms": int64(0),
  2460. "op_preprocessor_demo1E_0_records_in_total": int64(6),
  2461. "op_preprocessor_demo1E_0_records_out_total": int64(6),
  2462. "op_project_0_exceptions_total": int64(0),
  2463. "op_project_0_process_latency_ms": int64(0),
  2464. "op_project_0_records_in_total": int64(5),
  2465. "op_project_0_records_out_total": int64(5),
  2466. "sink_mockSink_0_exceptions_total": int64(0),
  2467. "sink_mockSink_0_records_in_total": int64(5),
  2468. "sink_mockSink_0_records_out_total": int64(5),
  2469. "source_demoE_0_exceptions_total": int64(0),
  2470. "source_demoE_0_records_in_total": int64(6),
  2471. "source_demoE_0_records_out_total": int64(6),
  2472. "source_demo1E_0_exceptions_total": int64(0),
  2473. "source_demo1E_0_records_in_total": int64(6),
  2474. "source_demo1E_0_records_out_total": int64(6),
  2475. "op_window_0_exceptions_total": int64(0),
  2476. "op_window_0_process_latency_ms": int64(0),
  2477. "op_window_0_records_in_total": int64(12),
  2478. "op_window_0_records_out_total": int64(5),
  2479. "op_join_0_exceptions_total": int64(0),
  2480. "op_join_0_process_latency_ms": int64(0),
  2481. "op_join_0_records_in_total": int64(5),
  2482. "op_join_0_records_out_total": int64(5),
  2483. },
  2484. }, {
  2485. name: `rule4`,
  2486. sql: `SELECT color FROM demoE GROUP BY SlidingWindow(ss, 2), color ORDER BY color`,
  2487. size: 6,
  2488. r: [][]map[string]interface{}{
  2489. {{
  2490. "color": "red",
  2491. }}, {{
  2492. "color": "blue",
  2493. }, {
  2494. "color": "red",
  2495. }}, {{
  2496. "color": "blue",
  2497. }, {
  2498. "color": "yellow",
  2499. }}, {{
  2500. "color": "blue",
  2501. }, {
  2502. "color": "red",
  2503. }, {
  2504. "color": "yellow",
  2505. }},
  2506. },
  2507. m: map[string]interface{}{
  2508. "op_preprocessor_demoE_0_exceptions_total": int64(0),
  2509. "op_preprocessor_demoE_0_process_latency_ms": int64(0),
  2510. "op_preprocessor_demoE_0_records_in_total": int64(6),
  2511. "op_preprocessor_demoE_0_records_out_total": int64(6),
  2512. "op_project_0_exceptions_total": int64(0),
  2513. "op_project_0_process_latency_ms": int64(0),
  2514. "op_project_0_records_in_total": int64(4),
  2515. "op_project_0_records_out_total": int64(4),
  2516. "sink_mockSink_0_exceptions_total": int64(0),
  2517. "sink_mockSink_0_records_in_total": int64(4),
  2518. "sink_mockSink_0_records_out_total": int64(4),
  2519. "source_demoE_0_exceptions_total": int64(0),
  2520. "source_demoE_0_records_in_total": int64(6),
  2521. "source_demoE_0_records_out_total": int64(6),
  2522. "op_window_0_exceptions_total": int64(0),
  2523. "op_window_0_process_latency_ms": int64(0),
  2524. "op_window_0_records_in_total": int64(6),
  2525. "op_window_0_records_out_total": int64(4),
  2526. "op_aggregate_0_exceptions_total": int64(0),
  2527. "op_aggregate_0_process_latency_ms": int64(0),
  2528. "op_aggregate_0_records_in_total": int64(4),
  2529. "op_aggregate_0_records_out_total": int64(4),
  2530. "op_order_0_exceptions_total": int64(0),
  2531. "op_order_0_process_latency_ms": int64(0),
  2532. "op_order_0_records_in_total": int64(4),
  2533. "op_order_0_records_out_total": int64(4),
  2534. },
  2535. }, {
  2536. name: `rule5`,
  2537. sql: `SELECT temp FROM sessionDemoE GROUP BY SessionWindow(ss, 2, 1) `,
  2538. size: 12,
  2539. r: [][]map[string]interface{}{
  2540. {{
  2541. "temp": 25.5,
  2542. }}, {{
  2543. "temp": 28.1,
  2544. }, {
  2545. "temp": 27.4,
  2546. }, {
  2547. "temp": 25.5,
  2548. }}, {{
  2549. "temp": 26.2,
  2550. }, {
  2551. "temp": 26.8,
  2552. }, {
  2553. "temp": 28.9,
  2554. }, {
  2555. "temp": 29.1,
  2556. }, {
  2557. "temp": 32.2,
  2558. }}, {{
  2559. "temp": 30.9,
  2560. }},
  2561. },
  2562. m: map[string]interface{}{
  2563. "op_preprocessor_sessionDemoE_0_exceptions_total": int64(0),
  2564. "op_preprocessor_sessionDemoE_0_process_latency_ms": int64(0),
  2565. "op_preprocessor_sessionDemoE_0_records_in_total": int64(12),
  2566. "op_preprocessor_sessionDemoE_0_records_out_total": int64(12),
  2567. "op_project_0_exceptions_total": int64(0),
  2568. "op_project_0_process_latency_ms": int64(0),
  2569. "op_project_0_records_in_total": int64(4),
  2570. "op_project_0_records_out_total": int64(4),
  2571. "sink_mockSink_0_exceptions_total": int64(0),
  2572. "sink_mockSink_0_records_in_total": int64(4),
  2573. "sink_mockSink_0_records_out_total": int64(4),
  2574. "source_sessionDemoE_0_exceptions_total": int64(0),
  2575. "source_sessionDemoE_0_records_in_total": int64(12),
  2576. "source_sessionDemoE_0_records_out_total": int64(12),
  2577. "op_window_0_exceptions_total": int64(0),
  2578. "op_window_0_process_latency_ms": int64(0),
  2579. "op_window_0_records_in_total": int64(12),
  2580. "op_window_0_records_out_total": int64(4),
  2581. },
  2582. }, {
  2583. name: `rule6`,
  2584. sql: `SELECT max(temp) as m, count(color) as c FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1)`,
  2585. size: 6,
  2586. r: [][]map[string]interface{}{
  2587. {{
  2588. "m": 25.5,
  2589. "c": float64(1),
  2590. }}, {{
  2591. "m": 25.5,
  2592. "c": float64(1),
  2593. }}, {{
  2594. "m": 28.1,
  2595. "c": float64(1),
  2596. }}, {{
  2597. "m": 28.1,
  2598. "c": float64(2),
  2599. }}, {{
  2600. "m": 27.4,
  2601. "c": float64(2),
  2602. }},
  2603. },
  2604. m: map[string]interface{}{
  2605. "op_preprocessor_demoE_0_exceptions_total": int64(0),
  2606. "op_preprocessor_demoE_0_process_latency_ms": int64(0),
  2607. "op_preprocessor_demoE_0_records_in_total": int64(6),
  2608. "op_preprocessor_demoE_0_records_out_total": int64(6),
  2609. "op_preprocessor_demo1E_0_exceptions_total": int64(0),
  2610. "op_preprocessor_demo1E_0_process_latency_ms": int64(0),
  2611. "op_preprocessor_demo1E_0_records_in_total": int64(6),
  2612. "op_preprocessor_demo1E_0_records_out_total": int64(6),
  2613. "op_project_0_exceptions_total": int64(0),
  2614. "op_project_0_process_latency_ms": int64(0),
  2615. "op_project_0_records_in_total": int64(5),
  2616. "op_project_0_records_out_total": int64(5),
  2617. "sink_mockSink_0_exceptions_total": int64(0),
  2618. "sink_mockSink_0_records_in_total": int64(5),
  2619. "sink_mockSink_0_records_out_total": int64(5),
  2620. "source_demoE_0_exceptions_total": int64(0),
  2621. "source_demoE_0_records_in_total": int64(6),
  2622. "source_demoE_0_records_out_total": int64(6),
  2623. "source_demo1E_0_exceptions_total": int64(0),
  2624. "source_demo1E_0_records_in_total": int64(6),
  2625. "source_demo1E_0_records_out_total": int64(6),
  2626. "op_window_0_exceptions_total": int64(0),
  2627. "op_window_0_records_in_total": int64(12),
  2628. "op_window_0_records_out_total": int64(5),
  2629. "op_join_0_exceptions_total": int64(0),
  2630. "op_join_0_process_latency_ms": int64(0),
  2631. "op_join_0_records_in_total": int64(5),
  2632. "op_join_0_records_out_total": int64(5),
  2633. },
  2634. }, {
  2635. name: `rule7`,
  2636. sql: `SELECT * FROM demoErr GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  2637. size: 6,
  2638. r: [][]map[string]interface{}{
  2639. {{
  2640. "error": "error in preprocessor: invalid data type for color, expect string but found int(2)",
  2641. }},
  2642. {{
  2643. "color": "red",
  2644. "size": float64(3),
  2645. "ts": float64(1541152486013),
  2646. }},
  2647. {{
  2648. "color": "red",
  2649. "size": float64(3),
  2650. "ts": float64(1541152486013),
  2651. }},
  2652. {{
  2653. "color": "yellow",
  2654. "size": float64(4),
  2655. "ts": float64(1541152488442),
  2656. }}, {{
  2657. "color": "yellow",
  2658. "size": float64(4),
  2659. "ts": float64(1541152488442),
  2660. }, {
  2661. "color": "red",
  2662. "size": float64(1),
  2663. "ts": float64(1541152489252),
  2664. }}, {{
  2665. "color": "red",
  2666. "size": float64(1),
  2667. "ts": float64(1541152489252),
  2668. }},
  2669. },
  2670. m: map[string]interface{}{
  2671. "op_preprocessor_demoErr_0_exceptions_total": int64(1),
  2672. "op_preprocessor_demoErr_0_process_latency_ms": int64(0),
  2673. "op_preprocessor_demoErr_0_records_in_total": int64(6),
  2674. "op_preprocessor_demoErr_0_records_out_total": int64(5),
  2675. "op_project_0_exceptions_total": int64(1),
  2676. "op_project_0_process_latency_ms": int64(0),
  2677. "op_project_0_records_in_total": int64(6),
  2678. "op_project_0_records_out_total": int64(5),
  2679. "sink_mockSink_0_exceptions_total": int64(0),
  2680. "sink_mockSink_0_records_in_total": int64(6),
  2681. "sink_mockSink_0_records_out_total": int64(6),
  2682. "source_demoErr_0_exceptions_total": int64(0),
  2683. "source_demoErr_0_records_in_total": int64(6),
  2684. "source_demoErr_0_records_out_total": int64(6),
  2685. "op_window_0_exceptions_total": int64(1),
  2686. "op_window_0_process_latency_ms": int64(0),
  2687. "op_window_0_records_in_total": int64(6),
  2688. "op_window_0_records_out_total": int64(5),
  2689. },
  2690. },
  2691. }
  2692. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  2693. createEventStreams(t)
  2694. defer dropEventStreams(t)
  2695. for i, tt := range tests {
  2696. test.ResetClock(1541152486000)
  2697. p := NewRuleProcessor(DbDir)
  2698. parser := xsql.NewParser(strings.NewReader(tt.sql))
  2699. var (
  2700. sources []*nodes.SourceNode
  2701. syncs []chan int
  2702. )
  2703. if stmt, err := xsql.Language.Parse(parser); err != nil {
  2704. t.Errorf("parse sql %s error: %s", tt.sql, err)
  2705. } else {
  2706. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  2707. t.Errorf("sql %s is not a select statement", tt.sql)
  2708. } else {
  2709. streams := xsql.GetStreams(selectStmt)
  2710. for _, stream := range streams {
  2711. next := make(chan int)
  2712. syncs = append(syncs, next)
  2713. source := getEventMockSource(stream, next, tt.size)
  2714. sources = append(sources, source)
  2715. }
  2716. }
  2717. }
  2718. tp, inputs, err := p.createTopoWithSources(&api.Rule{
  2719. Id: tt.name, Sql: tt.sql,
  2720. Options: map[string]interface{}{
  2721. "isEventTime": true,
  2722. "lateTolerance": float64(1000),
  2723. },
  2724. }, sources)
  2725. if err != nil {
  2726. t.Error(err)
  2727. }
  2728. mockSink := test.NewMockSink()
  2729. sink := nodes.NewSinkNodeWithSink("mockSink", mockSink)
  2730. tp.AddSink(inputs, sink)
  2731. errCh := tp.Open()
  2732. func() {
  2733. for i := 0; i < tt.size*len(syncs); i++ {
  2734. syncs[i%len(syncs)] <- i
  2735. for {
  2736. time.Sleep(1)
  2737. if getMetric(tp, "op_window_0_records_in_total") == (i + 1) {
  2738. break
  2739. }
  2740. }
  2741. select {
  2742. case err = <-errCh:
  2743. t.Log(err)
  2744. tp.Cancel()
  2745. return
  2746. default:
  2747. }
  2748. }
  2749. mockClock := test.GetMockClock()
  2750. mockClock.Add(1000 * time.Millisecond)
  2751. retry := 100
  2752. for ; retry > 0; retry-- {
  2753. if err := compareMetrics(tp, tt.m, tt.sql); err == nil {
  2754. break
  2755. }
  2756. t.Logf("wait to try another %d times", retry)
  2757. time.Sleep(time.Duration(retry) * time.Millisecond)
  2758. }
  2759. if retry == 0 {
  2760. err := compareMetrics(tp, tt.m, tt.sql)
  2761. t.Errorf("could not get correct metrics: %v", err)
  2762. }
  2763. }()
  2764. results := mockSink.GetResults()
  2765. var maps [][]map[string]interface{}
  2766. for _, v := range results {
  2767. var mapRes []map[string]interface{}
  2768. err := json.Unmarshal(v, &mapRes)
  2769. if err != nil {
  2770. t.Errorf("Failed to parse the input into map")
  2771. continue
  2772. }
  2773. maps = append(maps, mapRes)
  2774. }
  2775. if !reflect.DeepEqual(tt.r, maps) {
  2776. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  2777. }
  2778. if err := compareMetrics(tp, tt.m, tt.sql); err != nil {
  2779. t.Errorf("%d. %q\n\n%v", i, tt.sql, err)
  2780. }
  2781. tp.Cancel()
  2782. }
  2783. }
  2784. func getMetric(tp *xstream.TopologyNew, name string) int {
  2785. keys, values := tp.GetMetrics()
  2786. for index, key := range keys {
  2787. if key == name {
  2788. return int(values[index].(int64))
  2789. }
  2790. }
  2791. fmt.Println("can't find " + name)
  2792. return 0
  2793. }
  2794. func compareMetrics(tp *xstream.TopologyNew, m map[string]interface{}, sql string) (err error) {
  2795. keys, values := tp.GetMetrics()
  2796. for i, k := range keys {
  2797. log.Printf("%s:%v", k, values[i])
  2798. }
  2799. for k, v := range m {
  2800. var (
  2801. index int
  2802. key string
  2803. matched bool
  2804. )
  2805. for index, key = range keys {
  2806. if k == key {
  2807. if strings.HasSuffix(k, "process_latency_ms") {
  2808. if values[index].(int64) >= v.(int64) {
  2809. matched = true
  2810. continue
  2811. } else {
  2812. break
  2813. }
  2814. }
  2815. if values[index] == v {
  2816. matched = true
  2817. }
  2818. break
  2819. }
  2820. }
  2821. if matched {
  2822. continue
  2823. }
  2824. //do not find
  2825. if index < len(values) {
  2826. return fmt.Errorf("metrics mismatch for %s:\n\nexp=%#v(%t)\n\ngot=%#v(%t)\n\n", k, v, v, values[index], values[index])
  2827. } else {
  2828. return fmt.Errorf("metrics mismatch for %s:\n\nexp=%#v\n\ngot=nil\n\n", k, v)
  2829. }
  2830. }
  2831. return nil
  2832. }
  2833. func errstring(err error) string {
  2834. if err != nil {
  2835. return err.Error()
  2836. }
  2837. return ""
  2838. }