Browse Source

fix(rule): fix schedule rule state (#1916)

* fix state

Signed-off-by: yisaer <disxiaofei@163.com>

* fix

Signed-off-by: yisaer <disxiaofei@163.com>

* fix

Signed-off-by: yisaer <disxiaofei@163.com>

* fix

Signed-off-by: yisaer <disxiaofei@163.com>

---------

Signed-off-by: yisaer <disxiaofei@163.com>
Song Gao 1 year ago
parent
commit
34b363ce8a
2 changed files with 54 additions and 5 deletions
  1. 9 5
      internal/topo/rule/ruleState.go
  2. 45 0
      internal/topo/rule/ruleState_test.go

+ 9 - 5
internal/topo/rule/ruleState.go

@@ -324,9 +324,11 @@ func (rs *RuleState) start() error {
 func (rs *RuleState) Stop() error {
 	rs.Lock()
 	defer rs.Unlock()
-	if rs.Rule.IsScheduleRule() {
+	if rs.Rule.IsScheduleRule() && rs.cronState.isInSchedule {
 		rs.cronState.isInSchedule = false
-		rs.cronState.cancel()
+		if rs.cronState.cancel != nil {
+			rs.cronState.cancel()
+		}
 		rs.cronState.startFailedCnt = 0
 		backgroundCron.Remove(rs.cronState.entryID)
 	}
@@ -355,9 +357,11 @@ func (rs *RuleState) Close() error {
 		rs.Topology.Cancel()
 	}
 	rs.triggered = -1
-	if rs.Rule.IsScheduleRule() {
+	if rs.Rule.IsScheduleRule() && rs.cronState.isInSchedule {
 		rs.cronState.isInSchedule = false
-		rs.cronState.cancel()
+		if rs.cronState.cancel != nil {
+			rs.cronState.cancel()
+		}
 		rs.cronState.startFailedCnt = 0
 		backgroundCron.Remove(rs.cronState.entryID)
 	}
@@ -379,7 +383,7 @@ func (rs *RuleState) GetState() (string, error) {
 			case nil:
 				result = "Running"
 			case context.Canceled:
-				if rs.Rule.IsScheduleRule() {
+				if rs.Rule.IsScheduleRule() && rs.cronState.isInSchedule {
 					result = "Stopped: waiting for next schedule."
 				} else {
 					result = "Stopped: canceled manually."

+ 45 - 0
internal/topo/rule/ruleState_test.go

@@ -417,4 +417,49 @@ func TestScheduleRule(t *testing.T) {
 			}
 		}
 	}()
+
+	func() {
+		rs, err := NewRuleState(r)
+		if err != nil {
+			t.Error(err)
+			return
+		}
+		if err := rs.startScheduleRule(); err != nil {
+			t.Error(err)
+			return
+		}
+		if err := rs.Stop(); err != nil {
+			t.Error(err)
+			return
+		}
+		state, err := rs.GetState()
+		if err != nil {
+			t.Errorf("get rule state error: %v", err)
+			return
+		}
+		if state != "Stopped: canceled manually." {
+			t.Errorf("rule state mismatch: exp=%v, got=%v", "Stopped: canceled manually.", state)
+			return
+		}
+		if rs.cronState.isInSchedule {
+			t.Error("cron state shouldn't be in schedule")
+			return
+		}
+	}()
+
+	func() {
+		rs, err := NewRuleState(r)
+		if err != nil {
+			t.Error(err)
+			return
+		}
+		if err := rs.Stop(); err != nil {
+			t.Error(err)
+			return
+		}
+		if err := rs.Close(); err != nil {
+			t.Error(err)
+			return
+		}
+	}()
 }