test.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. package main
  2. import (
  3. "engine/common"
  4. "engine/xsql"
  5. "engine/xsql/plans"
  6. "engine/xstream"
  7. "engine/xstream/collectors"
  8. "engine/xstream/extensions"
  9. "strings"
  10. )
  11. func main() {
  12. log := common.Log
  13. demo1Stream, err := xsql.NewParser(strings.NewReader("CREATE STREAM demo1 (count bigint) WITH (source=\"users\", FORMAT=\"AVRO\", KEY=\"USERID\")")).ParseCreateStreamStmt()
  14. demo2Stream, err := xsql.NewParser(strings.NewReader("CREATE STREAM demo2 (abc bigint) WITH (source=\"users\", FORMAT=\"AVRO\", KEY=\"USERID\")")).ParseCreateStreamStmt()
  15. stmt, err := xsql.NewParser(strings.NewReader("SELECT count FROM demo1 where demo1.count > 3")).Parse()
  16. if err != nil {
  17. log.Fatal("Failed to parse SQL for %s. \n", err)
  18. }
  19. tp := xstream.New()
  20. mqs1, err := extensions.NewWithName("srv1", "demo1", "")
  21. if err != nil {
  22. log.Fatalf("Found error %s.\n", err)
  23. return
  24. }
  25. tp.AddSrc(mqs1)
  26. mqs2, err := extensions.NewWithName("srv2", "demo2", "")
  27. if err != nil {
  28. log.Fatalf("Found error %s.\n", err)
  29. return
  30. }
  31. tp.AddSrc(mqs2)
  32. preprocessorOp1 := xstream.Transform(&plans.Preprocessor{StreamStmt: demo1Stream}, "preprocessor1")
  33. tp.AddOperator([]xstream.Emitter{mqs1}, preprocessorOp1)
  34. preprocessorOp2 := xstream.Transform(&plans.Preprocessor{StreamStmt: demo2Stream}, "preprocessor2")
  35. tp.AddOperator([]xstream.Emitter{mqs2}, preprocessorOp2)
  36. filterOp := xstream.Transform(&plans.FilterPlan{Condition: stmt.Condition}, "filter plan")
  37. filterOp.SetConcurrency(3)
  38. tp.AddOperator([]xstream.Emitter{preprocessorOp1, preprocessorOp2}, filterOp)
  39. projectOp := xstream.Transform(&plans.ProjectPlan{Fields: stmt.Fields}, "project plan")
  40. tp.AddOperator([]xstream.Emitter{filterOp}, projectOp)
  41. tp.AddSink([]xstream.Emitter{projectOp}, collectors.Func(func(data interface{}) error {
  42. log.Println("sink result %s", data)
  43. return nil
  44. }))
  45. if err := <-tp.Open(); err != nil {
  46. log.Fatal(err)
  47. return
  48. }
  49. }