Bläddra i källkod

feat: support time range for cron (#2015)

* support range

Signed-off-by: yisaer <disxiaofei@163.com>

* fix lint

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: yisaer <disxiaofei@163.com>

* add docs

Signed-off-by: yisaer <disxiaofei@163.com>

* fix lint

Signed-off-by: yisaer <disxiaofei@163.com>

* fix lint

Signed-off-by: yisaer <disxiaofei@163.com>

---------

Signed-off-by: yisaer <disxiaofei@163.com>
Song Gao 1 år sedan
förälder
incheckning
02cdcfc8da

+ 25 - 0
docs/en_US/guide/rules/overview.md

@@ -147,6 +147,7 @@ The current options includes:
 | restartStrategy    | struct               | Specify the strategy to automatic restarting rule after failures. This can help to get over recoverable failures without manual operations. Please check [Rule Restart Strategy](#rule-restart-strategy) for detail configuration items.                                                                                                          |
 | restartStrategy    | struct               | Specify the strategy to automatic restarting rule after failures. This can help to get over recoverable failures without manual operations. Please check [Rule Restart Strategy](#rule-restart-strategy) for detail configuration items.                                                                                                          |
 | cron | string: "" | Specify the periodic trigger strategy of the rule, which is described by [cron expression](https://en.wikipedia.org/wiki/Cron) |
 | cron | string: "" | Specify the periodic trigger strategy of the rule, which is described by [cron expression](https://en.wikipedia.org/wiki/Cron) |
 | duration | string: "" | Specifies the running duration of the rule, only valid when cron is specified. The duration should not exceed the time interval between two cron cycles, otherwise it will cause unexpected behavior. |
 | duration | string: "" | Specifies the running duration of the rule, only valid when cron is specified. The duration should not exceed the time interval between two cron cycles, otherwise it will cause unexpected behavior. |
+| cronDatetimeRange | lists of struct | Specify the effective time period of the Scheduled Rule, which is only valid when `cron` is specified. When this `cronDatetimeRange` is specified, the Scheduled Rule will only take effect within the time range specified. Please see [Scheduled Rule](#Scheduled Rule) for detailed configuration items|
 
 
 For detail about `qos` and `checkpointInterval`, please check [state and fault tolerance](./state_and_fault_tolerance.md).
 For detail about `qos` and `checkpointInterval`, please check [state and fault tolerance](./state_and_fault_tolerance.md).
 
 
@@ -174,6 +175,30 @@ When `cron` is every 1 hour and `duration` is 30 minutes, then the rule will be
 
 
 When a periodic rule is stopped by [stop rule](../../api/restapi/rules.md#stop-a-rule), the rule will be removed from the periodic scheduler and will no longer be scheduled to run. If the rule is running, it will also be paused.
 When a periodic rule is stopped by [stop rule](../../api/restapi/rules.md#stop-a-rule), the rule will be removed from the periodic scheduler and will no longer be scheduled to run. If the rule is running, it will also be paused.
 
 
+`cronDatetimeRange`configuration items are like following:
+
+| Option name  | Type & Default Value | Description                                                                                                                           |
+|--------------|------------|-----------------------------------------------------------|
+| begin     | string    | The begin time of the effective period of the scheduled rule, the format is `YYYY-MM-DD hh:mm:ss'                           |
+| end        | string  | The end time of the effective period of the scheduled rule, the format is `YYYY-MM-DD hh:mm:ss'       |
+
+`cronDatetimeRange` supports lists of struct, you can declare a set of time ranges to express multiple time ranges for scheduled rules to take effect:
+
+```json
+{
+    "cronDatetimeRange": [
+        {
+            "begin": "2023-06-26 10:00:00",
+            "end": "2023-06-26 20:00:00"
+        },
+        {
+            "begin": "2023-06-27 10:00:00",
+            "end": "2023-06-27 20:00:00"
+        }
+    ]
+}
+```
+
 ## View rule status
 ## View rule status
 
 
 When a rule is deployed to eKuiper, we can use the rule indicator to understand the current running status of the rule.
 When a rule is deployed to eKuiper, we can use the rule indicator to understand the current running status of the rule.

+ 25 - 0
docs/zh_CN/guide/rules/overview.md

@@ -149,6 +149,7 @@ eKuiper 已经内置了丰富的 sink connector 类型,如 mqtt、rest 和 fil
 | restartStrategy    | 结构         | 指定规则运行失败后自动重新启动规则的策略。这可以帮助从可恢复的故障中回复,而无需手动操作。请查看[规则重启策略](#规则重启策略)了解详细的配置项目。                    |
 | restartStrategy    | 结构         | 指定规则运行失败后自动重新启动规则的策略。这可以帮助从可恢复的故障中回复,而无需手动操作。请查看[规则重启策略](#规则重启策略)了解详细的配置项目。                    |
 | cron               | string: ""   | 指定规则的周期性触发策略,该周期通过 [cron 表达式](https://zh.wikipedia.org/wiki/Cron) 进行描述。 |
 | cron               | string: ""   | 指定规则的周期性触发策略,该周期通过 [cron 表达式](https://zh.wikipedia.org/wiki/Cron) 进行描述。 |
 | duration           | string: ""   | 指定规则的运行持续时间,只有当指定了 cron 后才有效。duration 不应该超过两次 cron 周期之间的时间间隔,否则会引起非预期的行为。   |
 | duration           | string: ""   | 指定规则的运行持续时间,只有当指定了 cron 后才有效。duration 不应该超过两次 cron 周期之间的时间间隔,否则会引起非预期的行为。   |
+| cronDatetimeRange | 结构体数组 | 指定周期性规则的生效时间段,只有当指定了 cron 后才有效。当指定了该参数后,周期性规则只有在这个参数所制定的时间范围内才生效。请查看 [周期性规则](#周期性规则) 了解详细的配置项目|
 
 
 有关 `qos` 和 `checkpointInterval` 的详细信息,请查看[状态和容错](./state_and_fault_tolerance.md)。
 有关 `qos` 和 `checkpointInterval` 的详细信息,请查看[状态和容错](./state_and_fault_tolerance.md)。
 
 
@@ -176,6 +177,30 @@ eKuiper 已经内置了丰富的 sink connector 类型,如 mqtt、rest 和 fil
 
 
 通过 [停止规则](../../api/restapi/rules.md#停止规则) 停止一个周期性规则时,便会将该规则从周期性调度器中移除,从而不再被调度运行。如果该周期性规则正在运行,那么该运行也会被暂停。
 通过 [停止规则](../../api/restapi/rules.md#停止规则) 停止一个周期性规则时,便会将该规则从周期性调度器中移除,从而不再被调度运行。如果该周期性规则正在运行,那么该运行也会被暂停。
 
 
+`cronDatetimeRange` 的配置项如下:
+
+| 选项名          | 类型和默认值     | 说明                                                        |
+|--------------|------------|-----------------------------------------------------------|
+| begin     | string    | 周期性规则生效时间段的起始时间,格式为 `YYYY-MM-DD hh:mm:ss"                            |
+| end        | string  | 周期性规则生效时间段的结束时间,格式为 `YYYY-MM-DD hh:mm:ss"       |
+
+cronDatetimeRange 支持结构体数组,你可以声明一组时间段来表达周期性规则生效的多个时间段:
+
+```json
+{
+    "cronDatetimeRange": [
+        {
+            "begin": "2023-06-26 10:00:00",
+            "end": "2023-06-26 20:00:00"
+        },
+        {
+            "begin": "2023-06-27 10:00:00",
+            "end": "2023-06-27 20:00:00"
+        }
+    ]
+}
+```
+
 ## 查看规则状态
 ## 查看规则状态
 
 
 当一条规则被部署到 eKuiper 中后,我们可以通过规则指标来了解到当前的规则运行状态。
 当一条规则被部署到 eKuiper 中后,我们可以通过规则指标来了解到当前的规则运行状态。

+ 54 - 15
internal/topo/rule/ruleState.go

@@ -263,7 +263,9 @@ func (rs *RuleState) startScheduleRule() error {
 	var cronCtx context.Context
 	var cronCtx context.Context
 	cronCtx, rs.cronState.cancel = context.WithCancel(context.Background())
 	cronCtx, rs.cronState.cancel = context.WithCancel(context.Background())
 	entryID, err := backgroundCron.AddFunc(rs.Rule.Options.Cron, func() {
 	entryID, err := backgroundCron.AddFunc(rs.Rule.Options.Cron, func() {
-		if err := func() error {
+		var started bool
+		var err error
+		if started, err = func() (bool, error) {
 			switch backgroundCron.(type) {
 			switch backgroundCron.(type) {
 			case *MockCron:
 			case *MockCron:
 				// skip mutex if this is a unit test
 				// skip mutex if this is a unit test
@@ -271,9 +273,25 @@ func (rs *RuleState) startScheduleRule() error {
 				rs.Lock()
 				rs.Lock()
 				defer rs.Unlock()
 				defer rs.Unlock()
 			}
 			}
+			now := conf.GetNow()
+			var err error
+			allowed := true
+			for _, timeRange := range rs.Rule.Options.CronDatetimeRange {
+				allowed, err = isInScheduleRange(now, timeRange.Begin, timeRange.End)
+				if err != nil {
+					return false, err
+				}
+				if allowed {
+					break
+				}
+			}
+			if !allowed {
+				return false, nil
+			}
+
 			rs.cronState.cron = rs.Rule.Options.Cron
 			rs.cronState.cron = rs.Rule.Options.Cron
 			rs.cronState.duration = rs.Rule.Options.Duration
 			rs.cronState.duration = rs.Rule.Options.Duration
-			return rs.start()
+			return true, rs.start()
 		}(); err != nil {
 		}(); err != nil {
 			rs.Lock()
 			rs.Lock()
 			rs.cronState.startFailedCnt++
 			rs.cronState.startFailedCnt++
@@ -281,20 +299,22 @@ func (rs *RuleState) startScheduleRule() error {
 			conf.Log.Errorf(err.Error())
 			conf.Log.Errorf(err.Error())
 			return
 			return
 		}
 		}
-		after := time.After(d)
-		go func(ctx context.Context) {
-			select {
-			case <-after:
-				rs.Lock()
-				defer rs.Unlock()
-				if err := rs.stop(); err != nil {
-					conf.Log.Errorf("close rule %s failed, err: %v", rs.RuleId, err)
+		if started {
+			after := time.After(d)
+			go func(ctx context.Context) {
+				select {
+				case <-after:
+					rs.Lock()
+					defer rs.Unlock()
+					if err := rs.stop(); err != nil {
+						conf.Log.Errorf("close rule %s failed, err: %v", rs.RuleId, err)
+					}
+					return
+				case <-cronCtx.Done():
+					return
 				}
 				}
-				return
-			case <-cronCtx.Done():
-				return
-			}
-		}(cronCtx)
+			}(cronCtx)
+		}
 	})
 	})
 	if err != nil {
 	if err != nil {
 		return err
 		return err
@@ -416,3 +436,22 @@ func (rs *RuleState) GetTopoGraph() *api.PrintableTopo {
 		return nil
 		return nil
 	}
 	}
 }
 }
+
+const layout = "2006-01-02 15:04:05"
+
+func isInScheduleRange(now time.Time, start string, end string) (bool, error) {
+	s, err := time.Parse(layout, start)
+	if err != nil {
+		return false, err
+	}
+	e, err := time.Parse(layout, end)
+	if err != nil {
+		return false, err
+	}
+	isBefore := s.Before(now)
+	isAfter := e.After(now)
+	if isBefore && isAfter {
+		return true, nil
+	}
+	return false, nil
+}

+ 92 - 0
internal/topo/rule/ruleState_test.go

@@ -22,6 +22,7 @@ import (
 	"testing"
 	"testing"
 	"time"
 	"time"
 
 
+	"github.com/benbjohnson/clock"
 	"github.com/stretchr/testify/require"
 	"github.com/stretchr/testify/require"
 
 
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
@@ -524,3 +525,94 @@ func TestScheduleRule(t *testing.T) {
 		require.Equal(t, "Stopped: waiting for next schedule.", status)
 		require.Equal(t, "Stopped: waiting for next schedule.", status)
 	}()
 	}()
 }
 }
+
+func TestScheduleRuleInRange(t *testing.T) {
+	now := time.Now()
+	m := conf.Clock.(*clock.Mock)
+	m.Set(now)
+	before := now.AddDate(-10, -10, -10)
+	after := now.Add(10 * time.Second)
+	conf.IsTesting = true
+	sp := processor.NewStreamProcessor()
+	sp.ExecStmt(`CREATE STREAM demo () WITH (TYPE="neuron", FORMAT="JSON")`)
+	defer sp.ExecStmt(`DROP STREAM demo`)
+	// Test rule not triggered
+	r := &api.Rule{
+		Triggered: false,
+		Id:        "test",
+		Sql:       "SELECT ts FROM demo",
+		Actions: []map[string]interface{}{
+			{
+				"log": map[string]interface{}{},
+			},
+		},
+		Options: defaultOption,
+	}
+	r.Options.Cron = "mockCron"
+	r.Options.Duration = "1s"
+	r.Options.CronDatetimeRange = []api.DatetimeRange{
+		{
+			Begin: before.Format(layout),
+			End:   after.Format(layout),
+		},
+	}
+	const ruleStarted = "Running"
+	const ruleStopped = "Stopped: waiting for next schedule."
+	func() {
+		rs, err := NewRuleState(r)
+		if err != nil {
+			t.Error(err)
+			return
+		}
+		if err := rs.startScheduleRule(); err != nil {
+			t.Error(err)
+			return
+		}
+		time.Sleep(500 * time.Millisecond)
+		state, err := rs.GetState()
+		if err != nil {
+			t.Errorf("get rule state error: %v", err)
+			return
+		}
+		if state != ruleStarted {
+			t.Errorf("rule state mismatch: exp=%v, got=%v", ruleStarted, state)
+			return
+		}
+		if !rs.cronState.isInSchedule {
+			t.Error("cron state should be in schedule")
+			return
+		}
+	}()
+
+	r.Options.CronDatetimeRange = []api.DatetimeRange{
+		{
+			Begin: after.Format(layout),
+			End:   after.Format(layout),
+		},
+	}
+	func() {
+		rs, err := NewRuleState(r)
+		if err != nil {
+			t.Error(err)
+			return
+		}
+		if err := rs.startScheduleRule(); err != nil {
+			t.Error(err)
+			return
+		}
+		time.Sleep(500 * time.Millisecond)
+		state, err := rs.GetState()
+		if err != nil {
+			t.Errorf("get rule state error: %v", err)
+			return
+		}
+		if state != ruleStopped {
+			t.Errorf("rule state mismatch: exp=%v, got=%v", ruleStopped, state)
+			return
+		}
+		if !rs.cronState.isInSchedule {
+			t.Error("cron state should be in schedule")
+			return
+		}
+	}()
+}

+ 6 - 0
pkg/api/stream.go

@@ -146,6 +146,12 @@ type RuleOption struct {
 	Restart            *RestartStrategy `json:"restartStrategy" yaml:"restartStrategy"`
 	Restart            *RestartStrategy `json:"restartStrategy" yaml:"restartStrategy"`
 	Cron               string           `json:"cron" yaml:"cron"`
 	Cron               string           `json:"cron" yaml:"cron"`
 	Duration           string           `json:"duration" yaml:"duration"`
 	Duration           string           `json:"duration" yaml:"duration"`
+	CronDatetimeRange  []DatetimeRange  `json:"cronDatetimeRange" yaml:"cronDatetimeRange"`
+}
+
+type DatetimeRange struct {
+	Begin string `json:"begin" yaml:"begin"`
+	End   string `json:"end" yaml:"end"`
 }
 }
 
 
 type RestartStrategy struct {
 type RestartStrategy struct {