Browse Source

fix: fix updateTopo for schedule rule (#1982)

* fix update schedule rule

Signed-off-by: yisaer <disxiaofei@163.com>

* fix test and lint

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: yisaer <disxiaofei@163.com>

---------

Signed-off-by: yisaer <disxiaofei@163.com>
Song Gao 1 year ago
parent
commit
74439586bc

+ 3 - 0
internal/server/rule_manager.go

@@ -189,6 +189,9 @@ func reRunRule(name string) error {
 	if !ok {
 		return fmt.Errorf("Rule %s is not found in registry, please check if it is created", name)
 	} else {
+		if err := ruleProcessor.ExecReplaceRuleState(rs.RuleId, true); err != nil {
+			return err
+		}
 		return rs.UpdateTopo(rs.Rule)
 	}
 }

+ 26 - 30
internal/topo/rule/ruleState.go

@@ -61,6 +61,10 @@ type cronStateCtx struct {
 	// isInSchedule indicates the current rule is in scheduled in backgroundCron
 	isInSchedule   bool
 	startFailedCnt int
+
+	// only used for test
+	cron     string
+	duration string
 }
 
 /*********
@@ -107,27 +111,15 @@ func NewRuleState(rule *api.Rule) (*RuleState, error) {
 // UpdateTopo update the rule and the topology AND restart the topology
 // Do not need to call restart after update
 func (rs *RuleState) UpdateTopo(rule *api.Rule) error {
-	if tp, err := planner.Plan(rule); err != nil {
+	if _, err := planner.Plan(rule); err != nil {
+		return err
+	}
+	if err := rs.Stop(); err != nil {
 		return err
-	} else {
-		rs.Lock()
-		defer rs.Unlock()
-		// Update rule
-		rs.Rule = rule
-		rs.topoGraph = nil
-		// Stop the old topo
-		if rs.triggered == 1 {
-			rs.Topology.Cancel()
-			rs.ActionCh <- ActionSignalStop
-			// wait a little to make sure the old topo is stopped
-			time.Sleep(1 * time.Millisecond)
-		}
-		// Update the topo and start
-		rs.Topology = tp
-		rs.triggered = 1
-		rs.ActionCh <- ActionSignalStart
-		return nil
 	}
+	time.Sleep(1 * time.Millisecond)
+	rs.Rule = rule
+	return rs.Start()
 }
 
 // Run start to run the two loops, do not access any changeable states
@@ -270,8 +262,15 @@ func (rs *RuleState) startScheduleRule() error {
 	cronCtx, rs.cronState.cancel = context.WithCancel(context.Background())
 	entryID, err := backgroundCron.AddFunc(rs.Rule.Options.Cron, func() {
 		if err := func() error {
-			rs.Lock()
-			defer rs.Unlock()
+			switch backgroundCron.(type) {
+			case *MockCron:
+				// skip mutex if this is a unit test
+			default:
+				rs.Lock()
+				defer rs.Unlock()
+			}
+			rs.cronState.cron = rs.Rule.Options.Cron
+			rs.cronState.duration = rs.Rule.Options.Duration
 			return rs.start()
 		}(); err != nil {
 			rs.Lock()
@@ -324,6 +323,11 @@ func (rs *RuleState) start() error {
 func (rs *RuleState) Stop() error {
 	rs.Lock()
 	defer rs.Unlock()
+	rs.stopScheduleRule()
+	return rs.stop()
+}
+
+func (rs *RuleState) stopScheduleRule() {
 	if rs.Rule.IsScheduleRule() && rs.cronState.isInSchedule {
 		rs.cronState.isInSchedule = false
 		if rs.cronState.cancel != nil {
@@ -332,7 +336,6 @@ func (rs *RuleState) Stop() error {
 		rs.cronState.startFailedCnt = 0
 		backgroundCron.Remove(rs.cronState.entryID)
 	}
-	return rs.stop()
 }
 
 func (rs *RuleState) stop() error {
@@ -357,14 +360,7 @@ func (rs *RuleState) Close() error {
 		rs.Topology.Cancel()
 	}
 	rs.triggered = -1
-	if rs.Rule.IsScheduleRule() && rs.cronState.isInSchedule {
-		rs.cronState.isInSchedule = false
-		if rs.cronState.cancel != nil {
-			rs.cronState.cancel()
-		}
-		rs.cronState.startFailedCnt = 0
-		backgroundCron.Remove(rs.cronState.entryID)
-	}
+	rs.stopScheduleRule()
 	close(rs.ActionCh)
 	return nil
 }

+ 47 - 0
internal/topo/rule/ruleState_test.go

@@ -191,6 +191,53 @@ func TestUpdate(t *testing.T) {
 	}
 }
 
+func TestUpdateScheduleRule(t *testing.T) {
+	sp := processor.NewStreamProcessor()
+	sp.ExecStmt(`CREATE STREAM demo () WITH (DATASOURCE="users", FORMAT="JSON")`)
+	defer sp.ExecStmt(`DROP STREAM demo`)
+	scheduleOption1 := *defaultOption
+	scheduleOption1.Cron = "mockCron1"
+	scheduleOption1.Duration = "1s"
+	rule1 := &api.Rule{
+		Triggered: false,
+		Id:        "test",
+		Sql:       "SELECT ts FROM demo",
+		Actions: []map[string]interface{}{
+			{
+				"log": map[string]interface{}{},
+			},
+		},
+		Options: &scheduleOption1,
+	}
+	rs, err := NewRuleState(rule1)
+	require.NoError(t, err)
+	defer rs.Close()
+	err = rs.startScheduleRule()
+	require.NoError(t, err)
+	require.True(t, rs.cronState.isInSchedule)
+	require.Equal(t, "mockCron1", rs.cronState.cron)
+	require.Equal(t, "1s", rs.cronState.duration)
+
+	scheduleOption2 := *defaultOption
+	scheduleOption2.Cron = "mockCron2"
+	scheduleOption2.Duration = "2s"
+	rule2 := &api.Rule{
+		Triggered: false,
+		Id:        "test",
+		Sql:       "SELECT ts FROM demo",
+		Actions: []map[string]interface{}{
+			{
+				"log": map[string]interface{}{},
+			},
+		},
+		Options: &scheduleOption2,
+	}
+	err = rs.UpdateTopo(rule2)
+	require.NoError(t, err)
+	require.Equal(t, "mockCron2", rs.cronState.cron)
+	require.Equal(t, "2s", rs.cronState.duration)
+}
+
 func TestMultipleAccess(t *testing.T) {
 	sp := processor.NewStreamProcessor()
 	sp.ExecStmt(`CREATE STREAM demo () WITH (DATASOURCE="users", FORMAT="JSON")`)