Quellcode durchsuchen

feat(rule): options to auto restart rule

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang vor 2 Jahren
Ursprung
Commit
a6dfe100da

+ 12 - 1
etc/kuiper.yaml

@@ -41,7 +41,18 @@ rule:
   checkpointInterval: 300000
   # Whether to send errors to sinks
   sendError: true
-
+  # The strategy to retry for rule errors.
+  restartStrategy:
+    # The maximum retry times
+    attempts: 0
+    # The interval in millisecond to retry
+    delay: 1000
+    # The maximum interval in millisecond to retry
+    maxDelay: 30000
+    # The strategy to increase the interval. The value must be an integer. The default value is 2.
+    multiplier: 2
+    # How large random value will be added or subtracted to the delay to prevent restarting multiple rules at the same time.
+    jitterFactor: 0.1
 sink:
   # Control to enable cache or not. If it's set to true, then the cache will be enabled, otherwise, it will be disabled.
   enableCache: false

+ 61 - 0
internal/conf/conf.go

@@ -163,6 +163,13 @@ func InitConf() {
 			BufferLength:       1024,
 			CheckpointInterval: 300000, //5 minutes
 			SendError:          true,
+			Restart: &api.RestartStrategy{
+				Attempts:     0,
+				Delay:        1000,
+				Multiplier:   2,
+				MaxDelay:     30000,
+				JitterFactor: 0.1,
+			},
 		},
 	}
 
@@ -227,6 +234,60 @@ func InitConf() {
 		Config.Sink = &SinkConf{}
 	}
 	_ = Config.Sink.Validate()
+
+	_ = ValidateRuleOption(&Config.Rule)
+}
+
+func ValidateRuleOption(option *api.RuleOption) error {
+	e := make(errorx.MultiError)
+	if option.CheckpointInterval < 0 {
+		option.CheckpointInterval = 0
+		Log.Warnf("checkpointInterval is negative, set to 0")
+		e["invalidCheckpointInterval"] = fmt.Errorf("checkpointInterval must be greater than 0")
+	}
+	if option.Concurrency < 0 {
+		option.Concurrency = 1
+		Log.Warnf("concurrency is negative, set to 1")
+		e["invalidConcurrency"] = fmt.Errorf("concurrency must be greater than 0")
+	}
+	if option.BufferLength < 0 {
+		option.BufferLength = 1024
+		Log.Warnf("bufferLength is negative, set to 1024")
+		e["invalidBufferLength"] = fmt.Errorf("bufferLength must be greater than 0")
+	}
+	if option.LateTol < 0 {
+		option.LateTol = 1000
+		Log.Warnf("lateTol is negative, set to 1000")
+		e["invalidLateTol"] = fmt.Errorf("lateTol must be greater than 0")
+	}
+	if option.Restart != nil {
+		if option.Restart.Multiplier <= 0 {
+			option.Restart.Multiplier = 2
+			Log.Warnf("restart multiplier is negative, set to 2")
+			e["invalidRestartMultiplier"] = fmt.Errorf("restart multiplier must be greater than 0")
+		}
+		if option.Restart.Attempts < 0 {
+			option.Restart.Attempts = 0
+			Log.Warnf("restart attempts is negative, set to 0")
+			e["invalidRestartAttempts"] = fmt.Errorf("restart attempts must be greater than 0")
+		}
+		if option.Restart.Delay <= 0 {
+			option.Restart.Delay = 1000
+			Log.Warnf("restart delay is negative, set to 1000")
+			e["invalidRestartDelay"] = fmt.Errorf("restart delay must be greater than 0")
+		}
+		if option.Restart.MaxDelay <= 0 {
+			option.Restart.MaxDelay = option.Restart.Delay
+			Log.Warnf("restart maxDelay is negative, set to %d", option.Restart.Delay)
+			e["invalidRestartMaxDelay"] = fmt.Errorf("restart maxDelay must be greater than 0")
+		}
+		if option.Restart.JitterFactor <= 0 || option.Restart.JitterFactor >= 1 {
+			option.Restart.JitterFactor = 0.1
+			Log.Warnf("restart jitterFactor must between 0 and 1, set to 0.1")
+			e["invalidRestartJitterFactor"] = fmt.Errorf("restart jitterFactor must between [0, 1)")
+		}
+	}
+	return e.GetError()
 }
 
 func init() {

+ 145 - 0
internal/conf/conf_test.go

@@ -15,6 +15,7 @@ package conf
 
 import (
 	"fmt"
+	"github.com/lf-edge/ekuiper/pkg/api"
 	"reflect"
 	"testing"
 )
@@ -79,3 +80,147 @@ func TestSourceConfValidate(t *testing.T) {
 		}
 	}
 }
+
+func TestRuleOptionValidate(t *testing.T) {
+	var tests = []struct {
+		s   *api.RuleOption
+		e   *api.RuleOption
+		err string
+	}{
+		{
+			s: &api.RuleOption{},
+			e: &api.RuleOption{},
+		},
+		{
+			s: &api.RuleOption{
+				LateTol:            1000,
+				Concurrency:        1,
+				BufferLength:       1024,
+				CheckpointInterval: 300000, //5 minutes
+				SendError:          true,
+				Restart: &api.RestartStrategy{
+					Attempts:     0,
+					Delay:        1000,
+					Multiplier:   1,
+					MaxDelay:     1000,
+					JitterFactor: 0.1,
+				},
+			},
+			e: &api.RuleOption{
+				LateTol:            1000,
+				Concurrency:        1,
+				BufferLength:       1024,
+				CheckpointInterval: 300000, //5 minutes
+				SendError:          true,
+				Restart: &api.RestartStrategy{
+					Attempts:     0,
+					Delay:        1000,
+					Multiplier:   1,
+					MaxDelay:     1000,
+					JitterFactor: 0.1,
+				},
+			},
+		},
+		{
+			s: &api.RuleOption{
+				LateTol:            1000,
+				Concurrency:        1,
+				BufferLength:       1024,
+				CheckpointInterval: 300000, //5 minutes
+				SendError:          true,
+				Restart: &api.RestartStrategy{
+					Attempts:     3,
+					Delay:        1000,
+					Multiplier:   1,
+					MaxDelay:     1000,
+					JitterFactor: 0.1,
+				},
+			},
+			e: &api.RuleOption{
+				LateTol:            1000,
+				Concurrency:        1,
+				BufferLength:       1024,
+				CheckpointInterval: 300000, //5 minutes
+				SendError:          true,
+				Restart: &api.RestartStrategy{
+					Attempts:     3,
+					Delay:        1000,
+					Multiplier:   1,
+					MaxDelay:     1000,
+					JitterFactor: 0.1,
+				},
+			},
+		},
+		{
+			s: &api.RuleOption{
+				LateTol:            1000,
+				Concurrency:        1,
+				BufferLength:       1024,
+				CheckpointInterval: 300000, //5 minutes
+				SendError:          true,
+				Restart: &api.RestartStrategy{
+					Attempts:     3,
+					Delay:        1000,
+					Multiplier:   1.5,
+					MaxDelay:     10000,
+					JitterFactor: 0.1,
+				},
+			},
+			e: &api.RuleOption{
+				LateTol:            1000,
+				Concurrency:        1,
+				BufferLength:       1024,
+				CheckpointInterval: 300000, //5 minutes
+				SendError:          true,
+				Restart: &api.RestartStrategy{
+					Attempts:     3,
+					Delay:        1000,
+					Multiplier:   1.5,
+					MaxDelay:     10000,
+					JitterFactor: 0.1,
+				},
+			},
+		},
+		{
+			s: &api.RuleOption{
+				LateTol:            1000,
+				Concurrency:        1,
+				BufferLength:       1024,
+				CheckpointInterval: 300000, //5 minutes
+				SendError:          true,
+				Restart: &api.RestartStrategy{
+					Attempts:     -2,
+					Delay:        0,
+					Multiplier:   0,
+					MaxDelay:     0,
+					JitterFactor: 1.1,
+				},
+			},
+			e: &api.RuleOption{
+				LateTol:            1000,
+				Concurrency:        1,
+				BufferLength:       1024,
+				CheckpointInterval: 300000, //5 minutes
+				SendError:          true,
+				Restart: &api.RestartStrategy{
+					Attempts:     0,
+					Delay:        1000,
+					Multiplier:   2,
+					MaxDelay:     1000,
+					JitterFactor: 0.1,
+				},
+			},
+			err: "multiple errors",
+		},
+	}
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	for i, tt := range tests {
+		err := ValidateRuleOption(tt.s)
+		if err != nil && tt.err == "" {
+			t.Errorf("%d: error mismatch:\n  exp=%s\n  got=%s\n\n", i, tt.err, err)
+		}
+		if !reflect.DeepEqual(tt.s, tt.e) {
+			t.Errorf("%d\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.s, tt.e)
+		}
+	}
+}

+ 31 - 13
internal/processor/rule.go

@@ -124,6 +124,13 @@ func (p *RuleProcessor) getDefaultRule(name, sql string) *api.Rule {
 			SendError:          true,
 			Qos:                api.AtMostOnce,
 			CheckpointInterval: 300000,
+			Restart: &api.RestartStrategy{
+				Attempts:     0,
+				Delay:        1000,
+				Multiplier:   2,
+				MaxDelay:     30000,
+				JitterFactor: 0.1,
+			},
 		},
 	}
 }
@@ -132,7 +139,7 @@ func (p *RuleProcessor) getRuleByJson(id, ruleJson string) (*api.Rule, error) {
 	opt := conf.Config.Rule
 	//set default rule options
 	rule := &api.Rule{
-		Options: &opt,
+		Options: clone(opt),
 	}
 	if err := json.Unmarshal([]byte(ruleJson), &rule); err != nil {
 		return nil, fmt.Errorf("Parse rule %s error : %s.", ruleJson, err)
@@ -166,22 +173,33 @@ func (p *RuleProcessor) getRuleByJson(id, ruleJson string) (*api.Rule, error) {
 	if rule.Options == nil {
 		rule.Options = &opt
 	}
-	//Setnx default options
-	if rule.Options.CheckpointInterval < 0 {
-		return nil, fmt.Errorf("rule option checkpointInterval %d is invalid, require a positive integer", rule.Options.CheckpointInterval)
-	}
-	if rule.Options.Concurrency < 0 {
-		return nil, fmt.Errorf("rule option concurrency %d is invalid, require a positive integer", rule.Options.Concurrency)
-	}
-	if rule.Options.BufferLength < 0 {
-		return nil, fmt.Errorf("rule option bufferLength %d is invalid, require a positive integer", rule.Options.BufferLength)
-	}
-	if rule.Options.LateTol < 0 {
-		return nil, fmt.Errorf("rule option lateTolerance %d is invalid, require a positive integer", rule.Options.LateTol)
+	err := conf.ValidateRuleOption(rule.Options)
+	if err != nil {
+		return nil, fmt.Errorf("Rule %s has invalid options: %s.", rule.Id, err)
 	}
 	return rule, nil
 }
 
+func clone(opt api.RuleOption) *api.RuleOption {
+	return &api.RuleOption{
+		IsEventTime:        opt.IsEventTime,
+		LateTol:            opt.LateTol,
+		Concurrency:        opt.Concurrency,
+		BufferLength:       opt.BufferLength,
+		SendMetaToSink:     opt.SendMetaToSink,
+		SendError:          opt.SendError,
+		Qos:                opt.Qos,
+		CheckpointInterval: opt.CheckpointInterval,
+		Restart: &api.RestartStrategy{
+			Attempts:     opt.Restart.Attempts,
+			Delay:        opt.Restart.Delay,
+			Multiplier:   opt.Restart.Multiplier,
+			MaxDelay:     opt.Restart.MaxDelay,
+			JitterFactor: opt.Restart.JitterFactor,
+		},
+	}
+}
+
 func (p *RuleProcessor) ExecDesc(name string) (string, error) {
 	var s1 string
 	f, _ := p.db.Get(name, &s1)

+ 1 - 1
internal/server/rest.go

@@ -411,7 +411,7 @@ func rulesHandler(w http.ResponseWriter, r *http.Request) {
 				if err != nil {
 					return err
 				} else {
-					err = doStartRule(rs)
+					err = doStartRule(rs, r.Options.Restart)
 					return err
 				}
 			})

+ 1 - 1
internal/server/rpc.go

@@ -154,7 +154,7 @@ func (t *Server) CreateRule(rule *model.RPCArgDesc, reply *string) error {
 	if err != nil {
 		return err
 	}
-	err = doStartRule(rs)
+	err = doStartRule(rs, r.Options.Restart)
 	if err != nil {
 		return err
 	}

+ 46 - 11
internal/server/rule_manager.go

@@ -25,8 +25,11 @@ import (
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
 	"github.com/lf-edge/ekuiper/pkg/infra"
+	"math"
+	"math/rand"
 	"sort"
 	"sync"
+	"time"
 )
 
 var registry *RuleRegistry
@@ -101,7 +104,7 @@ func createRuleState(rule *api.Rule) (*RuleState, error) {
 }
 
 // Assume rs is started with topo instantiated
-func doStartRule(rs *RuleState) error {
+func doStartRule(rs *RuleState, option *api.RestartStrategy) error {
 	err := ruleProcessor.ExecReplaceRuleState(rs.RuleId, true)
 	if err != nil {
 		return err
@@ -109,19 +112,50 @@ func doStartRule(rs *RuleState) error {
 	go func() {
 		tp := rs.Topology
 		err := infra.SafeRun(func() error {
-			select {
-			case err := <-tp.Open():
-				return err
+			count := 0
+			d := option.Delay
+			var (
+				er error
+			)
+			for {
+				select {
+				case e := <-tp.Open():
+					er = e
+					if er != nil { // Only restart rule for errors
+						tp.GetContext().SetError(err)
+						logger.Errorf("closing rule %s for error: %v", rs.RuleId, err)
+						tp.Cancel()
+						rs.Triggered = false
+					} else {
+						return nil
+					}
+				}
+				if count <= option.Attempts {
+					if d > option.MaxDelay {
+						d = option.MaxDelay
+					}
+					if option.JitterFactor > 0 {
+						d = int(math.Round(float64(d) * ((rand.Float64()*2-1)*0.1 + 1)))
+						conf.Log.Infof("Rule %s will restart with jitterred delay %d", rs.RuleId, d)
+					} else {
+						conf.Log.Infof("Rule %s will restart with delay %d", rs.RuleId, d)
+					}
+					time.Sleep(time.Duration(d) * time.Millisecond)
+					count++
+					if option.Multiplier > 0 {
+						d = option.Delay * int(math.Pow(float64(option.Multiplier), float64(count)))
+					}
+				} else {
+					return er
+				}
 			}
 		})
-		if err != nil {
-			tp.GetContext().SetError(err)
-			logger.Errorf("closing rule %s for error: %v", rs.RuleId, err)
-			tp.Cancel()
-			rs.Triggered = false
-		} else {
+		// If the rule is stopped manually
+		if err == nil {
 			rs.Triggered = false
 			logger.Infof("closing rule %s", rs.RuleId)
+		} else {
+			logger.Infof("closing rule %s after %d retries with error: %v", rs.RuleId, option.Attempts, err)
 		}
 	}()
 	return nil
@@ -250,7 +284,7 @@ func startRule(name string) error {
 		if err != nil {
 			return err
 		}
-		err = doStartRule(rs)
+		err = doStartRule(rs, r.Options.Restart)
 		if err != nil {
 			return err
 		}
@@ -288,6 +322,7 @@ func deleteRule(name string) (result string) {
 
 func restartRule(name string) error {
 	stopRule(name)
+	time.Sleep(1 * time.Millisecond)
 	return startRule(name)
 }
 

+ 17 - 8
pkg/api/stream.go

@@ -117,14 +117,23 @@ type Rewindable interface {
 }
 
 type RuleOption struct {
-	IsEventTime        bool  `json:"isEventTime" yaml:"isEventTime"`
-	LateTol            int64 `json:"lateTolerance" yaml:"lateTolerance"`
-	Concurrency        int   `json:"concurrency" yaml:"concurrency"`
-	BufferLength       int   `json:"bufferLength" yaml:"bufferLength"`
-	SendMetaToSink     bool  `json:"sendMetaToSink" yaml:"sendMetaToSink"`
-	SendError          bool  `json:"sendError" yaml:"sendError"`
-	Qos                Qos   `json:"qos" yaml:"qos"`
-	CheckpointInterval int   `json:"checkpointInterval" yaml:"checkpointInterval"`
+	IsEventTime        bool             `json:"isEventTime" yaml:"isEventTime"`
+	LateTol            int64            `json:"lateTolerance" yaml:"lateTolerance"`
+	Concurrency        int              `json:"concurrency" yaml:"concurrency"`
+	BufferLength       int              `json:"bufferLength" yaml:"bufferLength"`
+	SendMetaToSink     bool             `json:"sendMetaToSink" yaml:"sendMetaToSink"`
+	SendError          bool             `json:"sendError" yaml:"sendError"`
+	Qos                Qos              `json:"qos" yaml:"qos"`
+	CheckpointInterval int              `json:"checkpointInterval" yaml:"checkpointInterval"`
+	Restart            *RestartStrategy `json:"restartStrategy" yaml:"restartStrategy"`
+}
+
+type RestartStrategy struct {
+	Attempts     int     `json:"attempts" yaml:"attempts"`
+	Delay        int     `json:"delay" yaml:"delay"`
+	Multiplier   float64 `json:"multiplier" yaml:"multiplier"`
+	MaxDelay     int     `json:"maxDelay" yaml:"maxDelay"`
+	JitterFactor float64 `json:"jitter" yaml:"jitter"`
 }
 
 type PrintableTopo struct {