Prechádzať zdrojové kódy

bug fix for rule management (#1291)

* fix(python): function return bool value fix

Signed-off-by: Jiyong Huang <huangjy@emqx.io>

* fix(processor): fix problem for abnormal rules

1. Get rule will not check rule syntax, just return the json
2. Delete rule will not check rule syntax and should run all clean jobs regardless of job failures

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
ngjaying 2 rokov pred
rodič
commit
6b66baa13c

+ 13 - 7
internal/processor/rule.go

@@ -94,6 +94,15 @@ func (p *RuleProcessor) ExecReplaceRuleState(name string, triggered bool) (err e
 	return err
 	return err
 }
 }
 
 
+func (p *RuleProcessor) GetRuleJson(name string) (string, error) {
+	var s1 string
+	f, _ := p.db.Get(name, &s1)
+	if !f {
+		return "", errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("Rule %s is not found.", name))
+	}
+	return s1, nil
+}
+
 func (p *RuleProcessor) GetRuleByName(name string) (*api.Rule, error) {
 func (p *RuleProcessor) GetRuleByName(name string) (*api.Rule, error) {
 	var s1 string
 	var s1 string
 	f, _ := p.db.Get(name, &s1)
 	f, _ := p.db.Get(name, &s1)
@@ -190,16 +199,13 @@ func (p *RuleProcessor) ExecDrop(name string) (string, error) {
 	result := fmt.Sprintf("Rule %s is dropped.", name)
 	result := fmt.Sprintf("Rule %s is dropped.", name)
 	var ruleJson string
 	var ruleJson string
 	if ok, _ := p.db.Get(name, &ruleJson); ok {
 	if ok, _ := p.db.Get(name, &ruleJson); ok {
-		rule, err := p.getRuleByJson(name, ruleJson)
-		if err != nil {
-			return "", err
-		}
-		if err := cleanSinkCache(rule); err != nil {
+		if err := cleanSinkCache(name); err != nil {
 			result = fmt.Sprintf("%s. Clean sink cache faile: %s.", result, err)
 			result = fmt.Sprintf("%s. Clean sink cache faile: %s.", result, err)
 		}
 		}
 		if err := cleanCheckpoint(name); err != nil {
 		if err := cleanCheckpoint(name); err != nil {
 			result = fmt.Sprintf("%s. Clean checkpoint cache faile: %s.", result, err)
 			result = fmt.Sprintf("%s. Clean checkpoint cache faile: %s.", result, err)
 		}
 		}
+
 	}
 	}
 	err := p.db.Delete(name)
 	err := p.db.Delete(name)
 	if err != nil {
 	if err != nil {
@@ -217,8 +223,8 @@ func cleanCheckpoint(name string) error {
 	return nil
 	return nil
 }
 }
 
 
-func cleanSinkCache(rule *api.Rule) error {
-	err := store.DropKV(path.Join("sink", rule.Id))
+func cleanSinkCache(name string) error {
+	err := store.DropKV(path.Join("sink", name))
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}

+ 3 - 2
internal/server/rest.go

@@ -284,12 +284,13 @@ func ruleHandler(w http.ResponseWriter, r *http.Request) {
 
 
 	switch r.Method {
 	switch r.Method {
 	case http.MethodGet:
 	case http.MethodGet:
-		rule, err := ruleProcessor.GetRuleByName(name)
+		rule, err := ruleProcessor.GetRuleJson(name)
 		if err != nil {
 		if err != nil {
 			handleError(w, err, "describe rule error", logger)
 			handleError(w, err, "describe rule error", logger)
 			return
 			return
 		}
 		}
-		jsonResponse(rule, w, logger)
+		w.Header().Add(ContentType, ContentTypeJSON)
+		w.Write([]byte(rule))
 	case http.MethodDelete:
 	case http.MethodDelete:
 		deleteRule(name)
 		deleteRule(name)
 		content, err := ruleProcessor.ExecDrop(name)
 		content, err := ruleProcessor.ExecDrop(name)

+ 1 - 1
sdk/python/ekuiper/runtime/function.py

@@ -106,4 +106,4 @@ def encode_reply(state: bool, arg: any):
     try:
     try:
         return str.encode(json.dumps({'state': state, 'result': arg}))
         return str.encode(json.dumps({'state': state, 'result': arg}))
     except Exception:
     except Exception:
-        return str.encode(json.dumps({'state': false, 'result': traceback.format_exc()}))
+        return str.encode(json.dumps({'state': False, 'result': traceback.format_exc()}))