Selaa lähdekoodia

feat: support long running schedule rule (#2150)

Signed-off-by: yisaer <disxiaofei@163.com>
Song Gao 1 vuosi sitten
vanhempi
commit
e2b6399139

+ 7 - 0
docs/en_US/configuration/global_configurations.md

@@ -97,6 +97,13 @@ basic:
   authentication: false
   authentication: false
 ```
 ```
 
 
+## Rule Patrol Configuration
+
+```yaml
+basic:
+  rulePatrolInterval: "10s"
+```
+
 ## Prometheus Configuration
 ## Prometheus Configuration
 
 
 eKuiper can export metrics to prometheus if `prometheus` option is true. The prometheus will be served with the port specified by `prometheusPort` option.
 eKuiper can export metrics to prometheus if `prometheus` option is true. The prometheus will be served with the port specified by `prometheusPort` option.

+ 4 - 0
docs/en_US/guide/rules/overview.md

@@ -201,6 +201,10 @@ When a periodic rule is stopped by [stop rule](../../api/restapi/rules.md#stop-a
 }
 }
 ```
 ```
 
 
+#### Phase run rules
+
+When `cronDatetimeRange` is configured but `cron` and `duration` are empty, the rule will run according to the time period specified by `cronDatetimeRange` until the time period is exceeded.
+
 ## View rule status
 ## View rule status
 
 
 When a rule is deployed to eKuiper, we can use the rule indicator to understand the current running status of the rule.
 When a rule is deployed to eKuiper, we can use the rule indicator to understand the current running status of the rule.

+ 7 - 0
docs/zh_CN/configuration/global_configurations.md

@@ -95,6 +95,13 @@ basic:
   authentication: false
   authentication: false
 ```
 ```
 
 
+## 巡检规则配置
+
+```yaml
+basic:
+  rulePatrolInterval: "10s"
+```
+
 ## Prometheus 配置
 ## Prometheus 配置
 
 
 如果 `prometheus` 参数设置为 true,eKuiper 将把运行指标暴露到 prometheus。Prometheus 将运行在 `prometheusPort` 参数指定的端口上。
 如果 `prometheus` 参数设置为 true,eKuiper 将把运行指标暴露到 prometheus。Prometheus 将运行在 `prometheusPort` 参数指定的端口上。

+ 5 - 1
docs/zh_CN/guide/rules/overview.md

@@ -151,7 +151,7 @@ eKuiper 已经内置了丰富的 sink connector 类型,如 mqtt、rest 和 fil
 | restartStrategy    | 结构         | 指定规则运行失败后自动重新启动规则的策略。这可以帮助从可恢复的故障中回复,而无需手动操作。请查看[规则重启策略](#规则重启策略)了解详细的配置项目。                    |
 | restartStrategy    | 结构         | 指定规则运行失败后自动重新启动规则的策略。这可以帮助从可恢复的故障中回复,而无需手动操作。请查看[规则重启策略](#规则重启策略)了解详细的配置项目。                    |
 | cron               | string: "" | 指定规则的周期性触发策略,该周期通过 [cron 表达式](https://zh.wikipedia.org/wiki/Cron) 进行描述。                        |
 | cron               | string: "" | 指定规则的周期性触发策略,该周期通过 [cron 表达式](https://zh.wikipedia.org/wiki/Cron) 进行描述。                        |
 | duration           | string: "" | 指定规则的运行持续时间,只有当指定了 cron 后才有效。duration 不应该超过两次 cron 周期之间的时间间隔,否则会引起非预期的行为。                      |
 | duration           | string: "" | 指定规则的运行持续时间,只有当指定了 cron 后才有效。duration 不应该超过两次 cron 周期之间的时间间隔,否则会引起非预期的行为。                      |
-| cronDatetimeRange  | 结构体数组      | 指定周期性规则的生效时间段,只有当指定了 cron 后才有效。当指定了该参数后,周期性规则只有在这个参数所制定的时间范围内才生效。请查看 [周期性规则](#周期性规则) 了解详细的配置项目 |
+| cronDatetimeRange  | 结构体数组      | 指定周期性规则的生效时间段。当指定了该参数后,周期性规则只有在这个参数所制定的时间范围内才生效。请查看 [周期性规则](#周期性规则) 了解详细的配置项目 |
 
 
 有关 `qos` 和 `checkpointInterval` 的详细信息,请查看[状态和容错](./state_and_fault_tolerance.md)。
 有关 `qos` 和 `checkpointInterval` 的详细信息,请查看[状态和容错](./state_and_fault_tolerance.md)。
 
 
@@ -203,6 +203,10 @@ cronDatetimeRange 支持结构体数组,你可以声明一组时间段来表
 }
 }
 ```
 ```
 
 
+#### 阶段运行规则
+
+当 `cronDatetimeRange` 配置了但是 `cron` 与 `duration` 为空时,则该规则会按照 `cronDatetimeRange` 所指定的时间阶段内一直运行,直到超出该时间阶段。
+
 ## 查看规则状态
 ## 查看规则状态
 
 
 当一条规则被部署到 eKuiper 中后,我们可以通过规则指标来了解到当前的规则运行状态。
 当一条规则被部署到 eKuiper 中后,我们可以通过规则指标来了解到当前的规则运行状态。

+ 20 - 17
internal/conf/conf.go

@@ -130,23 +130,24 @@ type SQLConf struct {
 
 
 type KuiperConf struct {
 type KuiperConf struct {
 	Basic struct {
 	Basic struct {
-		Debug          bool     `yaml:"debug"`
-		ConsoleLog     bool     `yaml:"consoleLog"`
-		FileLog        bool     `yaml:"fileLog"`
-		RotateTime     int      `yaml:"rotateTime"`
-		MaxAge         int      `yaml:"maxAge"`
-		TimeZone       string   `yaml:"timezone"`
-		Ip             string   `yaml:"ip"`
-		Port           int      `yaml:"port"`
-		RestIp         string   `yaml:"restIp"`
-		RestPort       int      `yaml:"restPort"`
-		RestTls        *tlsConf `yaml:"restTls"`
-		Prometheus     bool     `yaml:"prometheus"`
-		PrometheusPort int      `yaml:"prometheusPort"`
-		PluginHosts    string   `yaml:"pluginHosts"`
-		Authentication bool     `yaml:"authentication"`
-		IgnoreCase     bool     `yaml:"ignoreCase"`
-		SQLConf        *SQLConf `yaml:"sql"`
+		Debug              bool     `yaml:"debug"`
+		ConsoleLog         bool     `yaml:"consoleLog"`
+		FileLog            bool     `yaml:"fileLog"`
+		RotateTime         int      `yaml:"rotateTime"`
+		MaxAge             int      `yaml:"maxAge"`
+		TimeZone           string   `yaml:"timezone"`
+		Ip                 string   `yaml:"ip"`
+		Port               int      `yaml:"port"`
+		RestIp             string   `yaml:"restIp"`
+		RestPort           int      `yaml:"restPort"`
+		RestTls            *tlsConf `yaml:"restTls"`
+		Prometheus         bool     `yaml:"prometheus"`
+		PrometheusPort     int      `yaml:"prometheusPort"`
+		PluginHosts        string   `yaml:"pluginHosts"`
+		Authentication     bool     `yaml:"authentication"`
+		IgnoreCase         bool     `yaml:"ignoreCase"`
+		SQLConf            *SQLConf `yaml:"sql"`
+		RulePatrolInterval string   `yaml:"rulePatrolInterval"`
 	}
 	}
 	Rule   api.RuleOption
 	Rule   api.RuleOption
 	Sink   *SinkConf
 	Sink   *SinkConf
@@ -248,6 +249,8 @@ func InitConf() {
 		Config.Basic.RestIp = "0.0.0.0"
 		Config.Basic.RestIp = "0.0.0.0"
 	}
 	}
 
 
+	Config.Basic.RulePatrolInterval = "10s"
+
 	if Config.Basic.Debug {
 	if Config.Basic.Debug {
 		SetDebugLevel(true)
 		SetDebugLevel(true)
 	}
 	}

+ 26 - 0
internal/server/rule_manager.go

@@ -281,6 +281,32 @@ func getAllRulesWithStatus() ([]map[string]interface{}, error) {
 	return result, nil
 	return result, nil
 }
 }
 
 
+type ruleWrapper struct {
+	rule  *api.Rule
+	state string
+}
+
+func getAllRulesWithState() ([]ruleWrapper, error) {
+	ruleIds, err := ruleProcessor.GetAllRules()
+	if err != nil {
+		return nil, err
+	}
+	sort.Strings(ruleIds)
+	rules := make([]ruleWrapper, 0, len(ruleIds))
+	for _, id := range ruleIds {
+		r, err := ruleProcessor.GetRuleById(id)
+		if err != nil {
+			return nil, err
+		}
+		s, err := getRuleState(id)
+		if err != nil {
+			return nil, err
+		}
+		rules = append(rules, ruleWrapper{rule: r, state: s})
+	}
+	return rules, nil
+}
+
 func getRuleState(name string) (string, error) {
 func getRuleState(name string) (string, error) {
 	if rs, ok := registry.Load(name); ok {
 	if rs, ok := registry.Load(name); ok {
 		return rs.GetState()
 		return rs.GetState()

+ 75 - 0
internal/server/server.go

@@ -38,8 +38,10 @@ import (
 	"github.com/lf-edge/ekuiper/internal/processor"
 	"github.com/lf-edge/ekuiper/internal/processor"
 	"github.com/lf-edge/ekuiper/internal/topo/connection/factory"
 	"github.com/lf-edge/ekuiper/internal/topo/connection/factory"
 	"github.com/lf-edge/ekuiper/internal/topo/rule"
 	"github.com/lf-edge/ekuiper/internal/topo/rule"
+	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
+	"github.com/lf-edge/ekuiper/pkg/schedule"
 )
 )
 
 
 var (
 var (
@@ -145,6 +147,8 @@ func StartUp(Version string) {
 			}
 			}
 		}
 		}
 	}
 	}
+	exit := make(chan struct{})
+	go runScheduleRuleChecker(exit)
 
 
 	// Start rest service
 	// Start rest service
 	srvRest := createRestServer(conf.Config.Basic.RestIp, conf.Config.Basic.RestPort, conf.Config.Basic.Authentication)
 	srvRest := createRestServer(conf.Config.Basic.RestIp, conf.Config.Basic.RestPort, conf.Config.Basic.Authentication)
@@ -179,6 +183,7 @@ func StartUp(Version string) {
 	sigint := make(chan os.Signal, 1)
 	sigint := make(chan os.Signal, 1)
 	signal.Notify(sigint, os.Interrupt, syscall.SIGTERM)
 	signal.Notify(sigint, os.Interrupt, syscall.SIGTERM)
 	<-sigint
 	<-sigint
+	exit <- struct{}{}
 
 
 	if err = srvRest.Shutdown(context.TODO()); err != nil {
 	if err = srvRest.Shutdown(context.TODO()); err != nil {
 		logger.Errorf("rest server shutdown error: %v", err)
 		logger.Errorf("rest server shutdown error: %v", err)
@@ -258,3 +263,73 @@ func resetAllStreams() error {
 	}
 	}
 	return nil
 	return nil
 }
 }
+
+func runScheduleRuleCheckerByInterval(d time.Duration, exit <-chan struct{}) {
+	conf.Log.Infof("start patroling schedule rule state")
+	ticker := time.NewTicker(d)
+	defer func() {
+		ticker.Stop()
+		conf.Log.Infof("exit partoling schedule rule state")
+	}()
+	for {
+		select {
+		case <-exit:
+			return
+		case <-ticker.C:
+			rs, err := getAllRulesWithState()
+			if err != nil {
+				conf.Log.Errorf("get all rules with stated failed, err:%v", err)
+				continue
+			}
+			now := conf.GetNow()
+			for _, r := range rs {
+				if err := handleScheduleRuleState(now, r.rule, r.state); err != nil {
+					conf.Log.Errorf("handle schedule rule %v state failed, err:%v", r.rule.Id, err)
+				}
+			}
+		}
+	}
+}
+
+func runScheduleRuleChecker(exit <-chan struct{}) {
+	d, err := time.ParseDuration(conf.Config.Basic.RulePatrolInterval)
+	if err != nil {
+		conf.Log.Errorf("parse rulePatrolInterval failed, err:%v", err)
+		return
+	}
+	runScheduleRuleCheckerByInterval(d, exit)
+}
+
+func handleScheduleRuleState(now time.Time, r *api.Rule, state string) error {
+	toStart, toStop := handleScheduleRule(now, r, state)
+	if toStart {
+		return startRule(r.Id)
+	} else if toStop {
+		stopRule(r.Id)
+	}
+	return nil
+}
+
+func handleScheduleRule(now time.Time, r *api.Rule, state string) (bool, bool) {
+	options := r.Options
+	if options != nil && options.Cron == "" && options.Duration == "" && len(options.CronDatetimeRange) > 0 {
+		var isInRange bool
+		var err error
+		for _, cRange := range options.CronDatetimeRange {
+			isInRange, err = schedule.IsInScheduleRange(now, cRange.Begin, cRange.End)
+			if err != nil {
+				conf.Log.Errorf("check rule %v schedule failed, err:%v", r.Id, err)
+				return false, false
+			}
+			if isInRange {
+				break
+			}
+		}
+		if isInRange && state != "Running" {
+			return true, false
+		} else if !isInRange && state == "Running" {
+			return false, true
+		}
+	}
+	return false, false
+}

+ 111 - 0
internal/server/server_test.go

@@ -0,0 +1,111 @@
+// Copyright 2023 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package server
+
+import (
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/require"
+
+	"github.com/lf-edge/ekuiper/pkg/api"
+)
+
+func TestHandleScheduleRule(t *testing.T) {
+	now, err := time.Parse("2006-01-02 15:04:05", "2006-01-02 15:04:05")
+	require.NoError(t, err)
+	testcases := []struct {
+		state   string
+		begin   string
+		end     string
+		toStart bool
+		toStop  bool
+	}{
+		{
+			state:   "Running",
+			begin:   "2006-01-02 15:04:01",
+			end:     "2006-01-02 15:04:06",
+			toStart: false,
+			toStop:  false,
+		},
+		{
+			state:   "Stopped",
+			begin:   "2006-01-02 15:04:01",
+			end:     "2006-01-02 15:04:06",
+			toStart: true,
+			toStop:  false,
+		},
+		{
+			state:   "Stopped",
+			begin:   "2006-01-02 15:04:01",
+			end:     "2006-01-02 15:04:04",
+			toStart: false,
+			toStop:  false,
+		},
+		{
+			state:   "Running",
+			begin:   "2006-01-02 15:04:01",
+			end:     "2006-01-02 15:04:04",
+			toStart: false,
+			toStop:  true,
+		},
+	}
+	for _, tc := range testcases {
+		r := &api.Rule{
+			Options: &api.RuleOption{
+				Cron:     "",
+				Duration: "",
+				CronDatetimeRange: []api.DatetimeRange{
+					{
+						Begin: tc.begin,
+						End:   tc.end,
+					},
+				},
+			},
+		}
+		toStart, toStop := handleScheduleRule(now, r, tc.state)
+		require.Equal(t, tc.toStart, toStart)
+		require.Equal(t, tc.toStop, toStop)
+	}
+}
+
+func TestRunScheduleRuleChecker(t *testing.T) {
+	exit := make(chan struct{})
+	go runScheduleRuleCheckerByInterval(3*time.Second, exit)
+	time.Sleep(1 * time.Second)
+	exit <- struct{}{}
+}
+
+func TestHandleScheduleRuleState(t *testing.T) {
+	r := &api.Rule{}
+	r.Options = &api.RuleOption{}
+	now, err := time.Parse("2006-01-02 15:04:05", "2006-01-02 15:04:05")
+	require.NoError(t, err)
+	require.NoError(t, handleScheduleRuleState(now, r, "Running"))
+	r.Options.CronDatetimeRange = []api.DatetimeRange{
+		{
+			Begin: "2006-01-02 15:04:01",
+			End:   "2006-01-02 15:04:06",
+		},
+	}
+	require.NoError(t, handleScheduleRuleState(now, r, "Running"))
+	r.Options.CronDatetimeRange = []api.DatetimeRange{
+		{
+			Begin: "2006-01-02 15:04:01",
+			End:   "2006-01-02 15:04:02",
+		},
+	}
+	require.NoError(t, handleScheduleRuleState(now, r, "Running"))
+}

+ 15 - 59
internal/topo/rule/ruleState.go

@@ -30,6 +30,7 @@ import (
 	"github.com/lf-edge/ekuiper/internal/topo/planner"
 	"github.com/lf-edge/ekuiper/internal/topo/planner"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/infra"
 	"github.com/lf-edge/ekuiper/pkg/infra"
+	"github.com/lf-edge/ekuiper/pkg/schedule"
 )
 )
 
 
 type ActionSignal int
 type ActionSignal int
@@ -243,6 +244,16 @@ func (rs *RuleState) Start() error {
 	if rs.triggered == -1 {
 	if rs.triggered == -1 {
 		return fmt.Errorf("rule %s is already deleted", rs.RuleId)
 		return fmt.Errorf("rule %s is already deleted", rs.RuleId)
 	}
 	}
+	if rs.Rule.IsLongRunningScheduleRule() {
+		isIn, err := schedule.IsInScheduleRanges(conf.GetNow(), rs.Rule.Options.CronDatetimeRange)
+		if err != nil {
+			return err
+		}
+		// When rule is created, we need to check its schedule range before start it.
+		if !isIn {
+			return nil
+		}
+	}
 	if rs.Rule.IsScheduleRule() {
 	if rs.Rule.IsScheduleRule() {
 		return rs.startScheduleRule()
 		return rs.startScheduleRule()
 	}
 	}
@@ -423,7 +434,7 @@ func (rs *RuleState) GetState() (string, error) {
 				result = "Running"
 				result = "Running"
 			case context.Canceled:
 			case context.Canceled:
 				if rs.Rule.IsScheduleRule() && rs.cronState.isInSchedule {
 				if rs.Rule.IsScheduleRule() && rs.cronState.isInSchedule {
-					if isAfterTimeRanges(conf.GetNow(), rs.Rule.Options.CronDatetimeRange) {
+					if schedule.IsAfterTimeRanges(conf.GetNow(), rs.Rule.Options.CronDatetimeRange) {
 						result = "Stopped: schedule terminated."
 						result = "Stopped: schedule terminated."
 					} else {
 					} else {
 						result = "Stopped: waiting for next schedule."
 						result = "Stopped: waiting for next schedule."
@@ -438,7 +449,7 @@ func (rs *RuleState) GetState() (string, error) {
 			}
 			}
 		} else {
 		} else {
 			if rs.cronState.isInSchedule {
 			if rs.cronState.isInSchedule {
-				if isAfterTimeRanges(conf.GetNow(), rs.Rule.Options.CronDatetimeRange) {
+				if schedule.IsAfterTimeRanges(conf.GetNow(), rs.Rule.Options.CronDatetimeRange) {
 					result = "Stopped: schedule terminated."
 					result = "Stopped: schedule terminated."
 				} else {
 				} else {
 					result = "Stopped: waiting for next schedule."
 					result = "Stopped: waiting for next schedule."
@@ -466,46 +477,6 @@ func (rs *RuleState) GetTopoGraph() *api.PrintableTopo {
 	}
 	}
 }
 }
 
 
-const layout = "2006-01-02 15:04:05"
-
-func isInScheduleRange(now time.Time, start string, end string) (bool, error) {
-	s, err := time.Parse(layout, start)
-	if err != nil {
-		return false, err
-	}
-	e, err := time.Parse(layout, end)
-	if err != nil {
-		return false, err
-	}
-	isBefore := s.Before(now)
-	isAfter := e.After(now)
-	if isBefore && isAfter {
-		return true, nil
-	}
-	return false, nil
-}
-
-func isAfterTimeRanges(now time.Time, ranges []api.DatetimeRange) bool {
-	if len(ranges) < 1 {
-		return false
-	}
-	for _, r := range ranges {
-		isAfter, err := isAfterTimeRange(now, r.End)
-		if err != nil || !isAfter {
-			return false
-		}
-	}
-	return true
-}
-
-func isAfterTimeRange(now time.Time, end string) (bool, error) {
-	e, err := time.Parse(layout, end)
-	if err != nil {
-		return false, err
-	}
-	return now.After(e), nil
-}
-
 func (rs *RuleState) isInRunningSchedule(now time.Time, d time.Duration) (bool, time.Duration, error) {
 func (rs *RuleState) isInRunningSchedule(now time.Time, d time.Duration) (bool, time.Duration, error) {
 	allowed, err := rs.isInAllowedTimeRange(now)
 	allowed, err := rs.isInAllowedTimeRange(now)
 	if err != nil {
 	if err != nil {
@@ -518,14 +489,14 @@ func (rs *RuleState) isInRunningSchedule(now time.Time, d time.Duration) (bool,
 	if strings.HasPrefix(cronExpr, "mock") {
 	if strings.HasPrefix(cronExpr, "mock") {
 		return false, 0, nil
 		return false, 0, nil
 	}
 	}
-	return isInRunningSchedule(cronExpr, now, d)
+	return schedule.IsInRunningSchedule(cronExpr, now, d)
 }
 }
 
 
 func (rs *RuleState) isInAllowedTimeRange(now time.Time) (bool, error) {
 func (rs *RuleState) isInAllowedTimeRange(now time.Time) (bool, error) {
 	allowed := true
 	allowed := true
 	var err error
 	var err error
 	for _, timeRange := range rs.Rule.Options.CronDatetimeRange {
 	for _, timeRange := range rs.Rule.Options.CronDatetimeRange {
-		allowed, err = isInScheduleRange(now, timeRange.Begin, timeRange.End)
+		allowed, err = schedule.IsInScheduleRange(now, timeRange.Begin, timeRange.End)
 		if err != nil {
 		if err != nil {
 			return false, err
 			return false, err
 		}
 		}
@@ -535,18 +506,3 @@ func (rs *RuleState) isInAllowedTimeRange(now time.Time) (bool, error) {
 	}
 	}
 	return allowed, nil
 	return allowed, nil
 }
 }
-
-// isInRunningSchedule checks whether the rule should be running, eg:
-// If the duration is 10min, and cron is "0 0 * * *", and the current time is 00:00:02
-// And the rule should be started immediately instead of checking it on the next day.
-func isInRunningSchedule(cronExpr string, now time.Time, d time.Duration) (bool, time.Duration, error) {
-	s, err := cron.ParseStandard(cronExpr)
-	if err != nil {
-		return false, 0, err
-	}
-	previousSchedule := s.Next(now.Add(-d))
-	if now.After(previousSchedule) && now.Before(previousSchedule.Add(d)) {
-		return true, previousSchedule.Add(d).Sub(now), nil
-	}
-	return false, 0, nil
-}

+ 57 - 9
internal/topo/rule/ruleState_test.go

@@ -671,13 +671,61 @@ func TestScheduleRuleInRange(t *testing.T) {
 	}()
 	}()
 }
 }
 
 
-func TestIsRuleInRunningSchedule(t *testing.T) {
-	now, err := time.Parse(layout, "2006-01-02 15:04:01")
-	require.NoError(t, err)
-	d, err := time.ParseDuration("2s")
-	require.NoError(t, err)
-	isInSchedule, remainedDuration, err := isInRunningSchedule("4 15 * * *", now, d)
-	require.NoError(t, err)
-	require.True(t, isInSchedule)
-	require.Equal(t, remainedDuration, time.Second)
+const layout = "2006-01-02 15:04:05"
+
+func TestStartLongRunningScheduleRule(t *testing.T) {
+	conf.IsTesting = true
+	sp := processor.NewStreamProcessor()
+	sp.ExecStmt(`CREATE STREAM demo () WITH (TYPE="neuron", FORMAT="JSON")`)
+	defer sp.ExecStmt(`DROP STREAM demo`)
+	now := time.Now()
+	m := conf.Clock.(*clock.Mock)
+	m.Set(now)
+	before := now.AddDate(-10, -10, -10)
+	after := now.Add(10 * time.Second)
+	r := &api.Rule{
+		Triggered: false,
+		Id:        "test",
+		Sql:       "SELECT ts FROM demo",
+		Actions: []map[string]interface{}{
+			{
+				"log": map[string]interface{}{},
+			},
+		},
+		Options: defaultOption,
+	}
+	r.Options.Cron = ""
+	r.Options.Duration = ""
+	r.Options.CronDatetimeRange = []api.DatetimeRange{
+		{
+			Begin: before.Format(layout),
+			End:   after.Format(layout),
+		},
+	}
+	const ruleStopped = "Stopped: canceled manually."
+	const ruleStarted = "Running"
+	func() {
+		rs, err := NewRuleState(r)
+		require.NoError(t, err)
+		require.NoError(t, rs.Start())
+		time.Sleep(500 * time.Millisecond)
+		state, err := rs.GetState()
+		require.NoError(t, err)
+		require.Equal(t, state, ruleStarted)
+	}()
+	r.Options.CronDatetimeRange = []api.DatetimeRange{
+		{
+			Begin: before.Format(layout),
+			End:   before.Format(layout),
+		},
+	}
+	func() {
+		rs, err := NewRuleState(r)
+		require.NoError(t, err)
+		require.NoError(t, rs.Start())
+		time.Sleep(500 * time.Millisecond)
+		state, err := rs.GetState()
+		require.NoError(t, err)
+		require.Equal(t, state, ruleStopped)
+	}()
 }
 }

+ 7 - 0
pkg/api/stream.go

@@ -207,6 +207,13 @@ type Rule struct {
 	Options   *RuleOption              `json:"options,omitempty"`
 	Options   *RuleOption              `json:"options,omitempty"`
 }
 }
 
 
+func (r *Rule) IsLongRunningScheduleRule() bool {
+	if r.Options == nil {
+		return false
+	}
+	return len(r.Options.Cron) == 0 && len(r.Options.Duration) == 0 && len(r.Options.CronDatetimeRange) > 0
+}
+
 func (r *Rule) IsScheduleRule() bool {
 func (r *Rule) IsScheduleRule() bool {
 	if r.Options == nil {
 	if r.Options == nil {
 		return false
 		return false

+ 18 - 0
pkg/api/stream_test.go

@@ -19,6 +19,7 @@ import (
 	"time"
 	"time"
 
 
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
 )
 )
 
 
 func TestDefaultSourceTupleResult(t *testing.T) {
 func TestDefaultSourceTupleResult(t *testing.T) {
@@ -46,3 +47,20 @@ func TestDefaultSourceTupleResult(t *testing.T) {
 	assert.Nil(t, st.Meta())
 	assert.Nil(t, st.Meta())
 	assert.NotEqual(t, now, st.Timestamp())
 	assert.NotEqual(t, now, st.Timestamp())
 }
 }
+
+func TestIsLongRunningScheduleRule(t *testing.T) {
+	r := &Rule{}
+	require.False(t, r.IsLongRunningScheduleRule())
+	r.Options = &RuleOption{
+		CronDatetimeRange: []DatetimeRange{
+			{
+				Begin: "1",
+				End:   "2",
+			},
+		},
+	}
+	require.True(t, r.IsLongRunningScheduleRule())
+	r.Options.Cron = "123"
+	r.Options.Duration = "123"
+	require.False(t, r.IsLongRunningScheduleRule())
+}

+ 91 - 0
pkg/schedule/schedule.go

@@ -0,0 +1,91 @@
+// Copyright 2023 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package schedule
+
+import (
+	"time"
+
+	"github.com/robfig/cron/v3"
+
+	"github.com/lf-edge/ekuiper/pkg/api"
+)
+
+const layout = "2006-01-02 15:04:05"
+
+func IsInScheduleRanges(now time.Time, timeRanges []api.DatetimeRange) (bool, error) {
+	for _, tRange := range timeRanges {
+		isIn, err := IsInScheduleRange(now, tRange.Begin, tRange.End)
+		if err != nil {
+			return false, err
+		}
+		if isIn {
+			return true, nil
+		}
+	}
+	return false, nil
+}
+
+func IsInScheduleRange(now time.Time, start string, end string) (bool, error) {
+	s, err := time.Parse(layout, start)
+	if err != nil {
+		return false, err
+	}
+	e, err := time.Parse(layout, end)
+	if err != nil {
+		return false, err
+	}
+	isBefore := s.Before(now)
+	isAfter := e.After(now)
+	if isBefore && isAfter {
+		return true, nil
+	}
+	return false, nil
+}
+
+func IsAfterTimeRanges(now time.Time, ranges []api.DatetimeRange) bool {
+	if len(ranges) < 1 {
+		return false
+	}
+	for _, r := range ranges {
+		isAfter, err := IsAfterTimeRange(now, r.End)
+		if err != nil || !isAfter {
+			return false
+		}
+	}
+	return true
+}
+
+func IsAfterTimeRange(now time.Time, end string) (bool, error) {
+	e, err := time.Parse(layout, end)
+	if err != nil {
+		return false, err
+	}
+	return now.After(e), nil
+}
+
+// IsInRunningSchedule checks whether the rule should be running, eg:
+// If the duration is 10min, and cron is "0 0 * * *", and the current time is 00:00:02
+// And the rule should be started immediately instead of checking it on the next day.
+func IsInRunningSchedule(cronExpr string, now time.Time, d time.Duration) (bool, time.Duration, error) {
+	s, err := cron.ParseStandard(cronExpr)
+	if err != nil {
+		return false, 0, err
+	}
+	previousSchedule := s.Next(now.Add(-d))
+	if now.After(previousSchedule) && now.Before(previousSchedule.Add(d)) {
+		return true, previousSchedule.Add(d).Sub(now), nil
+	}
+	return false, 0, nil
+}

+ 90 - 0
pkg/schedule/schedule_test.go

@@ -0,0 +1,90 @@
+// Copyright 2023 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package schedule
+
+import (
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/require"
+
+	"github.com/lf-edge/ekuiper/pkg/api"
+)
+
+func TestIsInScheduleRanges(t *testing.T) {
+	now, err := time.Parse(layout, "2006-01-02 15:04:01")
+	require.NoError(t, err)
+	rs := []api.DatetimeRange{
+		{
+			Begin: "2006-01-02 15:04:00",
+			End:   "2006-01-02 15:04:03",
+		},
+	}
+	isIn, err := IsInScheduleRanges(now, rs)
+	require.NoError(t, err)
+	require.True(t, isIn)
+}
+
+func TestIsRuleInRunningSchedule(t *testing.T) {
+	now, err := time.Parse(layout, "2006-01-02 15:04:01")
+	require.NoError(t, err)
+	d, err := time.ParseDuration("2s")
+	require.NoError(t, err)
+	isInSchedule, remainedDuration, err := IsInRunningSchedule("4 15 * * *", now, d)
+	require.NoError(t, err)
+	require.True(t, isInSchedule)
+	require.Equal(t, remainedDuration, time.Second)
+}
+
+func TestIsInScheduleRange(t *testing.T) {
+	now, err := time.Parse(layout, "2006-01-02 15:04:01")
+	require.NoError(t, err)
+	_, err = IsInScheduleRange(now, "", "")
+	require.Error(t, err)
+	_, err = IsInScheduleRange(now, "2006-01-02 15:04:01", "")
+	require.Error(t, err)
+	isIn, err := IsInScheduleRange(now, "2006-01-02 15:04:00", "2006-01-02 15:04:03")
+	require.NoError(t, err)
+	require.True(t, isIn)
+	isIn, err = IsInScheduleRange(now, "2006-01-02 15:05:00", "2006-01-02 15:05:03")
+	require.NoError(t, err)
+	require.False(t, isIn)
+}
+
+func TestIsAfterTimeRange(t *testing.T) {
+	now, err := time.Parse(layout, "2006-01-02 15:04:01")
+	require.NoError(t, err)
+	_, err = IsAfterTimeRange(now, "")
+	require.Error(t, err)
+	isAfter, err := IsAfterTimeRange(now, "2006-01-02 15:04:00")
+	require.NoError(t, err)
+	require.True(t, isAfter)
+	isAfter, err = IsAfterTimeRange(now, "2006-01-02 15:04:06")
+	require.NoError(t, err)
+	require.False(t, isAfter)
+}
+
+func TestIsInRunningSchedule(t *testing.T) {
+	now, err := time.Parse(layout, "2006-01-02 15:04:02")
+	require.NoError(t, err)
+	_, _, err = IsInRunningSchedule("", now, time.Second)
+	require.Error(t, err)
+	isIn, _, err := IsInRunningSchedule("4 15 * * *", now, 3*time.Second)
+	require.NoError(t, err)
+	require.True(t, isIn)
+	isIn, _, err = IsInRunningSchedule("4 15 * * *", now, time.Second)
+	require.NoError(t, err)
+	require.False(t, isIn)
+}