Browse Source

feat(checkpoint): add rule options to kuiper conf

ngjaying 4 years ago
parent
commit
736b128804
4 changed files with 29 additions and 14 deletions
  1. 10 1
      common/util.go
  2. 10 0
      etc/kuiper.yaml
  3. 2 6
      xsql/processors/xsql_processor.go
  4. 7 7
      xstream/api/stream.go

+ 10 - 1
common/util.go

@@ -5,6 +5,7 @@ import (
 	"encoding/json"
 	"fmt"
 	"github.com/benbjohnson/clock"
+	"github.com/emqx/kuiper/xstream/api"
 	"github.com/go-yaml/yaml"
 	"github.com/sirupsen/logrus"
 	"io"
@@ -68,6 +69,7 @@ type KuiperConf struct {
 		Prometheus     bool     `yaml:"prometheus"`
 		PrometheusPort int      `yaml:"prometheusPort"`
 	}
+	Rule api.RuleOption
 	Sink struct {
 		CacheThreshold    int `yaml:"cacheThreshold"`
 		CacheTriggerCount int `yaml:"cacheTriggerCount"`
@@ -106,7 +108,14 @@ func InitConf() {
 		Log.Fatal(err)
 	}
 
-	kc := KuiperConf{}
+	kc := KuiperConf{
+		Rule: api.RuleOption{
+			LateTol:            1000,
+			Concurrency:        1,
+			BufferLength:       1024,
+			CheckpointInterval: 300000, //5 minutes
+		},
+	}
 	if err := yaml.Unmarshal(b, &kc); err != nil {
 		Log.Fatal(err)
 	} else {

+ 10 - 0
etc/kuiper.yaml

@@ -16,6 +16,16 @@ basic:
   prometheus: false
   prometheusPort: 20499
 
+# The default options for all rules. Each rule can override this setting by defining its own option
+rule:
+  # The qos of the rule. The values can be 0: At most once; 1: At least once; 2: Exactly once
+  # If qos is bigger than 0, the checkpoint mechanism will launch to save states so that they can be
+  # restored for unintended interrupt or planned restart of the rule. The performance may be affected
+  # to enable the checkpoint mechanism
+  qos: 0
+  # The interval in millisecond to run the checkpoint mechanism.
+  checkpointInterval: 300000
+
 sink:
   # The cache persistence threshold size. If the message in sink cache is larger than 10, then it triggers persistence. If you find
   # the remote system is slow to response, or sink throughput is small, then it's recommend to increase below 2 configurations.

+ 2 - 6
xsql/processors/xsql_processor.go

@@ -270,14 +270,10 @@ func (p *RuleProcessor) GetRuleByName(name string) (*api.Rule, error) {
 }
 
 func (p *RuleProcessor) getRuleByJson(name, ruleJson string) (*api.Rule, error) {
+	opt := common.Config.Rule
 	//set default rule options
 	rule := &api.Rule{
-		Options: &api.RuleOption{
-			LateTol:            1000,
-			Concurrency:        1,
-			BufferLength:       1024,
-			CheckpointInterval: 300000, //5 minutes
-		},
+		Options: &opt,
 	}
 	if err := json.Unmarshal([]byte(ruleJson), &rule); err != nil {
 		return nil, fmt.Errorf("Parse rule %s error : %s.", ruleJson, err)

+ 7 - 7
xstream/api/stream.go

@@ -91,13 +91,13 @@ type Rewindable interface {
 }
 
 type RuleOption struct {
-	IsEventTime        bool  `json:"isEventTime"`
-	LateTol            int64 `json:"lateTolerance"`
-	Concurrency        int   `json:"concurrency"`
-	BufferLength       int   `json:"bufferLength"`
-	SendMetaToSink     bool  `json:"sendMetaToSink"`
-	Qos                Qos   `json:"qos"`
-	CheckpointInterval int   `json:"checkpointInterval"`
+	IsEventTime        bool  `json:"isEventTime" yaml:"isEventTime"`
+	LateTol            int64 `json:"lateTolerance" yaml:"lateTolerance"`
+	Concurrency        int   `json:"concurrency" yaml:"concurrency"`
+	BufferLength       int   `json:"bufferLength" yaml:"bufferLength"`
+	SendMetaToSink     bool  `json:"sendMetaToSink" yaml:"sendMetaToSink"`
+	Qos                Qos   `json:"qos" yaml:"qos"`
+	CheckpointInterval int   `json:"checkpointInterval" yaml:"checkpointInterval"`
 }
 
 type Rule struct {