123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051 |
- package main
- import (
- "fmt"
- "github.com/emqx/kuiper/common"
- "github.com/emqx/kuiper/xsql/processors"
- "github.com/emqx/kuiper/xstream/planner"
- "path"
- "time"
- )
- func main() {
- log := common.Log
- dbDir, err := common.GetDataLoc()
- if err != nil {
- log.Panic(err)
- }
- log.Infof("db location is %s", dbDir)
- demo := `DROP STREAM ext`
- processors.NewStreamProcessor(path.Join(dbDir, "stream")).ExecStmt(demo)
- demo = "CREATE STREAM ext (count bigint) WITH (DATASOURCE=\"users\", FORMAT=\"JSON\", TYPE=\"random\")"
- _, err = processors.NewStreamProcessor(path.Join(dbDir, "stream")).ExecStmt(demo)
- if err != nil {
- panic(err)
- }
- rp := processors.NewRuleProcessor(dbDir)
- rp.ExecDrop("$$test1")
- rs, err := rp.ExecCreate("$$test1", "{\"sql\": \"SELECT echo(count) FROM ext where count > 3\",\"actions\": [{\"memory\": {}}]}")
- if err != nil {
- msg := fmt.Sprintf("failed to create rule: %s.", err)
- log.Printf(msg)
- }
- tp, err := planner.Plan(rs, dbDir)
- if err != nil {
- log.Panicf("fail to init rule: %v", err)
- }
- go func() {
- select {
- case err := <-tp.Open():
- log.Println(err)
- tp.Cancel()
- }
- }()
- time.Sleep(5000000 * time.Millisecond)
- log.Infof("exit main program")
- }
|