Преглед на файлове

fix(rule): obey triggered value when creating rules

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang преди 2 години
родител
ревизия
2325b17e9b
променени са 4 файла, в които са добавени 51 реда и са изтрити 16 реда
  1. 36 0
      internal/processor/rule_test.go
  2. 1 1
      internal/server/rest.go
  3. 1 1
      internal/server/rpc.go
  4. 13 14
      internal/server/rule_manager.go

+ 36 - 0
internal/processor/rule_test.go

@@ -158,6 +158,42 @@ func TestRuleActionParse_Apply(t *testing.T) {
 					},
 				},
 			},
+		}, {
+			ruleStr: `{
+			  "id": "ruleTest",
+			  "sql": "SELECT * from demo",
+			  "actions": [
+			  	{"log": {}}
+			  ],
+              "triggered": false
+			}`,
+			result: &api.Rule{
+				Triggered: false,
+				Id:        "ruleTest",
+				Sql:       "SELECT * from demo",
+				Actions: []map[string]interface{}{
+					{
+						"log": map[string]interface{}{},
+					},
+				},
+				Options: &api.RuleOption{
+					IsEventTime:        false,
+					LateTol:            1000,
+					Concurrency:        1,
+					BufferLength:       1024,
+					SendMetaToSink:     false,
+					Qos:                api.AtMostOnce,
+					CheckpointInterval: 300000,
+					SendError:          true,
+					Restart: &api.RestartStrategy{
+						Attempts:     0,
+						Delay:        1000,
+						Multiplier:   2,
+						MaxDelay:     30000,
+						JitterFactor: 0.1,
+					},
+				},
+			},
 		},
 	}
 

+ 1 - 1
internal/server/rest.go

@@ -396,7 +396,7 @@ func rulesHandler(w http.ResponseWriter, r *http.Request) {
 			handleError(w, err, "Invalid body", logger)
 			return
 		}
-		id, err := createRule("", string(body), true)
+		id, err := createRule("", string(body))
 		if err != nil {
 			handleError(w, err, "", logger)
 			return

+ 1 - 1
internal/server/rpc.go

@@ -144,7 +144,7 @@ func (t *Server) Stream(stream string, reply *string) error {
 }
 
 func (t *Server) CreateRule(rule *model.RPCArgDesc, reply *string) error {
-	id, err := createRule(rule.Name, rule.Json, true)
+	id, err := createRule(rule.Name, rule.Json)
 	if err != nil {
 		return fmt.Errorf("Create rule %s error : %s.", id, err)
 	} else {

+ 13 - 14
internal/server/rule_manager.go

@@ -69,15 +69,12 @@ func (rr *RuleRegistry) Delete(key string) (*rule.RuleState, bool) {
 	return result, ok
 }
 
-func createRule(name, ruleJson string, shouldStart bool) (string, error) {
+func createRule(name, ruleJson string) (string, error) {
 	// Validate the rule json
 	r, err := ruleProcessor.GetRuleByJson(name, ruleJson)
 	if err != nil {
 		return "", fmt.Errorf("Invalid rule json: %v", err)
 	}
-	if shouldStart {
-		r.Triggered = true
-	}
 	// Validate the topo
 	rs, err := createRuleState(r)
 	if err != nil {
@@ -91,16 +88,18 @@ func createRule(name, ruleJson string, shouldStart bool) (string, error) {
 		return r.Id, fmt.Errorf("Store the rule error: %v", err)
 	}
 	// Start the rule asyncly
-	go func() {
-		panicOrError := infra.SafeRun(func() error {
-			//Start the rule which runs async
-			rs.Start()
-			return nil
-		})
-		if panicOrError != nil {
-			logger.Errorf("Rule %s start failed: %s", r.Id, panicOrError)
-		}
-	}()
+	if r.Triggered {
+		go func() {
+			panicOrError := infra.SafeRun(func() error {
+				//Start the rule which runs async
+				rs.Start()
+				return nil
+			})
+			if panicOrError != nil {
+				logger.Errorf("Rule %s start failed: %s", r.Id, panicOrError)
+			}
+		}()
+	}
 	return r.Id, nil
 }