Переглянути джерело

refactor(rule): rule option parse

ngjaying 4 роки тому
батько
коміт
274820b7b4
2 змінених файлів з 115 додано та 24 видалено
  1. 89 19
      xsql/processors/simple_processor_test.go
  2. 26 5
      xsql/processors/xsql_processor.go

+ 89 - 19
xsql/processors/simple_processor_test.go

@@ -1,6 +1,7 @@
 package processors
 
 import (
+	"github.com/emqx/kuiper/xstream/api"
 	"reflect"
 	"testing"
 )
@@ -8,7 +9,7 @@ import (
 func TestRuleActionParse_Apply(t *testing.T) {
 	var tests = []struct {
 		ruleStr string
-		result  []map[string]interface{}
+		result  *api.Rule
 	}{
 		{
 			ruleStr: `{
@@ -34,36 +35,105 @@ func TestRuleActionParse_Apply(t *testing.T) {
 				}
 			  ]
 			}`,
-			result: []map[string]interface{}{
-				{
-					"funcName": "RFC_READ_TABLE",
-					"ashost":   "192.168.1.100",
-					"sysnr":    "02",
-					"client":   "900",
-					"user":     "SPERF",
-					"passwd":   "PASSPASS",
-					"params": map[string]interface{}{
-						"QUERY_TABLE": "VBAP",
-						"ROWCOUNT":    float64(10),
-						"FIELDS": []interface{}{
-							map[string]interface{}{"FIELDNAME": "MANDT"},
-							map[string]interface{}{"FIELDNAME": "VBELN"},
-							map[string]interface{}{"FIELDNAME": "POSNR"},
+			result: &api.Rule{
+				Triggered: false,
+				Id:        "ruleTest",
+				Sql:       "SELECT * from demo",
+				Actions: []map[string]interface{}{
+					{
+						"funcName": "RFC_READ_TABLE",
+						"ashost":   "192.168.1.100",
+						"sysnr":    "02",
+						"client":   "900",
+						"user":     "SPERF",
+						"passwd":   "PASSPASS",
+						"params": map[string]interface{}{
+							"QUERY_TABLE": "VBAP",
+							"ROWCOUNT":    float64(10),
+							"FIELDS": []interface{}{
+								map[string]interface{}{"FIELDNAME": "MANDT"},
+								map[string]interface{}{"FIELDNAME": "VBELN"},
+								map[string]interface{}{"FIELDNAME": "POSNR"},
+							},
+						},
+					},
+				},
+				Options: &api.RuleOption{
+					IsEventTime:        false,
+					LateTol:            1000,
+					Concurrency:        1,
+					BufferLength:       1024,
+					SendMetaToSink:     false,
+					Qos:                api.AtMostOnce,
+					CheckpointInterval: 300000,
+				},
+			},
+		}, {
+			ruleStr: `{
+				"id": "ruleTest2",
+				"sql": "SELECT * from demo",
+				"actions": [
+					{
+						"log": ""
+					},
+					{
+						"sap": {
+							"funcName": "RFC_READ_TABLE",
+							"ashost": "192.168.100.10",
+							"sysnr": "02",
+							"client": "900",
+							"user": "uuu",
+							"passwd": "ppp."
+						}
+					}
+				],
+				"options": {
+					"isEventTime": true,
+					"lateTolerance": 1000,
+					"bufferLength": 10240,
+					"qos": 2,
+					"checkpointInterval": 60000
+				}
+			}`,
+			result: &api.Rule{
+				Triggered: false,
+				Id:        "ruleTest2",
+				Sql:       "SELECT * from demo",
+				Actions: []map[string]interface{}{
+					{
+						"log": "",
+					}, {
+						"sap": map[string]interface{}{
+							"funcName": "RFC_READ_TABLE",
+							"ashost":   "192.168.100.10",
+							"sysnr":    "02",
+							"client":   "900",
+							"user":     "uuu",
+							"passwd":   "ppp.",
 						},
 					},
 				},
+				Options: &api.RuleOption{
+					IsEventTime:        true,
+					LateTol:            1000,
+					Concurrency:        1,
+					BufferLength:       10240,
+					SendMetaToSink:     false,
+					Qos:                api.ExactlyOnce,
+					CheckpointInterval: 60000,
+				},
 			},
 		},
 	}
 
 	p := NewRuleProcessor(DbDir)
 	for i, tt := range tests {
-		r, err := p.getRuleByJson("ruleTest", tt.ruleStr)
+		r, err := p.getRuleByJson(tt.result.Id, tt.ruleStr)
 		if err != nil {
 			t.Errorf("get rule error: %s", err)
 		}
-		if !reflect.DeepEqual(tt.result, r.Actions) {
-			t.Errorf("%d \tresult mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, r.Actions)
+		if !reflect.DeepEqual(tt.result, r) {
+			t.Errorf("%d \tresult mismatch:\n\nexp=%+v\n\ngot=%+v\n\n", i, tt.result, r)
 		}
 	}
 

+ 26 - 5
xsql/processors/xsql_processor.go

@@ -270,7 +270,15 @@ func (p *RuleProcessor) GetRuleByName(name string) (*api.Rule, error) {
 }
 
 func (p *RuleProcessor) getRuleByJson(name, ruleJson string) (*api.Rule, error) {
-	var rule api.Rule
+	//set default rule options
+	rule := &api.Rule{
+		Options: &api.RuleOption{
+			LateTol:            1000,
+			Concurrency:        1,
+			BufferLength:       1024,
+			CheckpointInterval: 300000, //5 minutes
+		},
+	}
 	if err := json.Unmarshal([]byte(ruleJson), &rule); err != nil {
 		return nil, fmt.Errorf("Parse rule %s error : %s.", ruleJson, err)
 	}
@@ -291,10 +299,23 @@ func (p *RuleProcessor) getRuleByJson(name, ruleJson string) (*api.Rule, error)
 	if rule.Actions == nil || len(rule.Actions) == 0 {
 		return nil, fmt.Errorf("Missing rule actions.")
 	}
-	rule.Options.BufferLength = 1024
-	rule.Options.Concurrency = 1
-	rule.Options.CheckpointInterval = 300000 //5 minutes
-	return &rule, nil
+	if rule.Options == nil {
+		rule.Options = &api.RuleOption{}
+	}
+	//Set default options
+	if rule.Options.CheckpointInterval < 0 {
+		return nil, fmt.Errorf("rule option checkpointInterval %d is invalid, require a positive integer", rule.Options.CheckpointInterval)
+	}
+	if rule.Options.Concurrency < 0 {
+		return nil, fmt.Errorf("rule option concurrency %d is invalid, require a positive integer", rule.Options.Concurrency)
+	}
+	if rule.Options.BufferLength < 0 {
+		return nil, fmt.Errorf("rule option bufferLength %d is invalid, require a positive integer", rule.Options.BufferLength)
+	}
+	if rule.Options.LateTol < 0 {
+		return nil, fmt.Errorf("rule option lateTolerance %d is invalid, require a positive integer", rule.Options.LateTol)
+	}
+	return rule, nil
 }
 
 func (p *RuleProcessor) ExecInitRule(rule *api.Rule) (*xstream.TopologyNew, error) {