Kaynağa Gözat

feat(meta): support meta(*)

ngjaying 5 yıl önce
ebeveyn
işleme
4e61ccbd7a
3 değiştirilmiş dosya ile 42 ekleme ve 1 silme
  1. 9 0
      xsql/ast.go
  2. 5 1
      xsql/parser.go
  3. 28 0
      xsql/plans/misc_func_test.go

+ 9 - 0
xsql/ast.go

@@ -542,6 +542,9 @@ func (m Message) Value(key string) (interface{}, bool) {
 }
 }
 
 
 func (m Message) Meta(key string) (interface{}, bool) {
 func (m Message) Meta(key string) (interface{}, bool) {
+	if key == "*" {
+		return map[string]interface{}(m), true
+	}
 	return m.Value(key)
 	return m.Value(key)
 }
 }
 
 
@@ -558,6 +561,9 @@ func (m Metadata) Value(key string) (interface{}, bool) {
 }
 }
 
 
 func (m Metadata) Meta(key string) (interface{}, bool) {
 func (m Metadata) Meta(key string) (interface{}, bool) {
+	if key == "*" {
+		return map[string]interface{}(m), true
+	}
 	msg := Message(m)
 	msg := Message(m)
 	return msg.Meta(key)
 	return msg.Meta(key)
 }
 }
@@ -574,6 +580,9 @@ func (t *Tuple) Value(key string) (interface{}, bool) {
 }
 }
 
 
 func (t *Tuple) Meta(key string) (interface{}, bool) {
 func (t *Tuple) Meta(key string) (interface{}, bool) {
+	if key == "*" {
+		return map[string]interface{}(t.Metadata), true
+	}
 	return t.Metadata.Value(key)
 	return t.Metadata.Value(key)
 }
 }
 
 

+ 5 - 1
xsql/parser.go

@@ -609,7 +609,11 @@ func (p *Parser) parseCall(name string) (Expr, error) {
 			if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 != RPAREN {
 			if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 != RPAREN {
 				return nil, fmt.Errorf("found %q, expected right paren.", lit2)
 				return nil, fmt.Errorf("found %q, expected right paren.", lit2)
 			} else {
 			} else {
-				args = append(args, &StringLiteral{Val: "*"})
+				if p.inmeta {
+					args = append(args, &MetaRef{StreamName: "", Name: "*"})
+				} else {
+					args = append(args, &StringLiteral{Val: "*"})
+				}
 				return &Call{Name: name, Args: args}, nil
 				return &Call{Name: name, Args: args}, nil
 			}
 			}
 		} else {
 		} else {

+ 28 - 0
xsql/plans/misc_func_test.go

@@ -296,12 +296,40 @@ func TestMetaFunc_Apply1(t *testing.T) {
 				"r": "device2",
 				"r": "device2",
 			}},
 			}},
 		},
 		},
+		{
+			sql: "SELECT meta(*) as r FROM test",
+			data: &xsql.Tuple{
+				Emitter: "test",
+				Message: xsql.Message{
+					"temperature": 43.2,
+				},
+				Metadata: xsql.Metadata{
+					"temperature": map[string]interface{}{
+						"id":     "dfadfasfas",
+						"device": "device2",
+					},
+					"device": "gateway",
+				},
+			},
+			result: []map[string]interface{}{{
+				"r": map[string]interface{}{
+					"temperature": map[string]interface{}{
+						"id":     "dfadfasfas",
+						"device": "device2",
+					},
+					"device": "gateway",
+				},
+			}},
+		},
 	}
 	}
 
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
 	contextLogger := common.Log.WithField("rule", "TestMetaFunc_Apply1")
 	contextLogger := common.Log.WithField("rule", "TestMetaFunc_Apply1")
 	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 	for i, tt := range tests {
+		if i != 2 {
+			continue
+		}
 		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 		if err != nil || stmt == nil {
 		if err != nil || stmt == nil {
 			t.Errorf("parse sql %s error %v", tt.sql, err)
 			t.Errorf("parse sql %s error %v", tt.sql, err)