Browse Source

feat(rule): remove checkpoint and sink cache when dropping a rule

ngjaying 4 years atrás
parent
commit
f4e3436dfe
2 changed files with 52 additions and 1 deletions
  1. 51 1
      xsql/processors/xsql_processor.go
  2. 1 0
      xstream/nodes/sink_cache.go

+ 51 - 1
xsql/processors/xsql_processor.go

@@ -10,6 +10,7 @@ import (
 	"github.com/emqx/kuiper/xstream"
 	"github.com/emqx/kuiper/xstream/api"
 	"github.com/emqx/kuiper/xstream/nodes"
+	"os"
 	"path"
 	"strings"
 )
@@ -382,12 +383,61 @@ func (p *RuleProcessor) ExecDrop(name string) (string, error) {
 		return "", err
 	}
 	defer p.db.Close()
+	result := fmt.Sprintf("Rule %s is dropped.", name)
+	if ruleJson, ok := p.db.Get(name); ok {
+		rule, err := p.getRuleByJson(name, ruleJson.(string))
+		if err != nil {
+			return "", err
+		}
+		if err := cleanSinkCache(rule); err != nil {
+			result = fmt.Sprintf("%s. Clean sink cache faile: %s.", result, err)
+		}
+		if err := cleanCheckpoint(name); err != nil {
+			result = fmt.Sprintf("%s. Clean checkpoint cache faile: %s.", result, err)
+		}
+	}
 	err = p.db.Delete(name)
 	if err != nil {
 		return "", err
 	} else {
-		return fmt.Sprintf("Rule %s is dropped.", name), nil
+		return result, nil
+	}
+}
+
+func cleanCheckpoint(name string) error {
+	dbDir, _ := common.GetDataLoc()
+	c := path.Join(dbDir, "checkpoints", name)
+	return os.RemoveAll(c)
+}
+
+func cleanSinkCache(rule *api.Rule) error {
+	dbDir, err := common.GetDataLoc()
+	if err != nil {
+		return err
+	}
+	store := common.GetSimpleKVStore(path.Join(dbDir, "sink"))
+	err = store.Open()
+	if err != nil {
+		return err
+	}
+	defer store.Close()
+	for d, m := range rule.Actions {
+		con := 1
+		for name, action := range m {
+			props, _ := action.(map[string]interface{})
+			if c, ok := props["concurrency"]; ok {
+				if t, err := common.ToInt(c); err == nil && t > 0 {
+					con = t
+				}
+			}
+			for i := 0; i < con; i++ {
+				key := fmt.Sprintf("%s%s_%d%d", rule.Id, name, d, i)
+				common.Log.Debugf("delete cache key %s", key)
+				store.Delete(key)
+			}
+		}
 	}
+	return nil
 }
 
 func (p *RuleProcessor) createTopo(rule *api.Rule) (*xstream.TopologyNew, []api.Emitter, error) {

+ 1 - 0
xstream/nodes/sink_cache.go

@@ -88,6 +88,7 @@ func (c *Cache) initStore(ctx api.StreamContext) {
 	}
 	c.store = common.GetSimpleKVStore(path.Join(dbDir, "sink"))
 	c.key = ctx.GetRuleId() + ctx.GetOpId() + strconv.Itoa(ctx.GetInstanceId())
+	logger.Debugf("cache saved to key %s", c.key)
 	//load cache
 	if err := c.loadCache(); err != nil {
 		go c.drainError(err)