Jelajahi Sumber

fix: fix schedule rule state (#2196)

Signed-off-by: yisaer <disxiaofei@163.com>
Signed-off-by: Song Gao <disxiaofei@163.com>
Song Gao 1 tahun lalu
induk
melakukan
c9750139db

+ 22 - 5
internal/server/rule_manager.go

@@ -181,25 +181,42 @@ func deleteRule(name string) (result string) {
 }
 
 func startRule(name string) error {
-	return reRunRule(name)
+	return reRunRule(name, false)
+}
+
+func startRuleInternal(name string) error {
+	return reRunRule(name, true)
 }
 
 // reRunRule rerun the rule from optimize to Open the operator in order to refresh the schema
-func reRunRule(name string) error {
+func reRunRule(name string, isInternal bool) error {
 	rs, ok := registry.Load(name)
 	if !ok {
 		return fmt.Errorf("Rule %s is not found in registry, please check if it is created", name)
 	} else {
-		if err := ruleProcessor.ExecReplaceRuleState(rs.RuleId, true); err != nil {
-			return err
+		if !isInternal {
+			if err := ruleProcessor.ExecReplaceRuleState(rs.RuleId, true); err != nil {
+				return err
+			}
 		}
 		return rs.UpdateTopo(rs.Rule)
 	}
 }
 
+func stopRuleInternal(name string) {
+	var err error
+	if rs, ok := registry.Load(name); ok {
+		err = rs.InternalStop()
+		if err != nil {
+			conf.Log.Warn(err)
+		}
+	}
+}
+
 func stopRule(name string) (result string) {
+	var err error
 	if rs, ok := registry.Load(name); ok {
-		err := rs.Stop()
+		err = rs.Stop()
 		if err != nil {
 			conf.Log.Warn(err)
 		}

+ 22 - 13
internal/server/server.go

@@ -301,17 +301,26 @@ func runScheduleRuleChecker(exit <-chan struct{}) {
 }
 
 func handleScheduleRuleState(now time.Time, r *api.Rule, state string) error {
-	toStart, toStop := handleScheduleRule(now, r, state)
-	conf.Log.Debugf("rule %v origin state: %v, going to start:%v, to stop:%v", r.Id, state, toStart, toStop)
-	if toStart {
-		return startRule(r.Id)
-	} else if toStop {
-		stopRule(r.Id)
+	scheduleActionSignal := handleScheduleRule(now, r, state)
+	conf.Log.Debugf("rule %v origin state: %v, sginal: %v", r.Id, state, scheduleActionSignal)
+	switch scheduleActionSignal {
+	case scheduleRuleActionStart:
+		return startRuleInternal(r.Id)
+	case scheduleRuleActionStop:
+		stopRuleInternal(r.Id)
 	}
 	return nil
 }
 
-func handleScheduleRule(now time.Time, r *api.Rule, state string) (bool, bool) {
+type scheduleRuleAction int
+
+const (
+	scheduleRuleActionDoNothing scheduleRuleAction = iota
+	scheduleRuleActionStart
+	scheduleRuleActionStop
+)
+
+func handleScheduleRule(now time.Time, r *api.Rule, state string) scheduleRuleAction {
 	options := r.Options
 	if options != nil && options.Cron == "" && options.Duration == "" && len(options.CronDatetimeRange) > 0 {
 		var isInRange bool
@@ -320,17 +329,17 @@ func handleScheduleRule(now time.Time, r *api.Rule, state string) (bool, bool) {
 			isInRange, err = schedule.IsInScheduleRange(now, cRange.Begin, cRange.End)
 			if err != nil {
 				conf.Log.Errorf("check rule %v schedule failed, err:%v", r.Id, err)
-				return false, false
+				return scheduleRuleActionDoNothing
 			}
 			if isInRange {
 				break
 			}
 		}
-		if isInRange && state != "Running" {
-			return true, false
-		} else if !isInRange && state == "Running" {
-			return false, true
+		if isInRange && state == rule.RuleWait && r.Triggered {
+			return scheduleRuleActionStart
+		} else if !isInRange && state == rule.RuleStarted && r.Triggered {
+			return scheduleRuleActionStop
 		}
 	}
-	return false, false
+	return scheduleRuleActionDoNothing
 }

+ 44 - 32
internal/server/server_test.go

@@ -15,55 +15,60 @@
 package server
 
 import (
+	"fmt"
 	"testing"
 	"time"
 
 	"github.com/stretchr/testify/require"
 
+	"github.com/lf-edge/ekuiper/internal/topo/rule"
 	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/cast"
 )
 
 func TestHandleScheduleRule(t *testing.T) {
+	defer func() {
+		cast.SetTimeZone(cast.GetConfiguredTimeZone().String())
+	}()
+	err := cast.SetTimeZone("UTC")
+	require.NoError(t, err)
 	now, err := time.Parse("2006-01-02 15:04:05", "2006-01-02 15:04:05")
 	require.NoError(t, err)
+	now = now.In(cast.GetConfiguredTimeZone())
 	testcases := []struct {
-		state   string
-		begin   string
-		end     string
-		toStart bool
-		toStop  bool
+		state  string
+		begin  string
+		end    string
+		action scheduleRuleAction
 	}{
 		{
-			state:   "Running",
-			begin:   "2006-01-02 15:04:01",
-			end:     "2006-01-02 15:04:06",
-			toStart: false,
-			toStop:  false,
+			state:  "Running",
+			begin:  "2006-01-02 15:04:01",
+			end:    "2006-01-02 15:04:06",
+			action: scheduleRuleActionDoNothing,
 		},
 		{
-			state:   "Stopped",
-			begin:   "2006-01-02 15:04:01",
-			end:     "2006-01-02 15:04:06",
-			toStart: true,
-			toStop:  false,
+			state:  rule.RuleWait,
+			begin:  "2006-01-02 15:04:01",
+			end:    "2006-01-02 15:04:06",
+			action: scheduleRuleActionStart,
 		},
 		{
-			state:   "Stopped",
-			begin:   "2006-01-02 15:04:01",
-			end:     "2006-01-02 15:04:04",
-			toStart: false,
-			toStop:  false,
+			state:  rule.RuleTerminated,
+			begin:  "2006-01-02 15:04:01",
+			end:    "2006-01-02 15:04:04",
+			action: scheduleRuleActionDoNothing,
 		},
 		{
-			state:   "Running",
-			begin:   "2006-01-02 15:04:01",
-			end:     "2006-01-02 15:04:04",
-			toStart: false,
-			toStop:  true,
+			state:  rule.RuleStarted,
+			begin:  "2006-01-02 15:04:01",
+			end:    "2006-01-02 15:04:04",
+			action: scheduleRuleActionStop,
 		},
 	}
-	for _, tc := range testcases {
+	for i, tc := range testcases {
 		r := &api.Rule{
+			Triggered: true,
 			Options: &api.RuleOption{
 				Cron:     "",
 				Duration: "",
@@ -75,9 +80,8 @@ func TestHandleScheduleRule(t *testing.T) {
 				},
 			},
 		}
-		toStart, toStop := handleScheduleRule(now, r, tc.state)
-		require.Equal(t, tc.toStart, toStart)
-		require.Equal(t, tc.toStop, toStop)
+		scheduleRuleSignal := handleScheduleRule(now, r, tc.state)
+		require.Equal(t, tc.action, scheduleRuleSignal, fmt.Sprintf("case %v", i))
 	}
 }
 
@@ -89,23 +93,31 @@ func TestRunScheduleRuleChecker(t *testing.T) {
 }
 
 func TestHandleScheduleRuleState(t *testing.T) {
+	defer func() {
+		cast.SetTimeZone(cast.GetConfiguredTimeZone().String())
+	}()
+	err := cast.SetTimeZone("UTC")
+	require.NoError(t, err)
 	r := &api.Rule{}
 	r.Options = &api.RuleOption{}
 	now, err := time.Parse("2006-01-02 15:04:05", "2006-01-02 15:04:05")
 	require.NoError(t, err)
-	require.NoError(t, handleScheduleRuleState(now, r, "Running"))
+	require.NoError(t, handleScheduleRuleState(now, r, rule.RuleStarted))
+	require.NoError(t, handleScheduleRuleState(now, r, rule.RuleWait))
 	r.Options.CronDatetimeRange = []api.DatetimeRange{
 		{
 			Begin: "2006-01-02 15:04:01",
 			End:   "2006-01-02 15:04:06",
 		},
 	}
-	require.NoError(t, handleScheduleRuleState(now, r, "Running"))
+	require.NoError(t, handleScheduleRuleState(now, r, rule.RuleStarted))
+	require.NoError(t, handleScheduleRuleState(now, r, rule.RuleWait))
 	r.Options.CronDatetimeRange = []api.DatetimeRange{
 		{
 			Begin: "2006-01-02 15:04:01",
 			End:   "2006-01-02 15:04:02",
 		},
 	}
-	require.NoError(t, handleScheduleRuleState(now, r, "Running"))
+	require.NoError(t, handleScheduleRuleState(now, r, rule.RuleStarted))
+	require.NoError(t, handleScheduleRuleState(now, r, rule.RuleWait))
 }

+ 52 - 22
internal/topo/rule/ruleState.go

@@ -33,6 +33,13 @@ import (
 	"github.com/lf-edge/ekuiper/pkg/schedule"
 )
 
+const (
+	RuleStarted    = "Running"
+	RuleStopped    = "Stopped: canceled manually."
+	RuleTerminated = "Stopped: schedule terminated."
+	RuleWait       = "Stopped: waiting for next schedule."
+)
+
 type ActionSignal int
 
 const (
@@ -84,7 +91,7 @@ type RuleState struct {
 	Rule *api.Rule
 	// States, create through rule in each rule start
 	Topology *topo.Topo
-	// 0 stop, 1 start, -1 delete, changed in actions
+	// 0 stop, 1 running, -1 delete, 2 internal stop, changed in actions
 	triggered int
 	// temporary storage for topo graph to make sure even rule close, the graph is still available
 	topoGraph *api.PrintableTopo
@@ -338,6 +345,7 @@ func (rs *RuleState) runScheduleRule() error {
 	return nil
 }
 
+// stopAfterDuration only for schedule rule
 func (rs *RuleState) stopAfterDuration(d time.Duration, cronCtx context.Context) {
 	after := time.After(d)
 	go func(ctx context.Context) {
@@ -345,7 +353,7 @@ func (rs *RuleState) stopAfterDuration(d time.Duration, cronCtx context.Context)
 		case <-after:
 			rs.Lock()
 			defer rs.Unlock()
-			if err := rs.stop(); err != nil {
+			if err := rs.internalStop(); err != nil {
 				conf.Log.Errorf("close rule %s failed, err: %v", rs.RuleId, err)
 			}
 			return
@@ -380,7 +388,7 @@ func (rs *RuleState) Stop() error {
 	rs.Lock()
 	defer rs.Unlock()
 	if rs.Rule.IsScheduleRule() || rs.Rule.IsLongRunningScheduleRule() {
-		conf.Log.Debugf("rule %v stopped", rs.RuleId)
+		conf.Log.Debugf("rule %v manual stopped", rs.RuleId)
 	}
 	rs.stopScheduleRule()
 	return rs.stop()
@@ -409,6 +417,31 @@ func (rs *RuleState) stop() error {
 	return nil
 }
 
+func (rs *RuleState) InternalStop() error {
+	rs.Lock()
+	defer rs.Unlock()
+	if !rs.Rule.IsLongRunningScheduleRule() {
+		err := fmt.Errorf("rule %v isn't allowed to execute Internal stop as it's not long running schedule rule", rs.RuleId)
+		conf.Log.Errorf(err.Error())
+		return err
+	} else {
+		conf.Log.Debugf("rule %v internal stopped", rs.RuleId)
+	}
+	return rs.internalStop()
+}
+
+func (rs *RuleState) internalStop() error {
+	if rs.triggered == -1 {
+		return fmt.Errorf("rule %s is already deleted", rs.RuleId)
+	}
+	rs.triggered = 2
+	if rs.Topology != nil {
+		rs.Topology.Cancel()
+	}
+	rs.ActionCh <- ActionSignalStop
+	return nil
+}
+
 func (rs *RuleState) Close() error {
 	rs.Lock()
 	defer rs.Unlock()
@@ -436,32 +469,16 @@ func (rs *RuleState) GetState() (string, error) {
 			err := c.Err()
 			switch err {
 			case nil:
-				result = "Running"
+				result = RuleStarted
 			case context.Canceled:
-				if (rs.Rule.IsScheduleRule() && rs.cronState.isInSchedule) || rs.Rule.IsLongRunningScheduleRule() {
-					if schedule.IsAfterTimeRanges(conf.GetNow(), rs.Rule.Options.CronDatetimeRange) {
-						result = "Stopped: schedule terminated."
-					} else {
-						result = "Stopped: waiting for next schedule."
-					}
-				} else {
-					result = "Stopped: canceled manually."
-				}
+				result = rs.getStoppedRuleState()
 			case context.DeadlineExceeded:
 				result = "Stopped: deadline exceed."
 			default:
 				result = fmt.Sprintf("Stopped: %v.", err)
 			}
 		} else {
-			if rs.cronState.isInSchedule || rs.Rule.IsLongRunningScheduleRule() {
-				if schedule.IsAfterTimeRanges(conf.GetNow(), rs.Rule.Options.CronDatetimeRange) {
-					result = "Stopped: schedule terminated."
-				} else {
-					result = "Stopped: waiting for next schedule."
-				}
-			} else {
-				result = "Stopped: canceled manually."
-			}
+			result = rs.getStoppedRuleState()
 		}
 	}
 	if rs.Rule.IsScheduleRule() && rs.cronState.startFailedCnt > 0 {
@@ -470,6 +487,19 @@ func (rs *RuleState) GetState() (string, error) {
 	return result, nil
 }
 
+func (rs *RuleState) getStoppedRuleState() (result string) {
+	if schedule.IsAfterTimeRanges(conf.GetNow(), rs.Rule.Options.CronDatetimeRange) {
+		result = RuleTerminated
+	} else if rs.cronState.isInSchedule {
+		result = RuleWait
+	} else if rs.triggered == 0 || rs.triggered == -1 {
+		result = RuleStopped
+	} else if rs.triggered == 2 {
+		result = RuleWait
+	}
+	return result
+}
+
 func (rs *RuleState) GetTopoGraph() *api.PrintableTopo {
 	rs.RLock()
 	defer rs.RUnlock()

+ 59 - 69
internal/topo/rule/ruleState_test.go

@@ -556,33 +556,16 @@ func TestScheduleRuleInRange(t *testing.T) {
 			End:   after.Format(layout),
 		},
 	}
-	const ruleStarted = "Running"
-	const ruleStopped = "Stopped: waiting for next schedule."
-	const ruleTerminated = "Stopped: schedule terminated."
 	func() {
 		rs, err := NewRuleState(r)
-		if err != nil {
-			t.Error(err)
-			return
-		}
-		if err := rs.startScheduleRule(); err != nil {
-			t.Error(err)
-			return
-		}
+		require.NoError(t, err)
+		err = rs.startScheduleRule()
+		require.NoError(t, err)
 		time.Sleep(500 * time.Millisecond)
 		state, err := rs.GetState()
-		if err != nil {
-			t.Errorf("get rule state error: %v", err)
-			return
-		}
-		if state != ruleStarted {
-			t.Errorf("rule state mismatch: exp=%v, got=%v", ruleStarted, state)
-			return
-		}
-		if !rs.cronState.isInSchedule {
-			t.Error("cron state should be in schedule")
-			return
-		}
+		require.NoError(t, err)
+		require.Equal(t, RuleStarted, state)
+		require.True(t, rs.cronState.isInSchedule)
 	}()
 
 	r.Options.CronDatetimeRange = []api.DatetimeRange{
@@ -593,28 +576,14 @@ func TestScheduleRuleInRange(t *testing.T) {
 	}
 	func() {
 		rs, err := NewRuleState(r)
-		if err != nil {
-			t.Error(err)
-			return
-		}
-		if err := rs.startScheduleRule(); err != nil {
-			t.Error(err)
-			return
-		}
+		require.NoError(t, err)
+		err = rs.startScheduleRule()
+		require.NoError(t, err)
 		time.Sleep(500 * time.Millisecond)
 		state, err := rs.GetState()
-		if err != nil {
-			t.Errorf("get rule state error: %v", err)
-			return
-		}
-		if state != ruleStopped {
-			t.Errorf("rule state mismatch: exp=%v, got=%v", ruleStopped, state)
-			return
-		}
-		if !rs.cronState.isInSchedule {
-			t.Error("cron state should be in schedule")
-			return
-		}
+		require.NoError(t, err)
+		require.Equal(t, RuleWait, state)
+		require.True(t, rs.cronState.isInSchedule)
 	}()
 
 	r.Options.CronDatetimeRange = []api.DatetimeRange{
@@ -625,28 +594,14 @@ func TestScheduleRuleInRange(t *testing.T) {
 	}
 	func() {
 		rs, err := NewRuleState(r)
-		if err != nil {
-			t.Error(err)
-			return
-		}
-		if err := rs.startScheduleRule(); err != nil {
-			t.Error(err)
-			return
-		}
+		require.NoError(t, err)
+		err = rs.startScheduleRule()
+		require.NoError(t, err)
 		time.Sleep(500 * time.Millisecond)
 		state, err := rs.GetState()
-		if err != nil {
-			t.Errorf("get rule state error: %v", err)
-			return
-		}
-		if state != ruleTerminated {
-			t.Errorf("rule state mismatch: exp=%v, got=%v", ruleTerminated, state)
-			return
-		}
-		if !rs.cronState.isInSchedule {
-			t.Error("cron state should be in schedule")
-			return
-		}
+		require.NoError(t, err)
+		require.Equal(t, RuleTerminated, state)
+		require.True(t, rs.cronState.isInSchedule)
 	}()
 
 	now2, err := time.Parse(layout, "2006-01-02 15:04:01")
@@ -663,11 +618,11 @@ func TestScheduleRuleInRange(t *testing.T) {
 		time.Sleep(500 * time.Millisecond)
 		state, err := rs.GetState()
 		require.NoError(t, err)
-		require.Equal(t, state, ruleStarted)
+		require.Equal(t, state, RuleStarted)
 		time.Sleep(3 * time.Second)
 		state, err = rs.GetState()
 		require.NoError(t, err)
-		require.Equal(t, state, ruleStopped)
+		require.Equal(t, state, RuleWait)
 	}()
 }
 
@@ -702,8 +657,6 @@ func TestStartLongRunningScheduleRule(t *testing.T) {
 			End:   after.Format(layout),
 		},
 	}
-	const ruleStopped = "Stopped: schedule terminated."
-	const ruleStarted = "Running"
 	func() {
 		rs, err := NewRuleState(r)
 		require.NoError(t, err)
@@ -711,7 +664,7 @@ func TestStartLongRunningScheduleRule(t *testing.T) {
 		time.Sleep(500 * time.Millisecond)
 		state, err := rs.GetState()
 		require.NoError(t, err)
-		require.Equal(t, state, ruleStarted)
+		require.Equal(t, state, RuleStarted)
 	}()
 	r.Options.CronDatetimeRange = []api.DatetimeRange{
 		{
@@ -726,6 +679,43 @@ func TestStartLongRunningScheduleRule(t *testing.T) {
 		time.Sleep(500 * time.Millisecond)
 		state, err := rs.GetState()
 		require.NoError(t, err)
-		require.Equal(t, state, ruleStopped)
+		require.Equal(t, state, RuleTerminated)
 	}()
 }
+
+func TestRuleStateInternalStop(t *testing.T) {
+	conf.IsTesting = true
+	sp := processor.NewStreamProcessor()
+	sp.ExecStmt(`CREATE STREAM demo () WITH (TYPE="neuron", FORMAT="JSON")`)
+	defer sp.ExecStmt(`DROP STREAM demo`)
+	r := &api.Rule{
+		Triggered: false,
+		Id:        "test",
+		Sql:       "SELECT ts FROM demo",
+		Actions: []map[string]interface{}{
+			{
+				"log": map[string]interface{}{},
+			},
+		},
+		Options: defaultOption,
+	}
+	r.Options.Cron = "123"
+	rs, err := NewRuleState(r)
+	require.NoError(t, err)
+	err = rs.InternalStop()
+	require.Error(t, err)
+
+	r.Options.Cron = ""
+	r.Options.Duration = ""
+	r.Options.CronDatetimeRange = []api.DatetimeRange{
+		{
+			Begin: layout,
+			End:   layout,
+		},
+	}
+	rs, err = NewRuleState(r)
+	require.NoError(t, err)
+	err = rs.InternalStop()
+	require.NoError(t, err)
+	require.Equal(t, rs.triggered, 2)
+}