xsql_processor_test.go 93 KB


  1. package processors
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/emqx/kuiper/xstream"
  8. "github.com/emqx/kuiper/xstream/api"
  9. "github.com/emqx/kuiper/xstream/nodes"
  10. "github.com/emqx/kuiper/xstream/test"
  11. "os"
  12. "path"
  13. "reflect"
  14. "strings"
  15. "testing"
  16. "time"
  17. )
  18. var DbDir = getDbDir()
  19. func getDbDir() string {
  20. common.InitConf()
  21. dbDir, err := common.GetDataLoc()
  22. if err != nil {
  23. log.Panic(err)
  24. }
  25. log.Infof("db location is %s", dbDir)
  26. return dbDir
  27. }
  28. func cleanStateData() {
  29. dbDir, err := common.GetDataLoc()
  30. if err != nil {
  31. log.Panic(err)
  32. }
  33. c := path.Join(dbDir, "checkpoints")
  34. err = os.RemoveAll(c)
  35. if err != nil {
  36. log.Errorf("%s", err)
  37. }
  38. s := path.Join(dbDir, "sink", "cache")
  39. err = os.RemoveAll(s)
  40. if err != nil {
  41. log.Errorf("%s", err)
  42. }
  43. }
  44. func TestStreamCreateProcessor(t *testing.T) {
  45. var tests = []struct {
  46. s string
  47. r []string
  48. err string
  49. }{
  50. {
  51. s: `SHOW STREAMS;`,
  52. r: []string{"No stream definitions are found."},
  53. },
  54. {
  55. s: `EXPLAIN STREAM topic1;`,
  56. err: "Stream topic1 is not found.",
  57. },
  58. {
  59. s: `CREATE STREAM topic1 (
  60. USERID BIGINT,
  61. FIRST_NAME STRING,
  62. LAST_NAME STRING,
  63. NICKNAMES ARRAY(STRING),
  64. Gender BOOLEAN,
  65. ADDRESS STRUCT(STREET_NAME STRING, NUMBER BIGINT),
  66. ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID");`,
  67. r: []string{"Stream topic1 is created."},
  68. },
  69. {
  70. s: `CREATE STREAM ` + "`stream`" + ` (
  71. USERID BIGINT,
  72. FIRST_NAME STRING,
  73. LAST_NAME STRING,
  74. NICKNAMES ARRAY(STRING),
  75. Gender BOOLEAN,
  76. ` + "`地址`" + ` STRUCT(STREET_NAME STRING, NUMBER BIGINT),
  77. ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID");`,
  78. r: []string{"Stream stream is created."},
  79. },
  80. {
  81. s: `CREATE STREAM topic1 (
  82. USERID BIGINT,
  83. ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID");`,
  84. err: "Create stream fails: Item topic1 already exists.",
  85. },
  86. {
  87. s: `EXPLAIN STREAM topic1;`,
  88. r: []string{"TO BE SUPPORTED"},
  89. },
  90. {
  91. s: `DESCRIBE STREAM topic1;`,
  92. r: []string{"Fields\n--------------------------------------------------------------------------------\nUSERID\tbigint\nFIRST_NAME\tstring\nLAST_NAME\tstring\nNICKNAMES\t" +
  93. "array(string)\nGender\tboolean\nADDRESS\tstruct(STREET_NAME string, NUMBER bigint)\n\n" +
  94. "DATASOURCE: users\nFORMAT: AVRO\nKEY: USERID\n"},
  95. },
  96. {
  97. s: `DROP STREAM topic1;`,
  98. r: []string{"Stream topic1 is dropped."},
  99. },
  100. {
  101. s: `SHOW STREAMS;`,
  102. r: []string{"stream"},
  103. },
  104. {
  105. s: `DESCRIBE STREAM topic1;`,
  106. err: "Stream topic1 is not found.",
  107. },
  108. {
  109. s: `DROP STREAM topic1;`,
  110. err: "Drop stream fails: topic1 is not found.",
  111. },
  112. {
  113. s: "DROP STREAM `stream`;",
  114. r: []string{"Stream stream is dropped."},
  115. },
  116. }
  117. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  118. streamDB := path.Join(getDbDir(), "streamTest")
  119. for i, tt := range tests {
  120. results, err := NewStreamProcessor(streamDB).ExecStmt(tt.s)
  121. if !reflect.DeepEqual(tt.err, errstring(err)) {
  122. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.s, tt.err, err)
  123. } else if tt.err == "" {
  124. if !reflect.DeepEqual(tt.r, results) {
  125. t.Errorf("%d. %q\n\nstmt mismatch:\nexp=%s\ngot=%#v\n\n", i, tt.s, tt.r, results)
  126. }
  127. }
  128. }
  129. }
  130. func createStreams(t *testing.T) {
  131. p := NewStreamProcessor(path.Join(DbDir, "stream"))
  132. demo := `CREATE STREAM demo (
  133. color STRING,
  134. size BIGINT,
  135. ts BIGINT
  136. ) WITH (DATASOURCE="demo", FORMAT="json", KEY="ts");`
  137. _, err := p.ExecStmt(demo)
  138. if err != nil {
  139. t.Log(err)
  140. }
  141. demoE := `CREATE STREAM demoE (
  142. color STRING,
  143. size BIGINT,
  144. ts BIGINT
  145. ) WITH (DATASOURCE="demoE", FORMAT="json", KEY="ts");`
  146. _, err = p.ExecStmt(demoE)
  147. if err != nil {
  148. t.Log(err)
  149. }
  150. demo1 := `CREATE STREAM demo1 (
  151. temp FLOAT,
  152. hum BIGINT,` +
  153. "`from`" + ` STRING,
  154. ts BIGINT
  155. ) WITH (DATASOURCE="demo1", FORMAT="json", KEY="ts");`
  156. _, err = p.ExecStmt(demo1)
  157. if err != nil {
  158. t.Log(err)
  159. }
  160. sessionDemo := `CREATE STREAM sessionDemo (
  161. temp FLOAT,
  162. hum BIGINT,
  163. ts BIGINT
  164. ) WITH (DATASOURCE="sessionDemo", FORMAT="json", KEY="ts");`
  165. _, err = p.ExecStmt(sessionDemo)
  166. if err != nil {
  167. t.Log(err)
  168. }
  169. }
  170. func dropStreams(t *testing.T) {
  171. p := NewStreamProcessor(path.Join(DbDir, "stream"))
  172. demo := `DROP STREAM demo`
  173. _, err := p.ExecStmt(demo)
  174. if err != nil {
  175. t.Log(err)
  176. }
  177. demoE := `DROP STREAM demoE`
  178. _, err = p.ExecStmt(demoE)
  179. if err != nil {
  180. t.Log(err)
  181. }
  182. demo1 := `DROP STREAM demo1`
  183. _, err = p.ExecStmt(demo1)
  184. if err != nil {
  185. t.Log(err)
  186. }
  187. sessionDemo := `DROP STREAM sessionDemo`
  188. _, err = p.ExecStmt(sessionDemo)
  189. if err != nil {
  190. t.Log(err)
  191. }
  192. }
  193. func createSchemalessStreams(t *testing.T) {
  194. p := NewStreamProcessor(path.Join(DbDir, "stream"))
  195. demo := `CREATE STREAM ldemo (
  196. ) WITH (DATASOURCE="ldemo", FORMAT="json");`
  197. _, err := p.ExecStmt(demo)
  198. if err != nil {
  199. t.Log(err)
  200. }
  201. demo1 := `CREATE STREAM ldemo1 (
  202. ) WITH (DATASOURCE="ldemo1", FORMAT="json");`
  203. _, err = p.ExecStmt(demo1)
  204. if err != nil {
  205. t.Log(err)
  206. }
  207. sessionDemo := `CREATE STREAM lsessionDemo (
  208. ) WITH (DATASOURCE="lsessionDemo", FORMAT="json");`
  209. _, err = p.ExecStmt(sessionDemo)
  210. if err != nil {
  211. t.Log(err)
  212. }
  213. }
  214. func dropSchemalessStreams(t *testing.T) {
  215. p := NewStreamProcessor(path.Join(DbDir, "stream"))
  216. demo := `DROP STREAM ldemo`
  217. _, err := p.ExecStmt(demo)
  218. if err != nil {
  219. t.Log(err)
  220. }
  221. demo1 := `DROP STREAM ldemo1`
  222. _, err = p.ExecStmt(demo1)
  223. if err != nil {
  224. t.Log(err)
  225. }
  226. sessionDemo := `DROP STREAM lsessionDemo`
  227. _, err = p.ExecStmt(sessionDemo)
  228. if err != nil {
  229. t.Log(err)
  230. }
  231. }
  232. func getMockSource(name string, done <-chan int, size int) *nodes.SourceNode {
  233. var data []*xsql.Tuple
  234. switch name {
  235. case "demo":
  236. data = []*xsql.Tuple{
  237. {
  238. Emitter: name,
  239. Message: map[string]interface{}{
  240. "color": "red",
  241. "size": 3,
  242. "ts": 1541152486013,
  243. },
  244. Timestamp: 1541152486013,
  245. },
  246. {
  247. Emitter: name,
  248. Message: map[string]interface{}{
  249. "color": "blue",
  250. "size": 6,
  251. "ts": 1541152486822,
  252. },
  253. Timestamp: 1541152486822,
  254. },
  255. {
  256. Emitter: name,
  257. Message: map[string]interface{}{
  258. "color": "blue",
  259. "size": 2,
  260. "ts": 1541152487632,
  261. },
  262. Timestamp: 1541152487632,
  263. },
  264. {
  265. Emitter: name,
  266. Message: map[string]interface{}{
  267. "color": "yellow",
  268. "size": 4,
  269. "ts": 1541152488442,
  270. },
  271. Timestamp: 1541152488442,
  272. },
  273. {
  274. Emitter: name,
  275. Message: map[string]interface{}{
  276. "color": "red",
  277. "size": 1,
  278. "ts": 1541152489252,
  279. },
  280. Timestamp: 1541152489252,
  281. },
  282. }
  283. case "demoE":
  284. data = []*xsql.Tuple{
  285. {
  286. Emitter: name,
  287. Message: map[string]interface{}{
  288. "color": 3,
  289. "size": "red",
  290. "ts": 1541152486013,
  291. },
  292. Timestamp: 1541152486013,
  293. },
  294. {
  295. Emitter: name,
  296. Message: map[string]interface{}{
  297. "color": "blue",
  298. "size": 6,
  299. "ts": "1541152486822",
  300. },
  301. Timestamp: 1541152486822,
  302. },
  303. {
  304. Emitter: name,
  305. Message: map[string]interface{}{
  306. "color": "blue",
  307. "size": 2,
  308. "ts": 1541152487632,
  309. },
  310. Timestamp: 1541152487632,
  311. },
  312. {
  313. Emitter: name,
  314. Message: map[string]interface{}{
  315. "color": 7,
  316. "size": 4,
  317. "ts": 1541152488442,
  318. },
  319. Timestamp: 1541152488442,
  320. },
  321. {
  322. Emitter: name,
  323. Message: map[string]interface{}{
  324. "color": "red",
  325. "size": "blue",
  326. "ts": 1541152489252,
  327. },
  328. Timestamp: 1541152489252,
  329. },
  330. }
  331. case "demo1":
  332. data = []*xsql.Tuple{
  333. {
  334. Emitter: name,
  335. Message: map[string]interface{}{
  336. "temp": 25.5,
  337. "hum": 65,
  338. "from": "device1",
  339. "ts": 1541152486013,
  340. },
  341. Timestamp: 1541152486013,
  342. },
  343. {
  344. Emitter: name,
  345. Message: map[string]interface{}{
  346. "temp": 27.5,
  347. "hum": 59,
  348. "from": "device2",
  349. "ts": 1541152486823,
  350. },
  351. Timestamp: 1541152486823,
  352. },
  353. {
  354. Emitter: name,
  355. Message: map[string]interface{}{
  356. "temp": 28.1,
  357. "hum": 75,
  358. "from": "device3",
  359. "ts": 1541152487632,
  360. },
  361. Timestamp: 1541152487632,
  362. },
  363. {
  364. Emitter: name,
  365. Message: map[string]interface{}{
  366. "temp": 27.4,
  367. "hum": 80,
  368. "from": "device1",
  369. "ts": 1541152488442,
  370. },
  371. Timestamp: 1541152488442,
  372. },
  373. {
  374. Emitter: name,
  375. Message: map[string]interface{}{
  376. "temp": 25.5,
  377. "hum": 62,
  378. "from": "device3",
  379. "ts": 1541152489252,
  380. },
  381. Timestamp: 1541152489252,
  382. },
  383. }
  384. case "sessionDemo":
  385. data = []*xsql.Tuple{
  386. {
  387. Emitter: name,
  388. Message: map[string]interface{}{
  389. "temp": 25.5,
  390. "hum": 65,
  391. "ts": 1541152486013,
  392. },
  393. Timestamp: 1541152486013,
  394. },
  395. {
  396. Emitter: name,
  397. Message: map[string]interface{}{
  398. "temp": 27.5,
  399. "hum": 59,
  400. "ts": 1541152486823,
  401. },
  402. Timestamp: 1541152486823,
  403. },
  404. {
  405. Emitter: name,
  406. Message: map[string]interface{}{
  407. "temp": 28.1,
  408. "hum": 75,
  409. "ts": 1541152487932,
  410. },
  411. Timestamp: 1541152487932,
  412. },
  413. {
  414. Emitter: name,
  415. Message: map[string]interface{}{
  416. "temp": 27.4,
  417. "hum": 80,
  418. "ts": 1541152488442,
  419. },
  420. Timestamp: 1541152488442,
  421. },
  422. {
  423. Emitter: name,
  424. Message: map[string]interface{}{
  425. "temp": 25.5,
  426. "hum": 62,
  427. "ts": 1541152489252,
  428. },
  429. Timestamp: 1541152489252,
  430. },
  431. {
  432. Emitter: name,
  433. Message: map[string]interface{}{
  434. "temp": 26.2,
  435. "hum": 63,
  436. "ts": 1541152490062,
  437. },
  438. Timestamp: 1541152490062,
  439. },
  440. {
  441. Emitter: name,
  442. Message: map[string]interface{}{
  443. "temp": 26.8,
  444. "hum": 71,
  445. "ts": 1541152490872,
  446. },
  447. Timestamp: 1541152490872,
  448. },
  449. {
  450. Emitter: name,
  451. Message: map[string]interface{}{
  452. "temp": 28.9,
  453. "hum": 85,
  454. "ts": 1541152491682,
  455. },
  456. Timestamp: 1541152491682,
  457. },
  458. {
  459. Emitter: name,
  460. Message: map[string]interface{}{
  461. "temp": 29.1,
  462. "hum": 92,
  463. "ts": 1541152492492,
  464. },
  465. Timestamp: 1541152492492,
  466. },
  467. {
  468. Emitter: name,
  469. Message: map[string]interface{}{
  470. "temp": 32.2,
  471. "hum": 99,
  472. "ts": 1541152493202,
  473. },
  474. Timestamp: 1541152493202,
  475. },
  476. {
  477. Emitter: name,
  478. Message: map[string]interface{}{
  479. "temp": 30.9,
  480. "hum": 87,
  481. "ts": 1541152494112,
  482. },
  483. Timestamp: 1541152494112,
  484. },
  485. }
  486. }
  487. return nodes.NewSourceNodeWithSource(name, test.NewMockSource(data[:size], done, false), map[string]string{
  488. "DATASOURCE": name,
  489. })
  490. }
  491. func TestSingleSQL(t *testing.T) {
  492. var tests = []struct {
  493. name string
  494. sql string
  495. r [][]map[string]interface{}
  496. s string
  497. m map[string]interface{}
  498. }{
  499. {
  500. name: `rule1`,
  501. sql: `SELECT * FROM demo`,
  502. r: [][]map[string]interface{}{
  503. {{
  504. "color": "red",
  505. "size": float64(3),
  506. "ts": float64(1541152486013),
  507. }},
  508. {{
  509. "color": "blue",
  510. "size": float64(6),
  511. "ts": float64(1541152486822),
  512. }},
  513. {{
  514. "color": "blue",
  515. "size": float64(2),
  516. "ts": float64(1541152487632),
  517. }},
  518. {{
  519. "color": "yellow",
  520. "size": float64(4),
  521. "ts": float64(1541152488442),
  522. }},
  523. {{
  524. "color": "red",
  525. "size": float64(1),
  526. "ts": float64(1541152489252),
  527. }},
  528. },
  529. m: map[string]interface{}{
  530. "op_preprocessor_demo_0_exceptions_total": int64(0),
  531. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  532. "op_preprocessor_demo_0_records_in_total": int64(5),
  533. "op_preprocessor_demo_0_records_out_total": int64(5),
  534. "op_project_0_exceptions_total": int64(0),
  535. "op_project_0_process_latency_ms": int64(0),
  536. "op_project_0_records_in_total": int64(5),
  537. "op_project_0_records_out_total": int64(5),
  538. "sink_mockSink_0_exceptions_total": int64(0),
  539. "sink_mockSink_0_records_in_total": int64(5),
  540. "sink_mockSink_0_records_out_total": int64(5),
  541. "source_demo_0_exceptions_total": int64(0),
  542. "source_demo_0_records_in_total": int64(5),
  543. "source_demo_0_records_out_total": int64(5),
  544. },
  545. s: "sink_mockSink_0_records_out_total",
  546. }, {
  547. name: `rule2`,
  548. sql: `SELECT color, ts FROM demo where size > 3`,
  549. r: [][]map[string]interface{}{
  550. {{
  551. "color": "blue",
  552. "ts": float64(1541152486822),
  553. }},
  554. {{
  555. "color": "yellow",
  556. "ts": float64(1541152488442),
  557. }},
  558. },
  559. s: "op_filter_0_records_in_total",
  560. m: map[string]interface{}{
  561. "op_preprocessor_demo_0_exceptions_total": int64(0),
  562. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  563. "op_preprocessor_demo_0_records_in_total": int64(5),
  564. "op_preprocessor_demo_0_records_out_total": int64(5),
  565. "op_project_0_exceptions_total": int64(0),
  566. "op_project_0_process_latency_ms": int64(0),
  567. "op_project_0_records_in_total": int64(2),
  568. "op_project_0_records_out_total": int64(2),
  569. "sink_mockSink_0_exceptions_total": int64(0),
  570. "sink_mockSink_0_records_in_total": int64(2),
  571. "sink_mockSink_0_records_out_total": int64(2),
  572. "source_demo_0_exceptions_total": int64(0),
  573. "source_demo_0_records_in_total": int64(5),
  574. "source_demo_0_records_out_total": int64(5),
  575. "op_filter_0_exceptions_total": int64(0),
  576. "op_filter_0_process_latency_ms": int64(0),
  577. "op_filter_0_records_in_total": int64(5),
  578. "op_filter_0_records_out_total": int64(2),
  579. },
  580. }, {
  581. name: `rule3`,
  582. sql: `SELECT size as Int8, ts FROM demo where size > 3`,
  583. r: [][]map[string]interface{}{
  584. {{
  585. "Int8": float64(6),
  586. "ts": float64(1541152486822),
  587. }},
  588. {{
  589. "Int8": float64(4),
  590. "ts": float64(1541152488442),
  591. }},
  592. },
  593. s: "op_filter_0_records_in_total",
  594. m: map[string]interface{}{
  595. "op_preprocessor_demo_0_exceptions_total": int64(0),
  596. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  597. "op_preprocessor_demo_0_records_in_total": int64(5),
  598. "op_preprocessor_demo_0_records_out_total": int64(5),
  599. "op_project_0_exceptions_total": int64(0),
  600. "op_project_0_process_latency_ms": int64(0),
  601. "op_project_0_records_in_total": int64(2),
  602. "op_project_0_records_out_total": int64(2),
  603. "sink_mockSink_0_exceptions_total": int64(0),
  604. "sink_mockSink_0_records_in_total": int64(2),
  605. "sink_mockSink_0_records_out_total": int64(2),
  606. "source_demo_0_exceptions_total": int64(0),
  607. "source_demo_0_records_in_total": int64(5),
  608. "source_demo_0_records_out_total": int64(5),
  609. "op_filter_0_exceptions_total": int64(0),
  610. "op_filter_0_process_latency_ms": int64(0),
  611. "op_filter_0_records_in_total": int64(5),
  612. "op_filter_0_records_out_total": int64(2),
  613. },
  614. }, {
  615. name: `rule4`,
  616. sql: `SELECT size as Int8, ts FROM demoE where size > 3`,
  617. r: [][]map[string]interface{}{
  618. {{
  619. "error": "error in preprocessor: invalid data type for color, expect string but found int(3)",
  620. }},
  621. {{
  622. "Int8": float64(6),
  623. "ts": float64(1541152486822),
  624. }},
  625. {{
  626. "error": "error in preprocessor: invalid data type for color, expect string but found int(7)",
  627. }},
  628. {{
  629. "error": "error in preprocessor: invalid data type for size, expect bigint but found string(blue)",
  630. }},
  631. },
  632. s: "op_filter_0_records_in_total",
  633. m: map[string]interface{}{
  634. "op_preprocessor_demoE_0_exceptions_total": int64(3),
  635. "op_preprocessor_demoE_0_process_latency_ms": int64(0),
  636. "op_preprocessor_demoE_0_records_in_total": int64(5),
  637. "op_preprocessor_demoE_0_records_out_total": int64(2),
  638. "op_project_0_exceptions_total": int64(3),
  639. "op_project_0_process_latency_ms": int64(0),
  640. "op_project_0_records_in_total": int64(4),
  641. "op_project_0_records_out_total": int64(1),
  642. "sink_mockSink_0_exceptions_total": int64(0),
  643. "sink_mockSink_0_records_in_total": int64(4),
  644. "sink_mockSink_0_records_out_total": int64(4),
  645. "source_demoE_0_exceptions_total": int64(0),
  646. "source_demoE_0_records_in_total": int64(5),
  647. "source_demoE_0_records_out_total": int64(5),
  648. "op_filter_0_exceptions_total": int64(3),
  649. "op_filter_0_process_latency_ms": int64(0),
  650. "op_filter_0_records_in_total": int64(5),
  651. "op_filter_0_records_out_total": int64(1),
  652. },
  653. }, {
  654. name: `rule4`,
  655. sql: `SELECT size as Int8, ts FROM demoE where size > 3`,
  656. r: [][]map[string]interface{}{
  657. {{
  658. "error": "error in preprocessor: invalid data type for color, expect string but found int(3)",
  659. }},
  660. {{
  661. "Int8": float64(6),
  662. "ts": float64(1541152486822),
  663. }},
  664. {{
  665. "error": "error in preprocessor: invalid data type for color, expect string but found int(7)",
  666. }},
  667. {{
  668. "error": "error in preprocessor: invalid data type for size, expect bigint but found string(blue)",
  669. }},
  670. },
  671. s: "op_filter_0_records_in_total",
  672. m: map[string]interface{}{
  673. "op_preprocessor_demoE_0_exceptions_total": int64(3),
  674. "op_preprocessor_demoE_0_process_latency_ms": int64(0),
  675. "op_preprocessor_demoE_0_records_in_total": int64(5),
  676. "op_preprocessor_demoE_0_records_out_total": int64(2),
  677. "op_project_0_exceptions_total": int64(3),
  678. "op_project_0_process_latency_ms": int64(0),
  679. "op_project_0_records_in_total": int64(4),
  680. "op_project_0_records_out_total": int64(1),
  681. "sink_mockSink_0_exceptions_total": int64(0),
  682. "sink_mockSink_0_records_in_total": int64(4),
  683. "sink_mockSink_0_records_out_total": int64(4),
  684. "source_demoE_0_exceptions_total": int64(0),
  685. "source_demoE_0_records_in_total": int64(5),
  686. "source_demoE_0_records_out_total": int64(5),
  687. "op_filter_0_exceptions_total": int64(3),
  688. "op_filter_0_process_latency_ms": int64(0),
  689. "op_filter_0_records_in_total": int64(5),
  690. "op_filter_0_records_out_total": int64(1),
  691. },
  692. }, {
  693. name: `rule5`,
  694. sql: `SELECT meta(topic) as m, ts FROM demo`,
  695. r: [][]map[string]interface{}{
  696. {{
  697. "m": "mock",
  698. "ts": float64(1541152486013),
  699. }},
  700. {{
  701. "m": "mock",
  702. "ts": float64(1541152486822),
  703. }},
  704. {{
  705. "m": "mock",
  706. "ts": float64(1541152487632),
  707. }},
  708. {{
  709. "m": "mock",
  710. "ts": float64(1541152488442),
  711. }},
  712. {{
  713. "m": "mock",
  714. "ts": float64(1541152489252),
  715. }},
  716. },
  717. m: map[string]interface{}{
  718. "op_preprocessor_demo_0_exceptions_total": int64(0),
  719. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  720. "op_preprocessor_demo_0_records_in_total": int64(5),
  721. "op_preprocessor_demo_0_records_out_total": int64(5),
  722. "op_project_0_exceptions_total": int64(0),
  723. "op_project_0_process_latency_ms": int64(0),
  724. "op_project_0_records_in_total": int64(5),
  725. "op_project_0_records_out_total": int64(5),
  726. "sink_mockSink_0_exceptions_total": int64(0),
  727. "sink_mockSink_0_records_in_total": int64(5),
  728. "sink_mockSink_0_records_out_total": int64(5),
  729. "source_demo_0_exceptions_total": int64(0),
  730. "source_demo_0_records_in_total": int64(5),
  731. "source_demo_0_records_out_total": int64(5),
  732. },
  733. s: "sink_mockSink_0_records_out_total",
  734. }, {
  735. name: `rule6`,
  736. sql: `SELECT color, ts FROM demo where size > 3 and meta(topic)="mock"`,
  737. r: [][]map[string]interface{}{
  738. {{
  739. "color": "blue",
  740. "ts": float64(1541152486822),
  741. }},
  742. {{
  743. "color": "yellow",
  744. "ts": float64(1541152488442),
  745. }},
  746. },
  747. s: "op_filter_0_records_in_total",
  748. m: map[string]interface{}{
  749. "op_preprocessor_demo_0_exceptions_total": int64(0),
  750. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  751. "op_preprocessor_demo_0_records_in_total": int64(5),
  752. "op_preprocessor_demo_0_records_out_total": int64(5),
  753. "op_project_0_exceptions_total": int64(0),
  754. "op_project_0_process_latency_ms": int64(0),
  755. "op_project_0_records_in_total": int64(2),
  756. "op_project_0_records_out_total": int64(2),
  757. "sink_mockSink_0_exceptions_total": int64(0),
  758. "sink_mockSink_0_records_in_total": int64(2),
  759. "sink_mockSink_0_records_out_total": int64(2),
  760. "source_demo_0_exceptions_total": int64(0),
  761. "source_demo_0_records_in_total": int64(5),
  762. "source_demo_0_records_out_total": int64(5),
  763. "op_filter_0_exceptions_total": int64(0),
  764. "op_filter_0_process_latency_ms": int64(0),
  765. "op_filter_0_records_in_total": int64(5),
  766. "op_filter_0_records_out_total": int64(2),
  767. },
  768. }, {
  769. name: `rule1`,
  770. sql: "SELECT `from` FROM demo1",
  771. r: [][]map[string]interface{}{
  772. {{
  773. "from": "device1",
  774. }},
  775. {{
  776. "from": "device2",
  777. }},
  778. {{
  779. "from": "device3",
  780. }},
  781. {{
  782. "from": "device1",
  783. }},
  784. {{
  785. "from": "device3",
  786. }},
  787. },
  788. m: map[string]interface{}{
  789. "op_preprocessor_demo1_0_exceptions_total": int64(0),
  790. "op_preprocessor_demo1_0_process_latency_ms": int64(0),
  791. "op_preprocessor_demo1_0_records_in_total": int64(5),
  792. "op_preprocessor_demo1_0_records_out_total": int64(5),
  793. "op_project_0_exceptions_total": int64(0),
  794. "op_project_0_process_latency_ms": int64(0),
  795. "op_project_0_records_in_total": int64(5),
  796. "op_project_0_records_out_total": int64(5),
  797. "sink_mockSink_0_exceptions_total": int64(0),
  798. "sink_mockSink_0_records_in_total": int64(5),
  799. "sink_mockSink_0_records_out_total": int64(5),
  800. "source_demo1_0_exceptions_total": int64(0),
  801. "source_demo1_0_records_in_total": int64(5),
  802. "source_demo1_0_records_out_total": int64(5),
  803. },
  804. s: "sink_mockSink_0_records_out_total",
  805. }, {
  806. name: `rule1`,
  807. sql: "SELECT * FROM demo1 where `from`=\"device1\"",
  808. r: [][]map[string]interface{}{
  809. {{
  810. "temp": float64(25.5),
  811. "hum": float64(65),
  812. "from": "device1",
  813. "ts": float64(1541152486013),
  814. }},
  815. {{
  816. "temp": float64(27.4),
  817. "hum": float64(80),
  818. "from": "device1",
  819. "ts": float64(1541152488442),
  820. }},
  821. },
  822. m: map[string]interface{}{
  823. "op_preprocessor_demo1_0_exceptions_total": int64(0),
  824. "op_preprocessor_demo1_0_process_latency_ms": int64(0),
  825. "op_preprocessor_demo1_0_records_in_total": int64(5),
  826. "op_preprocessor_demo1_0_records_out_total": int64(5),
  827. "op_project_0_exceptions_total": int64(0),
  828. "op_project_0_process_latency_ms": int64(0),
  829. "op_project_0_records_in_total": int64(2),
  830. "op_project_0_records_out_total": int64(2),
  831. "op_filter_0_exceptions_total": int64(0),
  832. "op_filter_0_process_latency_ms": int64(0),
  833. "op_filter_0_records_in_total": int64(5),
  834. "op_filter_0_records_out_total": int64(2),
  835. "sink_mockSink_0_exceptions_total": int64(0),
  836. "sink_mockSink_0_records_in_total": int64(2),
  837. "sink_mockSink_0_records_out_total": int64(2),
  838. "source_demo1_0_exceptions_total": int64(0),
  839. "source_demo1_0_records_in_total": int64(5),
  840. "source_demo1_0_records_out_total": int64(5),
  841. },
  842. s: "sink_mockSink_0_records_out_total",
  843. },
  844. }
  845. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  846. createStreams(t)
  847. defer dropStreams(t)
  848. //defer close(done)
  849. options := []*api.RuleOption{
  850. {
  851. BufferLength: 100,
  852. }, {
  853. BufferLength: 100,
  854. Qos: api.AtLeastOnce,
  855. CheckpointInterval: 5000,
  856. }, {
  857. BufferLength: 100,
  858. Qos: api.ExactlyOnce,
  859. CheckpointInterval: 5000,
  860. },
  861. }
  862. for j, opt := range options {
  863. for i, tt := range tests {
  864. test.ResetClock(1541152486000)
  865. p := NewRuleProcessor(DbDir)
  866. parser := xsql.NewParser(strings.NewReader(tt.sql))
  867. var (
  868. sources []*nodes.SourceNode
  869. syncs []chan int
  870. )
  871. if stmt, err := xsql.Language.Parse(parser); err != nil {
  872. t.Errorf("parse sql %s error: %s", tt.sql, err)
  873. } else {
  874. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  875. t.Errorf("sql %s is not a select statement", tt.sql)
  876. } else {
  877. streams := xsql.GetStreams(selectStmt)
  878. for _, stream := range streams {
  879. next := make(chan int)
  880. syncs = append(syncs, next)
  881. source := getMockSource(stream, next, 5)
  882. sources = append(sources, source)
  883. }
  884. }
  885. }
  886. tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: fmt.Sprintf("%s_%d", tt.name, j), Sql: tt.sql, Options: opt}, sources)
  887. if err != nil {
  888. t.Error(err)
  889. }
  890. mockSink := test.NewMockSink()
  891. sink := nodes.NewSinkNodeWithSink("mockSink", mockSink, nil)
  892. tp.AddSink(inputs, sink)
  893. errCh := tp.Open()
  894. func() {
  895. for i := 0; i < 5; i++ {
  896. syncs[i%len(syncs)] <- i
  897. select {
  898. case err = <-errCh:
  899. t.Log(err)
  900. tp.Cancel()
  901. return
  902. default:
  903. }
  904. }
  905. for retry := 100; retry > 0; retry-- {
  906. if err := compareMetrics(tp, tt.m, tt.sql); err == nil {
  907. break
  908. }
  909. time.Sleep(time.Duration(retry) * time.Millisecond)
  910. }
  911. }()
  912. results := mockSink.GetResults()
  913. var maps [][]map[string]interface{}
  914. for _, v := range results {
  915. var mapRes []map[string]interface{}
  916. err := json.Unmarshal(v, &mapRes)
  917. if err != nil {
  918. t.Errorf("Failed to parse the input into map")
  919. continue
  920. }
  921. maps = append(maps, mapRes)
  922. }
  923. if !reflect.DeepEqual(tt.r, maps) {
  924. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  925. continue
  926. }
  927. if err := compareMetrics(tp, tt.m, tt.sql); err != nil {
  928. t.Errorf("%d. %q\n\n%v", i, tt.sql, err)
  929. }
  930. tp.Cancel()
  931. }
  932. cleanStateData()
  933. }
  934. }
  935. func TestSingleSQLTemplate(t *testing.T) {
  936. var tests = []struct {
  937. name string
  938. sql string
  939. r []map[string]interface{}
  940. s string
  941. m map[string]interface{}
  942. }{
  943. {
  944. name: `rule1`,
  945. sql: `SELECT * FROM demo`,
  946. r: []map[string]interface{}{
  947. {
  948. "c": "red",
  949. "wrapper": "w1",
  950. },
  951. {
  952. "c": "blue",
  953. "wrapper": "w1",
  954. },
  955. {
  956. "c": "blue",
  957. "wrapper": "w1",
  958. },
  959. {
  960. "c": "yellow",
  961. "wrapper": "w1",
  962. },
  963. {
  964. "c": "red",
  965. "wrapper": "w1",
  966. },
  967. },
  968. m: map[string]interface{}{
  969. "op_preprocessor_demo_0_exceptions_total": int64(0),
  970. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  971. "op_preprocessor_demo_0_records_in_total": int64(5),
  972. "op_preprocessor_demo_0_records_out_total": int64(5),
  973. "op_project_0_exceptions_total": int64(0),
  974. "op_project_0_process_latency_ms": int64(0),
  975. "op_project_0_records_in_total": int64(5),
  976. "op_project_0_records_out_total": int64(5),
  977. "sink_mockSink_0_exceptions_total": int64(0),
  978. "sink_mockSink_0_records_in_total": int64(5),
  979. "sink_mockSink_0_records_out_total": int64(5),
  980. "source_demo_0_exceptions_total": int64(0),
  981. "source_demo_0_records_in_total": int64(5),
  982. "source_demo_0_records_out_total": int64(5),
  983. },
  984. s: "sink_mockSink_0_records_out_total",
  985. },
  986. }
  987. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  988. createStreams(t)
  989. defer dropStreams(t)
  990. //defer close(done)
  991. for i, tt := range tests {
  992. test.ResetClock(1541152486000)
  993. p := NewRuleProcessor(DbDir)
  994. parser := xsql.NewParser(strings.NewReader(tt.sql))
  995. var (
  996. sources []*nodes.SourceNode
  997. syncs []chan int
  998. )
  999. if stmt, err := xsql.Language.Parse(parser); err != nil {
  1000. t.Errorf("parse sql %s error: %s", tt.sql, err)
  1001. } else {
  1002. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  1003. t.Errorf("sql %s is not a select statement", tt.sql)
  1004. } else {
  1005. streams := xsql.GetStreams(selectStmt)
  1006. for _, stream := range streams {
  1007. next := make(chan int)
  1008. syncs = append(syncs, next)
  1009. source := getMockSource(stream, next, 5)
  1010. sources = append(sources, source)
  1011. }
  1012. }
  1013. }
  1014. tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql, Options: &api.RuleOption{
  1015. BufferLength: 100,
  1016. }}, sources)
  1017. if err != nil {
  1018. t.Error(err)
  1019. }
  1020. mockSink := test.NewMockSink()
  1021. sink := nodes.NewSinkNodeWithSink("mockSink", mockSink, map[string]interface{}{
  1022. "dataTemplate": `{"wrapper":"w1", "c":"{{.color}}"}`,
  1023. "sendSingle": true,
  1024. })
  1025. tp.AddSink(inputs, sink)
  1026. errCh := tp.Open()
  1027. func() {
  1028. for i := 0; i < 5; i++ {
  1029. syncs[i%len(syncs)] <- i
  1030. select {
  1031. case err = <-errCh:
  1032. t.Log(err)
  1033. tp.Cancel()
  1034. return
  1035. default:
  1036. }
  1037. }
  1038. for retry := 100; retry > 0; retry-- {
  1039. if err := compareMetrics(tp, tt.m, tt.sql); err == nil {
  1040. break
  1041. }
  1042. time.Sleep(time.Duration(retry) * time.Millisecond)
  1043. }
  1044. }()
  1045. results := mockSink.GetResults()
  1046. var maps []map[string]interface{}
  1047. for _, v := range results {
  1048. var mapRes map[string]interface{}
  1049. err := json.Unmarshal(v, &mapRes)
  1050. if err != nil {
  1051. t.Errorf("Failed to parse the input into map")
  1052. continue
  1053. }
  1054. maps = append(maps, mapRes)
  1055. }
  1056. if !reflect.DeepEqual(tt.r, maps) {
  1057. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  1058. continue
  1059. }
  1060. if err := compareMetrics(tp, tt.m, tt.sql); err != nil {
  1061. t.Errorf("%d. %q\n\n%v", i, tt.sql, err)
  1062. }
  1063. tp.Cancel()
  1064. }
  1065. cleanStateData()
  1066. }
  1067. func TestNoneSingleSQLTemplate(t *testing.T) {
  1068. var tests = []struct {
  1069. name string
  1070. sql string
  1071. r [][]byte
  1072. s string
  1073. m map[string]interface{}
  1074. }{
  1075. {
  1076. name: `rule1`,
  1077. sql: `SELECT * FROM demo`,
  1078. r: [][]byte{
  1079. []byte("<div>results</div><ul><li>red - 3</li></ul>"),
  1080. []byte("<div>results</div><ul><li>blue - 6</li></ul>"),
  1081. []byte("<div>results</div><ul><li>blue - 2</li></ul>"),
  1082. []byte("<div>results</div><ul><li>yellow - 4</li></ul>"),
  1083. []byte("<div>results</div><ul><li>red - 1</li></ul>"),
  1084. },
  1085. m: map[string]interface{}{
  1086. "op_preprocessor_demo_0_exceptions_total": int64(0),
  1087. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  1088. "op_preprocessor_demo_0_records_in_total": int64(5),
  1089. "op_preprocessor_demo_0_records_out_total": int64(5),
  1090. "op_project_0_exceptions_total": int64(0),
  1091. "op_project_0_process_latency_ms": int64(0),
  1092. "op_project_0_records_in_total": int64(5),
  1093. "op_project_0_records_out_total": int64(5),
  1094. "sink_mockSink_0_exceptions_total": int64(0),
  1095. "sink_mockSink_0_records_in_total": int64(5),
  1096. "sink_mockSink_0_records_out_total": int64(5),
  1097. "source_demo_0_exceptions_total": int64(0),
  1098. "source_demo_0_records_in_total": int64(5),
  1099. "source_demo_0_records_out_total": int64(5),
  1100. },
  1101. s: "sink_mockSink_0_records_out_total",
  1102. },
  1103. }
  1104. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1105. createStreams(t)
  1106. defer dropStreams(t)
  1107. //defer close(done)
  1108. for i, tt := range tests {
  1109. test.ResetClock(1541152486000)
  1110. p := NewRuleProcessor(DbDir)
  1111. parser := xsql.NewParser(strings.NewReader(tt.sql))
  1112. var (
  1113. sources []*nodes.SourceNode
  1114. syncs []chan int
  1115. )
  1116. if stmt, err := xsql.Language.Parse(parser); err != nil {
  1117. t.Errorf("parse sql %s error: %s", tt.sql, err)
  1118. } else {
  1119. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  1120. t.Errorf("sql %s is not a select statement", tt.sql)
  1121. } else {
  1122. streams := xsql.GetStreams(selectStmt)
  1123. for _, stream := range streams {
  1124. next := make(chan int)
  1125. syncs = append(syncs, next)
  1126. source := getMockSource(stream, next, 5)
  1127. sources = append(sources, source)
  1128. }
  1129. }
  1130. }
  1131. tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql, Options: &api.RuleOption{
  1132. BufferLength: 100,
  1133. }}, sources)
  1134. if err != nil {
  1135. t.Error(err)
  1136. }
  1137. mockSink := test.NewMockSink()
  1138. sink := nodes.NewSinkNodeWithSink("mockSink", mockSink, map[string]interface{}{
  1139. "dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.color}} - {{.size}}</li>{{end}}</ul>`,
  1140. })
  1141. tp.AddSink(inputs, sink)
  1142. errCh := tp.Open()
  1143. func() {
  1144. for i := 0; i < 5; i++ {
  1145. syncs[i%len(syncs)] <- i
  1146. select {
  1147. case err = <-errCh:
  1148. t.Log(err)
  1149. tp.Cancel()
  1150. return
  1151. default:
  1152. }
  1153. }
  1154. for retry := 100; retry > 0; retry-- {
  1155. if err := compareMetrics(tp, tt.m, tt.sql); err == nil {
  1156. break
  1157. }
  1158. time.Sleep(time.Duration(retry) * time.Millisecond)
  1159. }
  1160. }()
  1161. results := mockSink.GetResults()
  1162. if !reflect.DeepEqual(tt.r, results) {
  1163. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, results)
  1164. continue
  1165. }
  1166. if err := compareMetrics(tp, tt.m, tt.sql); err != nil {
  1167. t.Errorf("%d. %q\n\n%v", i, tt.sql, err)
  1168. }
  1169. tp.Cancel()
  1170. }
  1171. cleanStateData()
  1172. }
  1173. func getMockSourceL(name string, done <-chan int, size int) *nodes.SourceNode {
  1174. var data []*xsql.Tuple
  1175. switch name {
  1176. case "ldemo":
  1177. data = []*xsql.Tuple{
  1178. {
  1179. Emitter: name,
  1180. Message: map[string]interface{}{
  1181. "color": "red",
  1182. "size": 3,
  1183. "ts": 1541152486013,
  1184. },
  1185. Timestamp: 1541152486013,
  1186. },
  1187. {
  1188. Emitter: name,
  1189. Message: map[string]interface{}{
  1190. "color": "blue",
  1191. "size": "string",
  1192. "ts": 1541152486822,
  1193. },
  1194. Timestamp: 1541152486822,
  1195. },
  1196. {
  1197. Emitter: name,
  1198. Message: map[string]interface{}{
  1199. "size": 3,
  1200. "ts": 1541152487632,
  1201. },
  1202. Timestamp: 1541152487632,
  1203. },
  1204. {
  1205. Emitter: name,
  1206. Message: map[string]interface{}{
  1207. "color": 49,
  1208. "size": 2,
  1209. "ts": 1541152488442,
  1210. },
  1211. Timestamp: 1541152488442,
  1212. },
  1213. {
  1214. Emitter: name,
  1215. Message: map[string]interface{}{
  1216. "color": "red",
  1217. "ts": 1541152489252,
  1218. },
  1219. Timestamp: 1541152489252,
  1220. },
  1221. }
  1222. case "ldemo1":
  1223. data = []*xsql.Tuple{
  1224. {
  1225. Emitter: name,
  1226. Message: map[string]interface{}{
  1227. "temp": 25.5,
  1228. "hum": 65,
  1229. "ts": 1541152486013,
  1230. },
  1231. Timestamp: 1541152486013,
  1232. },
  1233. {
  1234. Emitter: name,
  1235. Message: map[string]interface{}{
  1236. "temp": 27.5,
  1237. "hum": 59,
  1238. "ts": 1541152486823,
  1239. },
  1240. Timestamp: 1541152486823,
  1241. },
  1242. {
  1243. Emitter: name,
  1244. Message: map[string]interface{}{
  1245. "temp": 28.1,
  1246. "hum": 75,
  1247. "ts": 1541152487632,
  1248. },
  1249. Timestamp: 1541152487632,
  1250. },
  1251. {
  1252. Emitter: name,
  1253. Message: map[string]interface{}{
  1254. "temp": 27.4,
  1255. "hum": 80,
  1256. "ts": "1541152488442",
  1257. },
  1258. Timestamp: 1541152488442,
  1259. },
  1260. {
  1261. Emitter: name,
  1262. Message: map[string]interface{}{
  1263. "temp": 25.5,
  1264. "hum": 62,
  1265. "ts": 1541152489252,
  1266. },
  1267. Timestamp: 1541152489252,
  1268. },
  1269. }
  1270. case "lsessionDemo":
  1271. data = []*xsql.Tuple{
  1272. {
  1273. Emitter: name,
  1274. Message: map[string]interface{}{
  1275. "temp": 25.5,
  1276. "hum": 65,
  1277. "ts": 1541152486013,
  1278. },
  1279. Timestamp: 1541152486013,
  1280. },
  1281. {
  1282. Emitter: name,
  1283. Message: map[string]interface{}{
  1284. "temp": 27.5,
  1285. "hum": 59,
  1286. "ts": 1541152486823,
  1287. },
  1288. Timestamp: 1541152486823,
  1289. },
  1290. {
  1291. Emitter: name,
  1292. Message: map[string]interface{}{
  1293. "temp": 28.1,
  1294. "hum": 75,
  1295. "ts": 1541152487932,
  1296. },
  1297. Timestamp: 1541152487932,
  1298. },
  1299. {
  1300. Emitter: name,
  1301. Message: map[string]interface{}{
  1302. "temp": 27.4,
  1303. "hum": 80,
  1304. "ts": 1541152488442,
  1305. },
  1306. Timestamp: 1541152488442,
  1307. },
  1308. {
  1309. Emitter: name,
  1310. Message: map[string]interface{}{
  1311. "temp": 25.5,
  1312. "hum": 62,
  1313. "ts": 1541152489252,
  1314. },
  1315. Timestamp: 1541152489252,
  1316. },
  1317. {
  1318. Emitter: name,
  1319. Message: map[string]interface{}{
  1320. "temp": 26.2,
  1321. "hum": 63,
  1322. "ts": 1541152490062,
  1323. },
  1324. Timestamp: 1541152490062,
  1325. },
  1326. {
  1327. Emitter: name,
  1328. Message: map[string]interface{}{
  1329. "temp": 26.8,
  1330. "hum": 71,
  1331. "ts": 1541152490872,
  1332. },
  1333. Timestamp: 1541152490872,
  1334. },
  1335. {
  1336. Emitter: name,
  1337. Message: map[string]interface{}{
  1338. "temp": 28.9,
  1339. "hum": 85,
  1340. "ts": 1541152491682,
  1341. },
  1342. Timestamp: 1541152491682,
  1343. },
  1344. {
  1345. Emitter: name,
  1346. Message: map[string]interface{}{
  1347. "temp": 29.1,
  1348. "hum": 92,
  1349. "ts": 1541152492492,
  1350. },
  1351. Timestamp: 1541152492492,
  1352. },
  1353. {
  1354. Emitter: name,
  1355. Message: map[string]interface{}{
  1356. "temp": 2.2,
  1357. "hum": 99,
  1358. "ts": 1541152493202,
  1359. },
  1360. Timestamp: 1541152493202,
  1361. },
  1362. {
  1363. Emitter: name,
  1364. Message: map[string]interface{}{
  1365. "temp": 30.9,
  1366. "hum": 87,
  1367. "ts": 1541152494112,
  1368. },
  1369. Timestamp: 1541152494112,
  1370. },
  1371. }
  1372. }
  1373. return nodes.NewSourceNodeWithSource(name, test.NewMockSource(data[:size], done, false), map[string]string{
  1374. "DATASOURCE": name,
  1375. })
  1376. }
  1377. func TestSingleSQLError(t *testing.T) {
  1378. var tests = []struct {
  1379. name string
  1380. sql string
  1381. r [][]map[string]interface{}
  1382. s string
  1383. m map[string]interface{}
  1384. }{
  1385. {
  1386. name: `rule1`,
  1387. sql: `SELECT color, ts FROM ldemo where size >= 3`,
  1388. r: [][]map[string]interface{}{
  1389. {{
  1390. "color": "red",
  1391. "ts": float64(1541152486013),
  1392. }},
  1393. {{
  1394. "error": "run Where error: invalid operation string(string) >= int64(3)",
  1395. }},
  1396. {{
  1397. "ts": float64(1541152487632),
  1398. }},
  1399. },
  1400. s: "op_filter_0_records_in_total",
  1401. m: map[string]interface{}{
  1402. "op_preprocessor_ldemo_0_exceptions_total": int64(0),
  1403. "op_preprocessor_ldemo_0_process_latency_ms": int64(0),
  1404. "op_preprocessor_ldemo_0_records_in_total": int64(5),
  1405. "op_preprocessor_ldemo_0_records_out_total": int64(5),
  1406. "op_project_0_exceptions_total": int64(1),
  1407. "op_project_0_process_latency_ms": int64(0),
  1408. "op_project_0_records_in_total": int64(3),
  1409. "op_project_0_records_out_total": int64(2),
  1410. "sink_mockSink_0_exceptions_total": int64(0),
  1411. "sink_mockSink_0_records_in_total": int64(3),
  1412. "sink_mockSink_0_records_out_total": int64(3),
  1413. "source_ldemo_0_exceptions_total": int64(0),
  1414. "source_ldemo_0_records_in_total": int64(5),
  1415. "source_ldemo_0_records_out_total": int64(5),
  1416. "op_filter_0_exceptions_total": int64(1),
  1417. "op_filter_0_process_latency_ms": int64(0),
  1418. "op_filter_0_records_in_total": int64(5),
  1419. "op_filter_0_records_out_total": int64(2),
  1420. },
  1421. }, {
  1422. name: `rule2`,
  1423. sql: `SELECT size * 5 FROM ldemo`,
  1424. r: [][]map[string]interface{}{
  1425. {{
  1426. "rengine_field_0": float64(15),
  1427. }},
  1428. {{
  1429. "error": "run Select error: invalid operation string(string) * int64(5)",
  1430. }},
  1431. {{
  1432. "rengine_field_0": float64(15),
  1433. }},
  1434. {{
  1435. "rengine_field_0": float64(10),
  1436. }},
  1437. {{}},
  1438. },
  1439. s: "op_filter_0_records_in_total",
  1440. m: map[string]interface{}{
  1441. "op_preprocessor_ldemo_0_exceptions_total": int64(0),
  1442. "op_preprocessor_ldemo_0_process_latency_ms": int64(0),
  1443. "op_preprocessor_ldemo_0_records_in_total": int64(5),
  1444. "op_preprocessor_ldemo_0_records_out_total": int64(5),
  1445. "op_project_0_exceptions_total": int64(1),
  1446. "op_project_0_process_latency_ms": int64(0),
  1447. "op_project_0_records_in_total": int64(5),
  1448. "op_project_0_records_out_total": int64(4),
  1449. "sink_mockSink_0_exceptions_total": int64(0),
  1450. "sink_mockSink_0_records_in_total": int64(5),
  1451. "sink_mockSink_0_records_out_total": int64(5),
  1452. "source_ldemo_0_exceptions_total": int64(0),
  1453. "source_ldemo_0_records_in_total": int64(5),
  1454. "source_ldemo_0_records_out_total": int64(5),
  1455. },
  1456. },
  1457. }
  1458. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1459. createSchemalessStreams(t)
  1460. defer dropSchemalessStreams(t)
  1461. //defer close(done)
  1462. for i, tt := range tests {
  1463. test.ResetClock(1541152486000)
  1464. p := NewRuleProcessor(DbDir)
  1465. parser := xsql.NewParser(strings.NewReader(tt.sql))
  1466. var (
  1467. sources []*nodes.SourceNode
  1468. syncs []chan int
  1469. )
  1470. if stmt, err := xsql.Language.Parse(parser); err != nil {
  1471. t.Errorf("parse sql %s error: %s", tt.sql, err)
  1472. } else {
  1473. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  1474. t.Errorf("sql %s is not a select statement", tt.sql)
  1475. } else {
  1476. streams := xsql.GetStreams(selectStmt)
  1477. for _, stream := range streams {
  1478. next := make(chan int)
  1479. syncs = append(syncs, next)
  1480. source := getMockSourceL(stream, next, 5)
  1481. sources = append(sources, source)
  1482. }
  1483. }
  1484. }
  1485. tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql, Options: &api.RuleOption{
  1486. BufferLength: 100,
  1487. }}, sources)
  1488. if err != nil {
  1489. t.Error(err)
  1490. }
  1491. mockSink := test.NewMockSink()
  1492. sink := nodes.NewSinkNodeWithSink("mockSink", mockSink, nil)
  1493. tp.AddSink(inputs, sink)
  1494. errCh := tp.Open()
  1495. func() {
  1496. for i := 0; i < 5; i++ {
  1497. syncs[i%len(syncs)] <- i
  1498. select {
  1499. case err = <-errCh:
  1500. t.Log(err)
  1501. tp.Cancel()
  1502. return
  1503. default:
  1504. }
  1505. }
  1506. for retry := 100; retry > 0; retry-- {
  1507. if err := compareMetrics(tp, tt.m, tt.sql); err == nil {
  1508. break
  1509. }
  1510. time.Sleep(time.Duration(retry) * time.Millisecond)
  1511. }
  1512. }()
  1513. results := mockSink.GetResults()
  1514. var maps [][]map[string]interface{}
  1515. for _, v := range results {
  1516. var mapRes []map[string]interface{}
  1517. err := json.Unmarshal(v, &mapRes)
  1518. if err != nil {
  1519. t.Errorf("Failed to parse the input into map")
  1520. continue
  1521. }
  1522. maps = append(maps, mapRes)
  1523. }
  1524. if !reflect.DeepEqual(tt.r, maps) {
  1525. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  1526. continue
  1527. }
  1528. if err := compareMetrics(tp, tt.m, tt.sql); err != nil {
  1529. t.Errorf("%d. %q\n\n%v", i, tt.sql, err)
  1530. }
  1531. tp.Cancel()
  1532. }
  1533. cleanStateData()
  1534. }
  1535. func TestWindow(t *testing.T) {
  1536. var tests = []struct {
  1537. name string
  1538. sql string
  1539. size int
  1540. r [][]map[string]interface{}
  1541. m map[string]interface{}
  1542. }{
  1543. {
  1544. name: `rule1`,
  1545. sql: `SELECT * FROM demo GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  1546. size: 5,
  1547. r: [][]map[string]interface{}{
  1548. {{
  1549. "color": "red",
  1550. "size": float64(3),
  1551. "ts": float64(1541152486013),
  1552. }, {
  1553. "color": "blue",
  1554. "size": float64(6),
  1555. "ts": float64(1541152486822),
  1556. }},
  1557. {{
  1558. "color": "red",
  1559. "size": float64(3),
  1560. "ts": float64(1541152486013),
  1561. }, {
  1562. "color": "blue",
  1563. "size": float64(6),
  1564. "ts": float64(1541152486822),
  1565. }, {
  1566. "color": "blue",
  1567. "size": float64(2),
  1568. "ts": float64(1541152487632),
  1569. }},
  1570. {{
  1571. "color": "blue",
  1572. "size": float64(2),
  1573. "ts": float64(1541152487632),
  1574. }, {
  1575. "color": "yellow",
  1576. "size": float64(4),
  1577. "ts": float64(1541152488442),
  1578. }},
  1579. },
  1580. m: map[string]interface{}{
  1581. "op_preprocessor_demo_0_exceptions_total": int64(0),
  1582. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  1583. "op_preprocessor_demo_0_records_in_total": int64(5),
  1584. "op_preprocessor_demo_0_records_out_total": int64(5),
  1585. "op_project_0_exceptions_total": int64(0),
  1586. "op_project_0_process_latency_ms": int64(0),
  1587. "op_project_0_records_in_total": int64(3),
  1588. "op_project_0_records_out_total": int64(3),
  1589. "sink_mockSink_0_exceptions_total": int64(0),
  1590. "sink_mockSink_0_records_in_total": int64(3),
  1591. "sink_mockSink_0_records_out_total": int64(3),
  1592. "source_demo_0_exceptions_total": int64(0),
  1593. "source_demo_0_records_in_total": int64(5),
  1594. "source_demo_0_records_out_total": int64(5),
  1595. "op_window_0_exceptions_total": int64(0),
  1596. "op_window_0_process_latency_ms": int64(0),
  1597. "op_window_0_records_in_total": int64(5),
  1598. "op_window_0_records_out_total": int64(3),
  1599. },
  1600. }, {
  1601. name: `rule2`,
  1602. sql: `SELECT color, ts FROM demo where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  1603. size: 5,
  1604. r: [][]map[string]interface{}{
  1605. {{
  1606. "color": "red",
  1607. "ts": float64(1541152486013),
  1608. }, {
  1609. "color": "blue",
  1610. "ts": float64(1541152486822),
  1611. }},
  1612. {{
  1613. "color": "yellow",
  1614. "ts": float64(1541152488442),
  1615. }},
  1616. },
  1617. m: map[string]interface{}{
  1618. "op_preprocessor_demo_0_exceptions_total": int64(0),
  1619. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  1620. "op_preprocessor_demo_0_records_in_total": int64(5),
  1621. "op_preprocessor_demo_0_records_out_total": int64(5),
  1622. "op_project_0_exceptions_total": int64(0),
  1623. "op_project_0_process_latency_ms": int64(0),
  1624. "op_project_0_records_in_total": int64(2),
  1625. "op_project_0_records_out_total": int64(2),
  1626. "sink_mockSink_0_exceptions_total": int64(0),
  1627. "sink_mockSink_0_records_in_total": int64(2),
  1628. "sink_mockSink_0_records_out_total": int64(2),
  1629. "source_demo_0_exceptions_total": int64(0),
  1630. "source_demo_0_records_in_total": int64(5),
  1631. "source_demo_0_records_out_total": int64(5),
  1632. "op_window_0_exceptions_total": int64(0),
  1633. "op_window_0_process_latency_ms": int64(0),
  1634. "op_window_0_records_in_total": int64(5),
  1635. "op_window_0_records_out_total": int64(3),
  1636. "op_filter_0_exceptions_total": int64(0),
  1637. "op_filter_0_process_latency_ms": int64(0),
  1638. "op_filter_0_records_in_total": int64(3),
  1639. "op_filter_0_records_out_total": int64(2),
  1640. },
  1641. }, {
  1642. name: `rule3`,
  1643. sql: `SELECT color, temp, ts FROM demo INNER JOIN demo1 ON demo.ts = demo1.ts GROUP BY SlidingWindow(ss, 1)`,
  1644. size: 5,
  1645. r: [][]map[string]interface{}{
  1646. {{
  1647. "color": "red",
  1648. "temp": 25.5,
  1649. "ts": float64(1541152486013),
  1650. }}, {{
  1651. "color": "red",
  1652. "temp": 25.5,
  1653. "ts": float64(1541152486013),
  1654. }}, {{
  1655. "color": "red",
  1656. "temp": 25.5,
  1657. "ts": float64(1541152486013),
  1658. }}, {{
  1659. "color": "blue",
  1660. "temp": 28.1,
  1661. "ts": float64(1541152487632),
  1662. }}, {{
  1663. "color": "blue",
  1664. "temp": 28.1,
  1665. "ts": float64(1541152487632),
  1666. }}, {{
  1667. "color": "blue",
  1668. "temp": 28.1,
  1669. "ts": float64(1541152487632),
  1670. }, {
  1671. "color": "yellow",
  1672. "temp": 27.4,
  1673. "ts": float64(1541152488442),
  1674. }}, {{
  1675. "color": "yellow",
  1676. "temp": 27.4,
  1677. "ts": float64(1541152488442),
  1678. }}, {{
  1679. "color": "yellow",
  1680. "temp": 27.4,
  1681. "ts": float64(1541152488442),
  1682. }, {
  1683. "color": "red",
  1684. "temp": 25.5,
  1685. "ts": float64(1541152489252),
  1686. }},
  1687. },
  1688. m: map[string]interface{}{
  1689. "op_preprocessor_demo_0_exceptions_total": int64(0),
  1690. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  1691. "op_preprocessor_demo_0_records_in_total": int64(5),
  1692. "op_preprocessor_demo_0_records_out_total": int64(5),
  1693. "op_preprocessor_demo1_0_exceptions_total": int64(0),
  1694. "op_preprocessor_demo1_0_process_latency_ms": int64(0),
  1695. "op_preprocessor_demo1_0_records_in_total": int64(5),
  1696. "op_preprocessor_demo1_0_records_out_total": int64(5),
  1697. "op_project_0_exceptions_total": int64(0),
  1698. "op_project_0_process_latency_ms": int64(0),
  1699. "op_project_0_records_in_total": int64(8),
  1700. "op_project_0_records_out_total": int64(8),
  1701. "sink_mockSink_0_exceptions_total": int64(0),
  1702. "sink_mockSink_0_records_in_total": int64(8),
  1703. "sink_mockSink_0_records_out_total": int64(8),
  1704. "source_demo_0_exceptions_total": int64(0),
  1705. "source_demo_0_records_in_total": int64(5),
  1706. "source_demo_0_records_out_total": int64(5),
  1707. "source_demo1_0_exceptions_total": int64(0),
  1708. "source_demo1_0_records_in_total": int64(5),
  1709. "source_demo1_0_records_out_total": int64(5),
  1710. "op_window_0_exceptions_total": int64(0),
  1711. "op_window_0_process_latency_ms": int64(0),
  1712. "op_window_0_records_in_total": int64(10),
  1713. "op_window_0_records_out_total": int64(10),
  1714. "op_join_0_exceptions_total": int64(0),
  1715. "op_join_0_process_latency_ms": int64(0),
  1716. "op_join_0_records_in_total": int64(10),
  1717. "op_join_0_records_out_total": int64(8),
  1718. },
  1719. }, {
  1720. name: `rule4`,
  1721. sql: `SELECT color FROM demo GROUP BY SlidingWindow(ss, 2), color ORDER BY color`,
  1722. size: 5,
  1723. r: [][]map[string]interface{}{
  1724. {{
  1725. "color": "red",
  1726. }}, {{
  1727. "color": "blue",
  1728. }, {
  1729. "color": "red",
  1730. }}, {{
  1731. "color": "blue",
  1732. }, {
  1733. "color": "red",
  1734. }}, {{
  1735. "color": "blue",
  1736. }, {
  1737. "color": "yellow",
  1738. }}, {{
  1739. "color": "blue",
  1740. }, {
  1741. "color": "red",
  1742. }, {
  1743. "color": "yellow",
  1744. }},
  1745. },
  1746. m: map[string]interface{}{
  1747. "op_preprocessor_demo_0_exceptions_total": int64(0),
  1748. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  1749. "op_preprocessor_demo_0_records_in_total": int64(5),
  1750. "op_preprocessor_demo_0_records_out_total": int64(5),
  1751. "op_project_0_exceptions_total": int64(0),
  1752. "op_project_0_process_latency_ms": int64(0),
  1753. "op_project_0_records_in_total": int64(5),
  1754. "op_project_0_records_out_total": int64(5),
  1755. "sink_mockSink_0_exceptions_total": int64(0),
  1756. "sink_mockSink_0_records_in_total": int64(5),
  1757. "sink_mockSink_0_records_out_total": int64(5),
  1758. "source_demo_0_exceptions_total": int64(0),
  1759. "source_demo_0_records_in_total": int64(5),
  1760. "source_demo_0_records_out_total": int64(5),
  1761. "op_window_0_exceptions_total": int64(0),
  1762. "op_window_0_process_latency_ms": int64(0),
  1763. "op_window_0_records_in_total": int64(5),
  1764. "op_window_0_records_out_total": int64(5),
  1765. "op_aggregate_0_exceptions_total": int64(0),
  1766. "op_aggregate_0_process_latency_ms": int64(0),
  1767. "op_aggregate_0_records_in_total": int64(5),
  1768. "op_aggregate_0_records_out_total": int64(5),
  1769. "op_order_0_exceptions_total": int64(0),
  1770. "op_order_0_process_latency_ms": int64(0),
  1771. "op_order_0_records_in_total": int64(5),
  1772. "op_order_0_records_out_total": int64(5),
  1773. },
  1774. }, {
  1775. name: `rule5`,
  1776. sql: `SELECT temp FROM sessionDemo GROUP BY SessionWindow(ss, 2, 1) `,
  1777. size: 11,
  1778. r: [][]map[string]interface{}{
  1779. {{
  1780. "temp": 25.5,
  1781. }, {
  1782. "temp": 27.5,
  1783. }}, {{
  1784. "temp": 28.1,
  1785. }, {
  1786. "temp": 27.4,
  1787. }, {
  1788. "temp": 25.5,
  1789. }}, {{
  1790. "temp": 26.2,
  1791. }, {
  1792. "temp": 26.8,
  1793. }, {
  1794. "temp": 28.9,
  1795. }, {
  1796. "temp": 29.1,
  1797. }, {
  1798. "temp": 32.2,
  1799. }},
  1800. },
  1801. m: map[string]interface{}{
  1802. "op_preprocessor_sessionDemo_0_exceptions_total": int64(0),
  1803. "op_preprocessor_sessionDemo_0_process_latency_ms": int64(0),
  1804. "op_preprocessor_sessionDemo_0_records_in_total": int64(11),
  1805. "op_preprocessor_sessionDemo_0_records_out_total": int64(11),
  1806. "op_project_0_exceptions_total": int64(0),
  1807. "op_project_0_process_latency_ms": int64(0),
  1808. "op_project_0_records_in_total": int64(3),
  1809. "op_project_0_records_out_total": int64(3),
  1810. "sink_mockSink_0_exceptions_total": int64(0),
  1811. "sink_mockSink_0_records_in_total": int64(3),
  1812. "sink_mockSink_0_records_out_total": int64(3),
  1813. "source_sessionDemo_0_exceptions_total": int64(0),
  1814. "source_sessionDemo_0_records_in_total": int64(11),
  1815. "source_sessionDemo_0_records_out_total": int64(11),
  1816. "op_window_0_exceptions_total": int64(0),
  1817. "op_window_0_process_latency_ms": int64(0),
  1818. "op_window_0_records_in_total": int64(11),
  1819. "op_window_0_records_out_total": int64(3),
  1820. },
  1821. }, {
  1822. name: `rule6`,
  1823. sql: `SELECT max(temp) as m, count(color) as c FROM demo INNER JOIN demo1 ON demo.ts = demo1.ts GROUP BY SlidingWindow(ss, 1)`,
  1824. size: 5,
  1825. r: [][]map[string]interface{}{
  1826. {{
  1827. "m": 25.5,
  1828. "c": float64(1),
  1829. }}, {{
  1830. "m": 25.5,
  1831. "c": float64(1),
  1832. }}, {{
  1833. "m": 25.5,
  1834. "c": float64(1),
  1835. }}, {{
  1836. "m": 28.1,
  1837. "c": float64(1),
  1838. }}, {{
  1839. "m": 28.1,
  1840. "c": float64(1),
  1841. }}, {{
  1842. "m": 28.1,
  1843. "c": float64(2),
  1844. }}, {{
  1845. "m": 27.4,
  1846. "c": float64(1),
  1847. }}, {{
  1848. "m": 27.4,
  1849. "c": float64(2),
  1850. }},
  1851. },
  1852. m: map[string]interface{}{
  1853. "op_preprocessor_demo_0_exceptions_total": int64(0),
  1854. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  1855. "op_preprocessor_demo_0_records_in_total": int64(5),
  1856. "op_preprocessor_demo_0_records_out_total": int64(5),
  1857. "op_preprocessor_demo1_0_exceptions_total": int64(0),
  1858. "op_preprocessor_demo1_0_process_latency_ms": int64(0),
  1859. "op_preprocessor_demo1_0_records_in_total": int64(5),
  1860. "op_preprocessor_demo1_0_records_out_total": int64(5),
  1861. "op_project_0_exceptions_total": int64(0),
  1862. "op_project_0_process_latency_ms": int64(0),
  1863. "op_project_0_records_in_total": int64(8),
  1864. "op_project_0_records_out_total": int64(8),
  1865. "sink_mockSink_0_exceptions_total": int64(0),
  1866. "sink_mockSink_0_records_in_total": int64(8),
  1867. "sink_mockSink_0_records_out_total": int64(8),
  1868. "source_demo_0_exceptions_total": int64(0),
  1869. "source_demo_0_records_in_total": int64(5),
  1870. "source_demo_0_records_out_total": int64(5),
  1871. "source_demo1_0_exceptions_total": int64(0),
  1872. "source_demo1_0_records_in_total": int64(5),
  1873. "source_demo1_0_records_out_total": int64(5),
  1874. "op_window_0_exceptions_total": int64(0),
  1875. "op_window_0_process_latency_ms": int64(0),
  1876. "op_window_0_records_in_total": int64(10),
  1877. "op_window_0_records_out_total": int64(10),
  1878. "op_join_0_exceptions_total": int64(0),
  1879. "op_join_0_process_latency_ms": int64(0),
  1880. "op_join_0_records_in_total": int64(10),
  1881. "op_join_0_records_out_total": int64(8),
  1882. },
  1883. }, {
  1884. name: `rule7`,
  1885. sql: `SELECT * FROM demoE GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  1886. size: 5,
  1887. r: [][]map[string]interface{}{
  1888. {{
  1889. "error": "error in preprocessor: invalid data type for color, expect string but found int(3)",
  1890. }},
  1891. {{
  1892. "color": "blue",
  1893. "size": float64(6),
  1894. "ts": float64(1541152486822),
  1895. }},
  1896. {{
  1897. "color": "blue",
  1898. "size": float64(6),
  1899. "ts": float64(1541152486822),
  1900. }, {
  1901. "color": "blue",
  1902. "size": float64(2),
  1903. "ts": float64(1541152487632),
  1904. }},
  1905. {{
  1906. "error": "error in preprocessor: invalid data type for color, expect string but found int(7)",
  1907. }},
  1908. {{
  1909. "color": "blue",
  1910. "size": float64(2),
  1911. "ts": float64(1541152487632),
  1912. }},
  1913. {{
  1914. "error": "error in preprocessor: invalid data type for size, expect bigint but found string(blue)",
  1915. }},
  1916. },
  1917. m: map[string]interface{}{
  1918. "op_preprocessor_demoE_0_exceptions_total": int64(3),
  1919. "op_preprocessor_demoE_0_process_latency_ms": int64(0),
  1920. "op_preprocessor_demoE_0_records_in_total": int64(5),
  1921. "op_preprocessor_demoE_0_records_out_total": int64(2),
  1922. "op_project_0_exceptions_total": int64(3),
  1923. "op_project_0_process_latency_ms": int64(0),
  1924. "op_project_0_records_in_total": int64(6),
  1925. "op_project_0_records_out_total": int64(3),
  1926. "sink_mockSink_0_exceptions_total": int64(0),
  1927. "sink_mockSink_0_records_in_total": int64(6),
  1928. "sink_mockSink_0_records_out_total": int64(6),
  1929. "source_demoE_0_exceptions_total": int64(0),
  1930. "source_demoE_0_records_in_total": int64(5),
  1931. "source_demoE_0_records_out_total": int64(5),
  1932. "op_window_0_exceptions_total": int64(3),
  1933. "op_window_0_process_latency_ms": int64(0),
  1934. "op_window_0_records_in_total": int64(5),
  1935. "op_window_0_records_out_total": int64(3),
  1936. },
  1937. }, {
  1938. name: `rule8`,
  1939. sql: `SELECT color, ts, count(*) as c FROM demo where size > 2 GROUP BY tumblingwindow(ss, 1) having c > 1`,
  1940. size: 5,
  1941. r: [][]map[string]interface{}{
  1942. {{
  1943. "color": "red",
  1944. "ts": float64(1541152486013),
  1945. "c": float64(2),
  1946. }},
  1947. },
  1948. m: map[string]interface{}{
  1949. "op_preprocessor_demo_0_exceptions_total": int64(0),
  1950. "op_preprocessor_demo_0_process_latency_ms": int64(0),
  1951. "op_preprocessor_demo_0_records_in_total": int64(5),
  1952. "op_preprocessor_demo_0_records_out_total": int64(5),
  1953. "op_project_0_exceptions_total": int64(0),
  1954. "op_project_0_process_latency_ms": int64(0),
  1955. "op_project_0_records_in_total": int64(1),
  1956. "op_project_0_records_out_total": int64(1),
  1957. "sink_mockSink_0_exceptions_total": int64(0),
  1958. "sink_mockSink_0_records_in_total": int64(1),
  1959. "sink_mockSink_0_records_out_total": int64(1),
  1960. "source_demo_0_exceptions_total": int64(0),
  1961. "source_demo_0_records_in_total": int64(5),
  1962. "source_demo_0_records_out_total": int64(5),
  1963. "op_window_0_exceptions_total": int64(0),
  1964. "op_window_0_process_latency_ms": int64(0),
  1965. "op_window_0_records_in_total": int64(5),
  1966. "op_window_0_records_out_total": int64(3),
  1967. "op_filter_0_exceptions_total": int64(0),
  1968. "op_filter_0_process_latency_ms": int64(0),
  1969. "op_filter_0_records_in_total": int64(3),
  1970. "op_filter_0_records_out_total": int64(2),
  1971. "op_aggregate_0_exceptions_total": int64(0),
  1972. "op_aggregate_0_process_latency_ms": int64(0),
  1973. "op_aggregate_0_records_in_total": int64(2),
  1974. "op_aggregate_0_records_out_total": int64(2),
  1975. "op_having_0_exceptions_total": int64(0),
  1976. "op_having_0_process_latency_ms": int64(0),
  1977. "op_having_0_records_in_total": int64(2),
  1978. "op_having_0_records_out_total": int64(1),
  1979. },
  1980. },
  1981. }
  1982. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  1983. createStreams(t)
  1984. defer dropStreams(t)
  1985. options := []*api.RuleOption{
  1986. {
  1987. BufferLength: 100,
  1988. }, {
  1989. BufferLength: 100,
  1990. Qos: api.AtLeastOnce,
  1991. CheckpointInterval: 5000,
  1992. }, {
  1993. BufferLength: 100,
  1994. Qos: api.ExactlyOnce,
  1995. CheckpointInterval: 5000,
  1996. },
  1997. }
  1998. for j, opt := range options {
  1999. for i, tt := range tests {
  2000. test.ResetClock(1541152486000)
  2001. p := NewRuleProcessor(DbDir)
  2002. parser := xsql.NewParser(strings.NewReader(tt.sql))
  2003. var (
  2004. sources []*nodes.SourceNode
  2005. syncs []chan int
  2006. )
  2007. if stmt, err := xsql.Language.Parse(parser); err != nil {
  2008. t.Errorf("parse sql %s error: %s", tt.sql, err)
  2009. } else {
  2010. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  2011. t.Errorf("sql %s is not a select statement", tt.sql)
  2012. } else {
  2013. streams := xsql.GetStreams(selectStmt)
  2014. for _, stream := range streams {
  2015. next := make(chan int)
  2016. syncs = append(syncs, next)
  2017. source := getMockSource(stream, next, tt.size)
  2018. sources = append(sources, source)
  2019. }
  2020. }
  2021. }
  2022. tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: fmt.Sprintf("%s_%d", tt.name, j), Sql: tt.sql, Options: opt}, sources)
  2023. if err != nil {
  2024. t.Error(err)
  2025. }
  2026. mockSink := test.NewMockSink()
  2027. sink := nodes.NewSinkNodeWithSink("mockSink", mockSink, nil)
  2028. tp.AddSink(inputs, sink)
  2029. errCh := tp.Open()
  2030. func() {
  2031. for i := 0; i < tt.size*len(syncs); i++ {
  2032. syncs[i%len(syncs)] <- i
  2033. for {
  2034. time.Sleep(1)
  2035. if getMetric(tp, "op_window_0_records_in_total") == (i + 1) {
  2036. break
  2037. }
  2038. }
  2039. select {
  2040. case err = <-errCh:
  2041. t.Log(err)
  2042. tp.Cancel()
  2043. return
  2044. default:
  2045. }
  2046. }
  2047. retry := 100
  2048. for ; retry > 0; retry-- {
  2049. if err := compareMetrics(tp, tt.m, tt.sql); err == nil {
  2050. break
  2051. }
  2052. t.Logf("wait to try another %d times", retry)
  2053. time.Sleep(time.Duration(retry) * time.Millisecond)
  2054. }
  2055. if retry == 0 {
  2056. err := compareMetrics(tp, tt.m, tt.sql)
  2057. t.Errorf("could not get correct metrics: %v", err)
  2058. }
  2059. }()
  2060. results := mockSink.GetResults()
  2061. var maps [][]map[string]interface{}
  2062. for _, v := range results {
  2063. var mapRes []map[string]interface{}
  2064. err := json.Unmarshal(v, &mapRes)
  2065. if err != nil {
  2066. t.Errorf("Failed to parse the input into map")
  2067. continue
  2068. }
  2069. maps = append(maps, mapRes)
  2070. }
  2071. if !reflect.DeepEqual(tt.r, maps) {
  2072. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  2073. }
  2074. if err := compareMetrics(tp, tt.m, tt.sql); err != nil {
  2075. t.Errorf("%d. %q\n\n%v", i, tt.sql, err)
  2076. }
  2077. tp.Cancel()
  2078. }
  2079. cleanStateData()
  2080. }
  2081. }
  2082. func TestWindowError(t *testing.T) {
  2083. var tests = []struct {
  2084. name string
  2085. sql string
  2086. size int
  2087. r [][]map[string]interface{}
  2088. m map[string]interface{}
  2089. }{
  2090. {
  2091. name: `rule1`,
  2092. sql: `SELECT size * 3 FROM ldemo GROUP BY TUMBLINGWINDOW(ss, 2)`,
  2093. size: 5,
  2094. r: [][]map[string]interface{}{
  2095. {{
  2096. "error": "run Select error: invalid operation string(string) * int64(3)",
  2097. }},
  2098. },
  2099. m: map[string]interface{}{
  2100. "op_preprocessor_ldemo_0_exceptions_total": int64(0),
  2101. "op_preprocessor_ldemo_0_process_latency_ms": int64(0),
  2102. "op_preprocessor_ldemo_0_records_in_total": int64(5),
  2103. "op_preprocessor_ldemo_0_records_out_total": int64(5),
  2104. "op_project_0_exceptions_total": int64(1),
  2105. "op_project_0_process_latency_ms": int64(0),
  2106. "op_project_0_records_in_total": int64(1),
  2107. "op_project_0_records_out_total": int64(0),
  2108. "sink_mockSink_0_exceptions_total": int64(0),
  2109. "sink_mockSink_0_records_in_total": int64(1),
  2110. "sink_mockSink_0_records_out_total": int64(1),
  2111. "source_ldemo_0_exceptions_total": int64(0),
  2112. "source_ldemo_0_records_in_total": int64(5),
  2113. "source_ldemo_0_records_out_total": int64(5),
  2114. "op_window_0_exceptions_total": int64(0),
  2115. "op_window_0_process_latency_ms": int64(0),
  2116. "op_window_0_records_in_total": int64(5),
  2117. "op_window_0_records_out_total": int64(1),
  2118. },
  2119. }, {
  2120. name: `rule2`,
  2121. sql: `SELECT color, ts FROM ldemo where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  2122. size: 5,
  2123. r: [][]map[string]interface{}{
  2124. {{
  2125. "error": "run Where error: invalid operation string(string) > int64(2)",
  2126. }}, {{
  2127. "ts": float64(1541152487632),
  2128. }},
  2129. },
  2130. m: map[string]interface{}{
  2131. "op_preprocessor_ldemo_0_exceptions_total": int64(0),
  2132. "op_preprocessor_ldemo_0_process_latency_ms": int64(0),
  2133. "op_preprocessor_ldemo_0_records_in_total": int64(5),
  2134. "op_preprocessor_ldemo_0_records_out_total": int64(5),
  2135. "op_project_0_exceptions_total": int64(1),
  2136. "op_project_0_process_latency_ms": int64(0),
  2137. "op_project_0_records_in_total": int64(2),
  2138. "op_project_0_records_out_total": int64(1),
  2139. "sink_mockSink_0_exceptions_total": int64(0),
  2140. "sink_mockSink_0_records_in_total": int64(2),
  2141. "sink_mockSink_0_records_out_total": int64(2),
  2142. "source_ldemo_0_exceptions_total": int64(0),
  2143. "source_ldemo_0_records_in_total": int64(5),
  2144. "source_ldemo_0_records_out_total": int64(5),
  2145. "op_window_0_exceptions_total": int64(0),
  2146. "op_window_0_process_latency_ms": int64(0),
  2147. "op_window_0_records_in_total": int64(5),
  2148. "op_window_0_records_out_total": int64(3),
  2149. "op_filter_0_exceptions_total": int64(1),
  2150. "op_filter_0_process_latency_ms": int64(0),
  2151. "op_filter_0_records_in_total": int64(3),
  2152. "op_filter_0_records_out_total": int64(1),
  2153. },
  2154. }, {
  2155. name: `rule3`,
  2156. sql: `SELECT color, temp, ts FROM ldemo INNER JOIN ldemo1 ON ldemo.ts = ldemo1.ts GROUP BY SlidingWindow(ss, 1)`,
  2157. size: 5,
  2158. r: [][]map[string]interface{}{
  2159. {{
  2160. "color": "red",
  2161. "temp": 25.5,
  2162. "ts": float64(1541152486013),
  2163. }}, {{
  2164. "color": "red",
  2165. "temp": 25.5,
  2166. "ts": float64(1541152486013),
  2167. }}, {{
  2168. "color": "red",
  2169. "temp": 25.5,
  2170. "ts": float64(1541152486013),
  2171. }}, {{
  2172. "temp": 28.1,
  2173. "ts": float64(1541152487632),
  2174. }}, {{
  2175. "temp": 28.1,
  2176. "ts": float64(1541152487632),
  2177. }}, {{
  2178. "error": "run Join error: invalid operation int64(1541152487632) = string(1541152488442)",
  2179. }}, {{
  2180. "error": "run Join error: invalid operation int64(1541152488442) = string(1541152488442)",
  2181. }}, {{
  2182. "error": "run Join error: invalid operation int64(1541152488442) = string(1541152488442)",
  2183. }},
  2184. },
  2185. m: map[string]interface{}{
  2186. "op_preprocessor_ldemo_0_exceptions_total": int64(0),
  2187. "op_preprocessor_ldemo_0_process_latency_ms": int64(0),
  2188. "op_preprocessor_ldemo_0_records_in_total": int64(5),
  2189. "op_preprocessor_ldemo_0_records_out_total": int64(5),
  2190. "op_preprocessor_ldemo1_0_exceptions_total": int64(0),
  2191. "op_preprocessor_ldemo1_0_process_latency_ms": int64(0),
  2192. "op_preprocessor_ldemo1_0_records_in_total": int64(5),
  2193. "op_preprocessor_ldemo1_0_records_out_total": int64(5),
  2194. "op_project_0_exceptions_total": int64(3),
  2195. "op_project_0_process_latency_ms": int64(0),
  2196. "op_project_0_records_in_total": int64(8),
  2197. "op_project_0_records_out_total": int64(5),
  2198. "sink_mockSink_0_exceptions_total": int64(0),
  2199. "sink_mockSink_0_records_in_total": int64(8),
  2200. "sink_mockSink_0_records_out_total": int64(8),
  2201. "source_ldemo_0_exceptions_total": int64(0),
  2202. "source_ldemo_0_records_in_total": int64(5),
  2203. "source_ldemo_0_records_out_total": int64(5),
  2204. "source_ldemo1_0_exceptions_total": int64(0),
  2205. "source_ldemo1_0_records_in_total": int64(5),
  2206. "source_ldemo1_0_records_out_total": int64(5),
  2207. "op_window_0_exceptions_total": int64(0),
  2208. "op_window_0_process_latency_ms": int64(0),
  2209. "op_window_0_records_in_total": int64(10),
  2210. "op_window_0_records_out_total": int64(10),
  2211. "op_join_0_exceptions_total": int64(3),
  2212. "op_join_0_process_latency_ms": int64(0),
  2213. "op_join_0_records_in_total": int64(10),
  2214. "op_join_0_records_out_total": int64(5),
  2215. },
  2216. }, {
  2217. name: `rule4`,
  2218. sql: `SELECT color FROM ldemo GROUP BY SlidingWindow(ss, 2), color having size >= 2 order by color`,
  2219. size: 5,
  2220. r: [][]map[string]interface{}{
  2221. {{
  2222. "color": "red",
  2223. }}, {{
  2224. "error": "run Having error: invalid operation string(string) >= int64(2)",
  2225. }}, {{
  2226. "error": "run Having error: invalid operation string(string) >= int64(2)",
  2227. }}, {{
  2228. "error": "run Having error: invalid operation string(string) >= int64(2)",
  2229. }}, {{
  2230. "color": float64(49),
  2231. }, {}},
  2232. },
  2233. m: map[string]interface{}{
  2234. "op_preprocessor_ldemo_0_exceptions_total": int64(0),
  2235. "op_preprocessor_ldemo_0_process_latency_ms": int64(0),
  2236. "op_preprocessor_ldemo_0_records_in_total": int64(5),
  2237. "op_preprocessor_ldemo_0_records_out_total": int64(5),
  2238. "op_project_0_exceptions_total": int64(3),
  2239. "op_project_0_process_latency_ms": int64(0),
  2240. "op_project_0_records_in_total": int64(5),
  2241. "op_project_0_records_out_total": int64(2),
  2242. "sink_mockSink_0_exceptions_total": int64(0),
  2243. "sink_mockSink_0_records_in_total": int64(5),
  2244. "sink_mockSink_0_records_out_total": int64(5),
  2245. "source_ldemo_0_exceptions_total": int64(0),
  2246. "source_ldemo_0_records_in_total": int64(5),
  2247. "source_ldemo_0_records_out_total": int64(5),
  2248. "op_window_0_exceptions_total": int64(0),
  2249. "op_window_0_process_latency_ms": int64(0),
  2250. "op_window_0_records_in_total": int64(5),
  2251. "op_window_0_records_out_total": int64(5),
  2252. "op_aggregate_0_exceptions_total": int64(0),
  2253. "op_aggregate_0_process_latency_ms": int64(0),
  2254. "op_aggregate_0_records_in_total": int64(5),
  2255. "op_aggregate_0_records_out_total": int64(5),
  2256. "op_having_0_exceptions_total": int64(3),
  2257. "op_having_0_process_latency_ms": int64(0),
  2258. "op_having_0_records_in_total": int64(5),
  2259. "op_having_0_records_out_total": int64(2),
  2260. },
  2261. }, {
  2262. name: `rule5`,
  2263. sql: `SELECT color, size FROM ldemo GROUP BY tumblingwindow(ss, 1) ORDER BY size`,
  2264. size: 5,
  2265. r: [][]map[string]interface{}{
  2266. {{
  2267. "error": "run Order By error: incompatible types for comparison: int and string",
  2268. }}, {{
  2269. "size": float64(3),
  2270. }}, {{
  2271. "color": float64(49),
  2272. "size": float64(2),
  2273. }},
  2274. },
  2275. m: map[string]interface{}{
  2276. "op_preprocessor_ldemo_0_exceptions_total": int64(0),
  2277. "op_preprocessor_ldemo_0_process_latency_ms": int64(0),
  2278. "op_preprocessor_ldemo_0_records_in_total": int64(5),
  2279. "op_preprocessor_ldemo_0_records_out_total": int64(5),
  2280. "op_project_0_exceptions_total": int64(1),
  2281. "op_project_0_process_latency_ms": int64(0),
  2282. "op_project_0_records_in_total": int64(3),
  2283. "op_project_0_records_out_total": int64(2),
  2284. "sink_mockSink_0_exceptions_total": int64(0),
  2285. "sink_mockSink_0_records_in_total": int64(3),
  2286. "sink_mockSink_0_records_out_total": int64(3),
  2287. "source_ldemo_0_exceptions_total": int64(0),
  2288. "source_ldemo_0_records_in_total": int64(5),
  2289. "source_ldemo_0_records_out_total": int64(5),
  2290. "op_window_0_exceptions_total": int64(0),
  2291. "op_window_0_process_latency_ms": int64(0),
  2292. "op_window_0_records_in_total": int64(5),
  2293. "op_window_0_records_out_total": int64(3),
  2294. "op_order_0_exceptions_total": int64(1),
  2295. "op_order_0_process_latency_ms": int64(0),
  2296. "op_order_0_records_in_total": int64(3),
  2297. "op_order_0_records_out_total": int64(2),
  2298. },
  2299. },
  2300. }
  2301. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  2302. createSchemalessStreams(t)
  2303. defer dropSchemalessStreams(t)
  2304. for i, tt := range tests {
  2305. test.ResetClock(1541152486000)
  2306. p := NewRuleProcessor(DbDir)
  2307. parser := xsql.NewParser(strings.NewReader(tt.sql))
  2308. var (
  2309. sources []*nodes.SourceNode
  2310. syncs []chan int
  2311. )
  2312. if stmt, err := xsql.Language.Parse(parser); err != nil {
  2313. t.Errorf("parse sql %s error: %s", tt.sql, err)
  2314. } else {
  2315. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  2316. t.Errorf("sql %s is not a select statement", tt.sql)
  2317. } else {
  2318. streams := xsql.GetStreams(selectStmt)
  2319. for _, stream := range streams {
  2320. next := make(chan int)
  2321. syncs = append(syncs, next)
  2322. source := getMockSourceL(stream, next, tt.size)
  2323. sources = append(sources, source)
  2324. }
  2325. }
  2326. }
  2327. tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql, Options: &api.RuleOption{
  2328. BufferLength: 100,
  2329. }}, sources)
  2330. if err != nil {
  2331. t.Error(err)
  2332. }
  2333. mockSink := test.NewMockSink()
  2334. sink := nodes.NewSinkNodeWithSink("mockSink", mockSink, nil)
  2335. tp.AddSink(inputs, sink)
  2336. errCh := tp.Open()
  2337. func() {
  2338. for i := 0; i < tt.size*len(syncs); i++ {
  2339. syncs[i%len(syncs)] <- i
  2340. for {
  2341. time.Sleep(1)
  2342. if getMetric(tp, "op_window_0_records_in_total") == (i + 1) {
  2343. break
  2344. }
  2345. }
  2346. select {
  2347. case err = <-errCh:
  2348. t.Log(err)
  2349. tp.Cancel()
  2350. return
  2351. default:
  2352. }
  2353. }
  2354. retry := 100
  2355. for ; retry > 0; retry-- {
  2356. if err := compareMetrics(tp, tt.m, tt.sql); err == nil {
  2357. break
  2358. }
  2359. t.Logf("wait to try another %d times", retry)
  2360. time.Sleep(time.Duration(retry) * time.Millisecond)
  2361. }
  2362. if retry == 0 {
  2363. err := compareMetrics(tp, tt.m, tt.sql)
  2364. t.Errorf("could not get correct metrics: %v", err)
  2365. }
  2366. }()
  2367. results := mockSink.GetResults()
  2368. var maps [][]map[string]interface{}
  2369. for _, v := range results {
  2370. var mapRes []map[string]interface{}
  2371. err := json.Unmarshal(v, &mapRes)
  2372. if err != nil {
  2373. t.Errorf("Failed to parse the input into map")
  2374. continue
  2375. }
  2376. maps = append(maps, mapRes)
  2377. }
  2378. if !reflect.DeepEqual(tt.r, maps) {
  2379. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  2380. }
  2381. if err := compareMetrics(tp, tt.m, tt.sql); err != nil {
  2382. t.Errorf("%d. %q\n\n%v", i, tt.sql, err)
  2383. }
  2384. tp.Cancel()
  2385. }
  2386. cleanStateData()
  2387. }
  2388. func createEventStreams(t *testing.T) {
  2389. p := NewStreamProcessor(path.Join(DbDir, "stream"))
  2390. demo := `CREATE STREAM demoE (
  2391. color STRING,
  2392. size BIGINT,
  2393. ts BIGINT
  2394. ) WITH (DATASOURCE="demoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  2395. _, err := p.ExecStmt(demo)
  2396. if err != nil {
  2397. t.Log(err)
  2398. }
  2399. demo1 := `CREATE STREAM demo1E (
  2400. temp FLOAT,
  2401. hum BIGINT,
  2402. ts BIGINT
  2403. ) WITH (DATASOURCE="demo1E", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  2404. _, err = p.ExecStmt(demo1)
  2405. if err != nil {
  2406. t.Log(err)
  2407. }
  2408. sessionDemo := `CREATE STREAM sessionDemoE (
  2409. temp FLOAT,
  2410. hum BIGINT,
  2411. ts BIGINT
  2412. ) WITH (DATASOURCE="sessionDemoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  2413. _, err = p.ExecStmt(sessionDemo)
  2414. if err != nil {
  2415. t.Log(err)
  2416. }
  2417. demoErr := `CREATE STREAM demoErr (
  2418. color STRING,
  2419. size BIGINT,
  2420. ts BIGINT
  2421. ) WITH (DATASOURCE="demoErr", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
  2422. _, err = p.ExecStmt(demoErr)
  2423. if err != nil {
  2424. t.Log(err)
  2425. }
  2426. }
  2427. func dropEventStreams(t *testing.T) {
  2428. p := NewStreamProcessor(path.Join(DbDir, "stream"))
  2429. demo := `DROP STREAM demoE`
  2430. _, err := p.ExecStmt(demo)
  2431. if err != nil {
  2432. t.Log(err)
  2433. }
  2434. demo1 := `DROP STREAM demo1E`
  2435. _, err = p.ExecStmt(demo1)
  2436. if err != nil {
  2437. t.Log(err)
  2438. }
  2439. sessionDemo := `DROP STREAM sessionDemoE`
  2440. _, err = p.ExecStmt(sessionDemo)
  2441. if err != nil {
  2442. t.Log(err)
  2443. }
  2444. demoErr := `DROP STREAM demoErr`
  2445. _, err = p.ExecStmt(demoErr)
  2446. if err != nil {
  2447. t.Log(err)
  2448. }
  2449. }
  2450. func getEventMockSource(name string, done <-chan int, size int) *nodes.SourceNode {
  2451. var data []*xsql.Tuple
  2452. switch name {
  2453. case "demoE":
  2454. data = []*xsql.Tuple{
  2455. {
  2456. Emitter: name,
  2457. Message: map[string]interface{}{
  2458. "color": "red",
  2459. "size": 3,
  2460. "ts": 1541152486013,
  2461. },
  2462. Timestamp: 1541152486013,
  2463. },
  2464. {
  2465. Emitter: name,
  2466. Message: map[string]interface{}{
  2467. "color": "blue",
  2468. "size": 2,
  2469. "ts": 1541152487632,
  2470. },
  2471. Timestamp: 1541152487632,
  2472. },
  2473. {
  2474. Emitter: name,
  2475. Message: map[string]interface{}{
  2476. "color": "red",
  2477. "size": 1,
  2478. "ts": 1541152489252,
  2479. },
  2480. Timestamp: 1541152489252,
  2481. },
  2482. { //dropped item
  2483. Emitter: name,
  2484. Message: map[string]interface{}{
  2485. "color": "blue",
  2486. "size": 6,
  2487. "ts": 1541152486822,
  2488. },
  2489. Timestamp: 1541152486822,
  2490. },
  2491. {
  2492. Emitter: name,
  2493. Message: map[string]interface{}{
  2494. "color": "yellow",
  2495. "size": 4,
  2496. "ts": 1541152488442,
  2497. },
  2498. Timestamp: 1541152488442,
  2499. },
  2500. { //To lift the watermark and issue all windows
  2501. Emitter: name,
  2502. Message: map[string]interface{}{
  2503. "color": "yellow",
  2504. "size": 4,
  2505. "ts": 1541152492342,
  2506. },
  2507. Timestamp: 1541152488442,
  2508. },
  2509. }
  2510. case "demo1E":
  2511. data = []*xsql.Tuple{
  2512. {
  2513. Emitter: name,
  2514. Message: map[string]interface{}{
  2515. "temp": 27.5,
  2516. "hum": 59,
  2517. "ts": 1541152486823,
  2518. },
  2519. Timestamp: 1541152486823,
  2520. },
  2521. {
  2522. Emitter: name,
  2523. Message: map[string]interface{}{
  2524. "temp": 25.5,
  2525. "hum": 65,
  2526. "ts": 1541152486013,
  2527. },
  2528. Timestamp: 1541152486013,
  2529. },
  2530. {
  2531. Emitter: name,
  2532. Message: map[string]interface{}{
  2533. "temp": 27.4,
  2534. "hum": 80,
  2535. "ts": 1541152488442,
  2536. },
  2537. Timestamp: 1541152488442,
  2538. },
  2539. {
  2540. Emitter: name,
  2541. Message: map[string]interface{}{
  2542. "temp": 28.1,
  2543. "hum": 75,
  2544. "ts": 1541152487632,
  2545. },
  2546. Timestamp: 1541152487632,
  2547. },
  2548. {
  2549. Emitter: name,
  2550. Message: map[string]interface{}{
  2551. "temp": 25.5,
  2552. "hum": 62,
  2553. "ts": 1541152489252,
  2554. },
  2555. Timestamp: 1541152489252,
  2556. },
  2557. {
  2558. Emitter: name,
  2559. Message: map[string]interface{}{
  2560. "temp": 25.5,
  2561. "hum": 62,
  2562. "ts": 1541152499252,
  2563. },
  2564. Timestamp: 1541152499252,
  2565. },
  2566. }
  2567. case "sessionDemoE":
  2568. data = []*xsql.Tuple{
  2569. {
  2570. Emitter: name,
  2571. Message: map[string]interface{}{
  2572. "temp": 25.5,
  2573. "hum": 65,
  2574. "ts": 1541152486013,
  2575. },
  2576. Timestamp: 1541152486013,
  2577. },
  2578. {
  2579. Emitter: name,
  2580. Message: map[string]interface{}{
  2581. "temp": 28.1,
  2582. "hum": 75,
  2583. "ts": 1541152487932,
  2584. },
  2585. Timestamp: 1541152487932,
  2586. },
  2587. {
  2588. Emitter: name,
  2589. Message: map[string]interface{}{
  2590. "temp": 27.5,
  2591. "hum": 59,
  2592. "ts": 1541152486823,
  2593. },
  2594. Timestamp: 1541152486823,
  2595. },
  2596. {
  2597. Emitter: name,
  2598. Message: map[string]interface{}{
  2599. "temp": 25.5,
  2600. "hum": 62,
  2601. "ts": 1541152489252,
  2602. },
  2603. Timestamp: 1541152489252,
  2604. },
  2605. {
  2606. Emitter: name,
  2607. Message: map[string]interface{}{
  2608. "temp": 27.4,
  2609. "hum": 80,
  2610. "ts": 1541152488442,
  2611. },
  2612. Timestamp: 1541152488442,
  2613. },
  2614. {
  2615. Emitter: name,
  2616. Message: map[string]interface{}{
  2617. "temp": 26.2,
  2618. "hum": 63,
  2619. "ts": 1541152490062,
  2620. },
  2621. Timestamp: 1541152490062,
  2622. },
  2623. {
  2624. Emitter: name,
  2625. Message: map[string]interface{}{
  2626. "temp": 28.9,
  2627. "hum": 85,
  2628. "ts": 1541152491682,
  2629. },
  2630. Timestamp: 1541152491682,
  2631. },
  2632. {
  2633. Emitter: name,
  2634. Message: map[string]interface{}{
  2635. "temp": 26.8,
  2636. "hum": 71,
  2637. "ts": 1541152490872,
  2638. },
  2639. Timestamp: 1541152490872,
  2640. },
  2641. {
  2642. Emitter: name,
  2643. Message: map[string]interface{}{
  2644. "temp": 29.1,
  2645. "hum": 92,
  2646. "ts": 1541152492492,
  2647. },
  2648. Timestamp: 1541152492492,
  2649. },
  2650. {
  2651. Emitter: name,
  2652. Message: map[string]interface{}{
  2653. "temp": 30.9,
  2654. "hum": 87,
  2655. "ts": 1541152494112,
  2656. },
  2657. Timestamp: 1541152494112,
  2658. },
  2659. {
  2660. Emitter: name,
  2661. Message: map[string]interface{}{
  2662. "temp": 32.2,
  2663. "hum": 99,
  2664. "ts": 1541152493202,
  2665. },
  2666. Timestamp: 1541152493202,
  2667. },
  2668. {
  2669. Emitter: name,
  2670. Message: map[string]interface{}{
  2671. "temp": 32.2,
  2672. "hum": 99,
  2673. "ts": 1541152499202,
  2674. },
  2675. Timestamp: 1541152499202,
  2676. },
  2677. }
  2678. case "demoErr":
  2679. data = []*xsql.Tuple{
  2680. {
  2681. Emitter: name,
  2682. Message: map[string]interface{}{
  2683. "color": "red",
  2684. "size": 3,
  2685. "ts": 1541152486013,
  2686. },
  2687. Timestamp: 1541152486013,
  2688. },
  2689. {
  2690. Emitter: name,
  2691. Message: map[string]interface{}{
  2692. "color": 2,
  2693. "size": "blue",
  2694. "ts": 1541152487632,
  2695. },
  2696. Timestamp: 1541152487632,
  2697. },
  2698. {
  2699. Emitter: name,
  2700. Message: map[string]interface{}{
  2701. "color": "red",
  2702. "size": 1,
  2703. "ts": 1541152489252,
  2704. },
  2705. Timestamp: 1541152489252,
  2706. },
  2707. { //dropped item
  2708. Emitter: name,
  2709. Message: map[string]interface{}{
  2710. "color": "blue",
  2711. "size": 6,
  2712. "ts": 1541152486822,
  2713. },
  2714. Timestamp: 1541152486822,
  2715. },
  2716. {
  2717. Emitter: name,
  2718. Message: map[string]interface{}{
  2719. "color": "yellow",
  2720. "size": 4,
  2721. "ts": 1541152488442,
  2722. },
  2723. Timestamp: 1541152488442,
  2724. },
  2725. { //To lift the watermark and issue all windows
  2726. Emitter: name,
  2727. Message: map[string]interface{}{
  2728. "color": "yellow",
  2729. "size": 4,
  2730. "ts": 1541152492342,
  2731. },
  2732. Timestamp: 1541152488442,
  2733. },
  2734. }
  2735. }
  2736. return nodes.NewSourceNodeWithSource(name, test.NewMockSource(data[:size], done, true), map[string]string{
  2737. "DATASOURCE": name,
  2738. })
  2739. }
  2740. func TestEventWindow(t *testing.T) {
  2741. var tests = []struct {
  2742. name string
  2743. sql string
  2744. size int
  2745. r [][]map[string]interface{}
  2746. m map[string]interface{}
  2747. }{
  2748. {
  2749. name: `rule1`,
  2750. sql: `SELECT * FROM demoE GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  2751. size: 6,
  2752. r: [][]map[string]interface{}{
  2753. {{
  2754. "color": "red",
  2755. "size": float64(3),
  2756. "ts": float64(1541152486013),
  2757. }},
  2758. {{
  2759. "color": "red",
  2760. "size": float64(3),
  2761. "ts": float64(1541152486013),
  2762. }, {
  2763. "color": "blue",
  2764. "size": float64(2),
  2765. "ts": float64(1541152487632),
  2766. }},
  2767. {{
  2768. "color": "blue",
  2769. "size": float64(2),
  2770. "ts": float64(1541152487632),
  2771. }, {
  2772. "color": "yellow",
  2773. "size": float64(4),
  2774. "ts": float64(1541152488442),
  2775. }}, {{
  2776. "color": "yellow",
  2777. "size": float64(4),
  2778. "ts": float64(1541152488442),
  2779. }, {
  2780. "color": "red",
  2781. "size": float64(1),
  2782. "ts": float64(1541152489252),
  2783. }}, {{
  2784. "color": "red",
  2785. "size": float64(1),
  2786. "ts": float64(1541152489252),
  2787. }},
  2788. },
  2789. m: map[string]interface{}{
  2790. "op_preprocessor_demoE_0_exceptions_total": int64(0),
  2791. "op_preprocessor_demoE_0_process_latency_ms": int64(0),
  2792. "op_preprocessor_demoE_0_records_in_total": int64(6),
  2793. "op_preprocessor_demoE_0_records_out_total": int64(6),
  2794. "op_project_0_exceptions_total": int64(0),
  2795. "op_project_0_process_latency_ms": int64(0),
  2796. "op_project_0_records_in_total": int64(5),
  2797. "op_project_0_records_out_total": int64(5),
  2798. "sink_mockSink_0_exceptions_total": int64(0),
  2799. "sink_mockSink_0_records_in_total": int64(5),
  2800. "sink_mockSink_0_records_out_total": int64(5),
  2801. "source_demoE_0_exceptions_total": int64(0),
  2802. "source_demoE_0_records_in_total": int64(6),
  2803. "source_demoE_0_records_out_total": int64(6),
  2804. "op_window_0_exceptions_total": int64(0),
  2805. "op_window_0_process_latency_ms": int64(0),
  2806. "op_window_0_records_in_total": int64(6),
  2807. "op_window_0_records_out_total": int64(5),
  2808. },
  2809. }, {
  2810. name: `rule2`,
  2811. sql: `SELECT color, ts FROM demoE where size > 2 GROUP BY tumblingwindow(ss, 1)`,
  2812. size: 6,
  2813. r: [][]map[string]interface{}{
  2814. {{
  2815. "color": "red",
  2816. "ts": float64(1541152486013),
  2817. }},
  2818. {{
  2819. "color": "yellow",
  2820. "ts": float64(1541152488442),
  2821. }},
  2822. },
  2823. m: map[string]interface{}{
  2824. "op_preprocessor_demoE_0_exceptions_total": int64(0),
  2825. "op_preprocessor_demoE_0_process_latency_ms": int64(0),
  2826. "op_preprocessor_demoE_0_records_in_total": int64(6),
  2827. "op_preprocessor_demoE_0_records_out_total": int64(6),
  2828. "op_project_0_exceptions_total": int64(0),
  2829. "op_project_0_process_latency_ms": int64(0),
  2830. "op_project_0_records_in_total": int64(2),
  2831. "op_project_0_records_out_total": int64(2),
  2832. "sink_mockSink_0_exceptions_total": int64(0),
  2833. "sink_mockSink_0_records_in_total": int64(2),
  2834. "sink_mockSink_0_records_out_total": int64(2),
  2835. "source_demoE_0_exceptions_total": int64(0),
  2836. "source_demoE_0_records_in_total": int64(6),
  2837. "source_demoE_0_records_out_total": int64(6),
  2838. "op_window_0_exceptions_total": int64(0),
  2839. "op_window_0_process_latency_ms": int64(0),
  2840. "op_window_0_records_in_total": int64(6),
  2841. "op_window_0_records_out_total": int64(4),
  2842. "op_filter_0_exceptions_total": int64(0),
  2843. "op_filter_0_process_latency_ms": int64(0),
  2844. "op_filter_0_records_in_total": int64(4),
  2845. "op_filter_0_records_out_total": int64(2),
  2846. },
  2847. }, {
  2848. name: `rule3`,
  2849. sql: `SELECT color, temp, ts FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1)`,
  2850. size: 6,
  2851. r: [][]map[string]interface{}{
  2852. {{
  2853. "color": "red",
  2854. "temp": 25.5,
  2855. "ts": float64(1541152486013),
  2856. }}, {{
  2857. "color": "red",
  2858. "temp": 25.5,
  2859. "ts": float64(1541152486013),
  2860. }}, {{
  2861. "color": "blue",
  2862. "temp": 28.1,
  2863. "ts": float64(1541152487632),
  2864. }}, {{
  2865. "color": "blue",
  2866. "temp": 28.1,
  2867. "ts": float64(1541152487632),
  2868. }, {
  2869. "color": "yellow",
  2870. "temp": 27.4,
  2871. "ts": float64(1541152488442),
  2872. }}, {{
  2873. "color": "yellow",
  2874. "temp": 27.4,
  2875. "ts": float64(1541152488442),
  2876. }, {
  2877. "color": "red",
  2878. "temp": 25.5,
  2879. "ts": float64(1541152489252),
  2880. }},
  2881. },
  2882. m: map[string]interface{}{
  2883. "op_preprocessor_demoE_0_exceptions_total": int64(0),
  2884. "op_preprocessor_demoE_0_process_latency_ms": int64(0),
  2885. "op_preprocessor_demoE_0_records_in_total": int64(6),
  2886. "op_preprocessor_demoE_0_records_out_total": int64(6),
  2887. "op_preprocessor_demo1E_0_exceptions_total": int64(0),
  2888. "op_preprocessor_demo1E_0_process_latency_ms": int64(0),
  2889. "op_preprocessor_demo1E_0_records_in_total": int64(6),
  2890. "op_preprocessor_demo1E_0_records_out_total": int64(6),
  2891. "op_project_0_exceptions_total": int64(0),
  2892. "op_project_0_process_latency_ms": int64(0),
  2893. "op_project_0_records_in_total": int64(5),
  2894. "op_project_0_records_out_total": int64(5),
  2895. "sink_mockSink_0_exceptions_total": int64(0),
  2896. "sink_mockSink_0_records_in_total": int64(5),
  2897. "sink_mockSink_0_records_out_total": int64(5),
  2898. "source_demoE_0_exceptions_total": int64(0),
  2899. "source_demoE_0_records_in_total": int64(6),
  2900. "source_demoE_0_records_out_total": int64(6),
  2901. "source_demo1E_0_exceptions_total": int64(0),
  2902. "source_demo1E_0_records_in_total": int64(6),
  2903. "source_demo1E_0_records_out_total": int64(6),
  2904. "op_window_0_exceptions_total": int64(0),
  2905. "op_window_0_process_latency_ms": int64(0),
  2906. "op_window_0_records_in_total": int64(12),
  2907. "op_window_0_records_out_total": int64(5),
  2908. "op_join_0_exceptions_total": int64(0),
  2909. "op_join_0_process_latency_ms": int64(0),
  2910. "op_join_0_records_in_total": int64(5),
  2911. "op_join_0_records_out_total": int64(5),
  2912. },
  2913. }, {
  2914. name: `rule4`,
  2915. sql: `SELECT color FROM demoE GROUP BY SlidingWindow(ss, 2), color ORDER BY color`,
  2916. size: 6,
  2917. r: [][]map[string]interface{}{
  2918. {{
  2919. "color": "red",
  2920. }}, {{
  2921. "color": "blue",
  2922. }, {
  2923. "color": "red",
  2924. }}, {{
  2925. "color": "blue",
  2926. }, {
  2927. "color": "yellow",
  2928. }}, {{
  2929. "color": "blue",
  2930. }, {
  2931. "color": "red",
  2932. }, {
  2933. "color": "yellow",
  2934. }},
  2935. },
  2936. m: map[string]interface{}{
  2937. "op_preprocessor_demoE_0_exceptions_total": int64(0),
  2938. "op_preprocessor_demoE_0_process_latency_ms": int64(0),
  2939. "op_preprocessor_demoE_0_records_in_total": int64(6),
  2940. "op_preprocessor_demoE_0_records_out_total": int64(6),
  2941. "op_project_0_exceptions_total": int64(0),
  2942. "op_project_0_process_latency_ms": int64(0),
  2943. "op_project_0_records_in_total": int64(4),
  2944. "op_project_0_records_out_total": int64(4),
  2945. "sink_mockSink_0_exceptions_total": int64(0),
  2946. "sink_mockSink_0_records_in_total": int64(4),
  2947. "sink_mockSink_0_records_out_total": int64(4),
  2948. "source_demoE_0_exceptions_total": int64(0),
  2949. "source_demoE_0_records_in_total": int64(6),
  2950. "source_demoE_0_records_out_total": int64(6),
  2951. "op_window_0_exceptions_total": int64(0),
  2952. "op_window_0_process_latency_ms": int64(0),
  2953. "op_window_0_records_in_total": int64(6),
  2954. "op_window_0_records_out_total": int64(4),
  2955. "op_aggregate_0_exceptions_total": int64(0),
  2956. "op_aggregate_0_process_latency_ms": int64(0),
  2957. "op_aggregate_0_records_in_total": int64(4),
  2958. "op_aggregate_0_records_out_total": int64(4),
  2959. "op_order_0_exceptions_total": int64(0),
  2960. "op_order_0_process_latency_ms": int64(0),
  2961. "op_order_0_records_in_total": int64(4),
  2962. "op_order_0_records_out_total": int64(4),
  2963. },
  2964. }, {
  2965. name: `rule5`,
  2966. sql: `SELECT temp FROM sessionDemoE GROUP BY SessionWindow(ss, 2, 1) `,
  2967. size: 12,
  2968. r: [][]map[string]interface{}{
  2969. {{
  2970. "temp": 25.5,
  2971. }}, {{
  2972. "temp": 28.1,
  2973. }, {
  2974. "temp": 27.4,
  2975. }, {
  2976. "temp": 25.5,
  2977. }}, {{
  2978. "temp": 26.2,
  2979. }, {
  2980. "temp": 26.8,
  2981. }, {
  2982. "temp": 28.9,
  2983. }, {
  2984. "temp": 29.1,
  2985. }, {
  2986. "temp": 32.2,
  2987. }}, {{
  2988. "temp": 30.9,
  2989. }},
  2990. },
  2991. m: map[string]interface{}{
  2992. "op_preprocessor_sessionDemoE_0_exceptions_total": int64(0),
  2993. "op_preprocessor_sessionDemoE_0_process_latency_ms": int64(0),
  2994. "op_preprocessor_sessionDemoE_0_records_in_total": int64(12),
  2995. "op_preprocessor_sessionDemoE_0_records_out_total": int64(12),
  2996. "op_project_0_exceptions_total": int64(0),
  2997. "op_project_0_process_latency_ms": int64(0),
  2998. "op_project_0_records_in_total": int64(4),
  2999. "op_project_0_records_out_total": int64(4),
  3000. "sink_mockSink_0_exceptions_total": int64(0),
  3001. "sink_mockSink_0_records_in_total": int64(4),
  3002. "sink_mockSink_0_records_out_total": int64(4),
  3003. "source_sessionDemoE_0_exceptions_total": int64(0),
  3004. "source_sessionDemoE_0_records_in_total": int64(12),
  3005. "source_sessionDemoE_0_records_out_total": int64(12),
  3006. "op_window_0_exceptions_total": int64(0),
  3007. "op_window_0_process_latency_ms": int64(0),
  3008. "op_window_0_records_in_total": int64(12),
  3009. "op_window_0_records_out_total": int64(4),
  3010. },
  3011. }, {
  3012. name: `rule6`,
  3013. sql: `SELECT max(temp) as m, count(color) as c FROM demoE INNER JOIN demo1E ON demoE.ts = demo1E.ts GROUP BY SlidingWindow(ss, 1)`,
  3014. size: 6,
  3015. r: [][]map[string]interface{}{
  3016. {{
  3017. "m": 25.5,
  3018. "c": float64(1),
  3019. }}, {{
  3020. "m": 25.5,
  3021. "c": float64(1),
  3022. }}, {{
  3023. "m": 28.1,
  3024. "c": float64(1),
  3025. }}, {{
  3026. "m": 28.1,
  3027. "c": float64(2),
  3028. }}, {{
  3029. "m": 27.4,
  3030. "c": float64(2),
  3031. }},
  3032. },
  3033. m: map[string]interface{}{
  3034. "op_preprocessor_demoE_0_exceptions_total": int64(0),
  3035. "op_preprocessor_demoE_0_process_latency_ms": int64(0),
  3036. "op_preprocessor_demoE_0_records_in_total": int64(6),
  3037. "op_preprocessor_demoE_0_records_out_total": int64(6),
  3038. "op_preprocessor_demo1E_0_exceptions_total": int64(0),
  3039. "op_preprocessor_demo1E_0_process_latency_ms": int64(0),
  3040. "op_preprocessor_demo1E_0_records_in_total": int64(6),
  3041. "op_preprocessor_demo1E_0_records_out_total": int64(6),
  3042. "op_project_0_exceptions_total": int64(0),
  3043. "op_project_0_process_latency_ms": int64(0),
  3044. "op_project_0_records_in_total": int64(5),
  3045. "op_project_0_records_out_total": int64(5),
  3046. "sink_mockSink_0_exceptions_total": int64(0),
  3047. "sink_mockSink_0_records_in_total": int64(5),
  3048. "sink_mockSink_0_records_out_total": int64(5),
  3049. "source_demoE_0_exceptions_total": int64(0),
  3050. "source_demoE_0_records_in_total": int64(6),
  3051. "source_demoE_0_records_out_total": int64(6),
  3052. "source_demo1E_0_exceptions_total": int64(0),
  3053. "source_demo1E_0_records_in_total": int64(6),
  3054. "source_demo1E_0_records_out_total": int64(6),
  3055. "op_window_0_exceptions_total": int64(0),
  3056. "op_window_0_records_in_total": int64(12),
  3057. "op_window_0_records_out_total": int64(5),
  3058. "op_join_0_exceptions_total": int64(0),
  3059. "op_join_0_process_latency_ms": int64(0),
  3060. "op_join_0_records_in_total": int64(5),
  3061. "op_join_0_records_out_total": int64(5),
  3062. },
  3063. }, {
  3064. name: `rule7`,
  3065. sql: `SELECT * FROM demoErr GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  3066. size: 6,
  3067. r: [][]map[string]interface{}{
  3068. {{
  3069. "error": "error in preprocessor: invalid data type for color, expect string but found int(2)",
  3070. }},
  3071. {{
  3072. "color": "red",
  3073. "size": float64(3),
  3074. "ts": float64(1541152486013),
  3075. }},
  3076. {{
  3077. "color": "red",
  3078. "size": float64(3),
  3079. "ts": float64(1541152486013),
  3080. }},
  3081. {{
  3082. "color": "yellow",
  3083. "size": float64(4),
  3084. "ts": float64(1541152488442),
  3085. }}, {{
  3086. "color": "yellow",
  3087. "size": float64(4),
  3088. "ts": float64(1541152488442),
  3089. }, {
  3090. "color": "red",
  3091. "size": float64(1),
  3092. "ts": float64(1541152489252),
  3093. }}, {{
  3094. "color": "red",
  3095. "size": float64(1),
  3096. "ts": float64(1541152489252),
  3097. }},
  3098. },
  3099. m: map[string]interface{}{
  3100. "op_preprocessor_demoErr_0_exceptions_total": int64(1),
  3101. "op_preprocessor_demoErr_0_process_latency_ms": int64(0),
  3102. "op_preprocessor_demoErr_0_records_in_total": int64(6),
  3103. "op_preprocessor_demoErr_0_records_out_total": int64(5),
  3104. "op_project_0_exceptions_total": int64(1),
  3105. "op_project_0_process_latency_ms": int64(0),
  3106. "op_project_0_records_in_total": int64(6),
  3107. "op_project_0_records_out_total": int64(5),
  3108. "sink_mockSink_0_exceptions_total": int64(0),
  3109. "sink_mockSink_0_records_in_total": int64(6),
  3110. "sink_mockSink_0_records_out_total": int64(6),
  3111. "source_demoErr_0_exceptions_total": int64(0),
  3112. "source_demoErr_0_records_in_total": int64(6),
  3113. "source_demoErr_0_records_out_total": int64(6),
  3114. "op_window_0_exceptions_total": int64(1),
  3115. "op_window_0_process_latency_ms": int64(0),
  3116. "op_window_0_records_in_total": int64(6),
  3117. "op_window_0_records_out_total": int64(5),
  3118. },
  3119. },
  3120. }
  3121. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  3122. createEventStreams(t)
  3123. defer dropEventStreams(t)
  3124. options := []*api.RuleOption{
  3125. {
  3126. BufferLength: 100,
  3127. IsEventTime: true,
  3128. LateTol: 1000,
  3129. }, {
  3130. BufferLength: 100,
  3131. Qos: api.AtLeastOnce,
  3132. CheckpointInterval: 5000,
  3133. IsEventTime: true,
  3134. LateTol: 1000,
  3135. }, {
  3136. BufferLength: 100,
  3137. Qos: api.ExactlyOnce,
  3138. CheckpointInterval: 5000,
  3139. IsEventTime: true,
  3140. LateTol: 1000,
  3141. },
  3142. }
  3143. for j, opt := range options {
  3144. for i, tt := range tests {
  3145. test.ResetClock(1541152486000)
  3146. p := NewRuleProcessor(DbDir)
  3147. parser := xsql.NewParser(strings.NewReader(tt.sql))
  3148. var (
  3149. sources []*nodes.SourceNode
  3150. syncs []chan int
  3151. )
  3152. if stmt, err := xsql.Language.Parse(parser); err != nil {
  3153. t.Errorf("parse sql %s error: %s", tt.sql, err)
  3154. } else {
  3155. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  3156. t.Errorf("sql %s is not a select statement", tt.sql)
  3157. } else {
  3158. streams := xsql.GetStreams(selectStmt)
  3159. for _, stream := range streams {
  3160. next := make(chan int)
  3161. syncs = append(syncs, next)
  3162. source := getEventMockSource(stream, next, tt.size)
  3163. sources = append(sources, source)
  3164. }
  3165. }
  3166. }
  3167. tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: fmt.Sprintf("%s_%d", tt.name, j), Sql: tt.sql, Options: opt}, sources)
  3168. if err != nil {
  3169. t.Error(err)
  3170. }
  3171. mockSink := test.NewMockSink()
  3172. sink := nodes.NewSinkNodeWithSink("mockSink", mockSink, nil)
  3173. tp.AddSink(inputs, sink)
  3174. errCh := tp.Open()
  3175. func() {
  3176. for i := 0; i < tt.size*len(syncs); i++ {
  3177. syncs[i%len(syncs)] <- i
  3178. for {
  3179. time.Sleep(1)
  3180. if getMetric(tp, "op_window_0_records_in_total") == (i + 1) {
  3181. break
  3182. }
  3183. }
  3184. select {
  3185. case err = <-errCh:
  3186. t.Log(err)
  3187. tp.Cancel()
  3188. return
  3189. default:
  3190. }
  3191. }
  3192. mockClock := test.GetMockClock()
  3193. mockClock.Add(1000 * time.Millisecond)
  3194. retry := 100
  3195. for ; retry > 0; retry-- {
  3196. if err := compareMetrics(tp, tt.m, tt.sql); err == nil {
  3197. break
  3198. }
  3199. t.Logf("wait to try another %d times", retry)
  3200. time.Sleep(time.Duration(retry) * time.Millisecond)
  3201. }
  3202. if retry == 0 {
  3203. err := compareMetrics(tp, tt.m, tt.sql)
  3204. t.Errorf("could not get correct metrics: %v", err)
  3205. }
  3206. }()
  3207. results := mockSink.GetResults()
  3208. var maps [][]map[string]interface{}
  3209. for _, v := range results {
  3210. var mapRes []map[string]interface{}
  3211. err := json.Unmarshal(v, &mapRes)
  3212. if err != nil {
  3213. t.Errorf("Failed to parse the input into map")
  3214. continue
  3215. }
  3216. maps = append(maps, mapRes)
  3217. }
  3218. if !reflect.DeepEqual(tt.r, maps) {
  3219. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  3220. }
  3221. if err := compareMetrics(tp, tt.m, tt.sql); err != nil {
  3222. t.Errorf("%d. %q\n\n%v", i, tt.sql, err)
  3223. }
  3224. tp.Cancel()
  3225. }
  3226. cleanStateData()
  3227. }
  3228. }
  3229. func getMetric(tp *xstream.TopologyNew, name string) int {
  3230. keys, values := tp.GetMetrics()
  3231. for index, key := range keys {
  3232. if key == name {
  3233. return int(values[index].(int64))
  3234. }
  3235. }
  3236. fmt.Println("can't find " + name)
  3237. return 0
  3238. }
  3239. func compareMetrics(tp *xstream.TopologyNew, m map[string]interface{}, sql string) (err error) {
  3240. keys, values := tp.GetMetrics()
  3241. //for i, k := range keys {
  3242. // log.Printf("%s:%v", k, values[i])
  3243. //}
  3244. for k, v := range m {
  3245. var (
  3246. index int
  3247. key string
  3248. matched bool
  3249. )
  3250. for index, key = range keys {
  3251. if k == key {
  3252. if strings.HasSuffix(k, "process_latency_ms") {
  3253. if values[index].(int64) >= v.(int64) {
  3254. matched = true
  3255. continue
  3256. } else {
  3257. break
  3258. }
  3259. }
  3260. if values[index] == v {
  3261. matched = true
  3262. }
  3263. break
  3264. }
  3265. }
  3266. if matched {
  3267. continue
  3268. }
  3269. //do not find
  3270. if index < len(values) {
  3271. return fmt.Errorf("metrics mismatch for %s:\n\nexp=%#v(%t)\n\ngot=%#v(%t)\n\n", k, v, v, values[index], values[index])
  3272. } else {
  3273. return fmt.Errorf("metrics mismatch for %s:\n\nexp=%#v\n\ngot=nil\n\n", k, v)
  3274. }
  3275. }
  3276. return nil
  3277. }
  3278. func errstring(err error) string {
  3279. if err != nil {
  3280. return err.Error()
  3281. }
  3282. return ""
  3283. }