浏览代码

fix(func): analytic depending on alias must run alias firstly

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 年之前
父节点
当前提交
5fa2a51672
共有 2 个文件被更改,包括 44 次插入9 次删除
  1. 25 0
      internal/topo/planner/analyzer.go
  2. 19 9
      internal/topo/planner/planner_test.go

+ 25 - 0
internal/topo/planner/analyzer.go

@@ -120,6 +120,31 @@ func decorateStmt(s *ast.SelectStatement, store kv.KeyValue) ([]*ast.StreamStmt,
 	// Collect all analytic function calls so that we can let them run firstly
 	ast.WalkFunc(s, func(n ast.Node) bool {
 		switch f := n.(type) {
+		case ast.Fields:
+			return false
+		case *ast.Call:
+			if function.IsAnalyticFunc(f.Name) {
+				f.CachedField = fmt.Sprintf("%s_%s_%d", function.AnalyticPrefix, f.Name, f.FuncId)
+				f.Cached = true
+				analyticFuncs = append(analyticFuncs, &ast.Call{
+					Name:        f.Name,
+					FuncId:      f.FuncId,
+					FuncType:    f.FuncType,
+					Args:        f.Args,
+					CachedField: f.CachedField,
+					Partition:   f.Partition,
+				})
+			}
+		}
+		return true
+	})
+	if walkErr != nil {
+		return nil, nil, walkErr
+	}
+	// walk sources at last to let them run firstly
+	// because other clause may depend on the alias defined here
+	ast.WalkFunc(s.Fields, func(n ast.Node) bool {
+		switch f := n.(type) {
 		case *ast.Call:
 			if function.IsAnalyticFunc(f.Name) {
 				f.CachedField = fmt.Sprintf("%s_%s_%d", function.AnalyticPrefix, f.Name, f.FuncId)

+ 19 - 9
internal/topo/planner/planner_test.go

@@ -1102,7 +1102,7 @@ func Test_createLogicalPlan(t *testing.T) {
 				sendMeta:    false,
 			}.Init(),
 		}, { // 13 analytic function plan
-			sql: `SELECT lag(name), id1 FROM src1 WHERE lag(temp) > temp`,
+			sql: `SELECT latest(lag(name)), id1 FROM src1 WHERE lag(temp) > temp`,
 			p: ProjectPlan{
 				baseLogicalPlan: baseLogicalPlan{
 					children: []LogicalPlan{
@@ -1135,16 +1135,20 @@ func Test_createLogicalPlan(t *testing.T) {
 										},
 										funcs: []*ast.Call{
 											{
-												Name: "lag", FuncId: 0, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}},
-											}, {
 												Name:        "lag",
-												FuncId:      1,
-												CachedField: "$$a_lag_1",
+												FuncId:      2,
+												CachedField: "$$a_lag_2",
 												Args: []ast.Expr{&ast.FieldRef{
 													Name:       "temp",
 													StreamName: "src1",
 												}},
 											},
+											{
+												Name: "latest", FuncId: 1, CachedField: "$$a_latest_1", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}},
+											},
+											{
+												Name: "lag", FuncId: 0, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}},
+											},
 										},
 									}.Init(),
 								},
@@ -1152,12 +1156,12 @@ func Test_createLogicalPlan(t *testing.T) {
 							condition: &ast.BinaryExpr{
 								LHS: &ast.Call{
 									Name:   "lag",
-									FuncId: 1,
+									FuncId: 2,
 									Args: []ast.Expr{&ast.FieldRef{
 										Name:       "temp",
 										StreamName: "src1",
 									}},
-									CachedField: "$$a_lag_1",
+									CachedField: "$$a_lag_2",
 									Cached:      true,
 								},
 								OP: ast.GT,
@@ -1171,8 +1175,14 @@ func Test_createLogicalPlan(t *testing.T) {
 				},
 				fields: []ast.Field{
 					{
-						Expr: &ast.Call{Name: "lag", FuncId: 0, FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}, CachedField: "$$a_lag_0", Cached: true},
-						Name: "lag",
+						Expr: &ast.Call{
+							Name:        "latest",
+							FuncId:      1,
+							Args:        []ast.Expr{&ast.Call{Name: "lag", FuncId: 0, Cached: true, CachedField: "$$a_lag_0", FuncType: ast.FuncTypeScalar, Args: []ast.Expr{&ast.FieldRef{Name: "name", StreamName: "src1"}}}},
+							CachedField: "$$a_latest_1",
+							Cached:      true,
+						},
+						Name: "latest",
 					}, {
 						Expr: &ast.FieldRef{Name: "id1", StreamName: "src1"},
 						Name: "id1",