Browse Source

fix(planner): add validation for stream name of field ref (#1072)

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
ngjaying 3 years ago
parent
commit
815c86ff06
2 changed files with 19 additions and 1 deletions
  1. 14 0
      internal/topo/planner/analyzer.go
  2. 5 1
      internal/topo/planner/analyzer_test.go

+ 14 - 0
internal/topo/planner/analyzer.go

@@ -99,6 +99,20 @@ func decorateStmt(s *ast.SelectStatement, store kv.KeyValue) ([]*ast.StreamStmt,
 		case ast.Fields: // do not bind selection fields, should have done above
 			return false
 		case *ast.FieldRef:
+			if f.StreamName != "" && f.StreamName != ast.DefaultStream {
+				// check if stream exists
+				found := false
+				for _, sn := range streamsFromStmt {
+					if sn == string(f.StreamName) {
+						found = true
+						break
+					}
+				}
+				if !found {
+					walkErr = fmt.Errorf("stream %s not found", f.StreamName)
+					return true
+				}
+			}
 			walkErr = fieldsMap.bind(f)
 		}
 		return true

+ 5 - 1
internal/topo/planner/analyzer_test.go

@@ -119,10 +119,14 @@ var tests = []struct {
 		sql: `SELECT collect(*)[-1] as current FROM src1 GROUP BY COUNTWINDOW(2, 1) HAVING isNull(current->name) = false`,
 		r:   newErrorStruct(""),
 	},
-	{ // 14
+	{ // 15
 		sql: `SELECT sum(next->nid) as nid FROM src1 WHERE next->nid > 20 `,
 		r:   newErrorStruct(""),
 	},
+	{ // 16
+		sql: `SELECT collect(*)[0] as last FROM src1 GROUP BY SlidingWindow(ss,5) HAVING last.temp > 30`,
+		r:   newErrorStruct("stream last not found"),
+	},
 }
 
 func Test_validation(t *testing.T) {