Forráskód Böngészése

chore: add log for rule checker (#2186)

Signed-off-by: Song Gao <disxiaofei@163.com>
Signed-off-by: yisaer <disxiaofei@163.com>
Song Gao 1 éve
szülő
commit
ee9e1efc26

+ 4 - 0
internal/conf/conf.go

@@ -28,6 +28,7 @@ import (
 
 
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
+	"github.com/lf-edge/ekuiper/pkg/schedule"
 )
 )
 
 
 const ConfFileName = "kuiper.yaml"
 const ConfFileName = "kuiper.yaml"
@@ -347,6 +348,9 @@ func ValidateRuleOption(option *api.RuleOption) error {
 			errs = errors.Join(errs, errors.New("invalidRestartJitterFactor:restart jitterFactor must between [0, 1)"))
 			errs = errors.Join(errs, errors.New("invalidRestartJitterFactor:restart jitterFactor must between [0, 1)"))
 		}
 		}
 	}
 	}
+	if err := schedule.ValidateRanges(option.CronDatetimeRange); err != nil {
+		errs = errors.Join(errs, fmt.Errorf("validate cronDatetimeRange failed, err:%v", err))
+	}
 	return errs
 	return errs
 }
 }
 
 

+ 1 - 0
internal/server/server.go

@@ -302,6 +302,7 @@ func runScheduleRuleChecker(exit <-chan struct{}) {
 
 
 func handleScheduleRuleState(now time.Time, r *api.Rule, state string) error {
 func handleScheduleRuleState(now time.Time, r *api.Rule, state string) error {
 	toStart, toStop := handleScheduleRule(now, r, state)
 	toStart, toStop := handleScheduleRule(now, r, state)
+	conf.Log.Debugf("rule %v origin state: %v, going to start:%v, to stop:%v", r.Id, state, toStart, toStop)
 	if toStart {
 	if toStart {
 		return startRule(r.Id)
 		return startRule(r.Id)
 	} else if toStop {
 	} else if toStop {

+ 6 - 1
internal/topo/rule/ruleState.go

@@ -147,7 +147,6 @@ func (rs *RuleState) run() {
 				if ctx != nil {
 				if ctx != nil {
 					conf.Log.Warnf("rule %s is already started", rs.RuleId)
 					conf.Log.Warnf("rule %s is already started", rs.RuleId)
 				} else {
 				} else {
-
 					ctx, cancel = context.WithCancel(context.Background())
 					ctx, cancel = context.WithCancel(context.Background())
 					go rs.runTopo(ctx)
 					go rs.runTopo(ctx)
 				}
 				}
@@ -369,6 +368,9 @@ func (rs *RuleState) start() error {
 		}
 		}
 		rs.triggered = 1
 		rs.triggered = 1
 	}
 	}
+	if rs.Rule.IsScheduleRule() || rs.Rule.IsLongRunningScheduleRule() {
+		conf.Log.Debugf("rule %v started", rs.RuleId)
+	}
 	rs.ActionCh <- ActionSignalStart
 	rs.ActionCh <- ActionSignalStart
 	return nil
 	return nil
 }
 }
@@ -377,6 +379,9 @@ func (rs *RuleState) start() error {
 func (rs *RuleState) Stop() error {
 func (rs *RuleState) Stop() error {
 	rs.Lock()
 	rs.Lock()
 	defer rs.Unlock()
 	defer rs.Unlock()
+	if rs.Rule.IsScheduleRule() || rs.Rule.IsLongRunningScheduleRule() {
+		conf.Log.Debugf("rule %v stopped", rs.RuleId)
+	}
 	rs.stopScheduleRule()
 	rs.stopScheduleRule()
 	return rs.stop()
 	return rs.stop()
 }
 }

+ 28 - 0
pkg/schedule/schedule.go

@@ -15,6 +15,7 @@
 package schedule
 package schedule
 
 
 import (
 import (
+	"fmt"
 	"time"
 	"time"
 
 
 	"github.com/robfig/cron/v3"
 	"github.com/robfig/cron/v3"
@@ -94,3 +95,30 @@ func IsInRunningSchedule(cronExpr string, now time.Time, d time.Duration) (bool,
 	}
 	}
 	return false, 0, nil
 	return false, 0, nil
 }
 }
+
+func ValidateRanges(ranges []api.DatetimeRange) error {
+	if len(ranges) < 1 {
+		return nil
+	}
+	for _, r := range ranges {
+		if err := validateRange(r); err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func validateRange(r api.DatetimeRange) error {
+	s, err := cast.InterfaceToTime(r.Begin, layout)
+	if err != nil {
+		return err
+	}
+	e, err := cast.InterfaceToTime(r.End, layout)
+	if err != nil {
+		return err
+	}
+	if s.After(e) {
+		return fmt.Errorf("begin time shouldn't after end time")
+	}
+	return nil
+}

+ 46 - 0
pkg/schedule/schedule_test.go

@@ -15,10 +15,13 @@
 package schedule
 package schedule
 
 
 import (
 import (
+	"errors"
 	"testing"
 	"testing"
 	"time"
 	"time"
 
 
 	"github.com/stretchr/testify/require"
 	"github.com/stretchr/testify/require"
+
+	"github.com/lf-edge/ekuiper/pkg/api"
 )
 )
 
 
 func TestIsInTimeRange(t *testing.T) {
 func TestIsInTimeRange(t *testing.T) {
@@ -69,3 +72,46 @@ func TestIsInRunningSchedule(t *testing.T) {
 	require.NoError(t, err)
 	require.NoError(t, err)
 	require.False(t, isIn)
 	require.False(t, isIn)
 }
 }
+
+func TestValidateSchedule(t *testing.T) {
+	tests := []struct {
+		begin string
+		end   string
+		err   error
+	}{
+		{
+			begin: "123",
+			end:   "123",
+			err:   errors.New("Can't parse string as time: 123"),
+		},
+		{
+			begin: layout,
+			end:   "123",
+			err:   errors.New("Can't parse string as time: 123"),
+		},
+		{
+			begin: "2006-01-02 15:04:02",
+			end:   "2006-01-02 15:04:01",
+			err:   errors.New("begin time shouldn't after end time"),
+		},
+		{
+			begin: "2006-01-02 15:04:00",
+			end:   "2006-01-02 15:04:01",
+			err:   nil,
+		},
+	}
+	for _, tc := range tests {
+		rs := []api.DatetimeRange{
+			{
+				Begin: tc.begin,
+				End:   tc.end,
+			},
+		}
+		err := ValidateRanges(rs)
+		if tc.err != nil {
+			require.Equal(t, err, tc.err)
+		} else {
+			require.NoError(t, err)
+		}
+	}
+}