Преглед на файлове

fix: fix updateTopo ignore triggered (#2236)

Signed-off-by: yisaer <disxiaofei@163.com>
Song Gao преди 1 година
родител
ревизия
fa164f7738
променени са 4 файла, в които са добавени 53 реда и са изтрити 35 реда
  1. 2 2
      internal/server/rest_test.go
  2. 1 1
      internal/server/rule_manager.go
  3. 7 1
      internal/topo/rule/ruleState.go
  4. 43 31
      internal/topo/rule/ruleState_test.go

+ 2 - 2
internal/server/rest_test.go

@@ -258,7 +258,7 @@ func (suite *RestTestSuite) Test_rulesManageHandler() {
 	_, _ = io.ReadAll(w3.Result().Body)
 
 	// update rule, will set rule to triggered
-	ruleJson = `{"id": "rule1","triggered": false,"sql": "select * from alert","actions": [{"nop": {}}]}`
+	ruleJson = `{"id": "rule1","triggered": true,"sql": "select * from alert","actions": [{"nop": {}}]}`
 
 	buf2 = bytes.NewBuffer([]byte(ruleJson))
 	req1, _ = http.NewRequest(http.MethodPut, "http://localhost:8080/rules/rule1", buf2)
@@ -283,7 +283,7 @@ func (suite *RestTestSuite) Test_rulesManageHandler() {
 	suite.r.ServeHTTP(w1, req1)
 
 	returnVal, _ = io.ReadAll(w1.Result().Body)
-	expect = `{"id": "rule1","triggered": false,"sql": "select * from alert","actions": [{"nop": {}}]}`
+	expect = `{"id": "rule1","triggered": true,"sql": "select * from alert","actions": [{"nop": {}}]}`
 	assert.Equal(suite.T(), expect, string(returnVal))
 
 	// get rule status

+ 1 - 1
internal/server/rule_manager.go

@@ -163,7 +163,7 @@ func updateRule(ruleId, ruleJson string) error {
 		if err != nil {
 			return err
 		}
-		err = ruleProcessor.ExecReplaceRuleState(rs.RuleId, true)
+		err = ruleProcessor.ExecReplaceRuleState(rs.RuleId, r.Triggered)
 		return err
 	} else {
 		return fmt.Errorf("Rule %s registry not found, try to delete it and recreate", r.Id)

+ 7 - 1
internal/topo/rule/ruleState.go

@@ -128,7 +128,13 @@ func (rs *RuleState) UpdateTopo(rule *api.Rule) error {
 	}
 	time.Sleep(1 * time.Millisecond)
 	rs.Rule = rule
-	return rs.Start()
+	// If not triggered, just ignore start the rule
+	if rule.Triggered {
+		if err := rs.Start(); err != nil {
+			return err
+		}
+	}
+	return nil
 }
 
 // Run start to run the two loops, do not access any changeable states

+ 43 - 31
internal/topo/rule/ruleState_test.go

@@ -116,30 +116,10 @@ func TestUpdate(t *testing.T) {
 	sp := processor.NewStreamProcessor()
 	sp.ExecStmt(`CREATE STREAM demo () WITH (DATASOURCE="users", FORMAT="JSON")`)
 	defer sp.ExecStmt(`DROP STREAM demo`)
-	rs, err := NewRuleState(&api.Rule{
-		Triggered: false,
-		Id:        "test",
-		Sql:       "SELECT ts FROM demo",
-		Actions: []map[string]interface{}{
-			{
-				"log": map[string]interface{}{},
-			},
-		},
-		Options: defaultOption,
-	})
-	if err != nil {
-		t.Error(err)
-		return
-	}
-	defer rs.Close()
-	err = rs.Start()
-	if err != nil {
-		t.Error(err)
-		return
-	}
 	tests := []struct {
-		r *api.Rule
-		e error
+		r         *api.Rule
+		e         error
+		triggered int
 	}{
 		{
 			r: &api.Rule{
@@ -153,7 +133,8 @@ func TestUpdate(t *testing.T) {
 				},
 				Options: defaultOption,
 			},
-			e: errors.New("Parse SQL SELECT FROM demo error: found \"FROM\", expected expression.."),
+			e:         errors.New("Parse SQL SELECT FROM demo error: found \"FROM\", expected expression.."),
+			triggered: 1,
 		},
 		{
 			r: &api.Rule{
@@ -167,7 +148,23 @@ func TestUpdate(t *testing.T) {
 				},
 				Options: defaultOption,
 			},
-			e: errors.New("fail to get stream demo1, please check if stream is created"),
+			e:         errors.New("fail to get stream demo1, please check if stream is created"),
+			triggered: 1,
+		},
+		{
+			r: &api.Rule{
+				Triggered: true,
+				Id:        "test",
+				Sql:       "SELECT * FROM demo",
+				Actions: []map[string]interface{}{
+					{
+						"log": map[string]interface{}{},
+					},
+				},
+				Options: defaultOption,
+			},
+			e:         nil,
+			triggered: 1,
 		},
 		{
 			r: &api.Rule{
@@ -181,14 +178,29 @@ func TestUpdate(t *testing.T) {
 				},
 				Options: defaultOption,
 			},
-			e: nil,
+			e:         nil,
+			triggered: 0,
 		},
 	}
 	for i, tt := range tests {
+		rs, err := NewRuleState(&api.Rule{
+			Triggered: false,
+			Id:        "test",
+			Sql:       "SELECT ts FROM demo",
+			Actions: []map[string]interface{}{
+				{
+					"log": map[string]interface{}{},
+				},
+			},
+			Options: defaultOption,
+		})
+		require.NoError(t, err)
+		err = rs.Start()
+		require.NoError(t, err)
 		err = rs.UpdateTopo(tt.r)
-		if !reflect.DeepEqual(err, tt.e) {
-			t.Errorf("%d.\n\nerror mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.e, err)
-		}
+		require.Equal(t, tt.e, err, fmt.Sprintf("case %v failed", i))
+		require.Equal(t, tt.triggered, rs.triggered, fmt.Sprintf("case %v failed", i))
+		rs.Close()
 	}
 }
 
@@ -200,7 +212,7 @@ func TestUpdateScheduleRule(t *testing.T) {
 	scheduleOption1.Cron = "mockCron"
 	scheduleOption1.Duration = "1s"
 	rule1 := &api.Rule{
-		Triggered: false,
+		Triggered: true,
 		Id:        "test",
 		Sql:       "SELECT ts FROM demo",
 		Actions: []map[string]interface{}{
@@ -223,7 +235,7 @@ func TestUpdateScheduleRule(t *testing.T) {
 	scheduleOption2.Cron = "mockCron2"
 	scheduleOption2.Duration = "2s"
 	rule2 := &api.Rule{
-		Triggered: false,
+		Triggered: true,
 		Id:        "test",
 		Sql:       "SELECT ts FROM demo",
 		Actions: []map[string]interface{}{