Quellcode durchsuchen

feat(meta): initial support for meta func

ngjaying vor 5 Jahren
Ursprung
Commit
0620b5bf38

+ 84 - 22
xsql/ast.go

@@ -270,6 +270,14 @@ type FieldRef struct {
 func (fr *FieldRef) expr() {}
 func (fr *FieldRef) expr() {}
 func (fr *FieldRef) node() {}
 func (fr *FieldRef) node() {}
 
 
+type MetaRef struct {
+	StreamName StreamName
+	Name       string
+}
+
+func (fr *MetaRef) expr() {}
+func (fr *MetaRef) node() {}
+
 // The stream AST tree
 // The stream AST tree
 type Options map[string]string
 type Options map[string]string
 
 
@@ -459,6 +467,7 @@ func (fn walkFuncVisitor) Visit(n Node) Visitor { fn(n); return fn }
 type Valuer interface {
 type Valuer interface {
 	// Value returns the value and existence flag for a given key.
 	// Value returns the value and existence flag for a given key.
 	Value(key string) (interface{}, bool)
 	Value(key string) (interface{}, bool)
+	Meta(key string) (interface{}, bool)
 }
 }
 
 
 // CallValuer implements the Call method for evaluating function calls.
 // CallValuer implements the Call method for evaluating function calls.
@@ -502,6 +511,10 @@ func (wv *WildcardValuer) Value(key string) (interface{}, bool) {
 	}
 	}
 }
 }
 
 
+func (wv *WildcardValuer) Meta(key string) (interface{}, bool) {
+	return nil, false
+}
+
 /**********************************
 /**********************************
 **	Various Data Types for SQL transformation
 **	Various Data Types for SQL transformation
  */
  */
@@ -527,6 +540,11 @@ func (m Message) Value(key string) (interface{}, bool) {
 	return nil, false
 	return nil, false
 }
 }
 
 
+func (m Message) Meta(key string) (interface{}, bool) {
+	common.Log.Println("Message cannot get meta")
+	return nil, false
+}
+
 type Event interface {
 type Event interface {
 	GetTimestamp() int64
 	GetTimestamp() int64
 	IsWatermark() bool
 	IsWatermark() bool
@@ -556,11 +574,11 @@ func (m Metadata) Value(key string) (interface{}, bool) {
 }
 }
 
 
 func (t *Tuple) Value(key string) (interface{}, bool) {
 func (t *Tuple) Value(key string) (interface{}, bool) {
-	if v, ok := t.Message.Value(key); ok {
-		return v, ok
-	} else {
-		return t.Metadata.Value(key)
-	}
+	return t.Message.Value(key)
+}
+
+func (t *Tuple) Meta(key string) (interface{}, bool) {
+	return t.Metadata.Value(key)
 }
 }
 
 
 func (t *Tuple) All(stream string) (interface{}, bool) {
 func (t *Tuple) All(stream string) (interface{}, bool) {
@@ -708,6 +726,40 @@ func (jt *JoinTuple) Value(key string) (interface{}, bool) {
 	}
 	}
 }
 }
 
 
+func (jt *JoinTuple) Meta(key string) (interface{}, bool) {
+	keys := strings.Split(key, ".")
+	tuples := jt.Tuples
+	switch len(keys) {
+	case 1:
+		if len(tuples) > 1 {
+			for _, tuple := range tuples { //TODO support key without modifier?
+				v, ok := tuple.Metadata[key]
+				if ok {
+					return v, ok
+				}
+			}
+			common.Log.Infoln("Wrong key: ", key, ", not found")
+			return nil, false
+		} else {
+			v, ok := tuples[0].Metadata[key]
+			return v, ok
+		}
+	case 2:
+		emitter, key := keys[0], keys[1]
+		//TODO should use hash here
+		for _, tuple := range tuples {
+			if tuple.Emitter == emitter {
+				v, ok := tuple.Metadata[key]
+				return v, ok
+			}
+		}
+		return nil, false
+	default:
+		common.Log.Infoln("Wrong key: ", key, ", expect dot in the expression.")
+		return nil, false
+	}
+}
+
 func (jt *JoinTuple) All(stream string) (interface{}, bool) {
 func (jt *JoinTuple) All(stream string) (interface{}, bool) {
 	if stream != "" {
 	if stream != "" {
 		for _, t := range jt.Tuples {
 		for _, t := range jt.Tuples {
@@ -927,6 +979,15 @@ func (a multiValuer) Value(key string) (interface{}, bool) {
 	return nil, false
 	return nil, false
 }
 }
 
 
+func (a multiValuer) Meta(key string) (interface{}, bool) {
+	for _, valuer := range a {
+		if v, ok := valuer.Meta(key); ok {
+			return v, true
+		}
+	}
+	return nil, false
+}
+
 func (a multiValuer) Call(name string, args []interface{}) (interface{}, bool) {
 func (a multiValuer) Call(name string, args []interface{}) (interface{}, bool) {
 	for _, valuer := range a {
 	for _, valuer := range a {
 		if valuer, ok := valuer.(CallValuer); ok {
 		if valuer, ok := valuer.(CallValuer); ok {
@@ -941,30 +1002,21 @@ func (a multiValuer) Call(name string, args []interface{}) (interface{}, bool) {
 }
 }
 
 
 type multiAggregateValuer struct {
 type multiAggregateValuer struct {
-	data    AggregateData
-	valuers []Valuer
+	data AggregateData
+	multiValuer
 }
 }
 
 
 func MultiAggregateValuer(data AggregateData, valuers ...Valuer) Valuer {
 func MultiAggregateValuer(data AggregateData, valuers ...Valuer) Valuer {
 	return &multiAggregateValuer{
 	return &multiAggregateValuer{
-		data:    data,
-		valuers: valuers,
+		data:        data,
+		multiValuer: valuers,
 	}
 	}
 }
 }
 
 
-func (a *multiAggregateValuer) Value(key string) (interface{}, bool) {
-	for _, valuer := range a.valuers {
-		if v, ok := valuer.Value(key); ok {
-			return v, true
-		}
-	}
-	return nil, false
-}
-
 //The args is [][] for aggregation
 //The args is [][] for aggregation
 func (a *multiAggregateValuer) Call(name string, args []interface{}) (interface{}, bool) {
 func (a *multiAggregateValuer) Call(name string, args []interface{}) (interface{}, bool) {
 	var singleArgs []interface{} = nil
 	var singleArgs []interface{} = nil
-	for _, valuer := range a.valuers {
+	for _, valuer := range a.multiValuer {
 		if a, ok := valuer.(AggregateCallValuer); ok {
 		if a, ok := valuer.(AggregateCallValuer); ok {
 			if v, ok := a.Call(name, args); ok {
 			if v, ok := a.Call(name, args); ok {
 				return v, true
 				return v, true
@@ -1058,6 +1110,15 @@ func (v *ValuerEval) Eval(expr Expr) interface{} {
 			val, _ := v.Valuer.Value(string(expr.StreamName) + "." + expr.Name)
 			val, _ := v.Valuer.Value(string(expr.StreamName) + "." + expr.Name)
 			return val
 			return val
 		}
 		}
+	case *MetaRef:
+		if expr.StreamName == "" {
+			val, _ := v.Valuer.Meta(expr.Name)
+			return val
+		} else {
+			//The field specified with stream source
+			val, _ := v.Valuer.Meta(string(expr.StreamName) + "." + expr.Name)
+			return val
+		}
 	case *Wildcard:
 	case *Wildcard:
 		val, _ := v.Valuer.Value("")
 		val, _ := v.Valuer.Value("")
 		return val
 		return val
@@ -1088,10 +1149,11 @@ func (v *ValuerEval) evalJsonExpr(result interface{}, op Token, expr Expr) inter
 	if val, ok := result.(map[string]interface{}); ok {
 	if val, ok := result.(map[string]interface{}); ok {
 		switch op {
 		switch op {
 		case ARROW:
 		case ARROW:
-			if exp, ok := expr.(*FieldRef); ok {
+			switch e := expr.(type) {
+			case *FieldRef, *MetaRef:
 				ve := &ValuerEval{Valuer: Message(val)}
 				ve := &ValuerEval{Valuer: Message(val)}
-				return ve.Eval(exp)
-			} else {
+				return ve.Eval(e)
+			default:
 				return fmt.Errorf("the right expression is not a field reference node")
 				return fmt.Errorf("the right expression is not a field reference node")
 			}
 			}
 		default:
 		default:

+ 4 - 0
xsql/funcs_aggregate.go

@@ -16,6 +16,10 @@ func (v AggregateFunctionValuer) Value(key string) (interface{}, bool) {
 	return nil, false
 	return nil, false
 }
 }
 
 
+func (v AggregateFunctionValuer) Meta(key string) (interface{}, bool) {
+	return nil, false
+}
+
 func (v AggregateFunctionValuer) Call(name string, args []interface{}) (interface{}, bool) {
 func (v AggregateFunctionValuer) Call(name string, args []interface{}) (interface{}, bool) {
 	lowerName := strings.ToLower(name)
 	lowerName := strings.ToLower(name)
 	switch lowerName {
 	switch lowerName {

+ 10 - 0
xsql/funcs_ast_validator.go

@@ -301,6 +301,16 @@ func validateOtherFunc(name string, args []Expr) error {
 				return fmt.Errorf("Parameter of mqtt function can be only topic or messageid.")
 				return fmt.Errorf("Parameter of mqtt function can be only topic or messageid.")
 			}
 			}
 		}
 		}
+	case "meta":
+		if err := validateLen(name, 1, len); err != nil {
+			return err
+		}
+		if isIntegerArg(args[0]) || isTimeArg(args[0]) || isBooleanArg(args[0]) || isStringArg(args[0]) || isFloatArg(args[0]) {
+			return produceErrInfo(name, 0, "meta reference")
+		}
+		if _, ok := args[0].(*MetaRef); !ok {
+			return produceErrInfo(name, 0, "meta reference")
+		}
 	}
 	}
 	return nil
 	return nil
 }
 }

+ 14 - 1
xsql/funcs_ast_validator_test.go

@@ -409,12 +409,25 @@ func TestFuncValidator(t *testing.T) {
 			stmt: nil,
 			stmt: nil,
 			err:  "Expect string type for 2 parameter of function split_value.",
 			err:  "Expect string type for 2 parameter of function split_value.",
 		},
 		},
-
 		{
 		{
 			s:    `SELECT split_value(topic1, "hello", -1) FROM tbl`,
 			s:    `SELECT split_value(topic1, "hello", -1) FROM tbl`,
 			stmt: nil,
 			stmt: nil,
 			err:  "The index should not be a nagtive integer.",
 			err:  "The index should not be a nagtive integer.",
 		},
 		},
+		{
+			s:    `SELECT meta(tbl, "timestamp", 1) FROM tbl`,
+			stmt: nil,
+			err:  "The arguments for meta should be 1.",
+		},
+		{
+			s:    `SELECT meta("src1.device") FROM tbl`,
+			stmt: nil,
+			err:  "Expect meta reference type for 1 parameter of function meta.",
+		},
+		{
+			s:    `SELECT meta(device) FROM tbl`,
+			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "meta", Expr: &Call{Name: "meta", Args: []Expr{&MetaRef{Name: "device"}}}}}, Sources: []Source{&Table{Name: "tbl"}}},
+		},
 	}
 	}
 
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))

+ 2 - 0
xsql/funcs_misc.go

@@ -206,6 +206,8 @@ func otherCall(name string, args []interface{}) (interface{}, bool) {
 			return v, true
 			return v, true
 		}
 		}
 		return nil, false
 		return nil, false
+	case "meta":
+		return args[0], true
 	default:
 	default:
 		return fmt.Errorf("unknown function name %s", name), false
 		return fmt.Errorf("unknown function name %s", name), false
 	}
 	}

+ 5 - 1
xsql/functions.go

@@ -13,6 +13,10 @@ func (*FunctionValuer) Value(key string) (interface{}, bool) {
 	return nil, false
 	return nil, false
 }
 }
 
 
+func (*FunctionValuer) Meta(key string) (interface{}, bool) {
+	return nil, false
+}
+
 var aggFuncMap = map[string]string{"avg": "",
 var aggFuncMap = map[string]string{"avg": "",
 	"count": "",
 	"count": "",
 	"max":   "", "min": "",
 	"max":   "", "min": "",
@@ -53,7 +57,7 @@ var hashFuncMap = map[string]string{"md5": "",
 }
 }
 
 
 var otherFuncMap = map[string]string{"isNull": "",
 var otherFuncMap = map[string]string{"isNull": "",
-	"newuuid": "", "timestamp": "", "mqtt": "",
+	"newuuid": "", "timestamp": "", "mqtt": "", "meta": "",
 }
 }
 
 
 func (*FunctionValuer) Call(name string, args []interface{}) (interface{}, bool) {
 func (*FunctionValuer) Call(name string, args []interface{}) (interface{}, bool) {

+ 17 - 3
xsql/parser.go

@@ -17,6 +17,7 @@ type Parser struct {
 		tok Token
 		tok Token
 		lit string
 		lit string
 	}
 	}
+	inmeta bool
 }
 }
 
 
 func (p *Parser) parseCondition() (Expr, error) {
 func (p *Parser) parseCondition() (Expr, error) {
@@ -505,10 +506,17 @@ func (p *Parser) parseUnaryExpr() (Expr, error) {
 		if n, err := p.parseFieldNameSections(); err != nil {
 		if n, err := p.parseFieldNameSections(); err != nil {
 			return nil, err
 			return nil, err
 		} else {
 		} else {
-			if len(n) == 2 {
-				return &FieldRef{StreamName: StreamName(n[0]), Name: n[1]}, nil
+			if p.inmeta {
+				if len(n) == 2 {
+					return &MetaRef{StreamName: StreamName(n[0]), Name: n[1]}, nil
+				}
+				return &MetaRef{StreamName: "", Name: n[0]}, nil
+			} else {
+				if len(n) == 2 {
+					return &FieldRef{StreamName: StreamName(n[0]), Name: n[1]}, nil
+				}
+				return &FieldRef{StreamName: "", Name: n[0]}, nil
 			}
 			}
-			return &FieldRef{StreamName: StreamName(""), Name: n[0]}, nil
 		}
 		}
 	} else if tok == STRING {
 	} else if tok == STRING {
 		return &StringLiteral{Val: lit}, nil
 		return &StringLiteral{Val: lit}, nil
@@ -587,6 +595,12 @@ func (p *Parser) parseAs(f *Field) (*Field, error) {
 }
 }
 
 
 func (p *Parser) parseCall(name string) (Expr, error) {
 func (p *Parser) parseCall(name string) (Expr, error) {
+	if strings.ToLower(name) == "meta" {
+		p.inmeta = true
+		defer func() {
+			p.inmeta = false
+		}()
+	}
 	var args []Expr
 	var args []Expr
 	for {
 	for {
 		if tok, _ := p.scanIgnoreWhitespace(); tok == RPAREN {
 		if tok, _ := p.scanIgnoreWhitespace(); tok == RPAREN {

+ 53 - 0
xsql/plans/misc_func_test.go

@@ -210,3 +210,56 @@ func TestMqttFunc_Apply2(t *testing.T) {
 		}
 		}
 	}
 	}
 }
 }
+
+func TestMetaFunc_Apply1(t *testing.T) {
+	var tests = []struct {
+		sql    string
+		data   interface{}
+		result interface{}
+	}{
+		{
+			sql: "SELECT topic, meta(topic) AS a FROM test",
+			data: &xsql.Tuple{
+				Emitter: "test",
+				Message: xsql.Message{
+					"topic": "fff",
+				},
+				Metadata: xsql.Metadata{
+					"topic": "devices/device_001/message",
+				},
+			},
+			result: []map[string]interface{}{{
+				"topic": "fff",
+				"a":     "devices/device_001/message",
+			}},
+		},
+	}
+
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestHashFunc_Apply1")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
+	for i, tt := range tests {
+		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
+		if err != nil || stmt == nil {
+			t.Errorf("parse sql %s error %v", tt.sql, err)
+		}
+		pp := &ProjectPlan{Fields: stmt.Fields}
+		pp.isTest = true
+		result := pp.Apply(ctx, tt.data)
+		var mapRes []map[string]interface{}
+		if v, ok := result.([]byte); ok {
+			err := json.Unmarshal(v, &mapRes)
+			if err != nil {
+				t.Errorf("Failed to parse the input into map.\n")
+				continue
+			}
+			//fmt.Printf("%t\n", mapRes["rengine_field_0"])
+
+			if !reflect.DeepEqual(tt.result, mapRes) {
+				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
+			}
+		} else {
+			t.Errorf("The returned result is not type of []byte\n")
+		}
+	}
+}