testExtension.go 1.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. package main
  2. import (
  3. "engine/common"
  4. "engine/xsql/processors"
  5. "fmt"
  6. "path"
  7. "time"
  8. )
  9. func main() {
  10. log := common.Log
  11. BadgerDir, err := common.GetAndCreateDataLoc("test")
  12. if err != nil {
  13. log.Panic(err)
  14. }
  15. log.Infof("badge location is %s", BadgerDir)
  16. demo := `DROP STREAM ext`
  17. processors.NewStreamProcessor(demo, path.Join(BadgerDir, "stream")).Exec()
  18. demo = "CREATE STREAM ext (count bigint) WITH (DATASOURCE=\"users\", FORMAT=\"JSON\", TYPE=\"random\")"
  19. _, err = processors.NewStreamProcessor(demo, path.Join(BadgerDir, "stream")).Exec()
  20. if err != nil{
  21. panic(err)
  22. }
  23. rp := processors.NewRuleProcessor(BadgerDir)
  24. rp.ExecDrop("$$test1")
  25. rs, err := rp.ExecCreate("$$test1", "{\"sql\": \"SELECT count FROM ext where ext.count > 3\",\"actions\": [{\"memory\": {}}]}")
  26. if err != nil {
  27. msg := fmt.Sprintf("failed to create rule: %s.", err)
  28. log.Printf(msg)
  29. }
  30. tp, err := rp.ExecInitRule(rs)
  31. if err != nil{
  32. log.Panicf("fail to init rule: %v", err)
  33. }
  34. go func() {
  35. select {
  36. case err := <-tp.Open():
  37. log.Println(err)
  38. tp.Cancel()
  39. }
  40. }()
  41. time.Sleep(5000000 * time.Millisecond)
  42. log.Infof("exit main program")
  43. }