testWindow.go 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package main
  2. import (
  3. "engine/common"
  4. "engine/xsql"
  5. "engine/xsql/plans"
  6. "engine/xstream"
  7. "engine/xstream/collectors"
  8. "engine/xstream/extensions"
  9. "engine/xstream/operators"
  10. "strings"
  11. )
  12. func main() {
  13. log := common.Log
  14. demo1Stream, err := xsql.NewParser(strings.NewReader("CREATE STREAM demo (count bigint) WITH (datasource=\"demo\", FORMAT=\"AVRO\", KEY=\"USERID\")")).ParseCreateStreamStmt()
  15. //demo2Stream, err := xsql.NewParser(strings.NewReader("CREATE STREAM demo2 (abc bigint) WITH (datasource=\"demo2\", FORMAT=\"AVRO\", KEY=\"USERID\")")).ParseCreateStreamStmt()
  16. //stmt, err := xsql.NewParser(strings.NewReader("SELECT count FROM demo1 where demo1.count > 3")).Parse()
  17. if err != nil {
  18. log.Fatal("Failed to parse SQL for %s. \n", err)
  19. }
  20. tp := xstream.New()
  21. mqs1, err := extensions.NewWithName("srv1", "demo", "")
  22. if err != nil {
  23. log.Fatalf("Found error %s.\n", err)
  24. return
  25. }
  26. tp.AddSrc(mqs1)
  27. //mqs2, err := extensions.NewWithName("srv1", "demo2")
  28. //if err != nil {
  29. // log.Fatalf("Found error %s.\n", err)
  30. // return
  31. //}
  32. //tp.AddSrc(mqs2)
  33. preprocessorOp1 := xstream.Transform(&plans.Preprocessor{StreamStmt: demo1Stream}, "preprocessor1")
  34. tp.AddOperator([]xstream.Emitter{mqs1}, preprocessorOp1)
  35. //preprocessorOp2 := xstream.Transform(&plans.Preprocessor{StreamStmt: demo2Stream}, "preprocessor2")
  36. //tp.AddOperator([]xstream.Emitter{mqs2}, preprocessorOp2)
  37. //filterOp := xstream.Transform(&plans.FilterPlan{Condition: stmt.Condition}, "filter plan")
  38. //filterOp.SetConcurrency(3)
  39. //tp.AddOperator([]xstream.Emitter{preprocessorOp1, preprocessorOp2}, filterOp)
  40. //
  41. //projectOp := xstream.Transform(&plans.ProjectPlan{Fields: stmt.Fields}, "project plan")
  42. //tp.AddOperator([]xstream.Emitter{filterOp}, projectOp)
  43. //windowOp := operators.NewWindowOp("windowOp", &operators.WindowConfig{
  44. // Type: operators.NO_WINDOW,
  45. //})
  46. //windowOp := operators.NewWindowOp("windowOp", &operators.WindowConfig{
  47. // Type: operators.TUMBLING_WINDOW,
  48. // Length: 30000,
  49. //})
  50. //windowOp := operators.NewWindowOp("windowOp", &operators.WindowConfig{
  51. // Type: operators.HOPPING_WINDOW,
  52. // Length: 20000,
  53. // Interval: 10000,
  54. //})
  55. //
  56. //windowOp := operators.NewWindowOp("windowOp", &operators.WindowConfig{
  57. // Type: operators.SLIDING_WINDOW,
  58. // Length: 20000,
  59. //})
  60. windowOp := operators.NewWindowOp("windowOp", &operators.WindowConfig{
  61. Type: operators.SESSION_WINDOW,
  62. Length: 20000,
  63. Interval: 6000,
  64. })
  65. tp.AddOperator([]xstream.Emitter{preprocessorOp1}, windowOp)
  66. tp.AddSink([]xstream.Emitter{windowOp}, collectors.Func(func(data interface{}) error {
  67. log.Println("sink result %s", data)
  68. return nil
  69. }))
  70. if err := <-tp.Open(); err != nil {
  71. log.Fatal(err)
  72. return
  73. }
  74. }