Browse Source

feat(rule): support schedule rule (#1901)

* support schedule rule

Signed-off-by: yisaer <disxiaofei@163.com>

* fix test

Signed-off-by: yisaer <disxiaofei@163.com>

* add doc

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: yisaer <disxiaofei@163.com>

---------

Signed-off-by: yisaer <disxiaofei@163.com>
Song Gao 1 year ago
parent
commit
48047fb973

+ 11 - 1
docs/en_US/guide/rules/overview.md

@@ -145,6 +145,8 @@ The current options includes:
 | qos                | int:0                | Specify the qos of the stream. The options are 0: At most once; 1: At least once and 2: Exactly once. If qos is bigger than 0, the checkpoint mechanism will be activated to save states periodically so that the rule can be resumed from errors.                                                                                                |
 | checkpointInterval | int:300000           | Specify the time interval in milliseconds to trigger a checkpoint. This is only effective when qos is bigger than 0.                                                                                                                                                                                                                              |
 | restartStrategy    | struct               | Specify the strategy to automatic restarting rule after failures. This can help to get over recoverable failures without manual operations. Please check [Rule Restart Strategy](#rule-restart-strategy) for detail configuration items.                                                                                                          |
+| cron | string: "" | Specify the periodic trigger strategy of the rule, which is described by [cron expression](https://en.wikipedia.org/wiki/Cron) |
+| duration | string: "" | Specifies the running duration of the rule, only valid when cron is specified |
 
 For detail about `qos` and `checkpointInterval`, please check [state and fault tolerance](./state_and_fault_tolerance.md).
 
@@ -162,4 +164,12 @@ The restart strategy options include:
 | multiplier   | float: 2             | The exponential to increase the interval.                                                                                             |
 | jitterFactor | float: 0.1           | How large random value will be added or subtracted to the delay to prevent restarting multiple rules at the same time.                |
 
-The default values can be changed by editing the `etc/kuiper.yaml` file. 
+The default values can be changed by editing the `etc/kuiper.yaml` file. 
+
+### Scheduled Rule
+
+Rules support periodic start, run and pause. In options, `cron` expresses the starting policy of the periodic rule, such as starting every 1 hour, and `duration` expresses the running time when the rule is started each time, such as running for 30 minutes.
+
+When `cron` is every 1 hour and `duration` is 30 minutes, then the rule will be started every 1 hour, and will be suspended after 30 minutes each time, waiting for the next startup.
+
+When a periodic rule is stopped by [stop rule](../../api/restapi/rules.md#stop-a-rule), the rule will be removed from the periodic scheduler and will no longer be scheduled to run. If the rule is running, it will also be paused.

+ 10 - 0
docs/zh_CN/guide/rules/overview.md

@@ -147,6 +147,8 @@ eKuiper 已经内置了丰富的 sink connector 类型,如 mqtt、rest 和 fil
 | qos                | int:0      | 指定流的 qos。 值为0对应最多一次; 1对应至少一次,2对应恰好一次。 如果 qos 大于0,将激活检查点机制以定期保存状态,以便可以从错误中恢复规则。                 |
 | checkpointInterval | int:300000 | 指定触发检查点的时间间隔(单位为 ms)。 仅当 qos 大于0时才有效。                                                          |
 | restartStrategy    | 结构         | 指定规则运行失败后自动重新启动规则的策略。这可以帮助从可恢复的故障中回复,而无需手动操作。请查看[规则重启策略](#规则重启策略)了解详细的配置项目。                    |
+| cron               | string: ""   | 指定规则的周期性触发策略,该周期通过[ cron 表达式](https://zh.wikipedia.org/wiki/Cron) 进行描述。 |
+| duration           | string: ""   | 指定规则的运行持续时间,只有当指定了 cron 后才有效。   |
 
 有关 `qos` 和 `checkpointInterval` 的详细信息,请查看[状态和容错](./state_and_fault_tolerance.md)。
 
@@ -165,3 +167,11 @@ eKuiper 已经内置了丰富的 sink connector 类型,如 mqtt、rest 和 fil
 | jitterFactor | float: 0.1 | 添加或减去延迟的随机值系数,防止在同一时间重新启动多个规则。                            |
 
 这些选项的默认值定义于 `etc/kuiper.yaml` 配置文件,可通过修改该文件更改默认值。
+
+### 周期性规则
+
+规则支持周期性的启动、运行和暂停。在 options 中,`cron` 表达了周期性规则的启动策略,如每 1 小时启动一次,而 `duration` 则表达了每次启动规则时的运行时间,如运行 30 分钟。
+
+当 `cron` 是每 1 小时一次,而 `duration` 是 30 分钟时,那么该规则会每隔 1 小时启动一次,每次运行 30 分钟后便被暂停,等待下一次的启动运行。
+
+通过 [停止规则](../../api/restapi/rules.md#停止规则) 停止一个周期性规则时,便会将该规则从周期性调度器中移除,从而不再被调度运行。如果该周期性规则正在运行,那么该运行也会被暂停。

+ 2 - 1
go.mod

@@ -28,11 +28,13 @@ require (
 	github.com/pebbe/zmq4 v1.2.9
 	github.com/prometheus/client_golang v1.14.0
 	github.com/redis/go-redis/v9 v9.0.3
+	github.com/robfig/cron/v3 v3.0.0
 	github.com/second-state/WasmEdge-go v0.12.0-alpha.2
 	github.com/sirupsen/logrus v1.9.0
 	github.com/ugorji/go/codec v1.2.10
 	github.com/urfave/cli v1.22.12
 	go.nanomsg.org/mangos/v3 v3.4.2
+	golang.org/x/text v0.8.0
 	google.golang.org/genproto v0.0.0-20230227214838-9b19f0bdc514
 	google.golang.org/grpc v1.53.0
 	google.golang.org/protobuf v1.28.2-0.20220831092852-f930b1dc76e8
@@ -94,7 +96,6 @@ require (
 	golang.org/x/net v0.8.0 // indirect
 	golang.org/x/sync v0.1.0 // indirect
 	golang.org/x/sys v0.6.0 // indirect
-	golang.org/x/text v0.8.0 // indirect
 	golang.org/x/tools v0.6.0 // indirect
 	lukechampine.com/uint128 v1.2.0 // indirect
 	modernc.org/cc/v3 v3.40.0 // indirect

+ 2 - 0
go.sum

@@ -199,6 +199,8 @@ github.com/redis/go-redis/v9 v9.0.3/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDO
 github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
 github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
 github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
+github.com/robfig/cron/v3 v3.0.0 h1:kQ6Cb7aHOHTSzNVNEhmp8EcWKLb4CbiMW9h9VyIhO4E=
+github.com/robfig/cron/v3 v3.0.0/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
 github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
 github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
 github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=

+ 33 - 0
internal/topo/rule/mockCron.go

@@ -0,0 +1,33 @@
+// Copyright 2023 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rule
+
+import "github.com/robfig/cron/v3"
+
+type MockCron struct {
+	index int
+}
+
+func (m *MockCron) Start() {}
+
+// AddFunc
+// MockCron execute function immediately at once
+func (m *MockCron) AddFunc(_ string, cmd func()) (cron.EntryID, error) {
+	cmd()
+	m.index++
+	return cron.EntryID(m.index), nil
+}
+
+func (m *MockCron) Remove(id cron.EntryID) {}

+ 110 - 3
internal/topo/rule/ruleState.go

@@ -22,6 +22,8 @@ import (
 	"sync"
 	"time"
 
+	"github.com/robfig/cron/v3"
+
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/topo"
 	"github.com/lf-edge/ekuiper/internal/topo/planner"
@@ -36,6 +38,31 @@ const (
 	ActionSignalStop
 )
 
+type cronInterface interface {
+	Start()
+	AddFunc(spec string, cmd func()) (cron.EntryID, error)
+	Remove(id cron.EntryID)
+}
+
+var backgroundCron cronInterface
+
+func init() {
+	if !conf.IsTesting {
+		backgroundCron = cron.New()
+	} else {
+		backgroundCron = &MockCron{}
+	}
+	backgroundCron.Start()
+}
+
+type cronStateCtx struct {
+	cancel  context.CancelFunc
+	entryID cron.EntryID
+	// isInSchedule indicates the current rule is in scheduled in backgroundCron
+	isInSchedule   bool
+	startFailedCnt int
+}
+
 /*********
  *  RuleState is created for each rule. Each ruleState runs two loops:
  *  1. action event loop to accept commands, such as start, stop, getStatus, delete
@@ -56,6 +83,7 @@ type RuleState struct {
 	// temporary storage for topo graph to make sure even rule close, the graph is still available
 	topoGraph *api.PrintableTopo
 	sync.RWMutex
+	cronState cronStateCtx
 }
 
 // NewRuleState Create and initialize a rule state.
@@ -67,7 +95,7 @@ func NewRuleState(rule *api.Rule) (*RuleState, error) {
 		Rule:     rule,
 		ActionCh: make(chan ActionSignal),
 	}
-	rs.Run()
+	rs.run()
 	if tp, err := planner.Plan(rule); err != nil {
 		return rs, err
 	} else {
@@ -103,7 +131,7 @@ func (rs *RuleState) UpdateTopo(rule *api.Rule) error {
 }
 
 // Run start to run the two loops, do not access any changeable states
-func (rs *RuleState) Run() {
+func (rs *RuleState) run() {
 	var (
 		ctx    context.Context
 		cancel context.CancelFunc
@@ -220,6 +248,62 @@ func (rs *RuleState) Start() error {
 	if rs.triggered == -1 {
 		return fmt.Errorf("rule %s is already deleted", rs.RuleId)
 	}
+	if rs.Rule.IsScheduleRule() {
+		return rs.startScheduleRule()
+	}
+	return rs.start()
+}
+
+// startScheduleRule will register the job in the backgroundCron to run.
+// Job will do following 2 things:
+// 1. start the rule in cron if else the job is already stopped
+// 2. after the rule started, start an extract goroutine to stop the rule after specific duration
+func (rs *RuleState) startScheduleRule() error {
+	if rs.cronState.isInSchedule {
+		return fmt.Errorf("rule %s is already in schedule", rs.RuleId)
+	}
+	d, err := time.ParseDuration(rs.Rule.Options.Duration)
+	if err != nil {
+		return err
+	}
+	var cronCtx context.Context
+	cronCtx, rs.cronState.cancel = context.WithCancel(context.Background())
+	entryID, err := backgroundCron.AddFunc(rs.Rule.Options.Cron, func() {
+		if err := func() error {
+			rs.Lock()
+			defer rs.Unlock()
+			return rs.start()
+		}(); err != nil {
+			rs.Lock()
+			rs.cronState.startFailedCnt++
+			rs.Unlock()
+			conf.Log.Errorf(err.Error())
+			return
+		}
+		after := time.After(d)
+		go func(ctx context.Context) {
+			select {
+			case <-after:
+				rs.Lock()
+				defer rs.Unlock()
+				if err := rs.stop(); err != nil {
+					conf.Log.Errorf("close rule %s failed, err: %v", rs.RuleId, err)
+				}
+				return
+			case <-cronCtx.Done():
+				return
+			}
+		}(cronCtx)
+	})
+	if err != nil {
+		return err
+	}
+	rs.cronState.isInSchedule = true
+	rs.cronState.entryID = entryID
+	return nil
+}
+
+func (rs *RuleState) start() error {
 	if rs.triggered != 1 {
 		// If the rule has been stopped due to error, the topology is not nil
 		if rs.Topology != nil {
@@ -240,6 +324,16 @@ func (rs *RuleState) Start() error {
 func (rs *RuleState) Stop() error {
 	rs.Lock()
 	defer rs.Unlock()
+	if rs.Rule.IsScheduleRule() {
+		rs.cronState.isInSchedule = false
+		rs.cronState.cancel()
+		rs.cronState.startFailedCnt = 0
+		backgroundCron.Remove(rs.cronState.entryID)
+	}
+	return rs.stop()
+}
+
+func (rs *RuleState) stop() error {
 	if rs.triggered == -1 {
 		return fmt.Errorf("rule %s is already deleted", rs.RuleId)
 	}
@@ -261,6 +355,12 @@ func (rs *RuleState) Close() error {
 		rs.Topology.Cancel()
 	}
 	rs.triggered = -1
+	if rs.Rule.IsScheduleRule() {
+		rs.cronState.isInSchedule = false
+		rs.cronState.cancel()
+		rs.cronState.startFailedCnt = 0
+		backgroundCron.Remove(rs.cronState.entryID)
+	}
 	close(rs.ActionCh)
 	return nil
 }
@@ -279,7 +379,11 @@ func (rs *RuleState) GetState() (string, error) {
 			case nil:
 				result = "Running"
 			case context.Canceled:
-				result = "Stopped: canceled manually."
+				if rs.Rule.IsScheduleRule() {
+					result = "Stopped: waiting for next schedule."
+				} else {
+					result = "Stopped: canceled manually."
+				}
 			case context.DeadlineExceeded:
 				result = "Stopped: deadline exceed."
 			default:
@@ -289,6 +393,9 @@ func (rs *RuleState) GetState() (string, error) {
 			result = "Stopped: canceled manually."
 		}
 	}
+	if rs.Rule.IsScheduleRule() && rs.cronState.startFailedCnt > 0 {
+		result = result + fmt.Sprintf(" Start failed count: %v.", rs.cronState.startFailedCnt)
+	}
 	return result, nil
 }
 

+ 96 - 0
internal/topo/rule/ruleState_test.go

@@ -22,6 +22,7 @@ import (
 	"testing"
 	"time"
 
+	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/processor"
 	"github.com/lf-edge/ekuiper/internal/testx"
 	"github.com/lf-edge/ekuiper/pkg/api"
@@ -322,3 +323,98 @@ func TestRuleState_Start(t *testing.T) {
 		}
 	})
 }
+
+func TestScheduleRule(t *testing.T) {
+	conf.IsTesting = true
+	sp := processor.NewStreamProcessor()
+	sp.ExecStmt(`CREATE STREAM demo () WITH (TYPE="neuron", FORMAT="JSON")`)
+	defer sp.ExecStmt(`DROP STREAM demo`)
+	// Test rule not triggered
+	r := &api.Rule{
+		Triggered: false,
+		Id:        "test",
+		Sql:       "SELECT ts FROM demo",
+		Actions: []map[string]interface{}{
+			{
+				"log": map[string]interface{}{},
+			},
+		},
+		Options: defaultOption,
+	}
+	r.Options.Cron = "mockCron"
+	r.Options.Duration = "1s"
+	const ruleStarted = "Running"
+	const ruleStopped = "Stopped: waiting for next schedule."
+	func() {
+		rs, err := NewRuleState(r)
+		if err != nil {
+			t.Error(err)
+			return
+		}
+		if err := rs.startScheduleRule(); err != nil {
+			t.Error(err)
+			return
+		}
+		time.Sleep(500 * time.Millisecond)
+		state, err := rs.GetState()
+		if err != nil {
+			t.Errorf("get rule state error: %v", err)
+			return
+		}
+		if state != ruleStarted {
+			t.Errorf("rule state mismatch: exp=%v, got=%v", ruleStarted, state)
+			return
+		}
+		if !rs.cronState.isInSchedule {
+			t.Error("cron state should be in schedule")
+			return
+		}
+	}()
+
+	func() {
+		rs, err := NewRuleState(r)
+		if err != nil {
+			t.Error(err)
+			return
+		}
+		if err := rs.startScheduleRule(); err != nil {
+			t.Error(err)
+			return
+		}
+		time.Sleep(1500 * time.Millisecond)
+		state, err := rs.GetState()
+		if err != nil {
+			t.Errorf("get rule state error: %v", err)
+			return
+		}
+		if state != ruleStopped {
+			t.Errorf("rule state mismatch: exp=%v, got=%v", ruleStopped, state)
+			return
+		}
+		if !rs.cronState.isInSchedule {
+			t.Error("cron state should be in schedule")
+			return
+		}
+	}()
+
+	func() {
+		rs, err := NewRuleState(r)
+		if err != nil {
+			t.Error(err)
+			return
+		}
+		if err := rs.startScheduleRule(); err != nil {
+			t.Error(err)
+			return
+		}
+		if err := rs.startScheduleRule(); err == nil {
+			t.Error("rule can't be register in cron twice")
+			return
+		} else {
+			if err.Error() != "rule test is already in schedule" {
+				t.Error("error message wrong")
+				return
+			}
+		}
+	}()
+}

+ 9 - 0
pkg/api/stream.go

@@ -144,6 +144,8 @@ type RuleOption struct {
 	Qos                Qos              `json:"qos" yaml:"qos"`
 	CheckpointInterval int              `json:"checkpointInterval" yaml:"checkpointInterval"`
 	Restart            *RestartStrategy `json:"restartStrategy" yaml:"restartStrategy"`
+	Cron               string           `json:"cron" yaml:"cron"`
+	Duration           string           `json:"duration" yaml:"duration"`
 }
 
 type RestartStrategy struct {
@@ -191,6 +193,13 @@ type Rule struct {
 	Options   *RuleOption              `json:"options,omitempty"`
 }
 
+func (r *Rule) IsScheduleRule() bool {
+	if r.Options == nil {
+		return false
+	}
+	return len(r.Options.Cron) > 0 && len(r.Options.Duration) > 0
+}
+
 type StreamContext interface {
 	context.Context
 	GetLogger() Logger