testExtension.go 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/xsql/processors"
  6. "github.com/emqx/kuiper/xstream/planner"
  7. "path"
  8. "time"
  9. )
  10. func main() {
  11. log := common.Log
  12. dbDir, err := common.GetDataLoc()
  13. if err != nil {
  14. log.Panic(err)
  15. }
  16. log.Infof("db location is %s", dbDir)
  17. demo := `DROP STREAM ext`
  18. processors.NewStreamProcessor(path.Join(dbDir, "stream")).ExecStmt(demo)
  19. demo = "CREATE STREAM ext (count bigint) WITH (DATASOURCE=\"users\", FORMAT=\"JSON\", TYPE=\"random\")"
  20. _, err = processors.NewStreamProcessor(path.Join(dbDir, "stream")).ExecStmt(demo)
  21. if err != nil {
  22. panic(err)
  23. }
  24. rp := processors.NewRuleProcessor(dbDir)
  25. rp.ExecDrop("$$test1")
  26. rs, err := rp.ExecCreate("$$test1", "{\"sql\": \"SELECT echo(count) FROM ext where count > 3\",\"actions\": [{\"memory\": {}}]}")
  27. if err != nil {
  28. msg := fmt.Sprintf("failed to create rule: %s.", err)
  29. log.Printf(msg)
  30. }
  31. tp, err := planner.Plan(rs, dbDir)
  32. if err != nil {
  33. log.Panicf("fail to init rule: %v", err)
  34. }
  35. go func() {
  36. select {
  37. case err := <-tp.Open():
  38. log.Println(err)
  39. tp.Cancel()
  40. }
  41. }()
  42. time.Sleep(5000000 * time.Millisecond)
  43. log.Infof("exit main program")
  44. }