Browse Source

fix(parser): meta(*) arg must have a stream name

analyzer and planner rely on the stream name, so must set defaultStream as the default

closes: #972

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 3 năm trước cách đây
mục cha
commit
54a9960fbb

+ 69 - 2
internal/topo/planner/planner_test.go

@@ -70,7 +70,11 @@ func Test_createLogicalPlan(t *testing.T) {
 			t.Error(err)
 			t.Fail()
 		}
-		store.Set(name, string(s))
+		err = store.Set(name, string(s))
+		if err != nil {
+			t.Error(err)
+			t.Fail()
+		}
 	}
 	streams := make(map[string]*ast.StreamStmt)
 	for n := range streamSqls {
@@ -1044,6 +1048,65 @@ func Test_createLogicalPlan(t *testing.T) {
 				isAggregate: false,
 				sendMeta:    false,
 			}.Init(),
+		}, { // 12 meta with more fields
+			sql: `SELECT temp, meta(*) as m FROM src1 WHERE meta(device)="demo2"`,
+			p: ProjectPlan{
+				baseLogicalPlan: baseLogicalPlan{
+					children: []LogicalPlan{
+						FilterPlan{
+							baseLogicalPlan: baseLogicalPlan{
+								children: []LogicalPlan{
+									DataSourcePlan{
+										name: "src1",
+										streamFields: []interface{}{
+											&ast.StreamField{
+												Name:      "temp",
+												FieldType: &ast.BasicType{Type: ast.BIGINT},
+											},
+										},
+										streamStmt: streams["src1"],
+										metaFields: []string{},
+										allMeta:    true,
+									}.Init(),
+								},
+							},
+							condition: &ast.BinaryExpr{
+								LHS: &ast.Call{
+									Name: "meta",
+									Args: []ast.Expr{&ast.MetaRef{
+										Name:       "device",
+										StreamName: ast.DefaultStream,
+									}},
+								},
+								OP: ast.EQ,
+								RHS: &ast.StringLiteral{
+									Val: "demo2",
+								},
+							},
+						}.Init(),
+					},
+				},
+				fields: []ast.Field{
+					{
+						Expr:  &ast.FieldRef{Name: "temp", StreamName: "src1"},
+						Name:  "temp",
+						AName: "",
+					}, {
+						Expr: &ast.FieldRef{Name: "m", StreamName: ast.AliasStream, AliasRef: ast.MockAliasRef(
+							&ast.Call{Name: "meta", Args: []ast.Expr{&ast.MetaRef{
+								Name:       "*",
+								StreamName: ast.DefaultStream,
+							}}},
+							[]ast.StreamName{},
+							nil,
+						)},
+						Name:  "meta",
+						AName: "m",
+					},
+				},
+				isAggregate: false,
+				sendMeta:    false,
+			}.Init(),
 		},
 	}
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
@@ -1104,7 +1167,11 @@ func Test_createLogicalPlanSchemaless(t *testing.T) {
 			t.Error(err)
 			t.Fail()
 		}
-		store.Set(name, string(s))
+		err = store.Set(name, string(s))
+		if err != nil {
+			t.Error(err)
+			t.Fail()
+		}
 	}
 	streams := make(map[string]*ast.StreamStmt)
 	for n := range streamSqls {

+ 1 - 1
internal/xsql/parser.go

@@ -662,7 +662,7 @@ func (p *Parser) parseCall(name string) (ast.Expr, error) {
 				return nil, fmt.Errorf("found %q, expected right paren.", lit2)
 			} else {
 				if p.inmeta {
-					args = append(args, &ast.MetaRef{StreamName: "", Name: "*"})
+					args = append(args, &ast.MetaRef{StreamName: ast.DefaultStream, Name: "*"})
 				} else {
 					args = append(args, &ast.Wildcard{Token: ast.ASTERISK})
 				}

+ 21 - 0
internal/xsql/parser_test.go

@@ -1431,6 +1431,27 @@ func TestParser_ParseStatement(t *testing.T) {
 			stmt: nil,
 			err:  "found \"-\", expected expression.",
 		},
+		{
+			s: `SELECT meta(*) FROM tbl`,
+			stmt: &ast.SelectStatement{
+				Fields: []ast.Field{
+					{
+						AName: "",
+						Name:  "meta",
+						Expr: &ast.Call{
+							Name: "meta",
+							Args: []ast.Expr{
+								&ast.MetaRef{
+									Name:       "*",
+									StreamName: ast.DefaultStream,
+								},
+							},
+						},
+					},
+				},
+				Sources: []ast.Source{&ast.Table{Name: "tbl"}},
+			},
+		},
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))