testExtension.go 1.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/xsql/processors"
  6. "path"
  7. "time"
  8. )
  9. func main() {
  10. log := common.Log
  11. dbDir, err := common.GetDataLoc()
  12. if err != nil {
  13. log.Panic(err)
  14. }
  15. log.Infof("db location is %s", dbDir)
  16. demo := `DROP STREAM ext`
  17. processors.NewStreamProcessor(path.Join(dbDir, "stream")).ExecStmt(demo)
  18. demo = "CREATE STREAM ext (count bigint) WITH (DATASOURCE=\"users\", FORMAT=\"JSON\", TYPE=\"random\")"
  19. _, err = processors.NewStreamProcessor(path.Join(dbDir, "stream")).ExecStmt(demo)
  20. if err != nil {
  21. panic(err)
  22. }
  23. rp := processors.NewRuleProcessor(dbDir)
  24. rp.ExecDrop("$$test1")
  25. rs, err := rp.ExecCreate("$$test1", "{\"sql\": \"SELECT echo(count) FROM ext where count > 3\",\"actions\": [{\"memory\": {}}]}")
  26. if err != nil {
  27. msg := fmt.Sprintf("failed to create rule: %s.", err)
  28. log.Printf(msg)
  29. }
  30. tp, err := rp.ExecInitRule(rs)
  31. if err != nil {
  32. log.Panicf("fail to init rule: %v", err)
  33. }
  34. go func() {
  35. select {
  36. case err := <-tp.Open():
  37. log.Println(err)
  38. tp.Cancel()
  39. }
  40. }()
  41. time.Sleep(5000000 * time.Millisecond)
  42. log.Infof("exit main program")
  43. }