Browse Source

fix(rule): recover rule problem when using deleted plugin

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 năm trước cách đây
mục cha
commit
8c8acd8656

+ 14 - 6
internal/processor/rule.go

@@ -120,7 +120,7 @@ func (p *RuleProcessor) GetRuleById(id string) (*api.Rule, error) {
 	if !f {
 	if !f {
 		return nil, errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("Rule %s is not found.", id))
 		return nil, errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("Rule %s is not found.", id))
 	}
 	}
-	return p.GetRuleByJson(id, s1)
+	return p.GetRuleByJsonValidated(s1)
 }
 }
 
 
 func (p *RuleProcessor) getDefaultRule(name, sql string) *api.Rule {
 func (p *RuleProcessor) getDefaultRule(name, sql string) *api.Rule {
@@ -147,7 +147,8 @@ func (p *RuleProcessor) getDefaultRule(name, sql string) *api.Rule {
 	}
 	}
 }
 }
 
 
-func (p *RuleProcessor) GetRuleByJson(id, ruleJson string) (*api.Rule, error) {
+// GetRuleByJsonValidated called when the json is getting from trusted source like db
+func (p *RuleProcessor) GetRuleByJsonValidated(ruleJson string) (*api.Rule, error) {
 	opt := conf.Config.Rule
 	opt := conf.Config.Rule
 	//set default rule options
 	//set default rule options
 	rule := &api.Rule{
 	rule := &api.Rule{
@@ -157,7 +158,17 @@ func (p *RuleProcessor) GetRuleByJson(id, ruleJson string) (*api.Rule, error) {
 	if err := json.Unmarshal([]byte(ruleJson), &rule); err != nil {
 	if err := json.Unmarshal([]byte(ruleJson), &rule); err != nil {
 		return nil, fmt.Errorf("Parse rule %s error : %s.", ruleJson, err)
 		return nil, fmt.Errorf("Parse rule %s error : %s.", ruleJson, err)
 	}
 	}
+	if rule.Options == nil {
+		rule.Options = &opt
+	}
+	return rule, nil
+}
 
 
+func (p *RuleProcessor) GetRuleByJson(id, ruleJson string) (*api.Rule, error) {
+	rule, err := p.GetRuleByJsonValidated(ruleJson)
+	if err != nil {
+		return rule, err
+	}
 	//validation
 	//validation
 	if rule.Id == "" && id == "" {
 	if rule.Id == "" && id == "" {
 		return nil, fmt.Errorf("Missing rule id.")
 		return nil, fmt.Errorf("Missing rule id.")
@@ -183,10 +194,7 @@ func (p *RuleProcessor) GetRuleByJson(id, ruleJson string) (*api.Rule, error) {
 			return nil, fmt.Errorf("Rule %s has neither sql nor graph.", rule.Id)
 			return nil, fmt.Errorf("Rule %s has neither sql nor graph.", rule.Id)
 		}
 		}
 	}
 	}
-	if rule.Options == nil {
-		rule.Options = &opt
-	}
-	err := conf.ValidateRuleOption(rule.Options)
+	err = conf.ValidateRuleOption(rule.Options)
 	if err != nil {
 	if err != nil {
 		return nil, fmt.Errorf("Rule %s has invalid options: %s.", rule.Id, err)
 		return nil, fmt.Errorf("Rule %s has invalid options: %s.", rule.Id, err)
 	}
 	}

+ 7 - 7
internal/server/rule_manager.go

@@ -92,8 +92,7 @@ func createRule(name, ruleJson string) (string, error) {
 		go func() {
 		go func() {
 			panicOrError := infra.SafeRun(func() error {
 			panicOrError := infra.SafeRun(func() error {
 				//Start the rule which runs async
 				//Start the rule which runs async
-				rs.Start()
-				return nil
+				return rs.Start()
 			})
 			})
 			if panicOrError != nil {
 			if panicOrError != nil {
 				logger.Errorf("Rule %s start failed: %s", r.Id, panicOrError)
 				logger.Errorf("Rule %s start failed: %s", r.Id, panicOrError)
@@ -109,7 +108,7 @@ func createRule(name, ruleJson string) (string, error) {
 func createRuleState(r *api.Rule) (*rule.RuleState, error) {
 func createRuleState(r *api.Rule) (*rule.RuleState, error) {
 	rs, err := rule.NewRuleState(r)
 	rs, err := rule.NewRuleState(r)
 	if err != nil {
 	if err != nil {
-		return nil, err
+		return rs, err
 	}
 	}
 	registry.Store(r.Id, rs)
 	registry.Store(r.Id, rs)
 	return rs, nil
 	return rs, nil
@@ -118,8 +117,10 @@ func createRuleState(r *api.Rule) (*rule.RuleState, error) {
 func recoverRule(r *api.Rule) string {
 func recoverRule(r *api.Rule) string {
 	// Validate the topo
 	// Validate the topo
 	rs, err := createRuleState(r)
 	rs, err := createRuleState(r)
-	if err != nil {
-		return fmt.Sprintf("Create rule topo error: %v", err)
+	if err != nil { // when recovering rules, assume the rules are valid, so always add it to the registry
+		conf.Log.Errorf("Create rule topo error: %v", err)
+		r.Triggered = false
+		registry.Store(r.Id, rs)
 	}
 	}
 	if !r.Triggered {
 	if !r.Triggered {
 		return fmt.Sprintf("Rule %s was stopped.", r.Id)
 		return fmt.Sprintf("Rule %s was stopped.", r.Id)
@@ -164,9 +165,8 @@ func startRule(name string) error {
 	if !ok {
 	if !ok {
 		return fmt.Errorf("Rule %s is not found in registry, please check if it is created", name)
 		return fmt.Errorf("Rule %s is not found in registry, please check if it is created", name)
 	} else {
 	} else {
-		rs.Start()
+		return rs.Start()
 	}
 	}
-	return nil
 }
 }
 
 
 func stopRule(name string) (result string) {
 func stopRule(name string) (result string) {

+ 6 - 6
internal/topo/rule/ruleState.go

@@ -61,14 +61,14 @@ type RuleState struct {
 // Errors are possible during plan the topo.
 // Errors are possible during plan the topo.
 // If error happens return immediately without add it to the registry
 // If error happens return immediately without add it to the registry
 func NewRuleState(rule *api.Rule) (*RuleState, error) {
 func NewRuleState(rule *api.Rule) (*RuleState, error) {
+	rs := &RuleState{
+		RuleId:   rule.Id,
+		Rule:     rule,
+		ActionCh: make(chan ActionSignal),
+	}
 	if tp, err := planner.Plan(rule); err != nil {
 	if tp, err := planner.Plan(rule); err != nil {
-		return nil, err
+		return rs, err
 	} else {
 	} else {
-		rs := &RuleState{
-			RuleId:   rule.Id,
-			Rule:     rule,
-			ActionCh: make(chan ActionSignal),
-		}
 		rs.Topology = tp
 		rs.Topology = tp
 		rs.Run()
 		rs.Run()
 		return rs, nil
 		return rs, nil

+ 2 - 2
test/change_rule_status.jmx

@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
 <!--
-  ~ Copyright 2021 EMQ Technologies Co., Ltd.
+  ~ Copyright 2021-2022 EMQ Technologies Co., Ltd.
   ~
   ~
   ~ Licensed under the Apache License, Version 2.0 (the "License");
   ~ Licensed under the Apache License, Version 2.0 (the "License");
   ~ you may not use this file except in compliance with the License.
   ~ you may not use this file except in compliance with the License.
@@ -241,7 +241,7 @@
           <hashTree>
           <hashTree>
             <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
             <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
               <collectionProp name="Asserion.test_strings">
               <collectionProp name="Asserion.test_strings">
-                <stringProp name="-652969125">Rule rule1 was started</stringProp>
+                <stringProp name="-652969125">Rule rule1 is already starting</stringProp>
               </collectionProp>
               </collectionProp>
               <stringProp name="Assertion.custom_message"></stringProp>
               <stringProp name="Assertion.custom_message"></stringProp>
               <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
               <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>