Przeglądaj źródła

fix: support start schedule rule in schedule (#2145)

* support_start_schedule_ruel

Signed-off-by: yisaer <disxiaofei@163.com>
Song Gao 1 rok temu
rodzic
commit
3a72679e60
2 zmienionych plików z 124 dodań i 26 usunięć
  1. 90 24
      internal/topo/rule/ruleState.go
  2. 34 2
      internal/topo/rule/ruleState_test.go

+ 90 - 24
internal/topo/rule/ruleState.go

@@ -19,6 +19,7 @@ import (
 	"fmt"
 	"math"
 	"math/rand"
+	"strings"
 	"sync"
 	"time"
 
@@ -262,6 +263,17 @@ func (rs *RuleState) startScheduleRule() error {
 	}
 	var cronCtx context.Context
 	cronCtx, rs.cronState.cancel = context.WithCancel(context.Background())
+	now := conf.GetNow()
+	isInRunningSchedule, remainedDuration, err := rs.isInRunningSchedule(now, d)
+	if err != nil {
+		return err
+	}
+	if isInRunningSchedule {
+		if err := rs.runScheduleRule(); err != nil {
+			return err
+		}
+		rs.stopAfterDuration(remainedDuration, cronCtx)
+	}
 	entryID, err := backgroundCron.AddFunc(rs.Rule.Options.Cron, func() {
 		var started bool
 		var err error
@@ -274,16 +286,9 @@ func (rs *RuleState) startScheduleRule() error {
 				defer rs.Unlock()
 			}
 			now := conf.GetNow()
-			var err error
-			allowed := true
-			for _, timeRange := range rs.Rule.Options.CronDatetimeRange {
-				allowed, err = isInScheduleRange(now, timeRange.Begin, timeRange.End)
-				if err != nil {
-					return false, err
-				}
-				if allowed {
-					break
-				}
+			allowed, err := rs.isInAllowedTimeRange(now)
+			if err != nil {
+				return false, err
 			}
 			if !allowed {
 				return false, nil
@@ -300,20 +305,7 @@ func (rs *RuleState) startScheduleRule() error {
 			return
 		}
 		if started {
-			after := time.After(d)
-			go func(ctx context.Context) {
-				select {
-				case <-after:
-					rs.Lock()
-					defer rs.Unlock()
-					if err := rs.stop(); err != nil {
-						conf.Log.Errorf("close rule %s failed, err: %v", rs.RuleId, err)
-					}
-					return
-				case <-cronCtx.Done():
-					return
-				}
-			}(cronCtx)
+			rs.stopAfterDuration(d, cronCtx)
 		}
 	})
 	if err != nil {
@@ -324,6 +316,35 @@ func (rs *RuleState) startScheduleRule() error {
 	return nil
 }
 
+func (rs *RuleState) runScheduleRule() error {
+	rs.Lock()
+	defer rs.Unlock()
+	rs.cronState.cron = rs.Rule.Options.Cron
+	rs.cronState.duration = rs.Rule.Options.Duration
+	err := rs.start()
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+func (rs *RuleState) stopAfterDuration(d time.Duration, cronCtx context.Context) {
+	after := time.After(d)
+	go func(ctx context.Context) {
+		select {
+		case <-after:
+			rs.Lock()
+			defer rs.Unlock()
+			if err := rs.stop(); err != nil {
+				conf.Log.Errorf("close rule %s failed, err: %v", rs.RuleId, err)
+			}
+			return
+		case <-cronCtx.Done():
+			return
+		}
+	}(cronCtx)
+}
+
 func (rs *RuleState) start() error {
 	if rs.triggered != 1 {
 		// If the rule has been stopped due to error, the topology is not nil
@@ -484,3 +505,48 @@ func isAfterTimeRange(now time.Time, end string) (bool, error) {
 	}
 	return now.After(e), nil
 }
+
+func (rs *RuleState) isInRunningSchedule(now time.Time, d time.Duration) (bool, time.Duration, error) {
+	allowed, err := rs.isInAllowedTimeRange(now)
+	if err != nil {
+		return false, 0, err
+	}
+	if !allowed {
+		return false, 0, nil
+	}
+	cronExpr := rs.Rule.Options.Cron
+	if strings.HasPrefix(cronExpr, "mock") {
+		return false, 0, nil
+	}
+	return isInRunningSchedule(cronExpr, now, d)
+}
+
+func (rs *RuleState) isInAllowedTimeRange(now time.Time) (bool, error) {
+	allowed := true
+	var err error
+	for _, timeRange := range rs.Rule.Options.CronDatetimeRange {
+		allowed, err = isInScheduleRange(now, timeRange.Begin, timeRange.End)
+		if err != nil {
+			return false, err
+		}
+		if allowed {
+			break
+		}
+	}
+	return allowed, nil
+}
+
+// isInRunningSchedule checks whether the rule should be running, eg:
+// If the duration is 10min, and cron is "0 0 * * *", and the current time is 00:00:02
+// And the rule should be started immediately instead of checking it on the next day.
+func isInRunningSchedule(cronExpr string, now time.Time, d time.Duration) (bool, time.Duration, error) {
+	s, err := cron.ParseStandard(cronExpr)
+	if err != nil {
+		return false, 0, err
+	}
+	previousSchedule := s.Next(now.Add(-d))
+	if now.After(previousSchedule) && now.Before(previousSchedule.Add(d)) {
+		return true, previousSchedule.Add(d).Sub(now), nil
+	}
+	return false, 0, nil
+}

+ 34 - 2
internal/topo/rule/ruleState_test.go

@@ -197,7 +197,7 @@ func TestUpdateScheduleRule(t *testing.T) {
 	sp.ExecStmt(`CREATE STREAM demo () WITH (DATASOURCE="users", FORMAT="JSON")`)
 	defer sp.ExecStmt(`DROP STREAM demo`)
 	scheduleOption1 := *defaultOption
-	scheduleOption1.Cron = "mockCron1"
+	scheduleOption1.Cron = "mockCron"
 	scheduleOption1.Duration = "1s"
 	rule1 := &api.Rule{
 		Triggered: false,
@@ -216,7 +216,7 @@ func TestUpdateScheduleRule(t *testing.T) {
 	err = rs.startScheduleRule()
 	require.NoError(t, err)
 	require.True(t, rs.cronState.isInSchedule)
-	require.Equal(t, "mockCron1", rs.cronState.cron)
+	require.Equal(t, "mockCron", rs.cronState.cron)
 	require.Equal(t, "1s", rs.cronState.duration)
 
 	scheduleOption2 := *defaultOption
@@ -648,4 +648,36 @@ func TestScheduleRuleInRange(t *testing.T) {
 			return
 		}
 	}()
+
+	now2, err := time.Parse(layout, "2006-01-02 15:04:01")
+	require.NoError(t, err)
+	r.Options.Cron = "4 15 * * *"
+	r.Options.CronDatetimeRange = nil
+	r.Options.Duration = "2s"
+	m.Set(now2)
+
+	func() {
+		rs, err := NewRuleState(r)
+		require.NoError(t, err)
+		require.NoError(t, rs.startScheduleRule())
+		time.Sleep(500 * time.Millisecond)
+		state, err := rs.GetState()
+		require.NoError(t, err)
+		require.Equal(t, state, ruleStarted)
+		time.Sleep(3 * time.Second)
+		state, err = rs.GetState()
+		require.NoError(t, err)
+		require.Equal(t, state, ruleStopped)
+	}()
+}
+
+func TestIsRuleInRunningSchedule(t *testing.T) {
+	now, err := time.Parse(layout, "2006-01-02 15:04:01")
+	require.NoError(t, err)
+	d, err := time.ParseDuration("2s")
+	require.NoError(t, err)
+	isInSchedule, remainedDuration, err := isInRunningSchedule("4 15 * * *", now, d)
+	require.NoError(t, err)
+	require.True(t, isInSchedule)
+	require.Equal(t, remainedDuration, time.Second)
 }