xsql_processor_test.go 91 KB


  1. package processors
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/emqx/kuiper/xstream"
  8. "github.com/emqx/kuiper/xstream/api"
  9. "github.com/emqx/kuiper/xstream/nodes"
  10. "github.com/emqx/kuiper/xstream/test"
  11. "path"
  12. "reflect"
  13. "strings"
  14. "testing"
  15. "time"
  16. )
  17. var DbDir = getDbDir()
  18. func getDbDir() string {
  19. dbDir, err := common.GetAndCreateDataLoc("test")
  20. if err != nil {
  21. log.Panic(err)
  22. }
  23. log.Infof("db location is %s", dbDir)
  24. return dbDir
  25. }
  26. func TestStreamCreateProcessor(t *testing.T) {
  27. var tests = []struct {
  28. s string
  29. r []string
  30. err string
  31. }{
  32. {
  33. s: `SHOW STREAMS;`,
  34. r: []string{"No stream definitions are found."},
  35. },
  36. {
  37. s: `EXPLAIN STREAM topic1;`,
  38. err: "Stream topic1 is not found.",
  39. },
  40. {
  41. s: `CREATE STREAM topic1 (
  42. USERID BIGINT,
  43. FIRST_NAME STRING,
  44. LAST_NAME STRING,
  45. NICKNAMES ARRAY(STRING),
  46. Gender BOOLEAN,
  47. ADDRESS STRUCT(STREET_NAME STRING, NUMBER BIGINT),
  48. ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID");`,
  49. r: []string{"Stream topic1 is created."},
  50. },
  51. {
  52. s: `CREATE STREAM ` + "`stream`" + ` (
  53. USERID BIGINT,
  54. FIRST_NAME STRING,
  55. LAST_NAME STRING,
  56. NICKNAMES ARRAY(STRING),
  57. Gender BOOLEAN,
  58. ` + "`地址`" + ` STRUCT(STREET_NAME STRING, NUMBER BIGINT),
  59. ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID");`,
  60. r: []string{"Stream stream is created."},
  61. },
  62. {
  63. s: `CREATE STREAM topic1 (
  64. USERID BIGINT,
  65. ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID");`,
  66. err: "Create stream fails: Item topic1 already exists.",
  67. },
  68. {
  69. s: `EXPLAIN STREAM topic1;`,
  70. r: []string{"TO BE SUPPORTED"},
  71. },
  72. {
  73. s: `DESCRIBE STREAM topic1;`,
  74. r: []string{"Fields\n--------------------------------------------------------------------------------\nUSERID\tbigint\nFIRST_NAME\tstring\nLAST_NAME\tstring\nNICKNAMES\t" +
  75. "array(string)\nGender\tboolean\nADDRESS\tstruct(STREET_NAME string, NUMBER bigint)\n\n" +
  76. "DATASOURCE: users\nFORMAT: AVRO\nKEY: USERID\n"},
  77. },
  78. {
  79. s: `DROP STREAM topic1;`,
  80. r: []string{"Stream topic1 is dropped."},
  81. },
  82. {
  83. s: `SHOW STREAMS;`,
  84. r: []string{"stream"},
  85. },
  86. {
  87. s: `DESCRIBE STREAM topic1;`,
  88. err: "Stream topic1 is not found.",
  89. },
  90. {
  91. s: `DROP STREAM topic1;`,
  92. err: "Drop stream fails: topic1 is not found.",
  93. },
  94. {
  95. s: "DROP STREAM `stream`;",
  96. r: []string{"Stream stream is dropped."},
  97. },
  98. }
  99. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  100. streamDB := path.Join(getDbDir(), "streamTest")
  101. for i, tt := range tests {
  102. results, err := NewStreamProcessor(streamDB).ExecStmt(tt.s)
  103. if !reflect.DeepEqual(tt.err, errstring(err)) {
  104. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.s, tt.err, err)
  105. } else if tt.err == "" {
  106. if !reflect.DeepEqual(tt.r, results) {
  107. t.Errorf("%d. %q\n\nstmt mismatch:\nexp=%s\ngot=%#v\n\n", i, tt.s, tt.r, results)
  108. }
  109. }
  110. }
  111. }
  112. func createStreams(t *testing.T) {
  113. p := NewStreamProcessor(path.Join(DbDir, "stream"))
  114. demo := `CREATE STREAM demo (
  115. color STRING,
  116. size BIGINT,
  117. ts BIGINT
  118. ) WITH (DATASOURCE="demo", FORMAT="json", KEY="ts");`
  119. _, err := p.ExecStmt(demo)
  120. if err != nil {
  121. t.Log(err)
  122. }
  123. demoE := `CREATE STREAM demoE (
  124. color STRING,
  125. size BIGINT,
  126. ts BIGINT
  127. ) WITH (DATASOURCE="demoE", FORMAT="json", KEY="ts");`
  128. _, err = p.ExecStmt(demoE)
  129. if err != nil {
  130. t.Log(err)
  131. }
  132. demo1 := `CREATE STREAM demo1 (
  133. temp FLOAT,
  134. hum BIGINT,` +
  135. "`from`" + ` STRING,
  136. ts BIGINT
  137. ) WITH (DATASOURCE="demo1", FORMAT="json", KEY="ts");`
  138. _, err = p.ExecStmt(demo1)
  139. if err != nil {
  140. t.Log(err)
  141. }
  142. sessionDemo := `CREATE STREAM sessionDemo (
  143. temp FLOAT,
  144. hum BIGINT,
  145. ts BIGINT
  146. ) WITH (DATASOURCE="sessionDemo", FORMAT="json", KEY="ts");`
  147. _, err = p.ExecStmt(sessionDemo)
  148. if err != nil {
  149. t.Log(err)
  150. }
  151. }
  152. func dropStreams(t *testing.T) {
  153. p := NewStreamProcessor(path.Join(DbDir, "stream"))
  154. demo := `DROP STREAM demo`
  155. _, err := p.ExecStmt(demo)
  156. if err != nil {
  157. t.Log(err)
  158. }
  159. demoE := `DROP STREAM demoE`
  160. _, err = p.ExecStmt(demoE)
  161. if err != nil {
  162. t.Log(err)
  163. }
  164. demo1 := `DROP STREAM demo1`
  165. _, err = p.ExecStmt(demo1)
  166. if err != nil {
  167. t.Log(err)
  168. }
  169. sessionDemo := `DROP STREAM sessionDemo`
  170. _, err = p.ExecStmt(sessionDemo)
  171. if err != nil {
  172. t.Log(err)
  173. }
  174. }
  175. func createSchemalessStreams(t *testing.T) {
  176. p := NewStreamProcessor(path.Join(DbDir, "stream"))
  177. demo := `CREATE STREAM ldemo (
  178. ) WITH (DATASOURCE="ldemo", FORMAT="json");`
  179. _, err := p.ExecStmt(demo)
  180. if err != nil {
  181. t.Log(err)
  182. }
  183. demo1 := `CREATE STREAM ldemo1 (
  184. ) WITH (DATASOURCE="ldemo1", FORMAT="json");`
  185. _, err = p.ExecStmt(demo1)
  186. if err != nil {
  187. t.Log(err)
  188. }
  189. sessionDemo := `CREATE STREAM lsessionDemo (
  190. ) WITH (DATASOURCE="lsessionDemo", FORMAT="json");`
  191. _, err = p.ExecStmt(sessionDemo)
  192. if err != nil {
  193. t.Log(err)
  194. }
  195. }
  196. func dropSchemalessStreams(t *testing.T) {
  197. p := NewStreamProcessor(path.Join(DbDir, "stream"))
  198. demo := `DROP STREAM ldemo`
  199. _, err := p.ExecStmt(demo)
  200. if err != nil {
  201. t.Log(err)
  202. }
  203. demo1 := `DROP STREAM ldemo1`
  204. _, err = p.ExecStmt(demo1)
  205. if err != nil {
  206. t.Log(err)
  207. }
  208. sessionDemo := `DROP STREAM lsessionDemo`
  209. _, err = p.ExecStmt(sessionDemo)
  210. if err != nil {
  211. t.Log(err)
  212. }
  213. }
  214. func getMockSource(name string, done <-chan int, size int) *nodes.SourceNode {
  215. var data []*xsql.Tuple
  216. switch name {
  217. case "demo":
  218. data = []*xsql.Tuple{
  219. {
  220. Emitter: name,
  221. Message: map[string]interface{}{
  222. "color": "red",
  223. "size": 3,
  224. "ts": 1541152486013,
  225. },
  226. Timestamp: 1541152486013,
  227. },
  228. {
  229. Emitter: name,
  230. Message: map[string]interface{}{
  231. "color": "blue",
  232. "size": 6,
  233. "ts": 1541152486822,
  234. },
  235. Timestamp: 1541152486822,
  236. },
  237. {
  238. Emitter: name,
  239. Message: map[string]interface{}{
  240. "color": "blue",
  241. "size": 2,
  242. "ts": 1541152487632,
  243. },
  244. Timestamp: 1541152487632,
  245. },
  246. {
  247. Emitter: name,
  248. Message: map[string]interface{}{
  249. "color": "yellow",
  250. "size": 4,
  251. "ts": 1541152488442,
  252. },
  253. Timestamp: 1541152488442,
  254. },
  255. {
  256. Emitter: name,
  257. Message: map[string]interface{}{
  258. "color": "red",
  259. "size": 1,
  260. "ts": 1541152489252,
  261. },
  262. Timestamp: 1541152489252,
  263. },
  264. }
  265. case "demoE":
  266. data = []*xsql.Tuple{
  267. {
  268. Emitter: name,
  269. Message: map[string]interface{}{
  270. "color": 3,
  271. "size": "red",
  272. "ts": 1541152486013,
  273. },
  274. Timestamp: 1541152486013,
  275. },
  276. {
  277. Emitter: name,
  278. Message: map[string]interface{}{
  279. "color": "blue",
  280. "size": 6,
  281. "ts": "1541152486822",
  282. },
  283. Timestamp: 1541152486822,
  284. },
  285. {
  286. Emitter: name,
  287. Message: map[string]interface{}{
  288. "color": "blue",
  289. "size": 2,
  290. "ts": 1541152487632,
  291. },
  292. Timestamp: 1541152487632,
  293. },
  294. {
  295. Emitter: name,
  296. Message: map[string]interface{}{
  297. "color": 7,
  298. "size": 4,
  299. "ts": 1541152488442,
  300. },
  301. Timestamp: 1541152488442,
  302. },
  303. {
  304. Emitter: name,
  305. Message: map[string]interface{}{
  306. "color": "red",
  307. "size": "blue",
  308. "ts": 1541152489252,
  309. },
  310. Timestamp: 1541152489252,
  311. },
  312. }
  313. case "demo1":
  314. data = []*xsql.Tuple{
  315. {
  316. Emitter: name,
  317. Message: map[string]interface{}{
  318. "temp": 25.5,
  319. "hum": 65,
  320. "from": "device1",
  321. "ts": 1541152486013,
  322. },
  323. Timestamp: 1541152486013,
  324. },
  325. {
  326. Emitter: name,
  327. Message: map[string]interface{}{
  328. "temp": 27.5,
  329. "hum": 59,
  330. "from": "device2",
  331. "ts": 1541152486823,
  332. },
  333. Timestamp: 1541152486823,
  334. },
  335. {
  336. Emitter: name,
  337. Message: map[string]interface{}{
  338. "temp": 28.1,
  339. "hum": 75,
  340. "from": "device3",
  341. "ts": 1541152487632,
  342. },
  343. Timestamp: 1541152487632,
  344. },
  345. {
  346. Emitter: name,
  347. Message: map[string]interface{}{
  348. "temp": 27.4,
  349. "hum": 80,
  350. "from": "device1",
  351. "ts": 1541152488442,
  352. },
  353. Timestamp: 1541152488442,
  354. },
  355. {
  356. Emitter: name,
  357. Message: map[string]interface{}{
  358. "temp": 25.5,
  359. "hum": 62,
  360. "from": "device3",
  361. "ts": 1541152489252,
  362. },
  363. Timestamp: 1541152489252,
  364. },
  365. }
  366. case "sessionDemo":
  367. data = []*xsql.Tuple{
  368. {
  369. Emitter: name,
  370. Message: map[string]interface{}{
  371. "temp": 25.5,
  372. "hum": 65,
  373. "ts": 1541152486013,
  374. },
  375. Timestamp: 1541152486013,
  376. },
  377. {
  378. Emitter: name,
  379. Message: map[string]interface{}{
  380. "temp": 27.5,
  381. "hum": 59,
  382. "ts": 1541152486823,
  383. },
  384. Timestamp: 1541152486823,
  385. },
  386. {
  387. Emitter: name,
  388. Message: map[string]interface{}{
  389. "temp": 28.1,
  390. "hum": 75,
  391. "ts": 1541152487932,
  392. },
  393. Timestamp: 1541152487932,
  394. },
  395. {
  396. Emitter: name,
  397. Message: map[string]interface{}{
  398. "temp": 27.4,
  399. "hum": 80,
  400. "ts": 1541152488442,
  401. },
  402. Timestamp: 1541152488442,
  403. },
  404. {
  405. Emitter: name,
  406. Message: map[string]interface{}{
  407. "temp": 25.5,
  408. "hum": 62,
  409. "ts": 1541152489252,
  410. },
  411. Timestamp: 1541152489252,
  412. },
  413. {
  414. Emitter: name,
  415. Message: map[string]interface{}{
  416. "temp": 26.2,
  417. "hum": 63,
  418. "ts": 1541152490062,
  419. },
  420. Timestamp: 1541152490062,
  421. },
  422. {
  423. Emitter: name,
  424. Message: map[string]interface{}{
  425. "temp": 26.8,
  426. "hum": 71,
  427. "ts": 1541152490872,
  428. },
  429. Timestamp: 1541152490872,
  430. },
  431. {
  432. Emitter: name,
  433. Message: map[string]interface{}{
  434. "temp": 28.9,
  435. "hum": 85,
  436. "ts": 1541152491682,
  437. },
  438. Timestamp: 1541152491682,
  439. },
  440. {
  441. Emitter: name,
  442. Message: map[string]interface{}{
  443. "temp": 29.1,
  444. "hum": 92,
  445. "ts": 1541152492492,
  446. },
  447. Timestamp: 1541152492492,
  448. },
  449. {
  450. Emitter: name,
  451. Message: map[string]interface{}{
  452. "temp": 32.2,
  453. "hum": 99,
  454. "ts": 1541152493202,
  455. },
  456. Timestamp: 1541152493202,
  457. },
  458. {
  459. Emitter: name,
  460. Message: map[string]interface{}{
  461. "temp": 30.9,
  462. "hum": 87,
  463. "ts": 1541152494112,
  464. },
  465. Timestamp: 1541152494112,
  466. },
  467. }
  468. }
  469. return nodes.NewSourceNodeWithSource(name, test.NewMockSource(data[:size], done, false), map[string]string{
  470. "DATASOURCE": name,
  471. })
  472. }
  473. func TestSingleSQL(t *testing.T) {
  474. var tests = []struct {
  475. name string
  476. sql string
  477. r [][]map[string]interface{}
  478. s string
  479. m map[string]interface{}
  480. }{
  481. {
  482. name: `rule1`,
  483. sql: `SELECT * FROM demo`,
  484. r: [][]map[string]interface{}{
  485. {{
  486. "color": "red",
  487. "size": float64(3),
  488. "ts": float64(1541152486013),
  489. }},
  490. {{
  491. "color": "blue",
  492. "size": float64(6),
  493. "ts": float64(1541152486822),
  494. }},
  495. {{
  496. "color": "blue",
  497. "size": float64(2),
  498. "ts": float64(1541152487632),
  499. }},
  500. {{
  501. "color": "yellow",
  502. "size": float64(4),
  503. "ts": float64(1541152488442),
  504. }},
  505. {{
  506. "color": "red",
  507. "size": float64(1),
  508. "ts": float64(1541152489252),
  509. }},
  510. },
  511. m: map[string]interface{}{
  512. "op_preprocessor_demo_0_exceptions_total": int64(0),
  513. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  514. "op_preprocessor_demo_0_records_in_total": int64(5),
  515. "op_preprocessor_demo_0_records_out_total": int64(5),
  516. "op_project_0_exceptions_total": int64(0),
  517. "op_project_0_process_latency_ms": int64(0),
  518. "op_project_0_records_in_total": int64(5),
  519. "op_project_0_records_out_total": int64(5),
  520. "sink_mockSink_0_exceptions_total": int64(0),
  521. "sink_mockSink_0_records_in_total": int64(5),
  522. "sink_mockSink_0_records_out_total": int64(5),
  523. "source_demo_0_exceptions_total": int64(0),
  524. "source_demo_0_records_in_total": int64(5),
  525. "source_demo_0_records_out_total": int64(5),
  526. },
  527. s: "sink_mockSink_0_records_out_total",
  528. }, {
  529. name: `rule2`,
  530. sql: `SELECT color, ts FROM demo where size > 3`,
  531. r: [][]map[string]interface{}{
  532. {{
  533. "color": "blue",
  534. "ts": float64(1541152486822),
  535. }},
  536. {{
  537. "color": "yellow",
  538. "ts": float64(1541152488442),
  539. }},
  540. },
  541. s: "op_filter_0_records_in_total",
  542. m: map[string]interface{}{
  543. "op_preprocessor_demo_0_exceptions_total": int64(0),
  544. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  545. "op_preprocessor_demo_0_records_in_total": int64(5),
  546. "op_preprocessor_demo_0_records_out_total": int64(5),
  547. "op_project_0_exceptions_total": int64(0),
  548. "op_project_0_process_latency_ms": int64(0),
  549. "op_project_0_records_in_total": int64(2),
  550. "op_project_0_records_out_total": int64(2),
  551. "sink_mockSink_0_exceptions_total": int64(0),
  552. "sink_mockSink_0_records_in_total": int64(2),
  553. "sink_mockSink_0_records_out_total": int64(2),
  554. "source_demo_0_exceptions_total": int64(0),
  555. "source_demo_0_records_in_total": int64(5),
  556. "source_demo_0_records_out_total": int64(5),
  557. "op_filter_0_exceptions_total": int64(0),
  558. "op_filter_0_process_latency_ms": int64(0),
  559. "op_filter_0_records_in_total": int64(5),
  560. "op_filter_0_records_out_total": int64(2),
  561. },
  562. }, {
  563. name: `rule3`,
  564. sql: `SELECT size as Int8, ts FROM demo where size > 3`,
  565. r: [][]map[string]interface{}{
  566. {{
  567. "Int8": float64(6),
  568. "ts": float64(1541152486822),
  569. }},
  570. {{
  571. "Int8": float64(4),
  572. "ts": float64(1541152488442),
  573. }},
  574. },
  575. s: "op_filter_0_records_in_total",
  576. m: map[string]interface{}{
  577. "op_preprocessor_demo_0_exceptions_total": int64(0),
  578. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  579. "op_preprocessor_demo_0_records_in_total": int64(5),
  580. "op_preprocessor_demo_0_records_out_total": int64(5),
  581. "op_project_0_exceptions_total": int64(0),
  582. "op_project_0_process_latency_ms": int64(0),
  583. "op_project_0_records_in_total": int64(2),
  584. "op_project_0_records_out_total": int64(2),
  585. "sink_mockSink_0_exceptions_total": int64(0),
  586. "sink_mockSink_0_records_in_total": int64(2),
  587. "sink_mockSink_0_records_out_total": int64(2),
  588. "source_demo_0_exceptions_total": int64(0),
  589. "source_demo_0_records_in_total": int64(5),
  590. "source_demo_0_records_out_total": int64(5),
  591. "op_filter_0_exceptions_total": int64(0),
  592. "op_filter_0_process_latency_ms": int64(0),
  593. "op_filter_0_records_in_total": int64(5),
  594. "op_filter_0_records_out_total": int64(2),
  595. },
  596. }, {
  597. name: `rule4`,
  598. sql: `SELECT size as Int8, ts FROM demoE where size > 3`,
  599. r: [][]map[string]interface{}{
  600. {{
  601. "error": "error in preprocessor: invalid data type for color, expect string but found int(3)",
  602. }},
  603. {{
  604. "Int8": float64(6),
  605. "ts": float64(1541152486822),
  606. }},
  607. {{
  608. "error": "error in preprocessor: invalid data type for color, expect string but found int(7)",
  609. }},
  610. {{
  611. "error": "error in preprocessor: invalid data type for size, expect bigint but found string(blue)",
  612. }},
  613. },
  614. s: "op_filter_0_records_in_total",
  615. m: map[string]interface{}{
  616. "op_preprocessor_demoE_0_exceptions_total": int64(3),
  617. "op_preprocessor_demoE_0_process_latency_ms": int64(0),
  618. "op_preprocessor_demoE_0_records_in_total": int64(5),
  619. "op_preprocessor_demoE_0_records_out_total": int64(2),
  620. "op_project_0_exceptions_total": int64(3),
  621. "op_project_0_process_latency_ms": int64(0),
  622. "op_project_0_records_in_total": int64(4),
  623. "op_project_0_records_out_total": int64(1),
  624. "sink_mockSink_0_exceptions_total": int64(0),
  625. "sink_mockSink_0_records_in_total": int64(4),
  626. "sink_mockSink_0_records_out_total": int64(4),
  627. "source_demoE_0_exceptions_total": int64(0),
  628. "source_demoE_0_records_in_total": int64(5),
  629. "source_demoE_0_records_out_total": int64(5),
  630. "op_filter_0_exceptions_total": int64(3),
  631. "op_filter_0_process_latency_ms": int64(0),
  632. "op_filter_0_records_in_total": int64(5),
  633. "op_filter_0_records_out_total": int64(1),
  634. },
  635. }, {
  636. name: `rule4`,
  637. sql: `SELECT size as Int8, ts FROM demoE where size > 3`,
  638. r: [][]map[string]interface{}{
  639. {{
  640. "error": "error in preprocessor: invalid data type for color, expect string but found int(3)",
  641. }},
  642. {{
  643. "Int8": float64(6),
  644. "ts": float64(1541152486822),
  645. }},
  646. {{
  647. "error": "error in preprocessor: invalid data type for color, expect string but found int(7)",
  648. }},
  649. {{
  650. "error": "error in preprocessor: invalid data type for size, expect bigint but found string(blue)",
  651. }},
  652. },
  653. s: "op_filter_0_records_in_total",
  654. m: map[string]interface{}{
  655. "op_preprocessor_demoE_0_exceptions_total": int64(3),
  656. "op_preprocessor_demoE_0_process_latency_ms": int64(0),
  657. "op_preprocessor_demoE_0_records_in_total": int64(5),
  658. "op_preprocessor_demoE_0_records_out_total": int64(2),
  659. "op_project_0_exceptions_total": int64(3),
  660. "op_project_0_process_latency_ms": int64(0),
  661. "op_project_0_records_in_total": int64(4),
  662. "op_project_0_records_out_total": int64(1),
  663. "sink_mockSink_0_exceptions_total": int64(0),
  664. "sink_mockSink_0_records_in_total": int64(4),
  665. "sink_mockSink_0_records_out_total": int64(4),
  666. "source_demoE_0_exceptions_total": int64(0),
  667. "source_demoE_0_records_in_total": int64(5),
  668. "source_demoE_0_records_out_total": int64(5),
  669. "op_filter_0_exceptions_total": int64(3),
  670. "op_filter_0_process_latency_ms": int64(0),
  671. "op_filter_0_records_in_total": int64(5),
  672. "op_filter_0_records_out_total": int64(1),
  673. },
  674. }, {
  675. name: `rule5`,
  676. sql: `SELECT meta(topic) as m, ts FROM demo`,
  677. r: [][]map[string]interface{}{
  678. {{
  679. "m": "mock",
  680. "ts": float64(1541152486013),
  681. }},
  682. {{
  683. "m": "mock",
  684. "ts": float64(1541152486822),
  685. }},
  686. {{
  687. "m": "mock",
  688. "ts": float64(1541152487632),
  689. }},
  690. {{
  691. "m": "mock",
  692. "ts": float64(1541152488442),
  693. }},
  694. {{
  695. "m": "mock",
  696. "ts": float64(1541152489252),
  697. }},
  698. },
  699. m: map[string]interface{}{
  700. "op_preprocessor_demo_0_exceptions_total": int64(0),
  701. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  702. "op_preprocessor_demo_0_records_in_total": int64(5),
  703. "op_preprocessor_demo_0_records_out_total": int64(5),
  704. "op_project_0_exceptions_total": int64(0),
  705. "op_project_0_process_latency_ms": int64(0),
  706. "op_project_0_records_in_total": int64(5),
  707. "op_project_0_records_out_total": int64(5),
  708. "sink_mockSink_0_exceptions_total": int64(0),
  709. "sink_mockSink_0_records_in_total": int64(5),
  710. "sink_mockSink_0_records_out_total": int64(5),
  711. "source_demo_0_exceptions_total": int64(0),
  712. "source_demo_0_records_in_total": int64(5),
  713. "source_demo_0_records_out_total": int64(5),
  714. },
  715. s: "sink_mockSink_0_records_out_total",
  716. }, {
  717. name: `rule6`,
  718. sql: `SELECT color, ts FROM demo where size > 3 and meta(topic)="mock"`,
  719. r: [][]map[string]interface{}{
  720. {{
  721. "color": "blue",
  722. "ts": float64(1541152486822),
  723. }},
  724. {{
  725. "color": "yellow",
  726. "ts": float64(1541152488442),
  727. }},
  728. },
  729. s: "op_filter_0_records_in_total",
  730. m: map[string]interface{}{
  731. "op_preprocessor_demo_0_exceptions_total": int64(0),
  732. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  733. "op_preprocessor_demo_0_records_in_total": int64(5),
  734. "op_preprocessor_demo_0_records_out_total": int64(5),
  735. "op_project_0_exceptions_total": int64(0),
  736. "op_project_0_process_latency_ms": int64(0),
  737. "op_project_0_records_in_total": int64(2),
  738. "op_project_0_records_out_total": int64(2),
  739. "sink_mockSink_0_exceptions_total": int64(0),
  740. "sink_mockSink_0_records_in_total": int64(2),
  741. "sink_mockSink_0_records_out_total": int64(2),
  742. "source_demo_0_exceptions_total": int64(0),
  743. "source_demo_0_records_in_total": int64(5),
  744. "source_demo_0_records_out_total": int64(5),
  745. "op_filter_0_exceptions_total": int64(0),
  746. "op_filter_0_process_latency_ms": int64(0),
  747. "op_filter_0_records_in_total": int64(5),
  748. "op_filter_0_records_out_total": int64(2),
  749. },
  750. }, {
  751. name: `rule1`,
  752. sql: "SELECT `from` FROM demo1",
  753. r: [][]map[string]interface{}{
  754. {{
  755. "from": "device1",
  756. }},
  757. {{
  758. "from": "device2",
  759. }},
  760. {{
  761. "from": "device3",
  762. }},
  763. {{
  764. "from": "device1",
  765. }},
  766. {{
  767. "from": "device3",
  768. }},
  769. },
  770. m: map[string]interface{}{
  771. "op_preprocessor_demo1_0_exceptions_total": int64(0),
  772. "op_preprocessor_demo1_0_process_latency_ms": int64(0),
  773. "op_preprocessor_demo1_0_records_in_total": int64(5),
  774. "op_preprocessor_demo1_0_records_out_total": int64(5),
  775. "op_project_0_exceptions_total": int64(0),
  776. "op_project_0_process_latency_ms": int64(0),
  777. "op_project_0_records_in_total": int64(5),
  778. "op_project_0_records_out_total": int64(5),
  779. "sink_mockSink_0_exceptions_total": int64(0),
  780. "sink_mockSink_0_records_in_total": int64(5),
  781. "sink_mockSink_0_records_out_total": int64(5),
  782. "source_demo1_0_exceptions_total": int64(0),
  783. "source_demo1_0_records_in_total": int64(5),
  784. "source_demo1_0_records_out_total": int64(5),
  785. },
  786. s: "sink_mockSink_0_records_out_total",
  787. }, {
  788. name: `rule1`,
  789. sql: "SELECT * FROM demo1 where `from`=\"device1\"",
  790. r: [][]map[string]interface{}{
  791. {{
  792. "temp": float64(25.5),
  793. "hum": float64(65),
  794. "from": "device1",
  795. "ts": float64(1541152486013),
  796. }},
  797. {{
  798. "temp": float64(27.4),
  799. "hum": float64(80),
  800. "from": "device1",
  801. "ts": float64(1541152488442),
  802. }},
  803. },
  804. m: map[string]interface{}{
  805. "op_preprocessor_demo1_0_exceptions_total": int64(0),
  806. "op_preprocessor_demo1_0_process_latency_ms": int64(0),
  807. "op_preprocessor_demo1_0_records_in_total": int64(5),
  808. "op_preprocessor_demo1_0_records_out_total": int64(5),
  809. "op_project_0_exceptions_total": int64(0),
  810. "op_project_0_process_latency_ms": int64(0),
  811. "op_project_0_records_in_total": int64(2),
  812. "op_project_0_records_out_total": int64(2),
  813. "op_filter_0_exceptions_total": int64(0),
  814. "op_filter_0_process_latency_ms": int64(0),
  815. "op_filter_0_records_in_total": int64(5),
  816. "op_filter_0_records_out_total": int64(2),
  817. "sink_mockSink_0_exceptions_total": int64(0),
  818. "sink_mockSink_0_records_in_total": int64(2),
  819. "sink_mockSink_0_records_out_total": int64(2),
  820. "source_demo1_0_exceptions_total": int64(0),
  821. "source_demo1_0_records_in_total": int64(5),
  822. "source_demo1_0_records_out_total": int64(5),
  823. },
  824. s: "sink_mockSink_0_records_out_total",
  825. },
  826. }
  827. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  828. createStreams(t)
  829. defer dropStreams(t)
  830. //defer close(done)
  831. for i, tt := range tests {
  832. test.ResetClock(1541152486000)
  833. p := NewRuleProcessor(DbDir)
  834. parser := xsql.NewParser(strings.NewReader(tt.sql))
  835. var (
  836. sources []*nodes.SourceNode
  837. syncs []chan int
  838. )
  839. if stmt, err := xsql.Language.Parse(parser); err != nil {
  840. t.Errorf("parse sql %s error: %s", tt.sql, err)
  841. } else {
  842. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  843. t.Errorf("sql %s is not a select statement", tt.sql)
  844. } else {
  845. streams := xsql.GetStreams(selectStmt)
  846. for _, stream := range streams {
  847. next := make(chan int)
  848. syncs = append(syncs, next)
  849. source := getMockSource(stream, next, 5)
  850. sources = append(sources, source)
  851. }
  852. }
  853. }
  854. tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql, Options: map[string]interface{}{
  855. "bufferLength": float64(100),
  856. }}, sources)
  857. if err != nil {
  858. t.Error(err)
  859. }
  860. mockSink := test.NewMockSink()
  861. sink := nodes.NewSinkNodeWithSink("mockSink", mockSink, nil)
  862. tp.AddSink(inputs, sink)
  863. errCh := tp.Open()
  864. func() {
  865. for i := 0; i < 5; i++ {
  866. syncs[i%len(syncs)] <- i
  867. select {
  868. case err = <-errCh:
  869. t.Log(err)
  870. tp.Cancel()
  871. return
  872. default:
  873. }
  874. }
  875. for retry := 100; retry > 0; retry-- {
  876. if err := compareMetrics(tp, tt.m, tt.sql); err == nil {
  877. break
  878. }
  879. time.Sleep(time.Duration(retry) * time.Millisecond)
  880. }
  881. }()
  882. results := mockSink.GetResults()
  883. var maps [][]map[string]interface{}
  884. for _, v := range results {
  885. var mapRes []map[string]interface{}
  886. err := json.Unmarshal(v, &mapRes)
  887. if err != nil {
  888. t.Errorf("Failed to parse the input into map")
  889. continue
  890. }
  891. maps = append(maps, mapRes)
  892. }
  893. if !reflect.DeepEqual(tt.r, maps) {
  894. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  895. continue
  896. }
  897. if err := compareMetrics(tp, tt.m, tt.sql); err != nil {
  898. t.Errorf("%d. %q\n\n%v", i, tt.sql, err)
  899. }
  900. tp.Cancel()
  901. }
  902. }
  903. func TestSingleSQLTemplate(t *testing.T) {
  904. var tests = []struct {
  905. name string
  906. sql string
  907. r []map[string]interface{}
  908. s string
  909. m map[string]interface{}
  910. }{
  911. {
  912. name: `rule1`,
  913. sql: `SELECT * FROM demo`,
  914. r: []map[string]interface{}{
  915. {
  916. "c": "red",
  917. "wrapper": "w1",
  918. },
  919. {
  920. "c": "blue",
  921. "wrapper": "w1",
  922. },
  923. {
  924. "c": "blue",
  925. "wrapper": "w1",
  926. },
  927. {
  928. "c": "yellow",
  929. "wrapper": "w1",
  930. },
  931. {
  932. "c": "red",
  933. "wrapper": "w1",
  934. },
  935. },
  936. m: map[string]interface{}{
  937. "op_preprocessor_demo_0_exceptions_total": int64(0),
  938. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  939. "op_preprocessor_demo_0_records_in_total": int64(5),
  940. "op_preprocessor_demo_0_records_out_total": int64(5),
  941. "op_project_0_exceptions_total": int64(0),
  942. "op_project_0_process_latency_ms": int64(0),
  943. "op_project_0_records_in_total": int64(5),
  944. "op_project_0_records_out_total": int64(5),
  945. "sink_mockSink_0_exceptions_total": int64(0),
  946. "sink_mockSink_0_records_in_total": int64(5),
  947. "sink_mockSink_0_records_out_total": int64(5),
  948. "source_demo_0_exceptions_total": int64(0),
  949. "source_demo_0_records_in_total": int64(5),
  950. "source_demo_0_records_out_total": int64(5),
  951. },
  952. s: "sink_mockSink_0_records_out_total",
  953. },
  954. }
  955. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  956. createStreams(t)
  957. defer dropStreams(t)
  958. //defer close(done)
  959. for i, tt := range tests {
  960. test.ResetClock(1541152486000)
  961. p := NewRuleProcessor(DbDir)
  962. parser := xsql.NewParser(strings.NewReader(tt.sql))
  963. var (
  964. sources []*nodes.SourceNode
  965. syncs []chan int
  966. )
  967. if stmt, err := xsql.Language.Parse(parser); err != nil {
  968. t.Errorf("parse sql %s error: %s", tt.sql, err)
  969. } else {
  970. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  971. t.Errorf("sql %s is not a select statement", tt.sql)
  972. } else {
  973. streams := xsql.GetStreams(selectStmt)
  974. for _, stream := range streams {
  975. next := make(chan int)
  976. syncs = append(syncs, next)
  977. source := getMockSource(stream, next, 5)
  978. sources = append(sources, source)
  979. }
  980. }
  981. }
  982. tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql, Options: map[string]interface{}{
  983. "bufferLength": float64(100),
  984. }}, sources)
  985. if err != nil {
  986. t.Error(err)
  987. }
  988. mockSink := test.NewMockSink()
  989. sink := nodes.NewSinkNodeWithSink("mockSink", mockSink, map[string]interface{}{
  990. "dataTemplate": `{"wrapper":"w1", "c":"{{.color}}"}`,
  991. "sendSingle": true,
  992. })
  993. tp.AddSink(inputs, sink)
  994. errCh := tp.Open()
  995. func() {
  996. for i := 0; i < 5; i++ {
  997. syncs[i%len(syncs)] <- i
  998. select {
  999. case err = <-errCh:
  1000. t.Log(err)
  1001. tp.Cancel()
  1002. return
  1003. default:
  1004. }
  1005. }
  1006. for retry := 100; retry > 0; retry-- {
  1007. if err := compareMetrics(tp, tt.m, tt.sql); err == nil {
  1008. break
  1009. }
  1010. time.Sleep(time.Duration(retry) * time.Millisecond)
  1011. }
  1012. }()
  1013. results := mockSink.GetResults()
  1014. var maps []map[string]interface{}
  1015. for _, v := range results {
  1016. var mapRes map[string]interface{}
  1017. err := json.Unmarshal(v, &mapRes)
  1018. if err != nil {
  1019. t.Errorf("Failed to parse the input into map")
  1020. continue
  1021. }
  1022. maps = append(maps, mapRes)
  1023. }
  1024. if !reflect.DeepEqual(tt.r, maps) {
  1025. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  1026. continue
  1027. }
  1028. if err := compareMetrics(tp, tt.m, tt.sql); err != nil {
  1029. t.Errorf("%d. %q\n\n%v", i, tt.sql, err)
  1030. }
  1031. tp.Cancel()
  1032. }
  1033. }
  1034. func TestNoneSingleSQLTemplate(t *testing.T) {
  1035. var tests = []struct {
  1036. name string
  1037. sql string
  1038. r [][]byte
  1039. s string
  1040. m map[string]interface{}
  1041. }{
  1042. {
  1043. name: `rule1`,
  1044. sql: `SELECT * FROM demo`,
  1045. r: [][]byte{
  1046. []byte("<div>results</div><ul><li>red - 3</li></ul>"),
  1047. []byte("<div>results</div><ul><li>blue - 6</li></ul>"),
  1048. []byte("<div>results</div><ul><li>blue - 2</li></ul>"),
  1049. []byte("<div>results</div><ul><li>yellow - 4</li></ul>"),
  1050. []byte("<div>results</div><ul><li>red - 1</li></ul>"),
  1051. },
  1052. m: map[string]interface{}{
  1053. "op_preprocessor_demo_0_exceptions_total": int64(0),
  1054. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  1055. "op_preprocessor_demo_0_records_in_total": int64(5),
  1056. "op_preprocessor_demo_0_records_out_total": int64(5),
  1057. "op_project_0_exceptions_total": int64(0),
  1058. "op_project_0_process_latency_ms": int64(0),
  1059. "op_project_0_records_in_total": int64(5),
  1060. "op_project_0_records_out_total": int64(5),
  1061. "sink_mockSink_0_exceptions_total": int64(0),
  1062. "sink_mockSink_0_records_in_total": int64(5),
  1063. "sink_mockSink_0_records_out_total": int64(5),
  1064. "source_demo_0_exceptions_total": int64(0),
  1065. "source_demo_0_records_in_total": int64(5),
  1066. "source_demo_0_records_out_total": int64(5),
  1067. },
  1068. s: "sink_mockSink_0_records_out_total",
  1069. },
  1070. }
  1071. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1072. createStreams(t)
  1073. defer dropStreams(t)
  1074. //defer close(done)
  1075. for i, tt := range tests {
  1076. test.ResetClock(1541152486000)
  1077. p := NewRuleProcessor(DbDir)
  1078. parser := xsql.NewParser(strings.NewReader(tt.sql))
  1079. var (
  1080. sources []*nodes.SourceNode
  1081. syncs []chan int
  1082. )
  1083. if stmt, err := xsql.Language.Parse(parser); err != nil {
  1084. t.Errorf("parse sql %s error: %s", tt.sql, err)
  1085. } else {
  1086. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  1087. t.Errorf("sql %s is not a select statement", tt.sql)
  1088. } else {
  1089. streams := xsql.GetStreams(selectStmt)
  1090. for _, stream := range streams {
  1091. next := make(chan int)
  1092. syncs = append(syncs, next)
  1093. source := getMockSource(stream, next, 5)
  1094. sources = append(sources, source)
  1095. }
  1096. }
  1097. }
  1098. tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql, Options: map[string]interface{}{
  1099. "bufferLength": float64(100),
  1100. }}, sources)
  1101. if err != nil {
  1102. t.Error(err)
  1103. }
  1104. mockSink := test.NewMockSink()
  1105. sink := nodes.NewSinkNodeWithSink("mockSink", mockSink, map[string]interface{}{
  1106. "dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.color}} - {{.size}}</li>{{end}}</ul>`,
  1107. })
  1108. tp.AddSink(inputs, sink)
  1109. errCh := tp.Open()
  1110. func() {
  1111. for i := 0; i < 5; i++ {
  1112. syncs[i%len(syncs)] <- i
  1113. select {
  1114. case err = <-errCh:
  1115. t.Log(err)
  1116. tp.Cancel()
  1117. return
  1118. default:
  1119. }
  1120. }
  1121. for retry := 100; retry > 0; retry-- {
  1122. if err := compareMetrics(tp, tt.m, tt.sql); err == nil {
  1123. break
  1124. }
  1125. time.Sleep(time.Duration(retry) * time.Millisecond)
  1126. }
  1127. }()
  1128. results := mockSink.GetResults()
  1129. if !reflect.DeepEqual(tt.r, results) {
  1130. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, results)
  1131. continue
  1132. }
  1133. if err := compareMetrics(tp, tt.m, tt.sql); err != nil {
  1134. t.Errorf("%d. %q\n\n%v", i, tt.sql, err)
  1135. }
  1136. tp.Cancel()
  1137. }
  1138. }
  1139. func getMockSourceL(name string, done <-chan int, size int) *nodes.SourceNode {
  1140. var data []*xsql.Tuple
  1141. switch name {
  1142. case "ldemo":
  1143. data = []*xsql.Tuple{
  1144. {
  1145. Emitter: name,
  1146. Message: map[string]interface{}{
  1147. "color": "red",
  1148. "size": 3,
  1149. "ts": 1541152486013,
  1150. },
  1151. Timestamp: 1541152486013,
  1152. },
  1153. {
  1154. Emitter: name,
  1155. Message: map[string]interface{}{
  1156. "color": "blue",
  1157. "size": "string",
  1158. "ts": 1541152486822,
  1159. },
  1160. Timestamp: 1541152486822,
  1161. },
  1162. {
  1163. Emitter: name,
  1164. Message: map[string]interface{}{
  1165. "size": 3,
  1166. "ts": 1541152487632,
  1167. },
  1168. Timestamp: 1541152487632,
  1169. },
  1170. {
  1171. Emitter: name,
  1172. Message: map[string]interface{}{
  1173. "color": 49,
  1174. "size": 2,
  1175. "ts": 1541152488442,
  1176. },
  1177. Timestamp: 1541152488442,
  1178. },
  1179. {
  1180. Emitter: name,
  1181. Message: map[string]interface{}{
  1182. "color": "red",
  1183. "ts": 1541152489252,
  1184. },
  1185. Timestamp: 1541152489252,
  1186. },
  1187. }
  1188. case "ldemo1":
  1189. data = []*xsql.Tuple{
  1190. {
  1191. Emitter: name,
  1192. Message: map[string]interface{}{
  1193. "temp": 25.5,
  1194. "hum": 65,
  1195. "ts": 1541152486013,
  1196. },
  1197. Timestamp: 1541152486013,
  1198. },
  1199. {
  1200. Emitter: name,
  1201. Message: map[string]interface{}{
  1202. "temp": 27.5,
  1203. "hum": 59,
  1204. "ts": 1541152486823,
  1205. },
  1206. Timestamp: 1541152486823,
  1207. },
  1208. {
  1209. Emitter: name,
  1210. Message: map[string]interface{}{
  1211. "temp": 28.1,
  1212. "hum": 75,
  1213. "ts": 1541152487632,
  1214. },
  1215. Timestamp: 1541152487632,
  1216. },
  1217. {
  1218. Emitter: name,
  1219. Message: map[string]interface{}{
  1220. "temp": 27.4,
  1221. "hum": 80,
  1222. "ts": "1541152488442",
  1223. },
  1224. Timestamp: 1541152488442,
  1225. },
  1226. {
  1227. Emitter: name,
  1228. Message: map[string]interface{}{
  1229. "temp": 25.5,
  1230. "hum": 62,
  1231. "ts": 1541152489252,
  1232. },
  1233. Timestamp: 1541152489252,
  1234. },
  1235. }
  1236. case "lsessionDemo":
  1237. data = []*xsql.Tuple{
  1238. {
  1239. Emitter: name,
  1240. Message: map[string]interface{}{
  1241. "temp": 25.5,
  1242. "hum": 65,
  1243. "ts": 1541152486013,
  1244. },
  1245. Timestamp: 1541152486013,
  1246. },
  1247. {
  1248. Emitter: name,
  1249. Message: map[string]interface{}{
  1250. "temp": 27.5,
  1251. "hum": 59,
  1252. "ts": 1541152486823,
  1253. },
  1254. Timestamp: 1541152486823,
  1255. },
  1256. {
  1257. Emitter: name,
  1258. Message: map[string]interface{}{
  1259. "temp": 28.1,
  1260. "hum": 75,
  1261. "ts": 1541152487932,
  1262. },
  1263. Timestamp: 1541152487932,
  1264. },
  1265. {
  1266. Emitter: name,
  1267. Message: map[string]interface{}{
  1268. "temp": 27.4,
  1269. "hum": 80,
  1270. "ts": 1541152488442,
  1271. },
  1272. Timestamp: 1541152488442,
  1273. },
  1274. {
  1275. Emitter: name,
  1276. Message: map[string]interface{}{
  1277. "temp": 25.5,
  1278. "hum": 62,
  1279. "ts": 1541152489252,
  1280. },
  1281. Timestamp: 1541152489252,
  1282. },
  1283. {
  1284. Emitter: name,
  1285. Message: map[string]interface{}{
  1286. "temp": 26.2,
  1287. "hum": 63,
  1288. "ts": 1541152490062,
  1289. },
  1290. Timestamp: 1541152490062,
  1291. },
  1292. {
  1293. Emitter: name,
  1294. Message: map[string]interface{}{
  1295. "temp": 26.8,
  1296. "hum": 71,
  1297. "ts": 1541152490872,
  1298. },
  1299. Timestamp: 1541152490872,
  1300. },
  1301. {
  1302. Emitter: name,
  1303. Message: map[string]interface{}{
  1304. "temp": 28.9,
  1305. "hum": 85,
  1306. "ts": 1541152491682,
  1307. },
  1308. Timestamp: 1541152491682,
  1309. },
  1310. {
  1311. Emitter: name,
  1312. Message: map[string]interface{}{
  1313. "temp": 29.1,
  1314. "hum": 92,
  1315. "ts": 1541152492492,
  1316. },
  1317. Timestamp: 1541152492492,
  1318. },
  1319. {
  1320. Emitter: name,
  1321. Message: map[string]interface{}{
  1322. "temp": 2.2,
  1323. "hum": 99,
  1324. "ts": 1541152493202,
  1325. },
  1326. Timestamp: 1541152493202,
  1327. },
  1328. {
  1329. Emitter: name,
  1330. Message: map[string]interface{}{
  1331. "temp": 30.9,
  1332. "hum": 87,
  1333. "ts": 1541152494112,
  1334. },
  1335. Timestamp: 1541152494112,
  1336. },
  1337. }
  1338. }
  1339. return nodes.NewSourceNodeWithSource(name, test.NewMockSource(data[:size], done, false), map[string]string{
  1340. "DATASOURCE": name,
  1341. })
  1342. }
  1343. func TestSingleSQLError(t *testing.T) {
  1344. var tests = []struct {
  1345. name string
  1346. sql string
  1347. r [][]map[string]interface{}
  1348. s string
  1349. m map[string]interface{}
  1350. }{
  1351. {
  1352. name: `rule1`,
  1353. sql: `SELECT color, ts FROM ldemo where size >= 3`,
  1354. r: [][]map[string]interface{}{
  1355. {{
  1356. "color": "red",
  1357. "ts": float64(1541152486013),
  1358. }},
  1359. {{
  1360. "error": "run Where error: invalid operation string(string) >= int64(3)",
  1361. }},
  1362. {{
  1363. "ts": float64(1541152487632),
  1364. }},
  1365. },
  1366. s: "op_filter_0_records_in_total",
  1367. m: map[string]interface{}{
  1368. "op_preprocessor_ldemo_0_exceptions_total": int64(0),
  1369. "op_preprocessor_ldemo_0_process_latency_ms": int64(0),
  1370. "op_preprocessor_ldemo_0_records_in_total": int64(5),
  1371. "op_preprocessor_ldemo_0_records_out_total": int64(5),
  1372. "op_project_0_exceptions_total": int64(1),
  1373. "op_project_0_process_latency_ms": int64(0),
  1374. "op_project_0_records_in_total": int64(3),
  1375. "op_project_0_records_out_total": int64(2),
  1376. "sink_mockSink_0_exceptions_total": int64(0),
  1377. "sink_mockSink_0_records_in_total": int64(3),
  1378. "sink_mockSink_0_records_out_total": int64(3),
  1379. "source_ldemo_0_exceptions_total": int64(0),
  1380. "source_ldemo_0_records_in_total": int64(5),
  1381. "source_ldemo_0_records_out_total": int64(5),
  1382. "op_filter_0_exceptions_total": int64(1),
  1383. "op_filter_0_process_latency_ms": int64(0),
  1384. "op_filter_0_records_in_total": int64(5),
  1385. "op_filter_0_records_out_total": int64(2),
  1386. },
  1387. }, {
  1388. name: `rule2`,
  1389. sql: `SELECT size * 5 FROM ldemo`,
  1390. r: [][]map[string]interface{}{
  1391. {{
  1392. "rengine_field_0": float64(15),
  1393. }},
  1394. {{
  1395. "error": "run Select error: invalid operation string(string) * int64(5)",
  1396. }},
  1397. {{
  1398. "rengine_field_0": float64(15),
  1399. }},
  1400. {{
  1401. "rengine_field_0": float64(10),
  1402. }},
  1403. {{}},
  1404. },
  1405. s: "op_filter_0_records_in_total",
  1406. m: map[string]interface{}{
  1407. "op_preprocessor_ldemo_0_exceptions_total": int64(0),
  1408. "op_preprocessor_ldemo_0_process_latency_ms": int64(0),
  1409. "op_preprocessor_ldemo_0_records_in_total": int64(5),
  1410. "op_preprocessor_ldemo_0_records_out_total": int64(5),
  1411. "op_project_0_exceptions_total": int64(1),
  1412. "op_project_0_process_latency_ms": int64(0),
  1413. "op_project_0_records_in_total": int64(5),
  1414. "op_project_0_records_out_total": int64(4),
  1415. "sink_mockSink_0_exceptions_total": int64(0),
  1416. "sink_mockSink_0_records_in_total": int64(5),
  1417. "sink_mockSink_0_records_out_total": int64(5),
  1418. "source_ldemo_0_exceptions_total": int64(0),
  1419. "source_ldemo_0_records_in_total": int64(5),
  1420. "source_ldemo_0_records_out_total": int64(5),
  1421. },
  1422. },
  1423. }
  1424. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1425. createSchemalessStreams(t)
  1426. defer dropSchemalessStreams(t)
  1427. //defer close(done)
  1428. for i, tt := range tests {
  1429. test.ResetClock(1541152486000)
  1430. p := NewRuleProcessor(DbDir)
  1431. parser := xsql.NewParser(strings.NewReader(tt.sql))
  1432. var (
  1433. sources []*nodes.SourceNode
  1434. syncs []chan int
  1435. )
  1436. if stmt, err := xsql.Language.Parse(parser); err != nil {
  1437. t.Errorf("parse sql %s error: %s", tt.sql, err)
  1438. } else {
  1439. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  1440. t.Errorf("sql %s is not a select statement", tt.sql)
  1441. } else {
  1442. streams := xsql.GetStreams(selectStmt)
  1443. for _, stream := range streams {
  1444. next := make(chan int)
  1445. syncs = append(syncs, next)
  1446. source := getMockSourceL(stream, next, 5)
  1447. sources = append(sources, source)
  1448. }
  1449. }
  1450. }
  1451. tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql, Options: map[string]interface{}{
  1452. "bufferLength": float64(100),
  1453. }}, sources)
  1454. if err != nil {
  1455. t.Error(err)
  1456. }
  1457. mockSink := test.NewMockSink()
  1458. sink := nodes.NewSinkNodeWithSink("mockSink", mockSink, nil)
  1459. tp.AddSink(inputs, sink)
  1460. errCh := tp.Open()
  1461. func() {
  1462. for i := 0; i < 5; i++ {
  1463. syncs[i%len(syncs)] <- i
  1464. select {
  1465. case err = <-errCh:
  1466. t.Log(err)
  1467. tp.Cancel()
  1468. return
  1469. default:
  1470. }
  1471. }
  1472. for retry := 100; retry > 0; retry-- {
  1473. if err := compareMetrics(tp, tt.m, tt.sql); err == nil {
  1474. break
  1475. }
  1476. time.Sleep(time.Duration(retry) * time.Millisecond)
  1477. }
  1478. }()
  1479. results := mockSink.GetResults()
  1480. var maps [][]map[string]interface{}
  1481. for _, v := range results {
  1482. var mapRes []map[string]interface{}
  1483. err := json.Unmarshal(v, &mapRes)
  1484. if err != nil {
  1485. t.Errorf("Failed to parse the input into map")
  1486. continue
  1487. }
  1488. maps = append(maps, mapRes)
  1489. }
  1490. if !reflect.DeepEqual(tt.r, maps) {
  1491. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  1492. continue
  1493. }
  1494. if err := compareMetrics(tp, tt.m, tt.sql); err != nil {
  1495. t.Errorf("%d. %q\n\n%v", i, tt.sql, err)
  1496. }
  1497. tp.Cancel()
  1498. }
  1499. }
  1500. func TestWindow(t *testing.T) {
  1501. var tests = []struct {
  1502. name string
  1503. sql string
  1504. size int
  1505. r [][]map[string]interface{}
  1506. m map[string]interface{}
  1507. }{
  1508. {
  1509. name: `rule1`,
  1510. sql: `SELECT * FROM demo GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  1511. size: 5,
  1512. r: [][]map[string]interface{}{
  1513. {{
  1514. "color": "red",
  1515. "size": float64(3),
  1516. "ts": float64(1541152486013),
  1517. }, {
  1518. "color": "blue",
  1519. "size": float64(6),
  1520. "ts": float64(1541152486822),
  1521. }},
  1522. {{
  1523. "color": "red",
  1524. "size": float64(3),
  1525. "ts": float64(1541152486013),
  1526. }, {
  1527. "color": "blue",
  1528. "size": float64(6),
  1529. "ts": float64(1541152486822),
  1530. }, {
  1531. "color": "blue",
  1532. "size": float64(2),
  1533. "ts": float64(1541152487632),
  1534. }},
  1535. {{
  1536. "color": "blue",
  1537. "size": float64(2),
  1538. "ts": float64(1541152487632),
  1539. }, {
  1540. "color": "yellow",
  1541. "size": float64(4),
  1542. "ts": float64(1541152488442),
  1543. }},
  1544. },
  1545. m: map[string]interface{}{
  1546. "op_preprocessor_demo_0_exceptions_total": int64(0),
  1547. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  1548. "op_preprocessor_demo_0_records_in_total": int64(5),
  1549. "op_preprocessor_demo_0_records_out_total": int64(5),
  1550. "op_project_0_exceptions_total": int64(0),
  1551. "op_project_0_process_latency_ms": int64(0),
  1552. "op_project_0_records_in_total": int64(3),
  1553. "op_project_0_records_out_total": int64(3),
  1554. "sink_mockSink_0_exceptions_total": int64(0),
  1555. "sink_mockSink_0_records_in_total": int64(3),
  1556. "sink_mockSink_0_records_out_total": int64(3),
  1557. "source_demo_0_exceptions_total": int64(0),
  1558. "source_demo_0_records_in_total": int64(5),
  1559. "source_demo_0_records_out_total": int64(5),
  1560. "op_window_0_exceptions_total": int64(0),
  1561. "op_window_0_process_latency_ms": int64(0),
  1562. "op_window_0_records_in_total": int64(5),
  1563. "op_window_0_records_out_total": int64(3),
  1564. },
  1565. }, {
  1566. name: `rule2`,
  1567. sql: `SELECT color, ts FROM demo where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  1568. size: 5,
  1569. r: [][]map[string]interface{}{
  1570. {{
  1571. "color": "red",
  1572. "ts": float64(1541152486013),
  1573. }, {
  1574. "color": "blue",
  1575. "ts": float64(1541152486822),
  1576. }},
  1577. {{
  1578. "color": "yellow",
  1579. "ts": float64(1541152488442),
  1580. }},
  1581. },
  1582. m: map[string]interface{}{
  1583. "op_preprocessor_demo_0_exceptions_total": int64(0),
  1584. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  1585. "op_preprocessor_demo_0_records_in_total": int64(5),
  1586. "op_preprocessor_demo_0_records_out_total": int64(5),
  1587. "op_project_0_exceptions_total": int64(0),
  1588. "op_project_0_process_latency_ms": int64(0),
  1589. "op_project_0_records_in_total": int64(2),
  1590. "op_project_0_records_out_total": int64(2),
  1591. "sink_mockSink_0_exceptions_total": int64(0),
  1592. "sink_mockSink_0_records_in_total": int64(2),
  1593. "sink_mockSink_0_records_out_total": int64(2),
  1594. "source_demo_0_exceptions_total": int64(0),
  1595. "source_demo_0_records_in_total": int64(5),
  1596. "source_demo_0_records_out_total": int64(5),
  1597. "op_window_0_exceptions_total": int64(0),
  1598. "op_window_0_process_latency_ms": int64(0),
  1599. "op_window_0_records_in_total": int64(5),
  1600. "op_window_0_records_out_total": int64(3),
  1601. "op_filter_0_exceptions_total": int64(0),
  1602. "op_filter_0_process_latency_ms": int64(0),
  1603. "op_filter_0_records_in_total": int64(3),
  1604. "op_filter_0_records_out_total": int64(2),
  1605. },
  1606. }, {
  1607. name: `rule3`,
  1608. sql: `SELECT color, temp, ts FROM demo INNER JOIN demo1 ON demo.ts = demo1.ts GROUP BY SlidingWindow(ss, 1)`,
  1609. size: 5,
  1610. r: [][]map[string]interface{}{
  1611. {{
  1612. "color": "red",
  1613. "temp": 25.5,
  1614. "ts": float64(1541152486013),
  1615. }}, {{
  1616. "color": "red",
  1617. "temp": 25.5,
  1618. "ts": float64(1541152486013),
  1619. }}, {{
  1620. "color": "red",
  1621. "temp": 25.5,
  1622. "ts": float64(1541152486013),
  1623. }}, {{
  1624. "color": "blue",
  1625. "temp": 28.1,
  1626. "ts": float64(1541152487632),
  1627. }}, {{
  1628. "color": "blue",
  1629. "temp": 28.1,
  1630. "ts": float64(1541152487632),
  1631. }}, {{
  1632. "color": "blue",
  1633. "temp": 28.1,
  1634. "ts": float64(1541152487632),
  1635. }, {
  1636. "color": "yellow",
  1637. "temp": 27.4,
  1638. "ts": float64(1541152488442),
  1639. }}, {{
  1640. "color": "yellow",
  1641. "temp": 27.4,
  1642. "ts": float64(1541152488442),
  1643. }}, {{
  1644. "color": "yellow",
  1645. "temp": 27.4,
  1646. "ts": float64(1541152488442),
  1647. }, {
  1648. "color": "red",
  1649. "temp": 25.5,
  1650. "ts": float64(1541152489252),
  1651. }},
  1652. },
  1653. m: map[string]interface{}{
  1654. "op_preprocessor_demo_0_exceptions_total": int64(0),
  1655. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  1656. "op_preprocessor_demo_0_records_in_total": int64(5),
  1657. "op_preprocessor_demo_0_records_out_total": int64(5),
  1658. "op_preprocessor_demo1_0_exceptions_total": int64(0),
  1659. "op_preprocessor_demo1_0_process_latency_ms": int64(0),
  1660. "op_preprocessor_demo1_0_records_in_total": int64(5),
  1661. "op_preprocessor_demo1_0_records_out_total": int64(5),
  1662. "op_project_0_exceptions_total": int64(0),
  1663. "op_project_0_process_latency_ms": int64(0),
  1664. "op_project_0_records_in_total": int64(8),
  1665. "op_project_0_records_out_total": int64(8),
  1666. "sink_mockSink_0_exceptions_total": int64(0),
  1667. "sink_mockSink_0_records_in_total": int64(8),
  1668. "sink_mockSink_0_records_out_total": int64(8),
  1669. "source_demo_0_exceptions_total": int64(0),
  1670. "source_demo_0_records_in_total": int64(5),
  1671. "source_demo_0_records_out_total": int64(5),
  1672. "source_demo1_0_exceptions_total": int64(0),
  1673. "source_demo1_0_records_in_total": int64(5),
  1674. "source_demo1_0_records_out_total": int64(5),
  1675. "op_window_0_exceptions_total": int64(0),
  1676. "op_window_0_process_latency_ms": int64(0),
  1677. "op_window_0_records_in_total": int64(10),
  1678. "op_window_0_records_out_total": int64(10),
  1679. "op_join_0_exceptions_total": int64(0),
  1680. "op_join_0_process_latency_ms": int64(0),
  1681. "op_join_0_records_in_total": int64(10),
  1682. "op_join_0_records_out_total": int64(8),
  1683. },
  1684. }, {
  1685. name: `rule4`,
  1686. sql: `SELECT color FROM demo GROUP BY SlidingWindow(ss, 2), color ORDER BY color`,
  1687. size: 5,
  1688. r: [][]map[string]interface{}{
  1689. {{
  1690. "color": "red",
  1691. }}, {{
  1692. "color": "blue",
  1693. }, {
  1694. "color": "red",
  1695. }}, {{
  1696. "color": "blue",
  1697. }, {
  1698. "color": "red",
  1699. }}, {{
  1700. "color": "blue",
  1701. }, {
  1702. "color": "yellow",
  1703. }}, {{
  1704. "color": "blue",
  1705. }, {
  1706. "color": "red",
  1707. }, {
  1708. "color": "yellow",
  1709. }},
  1710. },
  1711. m: map[string]interface{}{
  1712. "op_preprocessor_demo_0_exceptions_total": int64(0),
  1713. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  1714. "op_preprocessor_demo_0_records_in_total": int64(5),
  1715. "op_preprocessor_demo_0_records_out_total": int64(5),
  1716. "op_project_0_exceptions_total": int64(0),
  1717. "op_project_0_process_latency_ms": int64(0),
  1718. "op_project_0_records_in_total": int64(5),
  1719. "op_project_0_records_out_total": int64(5),
  1720. "sink_mockSink_0_exceptions_total": int64(0),
  1721. "sink_mockSink_0_records_in_total": int64(5),
  1722. "sink_mockSink_0_records_out_total": int64(5),
  1723. "source_demo_0_exceptions_total": int64(0),
  1724. "source_demo_0_records_in_total": int64(5),
  1725. "source_demo_0_records_out_total": int64(5),
  1726. "op_window_0_exceptions_total": int64(0),
  1727. "op_window_0_process_latency_ms": int64(0),
  1728. "op_window_0_records_in_total": int64(5),
  1729. "op_window_0_records_out_total": int64(5),
  1730. "op_aggregate_0_exceptions_total": int64(0),
  1731. "op_aggregate_0_process_latency_ms": int64(0),
  1732. "op_aggregate_0_records_in_total": int64(5),
  1733. "op_aggregate_0_records_out_total": int64(5),
  1734. "op_order_0_exceptions_total": int64(0),
  1735. "op_order_0_process_latency_ms": int64(0),
  1736. "op_order_0_records_in_total": int64(5),
  1737. "op_order_0_records_out_total": int64(5),
  1738. },
  1739. }, {
  1740. name: `rule5`,
  1741. sql: `SELECT temp FROM sessionDemo GROUP BY SessionWindow(ss, 2, 1) `,
  1742. size: 11,
  1743. r: [][]map[string]interface{}{
  1744. {{
  1745. "temp": 25.5,
  1746. }, {
  1747. "temp": 27.5,
  1748. }}, {{
  1749. "temp": 28.1,
  1750. }, {
  1751. "temp": 27.4,
  1752. }, {
  1753. "temp": 25.5,
  1754. }}, {{
  1755. "temp": 26.2,
  1756. }, {
  1757. "temp": 26.8,
  1758. }, {
  1759. "temp": 28.9,
  1760. }, {
  1761. "temp": 29.1,
  1762. }, {
  1763. "temp": 32.2,
  1764. }},
  1765. },
  1766. m: map[string]interface{}{
  1767. "op_preprocessor_sessionDemo_0_exceptions_total": int64(0),
  1768. "op_preprocessor_sessionDemo_0_process_latency_ms": int64(0),
  1769. "op_preprocessor_sessionDemo_0_records_in_total": int64(11),
  1770. "op_preprocessor_sessionDemo_0_records_out_total": int64(11),
  1771. "op_project_0_exceptions_total": int64(0),
  1772. "op_project_0_process_latency_ms": int64(0),
  1773. "op_project_0_records_in_total": int64(3),
  1774. "op_project_0_records_out_total": int64(3),
  1775. "sink_mockSink_0_exceptions_total": int64(0),
  1776. "sink_mockSink_0_records_in_total": int64(3),
  1777. "sink_mockSink_0_records_out_total": int64(3),
  1778. "source_sessionDemo_0_exceptions_total": int64(0),
  1779. "source_sessionDemo_0_records_in_total": int64(11),
  1780. "source_sessionDemo_0_records_out_total": int64(11),
  1781. "op_window_0_exceptions_total": int64(0),
  1782. "op_window_0_process_latency_ms": int64(0),
  1783. "op_window_0_records_in_total": int64(11),
  1784. "op_window_0_records_out_total": int64(3),
  1785. },
  1786. }, {
  1787. name: `rule6`,
  1788. sql: `SELECT max(temp) as m, count(color) as c FROM demo INNER JOIN demo1 ON demo.ts = demo1.ts GROUP BY SlidingWindow(ss, 1)`,
  1789. size: 5,
  1790. r: [][]map[string]interface{}{
  1791. {{
  1792. "m": 25.5,
  1793. "c": float64(1),
  1794. }}, {{
  1795. "m": 25.5,
  1796. "c": float64(1),
  1797. }}, {{
  1798. "m": 25.5,
  1799. "c": float64(1),
  1800. }}, {{
  1801. "m": 28.1,
  1802. "c": float64(1),
  1803. }}, {{
  1804. "m": 28.1,
  1805. "c": float64(1),
  1806. }}, {{
  1807. "m": 28.1,
  1808. "c": float64(2),
  1809. }}, {{
  1810. "m": 27.4,
  1811. "c": float64(1),
  1812. }}, {{
  1813. "m": 27.4,
  1814. "c": float64(2),
  1815. }},
  1816. },
  1817. m: map[string]interface{}{
  1818. "op_preprocessor_demo_0_exceptions_total": int64(0),
  1819. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  1820. "op_preprocessor_demo_0_records_in_total": int64(5),
  1821. "op_preprocessor_demo_0_records_out_total": int64(5),
  1822. "op_preprocessor_demo1_0_exceptions_total": int64(0),
  1823. "op_preprocessor_demo1_0_process_latency_ms": int64(0),
  1824. "op_preprocessor_demo1_0_records_in_total": int64(5),
  1825. "op_preprocessor_demo1_0_records_out_total": int64(5),
  1826. "op_project_0_exceptions_total": int64(0),
  1827. "op_project_0_process_latency_ms": int64(0),
  1828. "op_project_0_records_in_total": int64(8),
  1829. "op_project_0_records_out_total": int64(8),
  1830. "sink_mockSink_0_exceptions_total": int64(0),
  1831. "sink_mockSink_0_records_in_total": int64(8),
  1832. "sink_mockSink_0_records_out_total": int64(8),
  1833. "source_demo_0_exceptions_total": int64(0),
  1834. "source_demo_0_records_in_total": int64(5),
  1835. "source_demo_0_records_out_total": int64(5),
  1836. "source_demo1_0_exceptions_total": int64(0),
  1837. "source_demo1_0_records_in_total": int64(5),
  1838. "source_demo1_0_records_out_total": int64(5),
  1839. "op_window_0_exceptions_total": int64(0),
  1840. "op_window_0_process_latency_ms": int64(0),
  1841. "op_window_0_records_in_total": int64(10),
  1842. "op_window_0_records_out_total": int64(10),
  1843. "op_join_0_exceptions_total": int64(0),
  1844. "op_join_0_process_latency_ms": int64(0),
  1845. "op_join_0_records_in_total": int64(10),
  1846. "op_join_0_records_out_total": int64(8),
  1847. },
  1848. }, {
  1849. name: `rule7`,
  1850. sql: `SELECT * FROM demoE GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  1851. size: 5,
  1852. r: [][]map[string]interface{}{
  1853. {{
  1854. "error": "error in preprocessor: invalid data type for color, expect string but found int(3)",
  1855. }},
  1856. {{
  1857. "color": "blue",
  1858. "size": float64(6),
  1859. "ts": float64(1541152486822),
  1860. }},
  1861. {{
  1862. "color": "blue",
  1863. "size": float64(6),
  1864. "ts": float64(1541152486822),
  1865. }, {
  1866. "color": "blue",
  1867. "size": float64(2),
  1868. "ts": float64(1541152487632),
  1869. }},
  1870. {{
  1871. "error": "error in preprocessor: invalid data type for color, expect string but found int(7)",
  1872. }},
  1873. {{
  1874. "color": "blue",
  1875. "size": float64(2),
  1876. "ts": float64(1541152487632),
  1877. }},
  1878. {{
  1879. "error": "error in preprocessor: invalid data type for size, expect bigint but found string(blue)",
  1880. }},
  1881. },
  1882. m: map[string]interface{}{
  1883. "op_preprocessor_demoE_0_exceptions_total": int64(3),
  1884. "op_preprocessor_demoE_0_process_latency_ms": int64(0),
  1885. "op_preprocessor_demoE_0_records_in_total": int64(5),
  1886. "op_preprocessor_demoE_0_records_out_total": int64(2),
  1887. "op_project_0_exceptions_total": int64(3),
  1888. "op_project_0_process_latency_ms": int64(0),
  1889. "op_project_0_records_in_total": int64(6),
  1890. "op_project_0_records_out_total": int64(3),
  1891. "sink_mockSink_0_exceptions_total": int64(0),
  1892. "sink_mockSink_0_records_in_total": int64(6),
  1893. "sink_mockSink_0_records_out_total": int64(6),
  1894. "source_demoE_0_exceptions_total": int64(0),
  1895. "source_demoE_0_records_in_total": int64(5),
  1896. "source_demoE_0_records_out_total": int64(5),
  1897. "op_window_0_exceptions_total": int64(3),
  1898. "op_window_0_process_latency_ms": int64(0),
  1899. "op_window_0_records_in_total": int64(5),
  1900. "op_window_0_records_out_total": int64(3),
  1901. },
  1902. }, {
  1903. name: `rule8`,
  1904. sql: `SELECT color, ts, count(*) as c FROM demo where size > 2 GROUP BY tumblingwindow(ss, 1) having c > 1`,
  1905. size: 5,
  1906. r: [][]map[string]interface{}{
  1907. {{
  1908. "color": "red",
  1909. "ts": float64(1541152486013),
  1910. "c": float64(2),
  1911. }},
  1912. },
  1913. m: map[string]interface{}{
  1914. "op_preprocessor_demo_0_exceptions_total": int64(0),
  1915. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  1916. "op_preprocessor_demo_0_records_in_total": int64(5),
  1917. "op_preprocessor_demo_0_records_out_total": int64(5),
  1918. "op_project_0_exceptions_total": int64(0),
  1919. "op_project_0_process_latency_ms": int64(0),
  1920. "op_project_0_records_in_total": int64(1),
  1921. "op_project_0_records_out_total": int64(1),
  1922. "sink_mockSink_0_exceptions_total": int64(0),
  1923. "sink_mockSink_0_records_in_total": int64(1),
  1924. "sink_mockSink_0_records_out_total": int64(1),
  1925. "source_demo_0_exceptions_total": int64(0),
  1926. "source_demo_0_records_in_total": int64(5),
  1927. "source_demo_0_records_out_total": int64(5),
  1928. "op_window_0_exceptions_total": int64(0),
  1929. "op_window_0_process_latency_ms": int64(0),
  1930. "op_window_0_records_in_total": int64(5),
  1931. "op_window_0_records_out_total": int64(3),
  1932. "op_filter_0_exceptions_total": int64(0),
  1933. "op_filter_0_process_latency_ms": int64(0),
  1934. "op_filter_0_records_in_total": int64(3),
  1935. "op_filter_0_records_out_total": int64(2),
  1936. "op_aggregate_0_exceptions_total": int64(0),
  1937. "op_aggregate_0_process_latency_ms": int64(0),
  1938. "op_aggregate_0_records_in_total": int64(2),
  1939. "op_aggregate_0_records_out_total": int64(2),
  1940. "op_having_0_exceptions_total": int64(0),
  1941. "op_having_0_process_latency_ms": int64(0),
  1942. "op_having_0_records_in_total": int64(2),
  1943. "op_having_0_records_out_total": int64(1),
  1944. },
  1945. },
  1946. }
  1947. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1948. createStreams(t)
  1949. defer dropStreams(t)
  1950. for i, tt := range tests {
  1951. test.ResetClock(1541152486000)
  1952. p := NewRuleProcessor(DbDir)
  1953. parser := xsql.NewParser(strings.NewReader(tt.sql))
  1954. var (
  1955. sources []*nodes.SourceNode
  1956. syncs []chan int
  1957. )
  1958. if stmt, err := xsql.Language.Parse(parser); err != nil {
  1959. t.Errorf("parse sql %s error: %s", tt.sql, err)
  1960. } else {
  1961. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  1962. t.Errorf("sql %s is not a select statement", tt.sql)
  1963. } else {
  1964. streams := xsql.GetStreams(selectStmt)
  1965. for _, stream := range streams {
  1966. next := make(chan int)
  1967. syncs = append(syncs, next)
  1968. source := getMockSource(stream, next, tt.size)
  1969. sources = append(sources, source)
  1970. }
  1971. }
  1972. }
  1973. tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql}, sources)
  1974. if err != nil {
  1975. t.Error(err)
  1976. }
  1977. mockSink := test.NewMockSink()
  1978. sink := nodes.NewSinkNodeWithSink("mockSink", mockSink, nil)
  1979. tp.AddSink(inputs, sink)
  1980. errCh := tp.Open()
  1981. func() {
  1982. for i := 0; i < tt.size*len(syncs); i++ {
  1983. syncs[i%len(syncs)] <- i
  1984. for {
  1985. time.Sleep(1)
  1986. if getMetric(tp, "op_window_0_records_in_total") == (i + 1) {
  1987. break
  1988. }
  1989. }
  1990. select {
  1991. case err = <-errCh:
  1992. t.Log(err)
  1993. tp.Cancel()
  1994. return
  1995. default:
  1996. }
  1997. }
  1998. retry := 100
  1999. for ; retry > 0; retry-- {
  2000. if err := compareMetrics(tp, tt.m, tt.sql); err == nil {
  2001. break
  2002. }
  2003. t.Logf("wait to try another %d times", retry)
  2004. time.Sleep(time.Duration(retry) * time.Millisecond)
  2005. }
  2006. if retry == 0 {
  2007. err := compareMetrics(tp, tt.m, tt.sql)
  2008. t.Errorf("could not get correct metrics: %v", err)
  2009. }
  2010. }()
  2011. results := mockSink.GetResults()
  2012. var maps [][]map[string]interface{}
  2013. for _, v := range results {
  2014. var mapRes []map[string]interface{}
  2015. err := json.Unmarshal(v, &mapRes)
  2016. if err != nil {
  2017. t.Errorf("Failed to parse the input into map")
  2018. continue
  2019. }
  2020. maps = append(maps, mapRes)
  2021. }
  2022. if !reflect.DeepEqual(tt.r, maps) {
  2023. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  2024. }
  2025. if err := compareMetrics(tp, tt.m, tt.sql); err != nil {
  2026. t.Errorf("%d. %q\n\n%v", i, tt.sql, err)
  2027. }
  2028. tp.Cancel()
  2029. }
  2030. }
  2031. func TestWindowError(t *testing.T) {
  2032. var tests = []struct {
  2033. name string
  2034. sql string
  2035. size int
  2036. r [][]map[string]interface{}
  2037. m map[string]interface{}
  2038. }{
  2039. {
  2040. name: `rule1`,
  2041. sql: `SELECT size * 3 FROM ldemo GROUP BY TUMBLINGWINDOW(ss, 2)`,
  2042. size: 5,
  2043. r: [][]map[string]interface{}{
  2044. {{
  2045. "error": "run Select error: invalid operation string(string) * int64(3)",
  2046. }},
  2047. },
  2048. m: map[string]interface{}{
  2049. "op_preprocessor_ldemo_0_exceptions_total": int64(0),
  2050. "op_preprocessor_ldemo_0_process_latency_ms": int64(0),
  2051. "op_preprocessor_ldemo_0_records_in_total": int64(5),
  2052. "op_preprocessor_ldemo_0_records_out_total": int64(5),
  2053. "op_project_0_exceptions_total": int64(1),
  2054. "op_project_0_process_latency_ms": int64(0),
  2055. "op_project_0_records_in_total": int64(1),
  2056. "op_project_0_records_out_total": int64(0),
  2057. "sink_mockSink_0_exceptions_total": int64(0),
  2058. "sink_mockSink_0_records_in_total": int64(1),
  2059. "sink_mockSink_0_records_out_total": int64(1),
  2060. "source_ldemo_0_exceptions_total": int64(0),
  2061. "source_ldemo_0_records_in_total": int64(5),
  2062. "source_ldemo_0_records_out_total": int64(5),
  2063. "op_window_0_exceptions_total": int64(0),
  2064. "op_window_0_process_latency_ms": int64(0),
  2065. "op_window_0_records_in_total": int64(5),
  2066. "op_window_0_records_out_total": int64(1),
  2067. },
  2068. }, {
  2069. name: `rule2`,
  2070. sql: `SELECT color, ts FROM ldemo where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  2071. size: 5,
  2072. r: [][]map[string]interface{}{
  2073. {{
  2074. "error": "run Where error: invalid operation string(string) > int64(2)",
  2075. }}, {{
  2076. "ts": float64(1541152487632),
  2077. }},
  2078. },
  2079. m: map[string]interface{}{
  2080. "op_preprocessor_ldemo_0_exceptions_total": int64(0),
  2081. "op_preprocessor_ldemo_0_process_latency_ms": int64(0),
  2082. "op_preprocessor_ldemo_0_records_in_total": int64(5),
  2083. "op_preprocessor_ldemo_0_records_out_total": int64(5),
  2084. "op_project_0_exceptions_total": int64(1),
  2085. "op_project_0_process_latency_ms": int64(0),
  2086. "op_project_0_records_in_total": int64(2),
  2087. "op_project_0_records_out_total": int64(1),
  2088. "sink_mockSink_0_exceptions_total": int64(0),
  2089. "sink_mockSink_0_records_in_total": int64(2),
  2090. "sink_mockSink_0_records_out_total": int64(2),
  2091. "source_ldemo_0_exceptions_total": int64(0),
  2092. "source_ldemo_0_records_in_total": int64(5),
  2093. "source_ldemo_0_records_out_total": int64(5),
  2094. "op_window_0_exceptions_total": int64(0),
  2095. "op_window_0_process_latency_ms": int64(0),
  2096. "op_window_0_records_in_total": int64(5),
  2097. "op_window_0_records_out_total": int64(3),
  2098. "op_filter_0_exceptions_total": int64(1),
  2099. "op_filter_0_process_latency_ms": int64(0),
  2100. "op_filter_0_records_in_total": int64(3),
  2101. "op_filter_0_records_out_total": int64(1),
  2102. },
  2103. }, {
  2104. name: `rule3`,
  2105. sql: `SELECT color, temp, ts FROM ldemo INNER JOIN ldemo1 ON ldemo.ts = ldemo1.ts GROUP BY SlidingWindow(ss, 1)`,
  2106. size: 5,
  2107. r: [][]map[string]interface{}{
  2108. {{
  2109. "color": "red",
  2110. "temp": 25.5,
  2111. "ts": float64(1541152486013),
  2112. }}, {{
  2113. "color": "red",
  2114. "temp": 25.5,
  2115. "ts": float64(1541152486013),
  2116. }}, {{
  2117. "color": "red",
  2118. "temp": 25.5,
  2119. "ts": float64(1541152486013),
  2120. }}, {{
  2121. "temp": 28.1,
  2122. "ts": float64(1541152487632),
  2123. }}, {{
  2124. "temp": 28.1,
  2125. "ts": float64(1541152487632),
  2126. }}, {{
  2127. "error": "run Join error: invalid operation int64(1541152487632) = string(1541152488442)",
  2128. }}, {{
  2129. "error": "run Join error: invalid operation int64(1541152488442) = string(1541152488442)",
  2130. }}, {{
  2131. "error": "run Join error: invalid operation int64(1541152488442) = string(1541152488442)",
  2132. }},
  2133. },
  2134. m: map[string]interface{}{
  2135. "op_preprocessor_ldemo_0_exceptions_total": int64(0),
  2136. "op_preprocessor_ldemo_0_process_latency_ms": int64(0),
  2137. "op_preprocessor_ldemo_0_records_in_total": int64(5),
  2138. "op_preprocessor_ldemo_0_records_out_total": int64(5),
  2139. "op_preprocessor_ldemo1_0_exceptions_total": int64(0),
  2140. "op_preprocessor_ldemo1_0_process_latency_ms": int64(0),
  2141. "op_preprocessor_ldemo1_0_records_in_total": int64(5),
  2142. "op_preprocessor_ldemo1_0_records_out_total": int64(5),
  2143. "op_project_0_exceptions_total": int64(3),
  2144. "op_project_0_process_latency_ms": int64(0),
  2145. "op_project_0_records_in_total": int64(8),
  2146. "op_project_0_records_out_total": int64(5),
  2147. "sink_mockSink_0_exceptions_total": int64(0),
  2148. "sink_mockSink_0_records_in_total": int64(8),
  2149. "sink_mockSink_0_records_out_total": int64(8),
  2150. "source_ldemo_0_exceptions_total": int64(0),
  2151. "source_ldemo_0_records_in_total": int64(5),
  2152. "source_ldemo_0_records_out_total": int64(5),
  2153. "source_ldemo1_0_exceptions_total": int64(0),
  2154. "source_ldemo1_0_records_in_total": int64(5),
  2155. "source_ldemo1_0_records_out_total": int64(5),
  2156. "op_window_0_exceptions_total": int64(0),
  2157. "op_window_0_process_latency_ms": int64(0),
  2158. "op_window_0_records_in_total": int64(10),
  2159. "op_window_0_records_out_total": int64(10),
  2160. "op_join_0_exceptions_total": int64(3),
  2161. "op_join_0_process_latency_ms": int64(0),
  2162. "op_join_0_records_in_total": int64(10),
  2163. "op_join_0_records_out_total": int64(5),
  2164. },
  2165. }, {
  2166. name: `rule4`,
  2167. sql: `SELECT color FROM ldemo GROUP BY SlidingWindow(ss, 2), color having size >= 2 order by color`,
  2168. size: 5,
  2169. r: [][]map[string]interface{}{
  2170. {{
  2171. "color": "red",
  2172. }}, {{
  2173. "error": "run Having error: invalid operation string(string) >= int64(2)",
  2174. }}, {{
  2175. "error": "run Having error: invalid operation string(string) >= int64(2)",
  2176. }}, {{
  2177. "error": "run Having error: invalid operation string(string) >= int64(2)",
  2178. }}, {{
  2179. "color": float64(49),
  2180. }, {}},
  2181. },
  2182. m: map[string]interface{}{
  2183. "op_preprocessor_ldemo_0_exceptions_total": int64(0),
  2184. "op_preprocessor_ldemo_0_process_latency_ms": int64(0),
  2185. "op_preprocessor_ldemo_0_records_in_total": int64(5),
  2186. "op_preprocessor_ldemo_0_records_out_total": int64(5),
  2187. "op_project_0_exceptions_total": int64(3),
  2188. "op_project_0_process_latency_ms": int64(0),
  2189. "op_project_0_records_in_total": int64(5),
  2190. "op_project_0_records_out_total": int64(2),
  2191. "sink_mockSink_0_exceptions_total": int64(0),
  2192. "sink_mockSink_0_records_in_total": int64(5),
  2193. "sink_mockSink_0_records_out_total": int64(5),
  2194. "source_ldemo_0_exceptions_total": int64(0),
  2195. "source_ldemo_0_records_in_total": int64(5),
  2196. "source_ldemo_0_records_out_total": int64(5),
  2197. "op_window_0_exceptions_total": int64(0),
  2198. "op_window_0_process_latency_ms": int64(0),
  2199. "op_window_0_records_in_total": int64(5),
  2200. "op_window_0_records_out_total": int64(5),
  2201. "op_aggregate_0_exceptions_total": int64(0),
  2202. "op_aggregate_0_process_latency_ms": int64(0),
  2203. "op_aggregate_0_records_in_total": int64(5),
  2204. "op_aggregate_0_records_out_total": int64(5),
  2205. "op_having_0_exceptions_total": int64(3),
  2206. "op_having_0_process_latency_ms": int64(0),
  2207. "op_having_0_records_in_total": int64(5),
  2208. "op_having_0_records_out_total": int64(2),
  2209. },
  2210. }, {
  2211. name: `rule5`,
  2212. sql: `SELECT color, size FROM ldemo GROUP BY tumblingwindow(ss, 1) ORDER BY size`,
  2213. size: 5,
  2214. r: [][]map[string]interface{}{
  2215. {{
  2216. "error": "run Order By error: incompatible types for comparison: int and string",
  2217. }}, {{
  2218. "size": float64(3),
  2219. }}, {{
  2220. "color": float64(49),
  2221. "size": float64(2),
  2222. }},
  2223. },
  2224. m: map[string]interface{}{
  2225. "op_preprocessor_ldemo_0_exceptions_total": int64(0),
  2226. "op_preprocessor_ldemo_0_process_latency_ms": int64(0),
  2227. "op_preprocessor_ldemo_0_records_in_total": int64(5),
  2228. "op_preprocessor_ldemo_0_records_out_total": int64(5),
  2229. "op_project_0_exceptions_total": int64(1),
  2230. "op_project_0_process_latency_ms": int64(0),
  2231. "op_project_0_records_in_total": int64(3),
  2232. "op_project_0_records_out_total": int64(2),
  2233. "sink_mockSink_0_exceptions_total": int64(0),
  2234. "sink_mockSink_0_records_in_total": int64(3),
  2235. "sink_mockSink_0_records_out_total": int64(3),
  2236. "source_ldemo_0_exceptions_total": int64(0),
  2237. "source_ldemo_0_records_in_total": int64(5),
  2238. "source_ldemo_0_records_out_total": int64(5),
  2239. "op_window_0_exceptions_total": int64(0),
  2240. "op_window_0_process_latency_ms": int64(0),
  2241. "op_window_0_records_in_total": int64(5),
  2242. "op_window_0_records_out_total": int64(3),
  2243. "op_order_0_exceptions_total": int64(1),
  2244. "op_order_0_process_latency_ms": int64(0),
  2245. "op_order_0_records_in_total": int64(3),
  2246. "op_order_0_records_out_total": int64(2),
  2247. },
  2248. },
  2249. }
  2250. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  2251. createSchemalessStreams(t)
  2252. defer dropSchemalessStreams(t)
  2253. for i, tt := range tests {
  2254. test.ResetClock(1541152486000)
  2255. p := NewRuleProcessor(DbDir)
  2256. parser := xsql.NewParser(strings.NewReader(tt.sql))
  2257. var (
  2258. sources []*nodes.SourceNode
  2259. syncs []chan int
  2260. )
  2261. if stmt, err := xsql.Language.Parse(parser); err != nil {
  2262. t.Errorf("parse sql %s error: %s", tt.sql, err)
  2263. } else {
  2264. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  2265. t.Errorf("sql %s is not a select statement", tt.sql)
  2266. } else {
  2267. streams := xsql.GetStreams(selectStmt)
  2268. for _, stream := range streams {
  2269. next := make(chan int)
  2270. syncs = append(syncs, next)
  2271. source := getMockSourceL(stream, next, tt.size)
  2272. sources = append(sources, source)
  2273. }
  2274. }
  2275. }
  2276. tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql}, sources)
  2277. if err != nil {
  2278. t.Error(err)
  2279. }
  2280. mockSink := test.NewMockSink()
  2281. sink := nodes.NewSinkNodeWithSink("mockSink", mockSink, nil)
  2282. tp.AddSink(inputs, sink)
  2283. errCh := tp.Open()
  2284. func() {
  2285. for i := 0; i < tt.size*len(syncs); i++ {
  2286. syncs[i%len(syncs)] <- i
  2287. for {
  2288. time.Sleep(1)
  2289. if getMetric(tp, "op_window_0_records_in_total") == (i + 1) {
  2290. break
  2291. }
  2292. }
  2293. select {
  2294. case err = <-errCh:
  2295. t.Log(err)
  2296. tp.Cancel()
  2297. return
  2298. default:
  2299. }
  2300. }
  2301. retry := 100
  2302. for ; retry > 0; retry-- {
  2303. if err := compareMetrics(tp, tt.m, tt.sql); err == nil {
  2304. break
  2305. }
  2306. t.Logf("wait to try another %d times", retry)
  2307. time.Sleep(time.Duration(retry) * time.Millisecond)
  2308. }
  2309. if retry == 0 {
  2310. err := compareMetrics(tp, tt.m, tt.sql)
  2311. t.Errorf("could not get correct metrics: %v", err)
  2312. }
  2313. }()
  2314. results := mockSink.GetResults()
  2315. var maps [][]map[string]interface{}
  2316. for _, v := range results {
  2317. var mapRes []map[string]interface{}
  2318. err := json.Unmarshal(v, &mapRes)
  2319. if err != nil {
  2320. t.Errorf("Failed to parse the input into map")
  2321. continue
  2322. }
  2323. maps = append(maps, mapRes)
  2324. }
  2325. if !reflect.DeepEqual(tt.r, maps) {
  2326. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  2327. }
  2328. if err := compareMetrics(tp, tt.m, tt.sql); err != nil {
  2329. t.Errorf("%d. %q\n\n%v", i, tt.sql, err)
  2330. }
  2331. tp.Cancel()
  2332. }
  2333. }
  2334. func createEventStreams(t *testing.T) {
  2335. p := NewStreamProcessor(path.Join(DbDir, "stream"))
  2336. demo := `CREATE STREAM demoE (
  2337. color STRING,
  2338. size BIGINT,
  2339. ts BIGINT
  2340. ) WITH (DATASOURCE="demoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  2341. _, err := p.ExecStmt(demo)
  2342. if err != nil {
  2343. t.Log(err)
  2344. }
  2345. demo1 := `CREATE STREAM demo1E (
  2346. temp FLOAT,
  2347. hum BIGINT,
  2348. ts BIGINT
  2349. ) WITH (DATASOURCE="demo1E", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  2350. _, err = p.ExecStmt(demo1)
  2351. if err != nil {
  2352. t.Log(err)
  2353. }
  2354. sessionDemo := `CREATE STREAM sessionDemoE (
  2355. temp FLOAT,
  2356. hum BIGINT,
  2357. ts BIGINT
  2358. ) WITH (DATASOURCE="sessionDemoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  2359. _, err = p.ExecStmt(sessionDemo)
  2360. if err != nil {
  2361. t.Log(err)
  2362. }
  2363. demoErr := `CREATE STREAM demoErr (
  2364. color STRING,
  2365. size BIGINT,
  2366. ts BIGINT
  2367. ) WITH (DATASOURCE="demoErr", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  2368. _, err = p.ExecStmt(demoErr)
  2369. if err != nil {
  2370. t.Log(err)
  2371. }
  2372. }
  2373. func dropEventStreams(t *testing.T) {
  2374. p := NewStreamProcessor(path.Join(DbDir, "stream"))
  2375. demo := `DROP STREAM demoE`
  2376. _, err := p.ExecStmt(demo)
  2377. if err != nil {
  2378. t.Log(err)
  2379. }
  2380. demo1 := `DROP STREAM demo1E`
  2381. _, err = p.ExecStmt(demo1)
  2382. if err != nil {
  2383. t.Log(err)
  2384. }
  2385. sessionDemo := `DROP STREAM sessionDemoE`
  2386. _, err = p.ExecStmt(sessionDemo)
  2387. if err != nil {
  2388. t.Log(err)
  2389. }
  2390. demoErr := `DROP STREAM demoErr`
  2391. _, err = p.ExecStmt(demoErr)
  2392. if err != nil {
  2393. t.Log(err)
  2394. }
  2395. }
  2396. func getEventMockSource(name string, done <-chan int, size int) *nodes.SourceNode {
  2397. var data []*xsql.Tuple
  2398. switch name {
  2399. case "demoE":
  2400. data = []*xsql.Tuple{
  2401. {
  2402. Emitter: name,
  2403. Message: map[string]interface{}{
  2404. "color": "red",
  2405. "size": 3,
  2406. "ts": 1541152486013,
  2407. },
  2408. Timestamp: 1541152486013,
  2409. },
  2410. {
  2411. Emitter: name,
  2412. Message: map[string]interface{}{
  2413. "color": "blue",
  2414. "size": 2,
  2415. "ts": 1541152487632,
  2416. },
  2417. Timestamp: 1541152487632,
  2418. },
  2419. {
  2420. Emitter: name,
  2421. Message: map[string]interface{}{
  2422. "color": "red",
  2423. "size": 1,
  2424. "ts": 1541152489252,
  2425. },
  2426. Timestamp: 1541152489252,
  2427. },
  2428. { //dropped item
  2429. Emitter: name,
  2430. Message: map[string]interface{}{
  2431. "color": "blue",
  2432. "size": 6,
  2433. "ts": 1541152486822,
  2434. },
  2435. Timestamp: 1541152486822,
  2436. },
  2437. {
  2438. Emitter: name,
  2439. Message: map[string]interface{}{
  2440. "color": "yellow",
  2441. "size": 4,
  2442. "ts": 1541152488442,
  2443. },
  2444. Timestamp: 1541152488442,
  2445. },
  2446. { //To lift the watermark and issue all windows
  2447. Emitter: name,
  2448. Message: map[string]interface{}{
  2449. "color": "yellow",
  2450. "size": 4,
  2451. "ts": 1541152492342,
  2452. },
  2453. Timestamp: 1541152488442,
  2454. },
  2455. }
  2456. case "demo1E":
  2457. data = []*xsql.Tuple{
  2458. {
  2459. Emitter: name,
  2460. Message: map[string]interface{}{
  2461. "temp": 27.5,
  2462. "hum": 59,
  2463. "ts": 1541152486823,
  2464. },
  2465. Timestamp: 1541152486823,
  2466. },
  2467. {
  2468. Emitter: name,
  2469. Message: map[string]interface{}{
  2470. "temp": 25.5,
  2471. "hum": 65,
  2472. "ts": 1541152486013,
  2473. },
  2474. Timestamp: 1541152486013,
  2475. },
  2476. {
  2477. Emitter: name,
  2478. Message: map[string]interface{}{
  2479. "temp": 27.4,
  2480. "hum": 80,
  2481. "ts": 1541152488442,
  2482. },
  2483. Timestamp: 1541152488442,
  2484. },
  2485. {
  2486. Emitter: name,
  2487. Message: map[string]interface{}{
  2488. "temp": 28.1,
  2489. "hum": 75,
  2490. "ts": 1541152487632,
  2491. },
  2492. Timestamp: 1541152487632,
  2493. },
  2494. {
  2495. Emitter: name,
  2496. Message: map[string]interface{}{
  2497. "temp": 25.5,
  2498. "hum": 62,
  2499. "ts": 1541152489252,
  2500. },
  2501. Timestamp: 1541152489252,
  2502. },
  2503. {
  2504. Emitter: name,
  2505. Message: map[string]interface{}{
  2506. "temp": 25.5,
  2507. "hum": 62,
  2508. "ts": 1541152499252,
  2509. },
  2510. Timestamp: 1541152499252,
  2511. },
  2512. }
  2513. case "sessionDemoE":
  2514. data = []*xsql.Tuple{
  2515. {
  2516. Emitter: name,
  2517. Message: map[string]interface{}{
  2518. "temp": 25.5,
  2519. "hum": 65,
  2520. "ts": 1541152486013,
  2521. },
  2522. Timestamp: 1541152486013,
  2523. },
  2524. {
  2525. Emitter: name,
  2526. Message: map[string]interface{}{
  2527. "temp": 28.1,
  2528. "hum": 75,
  2529. "ts": 1541152487932,
  2530. },
  2531. Timestamp: 1541152487932,
  2532. },
  2533. {
  2534. Emitter: name,
  2535. Message: map[string]interface{}{
  2536. "temp": 27.5,
  2537. "hum": 59,
  2538. "ts": 1541152486823,
  2539. },
  2540. Timestamp: 1541152486823,
  2541. },
  2542. {
  2543. Emitter: name,
  2544. Message: map[string]interface{}{
  2545. "temp": 25.5,
  2546. "hum": 62,
  2547. "ts": 1541152489252,
  2548. },
  2549. Timestamp: 1541152489252,
  2550. },
  2551. {
  2552. Emitter: name,
  2553. Message: map[string]interface{}{
  2554. "temp": 27.4,
  2555. "hum": 80,
  2556. "ts": 1541152488442,
  2557. },
  2558. Timestamp: 1541152488442,
  2559. },
  2560. {
  2561. Emitter: name,
  2562. Message: map[string]interface{}{
  2563. "temp": 26.2,
  2564. "hum": 63,
  2565. "ts": 1541152490062,
  2566. },
  2567. Timestamp: 1541152490062,
  2568. },
  2569. {
  2570. Emitter: name,
  2571. Message: map[string]interface{}{
  2572. "temp": 28.9,
  2573. "hum": 85,
  2574. "ts": 1541152491682,
  2575. },
  2576. Timestamp: 1541152491682,
  2577. },
  2578. {
  2579. Emitter: name,
  2580. Message: map[string]interface{}{
  2581. "temp": 26.8,
  2582. "hum": 71,
  2583. "ts": 1541152490872,
  2584. },
  2585. Timestamp: 1541152490872,
  2586. },
  2587. {
  2588. Emitter: name,
  2589. Message: map[string]interface{}{
  2590. "temp": 29.1,
  2591. "hum": 92,
  2592. "ts": 1541152492492,
  2593. },
  2594. Timestamp: 1541152492492,
  2595. },
  2596. {
  2597. Emitter: name,
  2598. Message: map[string]interface{}{
  2599. "temp": 30.9,
  2600. "hum": 87,
  2601. "ts": 1541152494112,
  2602. },
  2603. Timestamp: 1541152494112,
  2604. },
  2605. {
  2606. Emitter: name,
  2607. Message: map[string]interface{}{
  2608. "temp": 32.2,
  2609. "hum": 99,
  2610. "ts": 1541152493202,
  2611. },
  2612. Timestamp: 1541152493202,
  2613. },
  2614. {
  2615. Emitter: name,
  2616. Message: map[string]interface{}{
  2617. "temp": 32.2,
  2618. "hum": 99,
  2619. "ts": 1541152499202,
  2620. },
  2621. Timestamp: 1541152499202,
  2622. },
  2623. }
  2624. case "demoErr":
  2625. data = []*xsql.Tuple{
  2626. {
  2627. Emitter: name,
  2628. Message: map[string]interface{}{
  2629. "color": "red",
  2630. "size": 3,
  2631. "ts": 1541152486013,
  2632. },
  2633. Timestamp: 1541152486013,
  2634. },
  2635. {
  2636. Emitter: name,
  2637. Message: map[string]interface{}{
  2638. "color": 2,
  2639. "size": "blue",
  2640. "ts": 1541152487632,
  2641. },
  2642. Timestamp: 1541152487632,
  2643. },
  2644. {
  2645. Emitter: name,
  2646. Message: map[string]interface{}{
  2647. "color": "red",
  2648. "size": 1,
  2649. "ts": 1541152489252,
  2650. },
  2651. Timestamp: 1541152489252,
  2652. },
  2653. { //dropped item
  2654. Emitter: name,
  2655. Message: map[string]interface{}{
  2656. "color": "blue",
  2657. "size": 6,
  2658. "ts": 1541152486822,
  2659. },
  2660. Timestamp: 1541152486822,
  2661. },
  2662. {
  2663. Emitter: name,
  2664. Message: map[string]interface{}{
  2665. "color": "yellow",
  2666. "size": 4,
  2667. "ts": 1541152488442,
  2668. },
  2669. Timestamp: 1541152488442,
  2670. },
  2671. { //To lift the watermark and issue all windows
  2672. Emitter: name,
  2673. Message: map[string]interface{}{
  2674. "color": "yellow",
  2675. "size": 4,
  2676. "ts": 1541152492342,
  2677. },
  2678. Timestamp: 1541152488442,
  2679. },
  2680. }
  2681. }
  2682. return nodes.NewSourceNodeWithSource(name, test.NewMockSource(data[:size], done, true), map[string]string{
  2683. "DATASOURCE": name,
  2684. })
  2685. }
  2686. func TestEventWindow(t *testing.T) {
  2687. var tests = []struct {
  2688. name string
  2689. sql string
  2690. size int
  2691. r [][]map[string]interface{}
  2692. m map[string]interface{}
  2693. }{
  2694. {
  2695. name: `rule1`,
  2696. sql: `SELECT * FROM demoE GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  2697. size: 6,
  2698. r: [][]map[string]interface{}{
  2699. {{
  2700. "color": "red",
  2701. "size": float64(3),
  2702. "ts": float64(1541152486013),
  2703. }},
  2704. {{
  2705. "color": "red",
  2706. "size": float64(3),
  2707. "ts": float64(1541152486013),
  2708. }, {
  2709. "color": "blue",
  2710. "size": float64(2),
  2711. "ts": float64(1541152487632),
  2712. }},
  2713. {{
  2714. "color": "blue",
  2715. "size": float64(2),
  2716. "ts": float64(1541152487632),
  2717. }, {
  2718. "color": "yellow",
  2719. "size": float64(4),
  2720. "ts": float64(1541152488442),
  2721. }}, {{
  2722. "color": "yellow",
  2723. "size": float64(4),
  2724. "ts": float64(1541152488442),
  2725. }, {
  2726. "color": "red",
  2727. "size": float64(1),
  2728. "ts": float64(1541152489252),
  2729. }}, {{
  2730. "color": "red",
  2731. "size": float64(1),
  2732. "ts": float64(1541152489252),
  2733. }},
  2734. },
  2735. m: map[string]interface{}{
  2736. "op_preprocessor_demoE_0_exceptions_total": int64(0),
  2737. "op_preprocessor_demoE_0_process_latency_ms": int64(0),
  2738. "op_preprocessor_demoE_0_records_in_total": int64(6),
  2739. "op_preprocessor_demoE_0_records_out_total": int64(6),
  2740. "op_project_0_exceptions_total": int64(0),
  2741. "op_project_0_process_latency_ms": int64(0),
  2742. "op_project_0_records_in_total": int64(5),
  2743. "op_project_0_records_out_total": int64(5),
  2744. "sink_mockSink_0_exceptions_total": int64(0),
  2745. "sink_mockSink_0_records_in_total": int64(5),
  2746. "sink_mockSink_0_records_out_total": int64(5),
  2747. "source_demoE_0_exceptions_total": int64(0),
  2748. "source_demoE_0_records_in_total": int64(6),
  2749. "source_demoE_0_records_out_total": int64(6),
  2750. "op_window_0_exceptions_total": int64(0),
  2751. "op_window_0_process_latency_ms": int64(0),
  2752. "op_window_0_records_in_total": int64(6),
  2753. "op_window_0_records_out_total": int64(5),
  2754. },
  2755. }, {
  2756. name: `rule2`,
  2757. sql: `SELECT color, ts FROM demoE where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  2758. size: 6,
  2759. r: [][]map[string]interface{}{
  2760. {{
  2761. "color": "red",
  2762. "ts": float64(1541152486013),
  2763. }},
  2764. {{
  2765. "color": "yellow",
  2766. "ts": float64(1541152488442),
  2767. }},
  2768. },
  2769. m: map[string]interface{}{
  2770. "op_preprocessor_demoE_0_exceptions_total": int64(0),
  2771. "op_preprocessor_demoE_0_process_latency_ms": int64(0),
  2772. "op_preprocessor_demoE_0_records_in_total": int64(6),
  2773. "op_preprocessor_demoE_0_records_out_total": int64(6),
  2774. "op_project_0_exceptions_total": int64(0),
  2775. "op_project_0_process_latency_ms": int64(0),
  2776. "op_project_0_records_in_total": int64(2),
  2777. "op_project_0_records_out_total": int64(2),
  2778. "sink_mockSink_0_exceptions_total": int64(0),
  2779. "sink_mockSink_0_records_in_total": int64(2),
  2780. "sink_mockSink_0_records_out_total": int64(2),
  2781. "source_demoE_0_exceptions_total": int64(0),
  2782. "source_demoE_0_records_in_total": int64(6),
  2783. "source_demoE_0_records_out_total": int64(6),
  2784. "op_window_0_exceptions_total": int64(0),
  2785. "op_window_0_process_latency_ms": int64(0),
  2786. "op_window_0_records_in_total": int64(6),
  2787. "op_window_0_records_out_total": int64(4),
  2788. "op_filter_0_exceptions_total": int64(0),
  2789. "op_filter_0_process_latency_ms": int64(0),
  2790. "op_filter_0_records_in_total": int64(4),
  2791. "op_filter_0_records_out_total": int64(2),
  2792. },
  2793. }, {
  2794. name: `rule3`,
  2795. sql: `SELECT color, temp, ts FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1)`,
  2796. size: 6,
  2797. r: [][]map[string]interface{}{
  2798. {{
  2799. "color": "red",
  2800. "temp": 25.5,
  2801. "ts": float64(1541152486013),
  2802. }}, {{
  2803. "color": "red",
  2804. "temp": 25.5,
  2805. "ts": float64(1541152486013),
  2806. }}, {{
  2807. "color": "blue",
  2808. "temp": 28.1,
  2809. "ts": float64(1541152487632),
  2810. }}, {{
  2811. "color": "blue",
  2812. "temp": 28.1,
  2813. "ts": float64(1541152487632),
  2814. }, {
  2815. "color": "yellow",
  2816. "temp": 27.4,
  2817. "ts": float64(1541152488442),
  2818. }}, {{
  2819. "color": "yellow",
  2820. "temp": 27.4,
  2821. "ts": float64(1541152488442),
  2822. }, {
  2823. "color": "red",
  2824. "temp": 25.5,
  2825. "ts": float64(1541152489252),
  2826. }},
  2827. },
  2828. m: map[string]interface{}{
  2829. "op_preprocessor_demoE_0_exceptions_total": int64(0),
  2830. "op_preprocessor_demoE_0_process_latency_ms": int64(0),
  2831. "op_preprocessor_demoE_0_records_in_total": int64(6),
  2832. "op_preprocessor_demoE_0_records_out_total": int64(6),
  2833. "op_preprocessor_demo1E_0_exceptions_total": int64(0),
  2834. "op_preprocessor_demo1E_0_process_latency_ms": int64(0),
  2835. "op_preprocessor_demo1E_0_records_in_total": int64(6),
  2836. "op_preprocessor_demo1E_0_records_out_total": int64(6),
  2837. "op_project_0_exceptions_total": int64(0),
  2838. "op_project_0_process_latency_ms": int64(0),
  2839. "op_project_0_records_in_total": int64(5),
  2840. "op_project_0_records_out_total": int64(5),
  2841. "sink_mockSink_0_exceptions_total": int64(0),
  2842. "sink_mockSink_0_records_in_total": int64(5),
  2843. "sink_mockSink_0_records_out_total": int64(5),
  2844. "source_demoE_0_exceptions_total": int64(0),
  2845. "source_demoE_0_records_in_total": int64(6),
  2846. "source_demoE_0_records_out_total": int64(6),
  2847. "source_demo1E_0_exceptions_total": int64(0),
  2848. "source_demo1E_0_records_in_total": int64(6),
  2849. "source_demo1E_0_records_out_total": int64(6),
  2850. "op_window_0_exceptions_total": int64(0),
  2851. "op_window_0_process_latency_ms": int64(0),
  2852. "op_window_0_records_in_total": int64(12),
  2853. "op_window_0_records_out_total": int64(5),
  2854. "op_join_0_exceptions_total": int64(0),
  2855. "op_join_0_process_latency_ms": int64(0),
  2856. "op_join_0_records_in_total": int64(5),
  2857. "op_join_0_records_out_total": int64(5),
  2858. },
  2859. }, {
  2860. name: `rule4`,
  2861. sql: `SELECT color FROM demoE GROUP BY SlidingWindow(ss, 2), color ORDER BY color`,
  2862. size: 6,
  2863. r: [][]map[string]interface{}{
  2864. {{
  2865. "color": "red",
  2866. }}, {{
  2867. "color": "blue",
  2868. }, {
  2869. "color": "red",
  2870. }}, {{
  2871. "color": "blue",
  2872. }, {
  2873. "color": "yellow",
  2874. }}, {{
  2875. "color": "blue",
  2876. }, {
  2877. "color": "red",
  2878. }, {
  2879. "color": "yellow",
  2880. }},
  2881. },
  2882. m: map[string]interface{}{
  2883. "op_preprocessor_demoE_0_exceptions_total": int64(0),
  2884. "op_preprocessor_demoE_0_process_latency_ms": int64(0),
  2885. "op_preprocessor_demoE_0_records_in_total": int64(6),
  2886. "op_preprocessor_demoE_0_records_out_total": int64(6),
  2887. "op_project_0_exceptions_total": int64(0),
  2888. "op_project_0_process_latency_ms": int64(0),
  2889. "op_project_0_records_in_total": int64(4),
  2890. "op_project_0_records_out_total": int64(4),
  2891. "sink_mockSink_0_exceptions_total": int64(0),
  2892. "sink_mockSink_0_records_in_total": int64(4),
  2893. "sink_mockSink_0_records_out_total": int64(4),
  2894. "source_demoE_0_exceptions_total": int64(0),
  2895. "source_demoE_0_records_in_total": int64(6),
  2896. "source_demoE_0_records_out_total": int64(6),
  2897. "op_window_0_exceptions_total": int64(0),
  2898. "op_window_0_process_latency_ms": int64(0),
  2899. "op_window_0_records_in_total": int64(6),
  2900. "op_window_0_records_out_total": int64(4),
  2901. "op_aggregate_0_exceptions_total": int64(0),
  2902. "op_aggregate_0_process_latency_ms": int64(0),
  2903. "op_aggregate_0_records_in_total": int64(4),
  2904. "op_aggregate_0_records_out_total": int64(4),
  2905. "op_order_0_exceptions_total": int64(0),
  2906. "op_order_0_process_latency_ms": int64(0),
  2907. "op_order_0_records_in_total": int64(4),
  2908. "op_order_0_records_out_total": int64(4),
  2909. },
  2910. }, {
  2911. name: `rule5`,
  2912. sql: `SELECT temp FROM sessionDemoE GROUP BY SessionWindow(ss, 2, 1) `,
  2913. size: 12,
  2914. r: [][]map[string]interface{}{
  2915. {{
  2916. "temp": 25.5,
  2917. }}, {{
  2918. "temp": 28.1,
  2919. }, {
  2920. "temp": 27.4,
  2921. }, {
  2922. "temp": 25.5,
  2923. }}, {{
  2924. "temp": 26.2,
  2925. }, {
  2926. "temp": 26.8,
  2927. }, {
  2928. "temp": 28.9,
  2929. }, {
  2930. "temp": 29.1,
  2931. }, {
  2932. "temp": 32.2,
  2933. }}, {{
  2934. "temp": 30.9,
  2935. }},
  2936. },
  2937. m: map[string]interface{}{
  2938. "op_preprocessor_sessionDemoE_0_exceptions_total": int64(0),
  2939. "op_preprocessor_sessionDemoE_0_process_latency_ms": int64(0),
  2940. "op_preprocessor_sessionDemoE_0_records_in_total": int64(12),
  2941. "op_preprocessor_sessionDemoE_0_records_out_total": int64(12),
  2942. "op_project_0_exceptions_total": int64(0),
  2943. "op_project_0_process_latency_ms": int64(0),
  2944. "op_project_0_records_in_total": int64(4),
  2945. "op_project_0_records_out_total": int64(4),
  2946. "sink_mockSink_0_exceptions_total": int64(0),
  2947. "sink_mockSink_0_records_in_total": int64(4),
  2948. "sink_mockSink_0_records_out_total": int64(4),
  2949. "source_sessionDemoE_0_exceptions_total": int64(0),
  2950. "source_sessionDemoE_0_records_in_total": int64(12),
  2951. "source_sessionDemoE_0_records_out_total": int64(12),
  2952. "op_window_0_exceptions_total": int64(0),
  2953. "op_window_0_process_latency_ms": int64(0),
  2954. "op_window_0_records_in_total": int64(12),
  2955. "op_window_0_records_out_total": int64(4),
  2956. },
  2957. }, {
  2958. name: `rule6`,
  2959. sql: `SELECT max(temp) as m, count(color) as c FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1)`,
  2960. size: 6,
  2961. r: [][]map[string]interface{}{
  2962. {{
  2963. "m": 25.5,
  2964. "c": float64(1),
  2965. }}, {{
  2966. "m": 25.5,
  2967. "c": float64(1),
  2968. }}, {{
  2969. "m": 28.1,
  2970. "c": float64(1),
  2971. }}, {{
  2972. "m": 28.1,
  2973. "c": float64(2),
  2974. }}, {{
  2975. "m": 27.4,
  2976. "c": float64(2),
  2977. }},
  2978. },
  2979. m: map[string]interface{}{
  2980. "op_preprocessor_demoE_0_exceptions_total": int64(0),
  2981. "op_preprocessor_demoE_0_process_latency_ms": int64(0),
  2982. "op_preprocessor_demoE_0_records_in_total": int64(6),
  2983. "op_preprocessor_demoE_0_records_out_total": int64(6),
  2984. "op_preprocessor_demo1E_0_exceptions_total": int64(0),
  2985. "op_preprocessor_demo1E_0_process_latency_ms": int64(0),
  2986. "op_preprocessor_demo1E_0_records_in_total": int64(6),
  2987. "op_preprocessor_demo1E_0_records_out_total": int64(6),
  2988. "op_project_0_exceptions_total": int64(0),
  2989. "op_project_0_process_latency_ms": int64(0),
  2990. "op_project_0_records_in_total": int64(5),
  2991. "op_project_0_records_out_total": int64(5),
  2992. "sink_mockSink_0_exceptions_total": int64(0),
  2993. "sink_mockSink_0_records_in_total": int64(5),
  2994. "sink_mockSink_0_records_out_total": int64(5),
  2995. "source_demoE_0_exceptions_total": int64(0),
  2996. "source_demoE_0_records_in_total": int64(6),
  2997. "source_demoE_0_records_out_total": int64(6),
  2998. "source_demo1E_0_exceptions_total": int64(0),
  2999. "source_demo1E_0_records_in_total": int64(6),
  3000. "source_demo1E_0_records_out_total": int64(6),
  3001. "op_window_0_exceptions_total": int64(0),
  3002. "op_window_0_records_in_total": int64(12),
  3003. "op_window_0_records_out_total": int64(5),
  3004. "op_join_0_exceptions_total": int64(0),
  3005. "op_join_0_process_latency_ms": int64(0),
  3006. "op_join_0_records_in_total": int64(5),
  3007. "op_join_0_records_out_total": int64(5),
  3008. },
  3009. }, {
  3010. name: `rule7`,
  3011. sql: `SELECT * FROM demoErr GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  3012. size: 6,
  3013. r: [][]map[string]interface{}{
  3014. {{
  3015. "error": "error in preprocessor: invalid data type for color, expect string but found int(2)",
  3016. }},
  3017. {{
  3018. "color": "red",
  3019. "size": float64(3),
  3020. "ts": float64(1541152486013),
  3021. }},
  3022. {{
  3023. "color": "red",
  3024. "size": float64(3),
  3025. "ts": float64(1541152486013),
  3026. }},
  3027. {{
  3028. "color": "yellow",
  3029. "size": float64(4),
  3030. "ts": float64(1541152488442),
  3031. }}, {{
  3032. "color": "yellow",
  3033. "size": float64(4),
  3034. "ts": float64(1541152488442),
  3035. }, {
  3036. "color": "red",
  3037. "size": float64(1),
  3038. "ts": float64(1541152489252),
  3039. }}, {{
  3040. "color": "red",
  3041. "size": float64(1),
  3042. "ts": float64(1541152489252),
  3043. }},
  3044. },
  3045. m: map[string]interface{}{
  3046. "op_preprocessor_demoErr_0_exceptions_total": int64(1),
  3047. "op_preprocessor_demoErr_0_process_latency_ms": int64(0),
  3048. "op_preprocessor_demoErr_0_records_in_total": int64(6),
  3049. "op_preprocessor_demoErr_0_records_out_total": int64(5),
  3050. "op_project_0_exceptions_total": int64(1),
  3051. "op_project_0_process_latency_ms": int64(0),
  3052. "op_project_0_records_in_total": int64(6),
  3053. "op_project_0_records_out_total": int64(5),
  3054. "sink_mockSink_0_exceptions_total": int64(0),
  3055. "sink_mockSink_0_records_in_total": int64(6),
  3056. "sink_mockSink_0_records_out_total": int64(6),
  3057. "source_demoErr_0_exceptions_total": int64(0),
  3058. "source_demoErr_0_records_in_total": int64(6),
  3059. "source_demoErr_0_records_out_total": int64(6),
  3060. "op_window_0_exceptions_total": int64(1),
  3061. "op_window_0_process_latency_ms": int64(0),
  3062. "op_window_0_records_in_total": int64(6),
  3063. "op_window_0_records_out_total": int64(5),
  3064. },
  3065. },
  3066. }
  3067. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  3068. createEventStreams(t)
  3069. defer dropEventStreams(t)
  3070. for i, tt := range tests {
  3071. test.ResetClock(1541152486000)
  3072. p := NewRuleProcessor(DbDir)
  3073. parser := xsql.NewParser(strings.NewReader(tt.sql))
  3074. var (
  3075. sources []*nodes.SourceNode
  3076. syncs []chan int
  3077. )
  3078. if stmt, err := xsql.Language.Parse(parser); err != nil {
  3079. t.Errorf("parse sql %s error: %s", tt.sql, err)
  3080. } else {
  3081. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  3082. t.Errorf("sql %s is not a select statement", tt.sql)
  3083. } else {
  3084. streams := xsql.GetStreams(selectStmt)
  3085. for _, stream := range streams {
  3086. next := make(chan int)
  3087. syncs = append(syncs, next)
  3088. source := getEventMockSource(stream, next, tt.size)
  3089. sources = append(sources, source)
  3090. }
  3091. }
  3092. }
  3093. tp, inputs, err := p.createTopoWithSources(&api.Rule{
  3094. Id: tt.name, Sql: tt.sql,
  3095. Options: map[string]interface{}{
  3096. "isEventTime": true,
  3097. "lateTolerance": float64(1000),
  3098. },
  3099. }, sources)
  3100. if err != nil {
  3101. t.Error(err)
  3102. }
  3103. mockSink := test.NewMockSink()
  3104. sink := nodes.NewSinkNodeWithSink("mockSink", mockSink, nil)
  3105. tp.AddSink(inputs, sink)
  3106. errCh := tp.Open()
  3107. func() {
  3108. for i := 0; i < tt.size*len(syncs); i++ {
  3109. syncs[i%len(syncs)] <- i
  3110. for {
  3111. time.Sleep(1)
  3112. if getMetric(tp, "op_window_0_records_in_total") == (i + 1) {
  3113. break
  3114. }
  3115. }
  3116. select {
  3117. case err = <-errCh:
  3118. t.Log(err)
  3119. tp.Cancel()
  3120. return
  3121. default:
  3122. }
  3123. }
  3124. mockClock := test.GetMockClock()
  3125. mockClock.Add(1000 * time.Millisecond)
  3126. retry := 100
  3127. for ; retry > 0; retry-- {
  3128. if err := compareMetrics(tp, tt.m, tt.sql); err == nil {
  3129. break
  3130. }
  3131. t.Logf("wait to try another %d times", retry)
  3132. time.Sleep(time.Duration(retry) * time.Millisecond)
  3133. }
  3134. if retry == 0 {
  3135. err := compareMetrics(tp, tt.m, tt.sql)
  3136. t.Errorf("could not get correct metrics: %v", err)
  3137. }
  3138. }()
  3139. results := mockSink.GetResults()
  3140. var maps [][]map[string]interface{}
  3141. for _, v := range results {
  3142. var mapRes []map[string]interface{}
  3143. err := json.Unmarshal(v, &mapRes)
  3144. if err != nil {
  3145. t.Errorf("Failed to parse the input into map")
  3146. continue
  3147. }
  3148. maps = append(maps, mapRes)
  3149. }
  3150. if !reflect.DeepEqual(tt.r, maps) {
  3151. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  3152. }
  3153. if err := compareMetrics(tp, tt.m, tt.sql); err != nil {
  3154. t.Errorf("%d. %q\n\n%v", i, tt.sql, err)
  3155. }
  3156. tp.Cancel()
  3157. }
  3158. }
  3159. func getMetric(tp *xstream.TopologyNew, name string) int {
  3160. keys, values := tp.GetMetrics()
  3161. for index, key := range keys {
  3162. if key == name {
  3163. return int(values[index].(int64))
  3164. }
  3165. }
  3166. fmt.Println("can't find " + name)
  3167. return 0
  3168. }
  3169. func compareMetrics(tp *xstream.TopologyNew, m map[string]interface{}, sql string) (err error) {
  3170. keys, values := tp.GetMetrics()
  3171. //for i, k := range keys {
  3172. // log.Printf("%s:%v", k, values[i])
  3173. //}
  3174. for k, v := range m {
  3175. var (
  3176. index int
  3177. key string
  3178. matched bool
  3179. )
  3180. for index, key = range keys {
  3181. if k == key {
  3182. if strings.HasSuffix(k, "process_latency_ms") {
  3183. if values[index].(int64) >= v.(int64) {
  3184. matched = true
  3185. continue
  3186. } else {
  3187. break
  3188. }
  3189. }
  3190. if values[index] == v {
  3191. matched = true
  3192. }
  3193. break
  3194. }
  3195. }
  3196. if matched {
  3197. continue
  3198. }
  3199. //do not find
  3200. if index < len(values) {
  3201. return fmt.Errorf("metrics mismatch for %s:\n\nexp=%#v(%t)\n\ngot=%#v(%t)\n\n", k, v, v, values[index], values[index])
  3202. } else {
  3203. return fmt.Errorf("metrics mismatch for %s:\n\nexp=%#v\n\ngot=nil\n\n", k, v)
  3204. }
  3205. }
  3206. return nil
  3207. }
  3208. func errstring(err error) string {
  3209. if err != nil {
  3210. return err.Error()
  3211. }
  3212. return ""
  3213. }