|
@@ -19,6 +19,7 @@ import (
|
|
|
"encoding/json"
|
|
|
"fmt"
|
|
|
"github.com/lf-edge/ekuiper/internal/conf"
|
|
|
+ "github.com/lf-edge/ekuiper/internal/pkg/tskv"
|
|
|
"github.com/lf-edge/ekuiper/internal/topo"
|
|
|
"github.com/lf-edge/ekuiper/internal/topo/node"
|
|
|
"github.com/lf-edge/ekuiper/internal/topo/planner"
|
|
@@ -27,7 +28,6 @@ import (
|
|
|
"github.com/lf-edge/ekuiper/pkg/cast"
|
|
|
"github.com/lf-edge/ekuiper/pkg/errorx"
|
|
|
"github.com/lf-edge/ekuiper/pkg/kv"
|
|
|
- "os"
|
|
|
"path"
|
|
|
)
|
|
|
|
|
@@ -270,9 +270,11 @@ func (p *RuleProcessor) ExecDrop(name string) (string, error) {
|
|
|
}
|
|
|
|
|
|
func cleanCheckpoint(name string) error {
|
|
|
- dbDir, _ := conf.GetDataLoc()
|
|
|
- c := path.Join(dbDir, name)
|
|
|
- return os.RemoveAll(c)
|
|
|
+ db, err := tskv.NewSqlite(name)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ return db.Drop()
|
|
|
}
|
|
|
|
|
|
func cleanSinkCache(rule *api.Rule) error {
|