Selaa lähdekoodia

fix: fix walk function (#1972)

* fix walk func

Signed-off-by: Rui-Gan <1171530954@qq.com>

* add ut

Signed-off-by: Rui-Gan <1171530954@qq.com>

---------

Signed-off-by: Rui-Gan <1171530954@qq.com>
Regina 1 vuosi sitten
vanhempi
commit
9d14920bc2
2 muutettua tiedostoa jossa 171 lisäystä ja 0 poistoa
  1. 168 0
      internal/topo/planner/planner_test.go
  2. 3 0
      pkg/ast/visitor.go

+ 168 - 0
internal/topo/planner/planner_test.go

@@ -1622,6 +1622,174 @@ func Test_createLogicalPlan(t *testing.T) {
 				sendMeta:    false,
 			}.Init(),
 		},
+		{
+			// 18 analytic function over when plan
+			sql: `SELECT CASE WHEN lag(temp) OVER (WHEN lag(id1) > 1) BETWEEN 0 AND 10 THEN 1 ELSE 0 END FROM src1`,
+			p: ProjectPlan{
+				baseLogicalPlan: baseLogicalPlan{
+					children: []LogicalPlan{
+						AnalyticFuncsPlan{
+							baseLogicalPlan: baseLogicalPlan{
+								children: []LogicalPlan{
+									DataSourcePlan{
+										name: "src1",
+										streamFields: map[string]*ast.JsonStreamField{
+											"id1": {
+												Type: "bigint",
+											},
+											"temp": {
+												Type: "bigint",
+											},
+										},
+										streamStmt: &ast.StreamStmt{
+											Name: "src1",
+											StreamFields: []ast.StreamField{
+												{
+													Name: "id1",
+													FieldType: &ast.BasicType{
+														Type: ast.DataType(1),
+													},
+												},
+												{
+													Name: "temp",
+													FieldType: &ast.BasicType{
+														Type: ast.DataType(1),
+													},
+												},
+												{
+													Name: "name",
+													FieldType: &ast.BasicType{
+														Type: ast.DataType(3),
+													},
+												},
+												{
+													Name: "myarray",
+													FieldType: &ast.ArrayType{
+														Type: ast.DataType(3),
+													},
+												},
+											},
+											Options: &ast.Options{
+												DATASOURCE: "src1",
+												KEY:        "ts",
+												FORMAT:     "json",
+											},
+											StreamType: ast.StreamType(0),
+										},
+										metaFields: []string{},
+									}.Init(),
+								},
+							},
+							funcs: []*ast.Call{
+								{
+									Name:     "lag",
+									FuncId:   0,
+									FuncType: ast.FuncType(0),
+									Args: []ast.Expr{
+										&ast.FieldRef{
+											StreamName: "src1",
+											Name:       "temp",
+										},
+									},
+									CachedField: "$$a_lag_0",
+									WhenExpr: &ast.BinaryExpr{
+										OP: ast.GT,
+										LHS: &ast.Call{
+											Name:     "lag",
+											FuncId:   1,
+											FuncType: ast.FuncType(0),
+											Args: []ast.Expr{
+												&ast.FieldRef{
+													StreamName: "src1",
+													Name:       "id1",
+												},
+											},
+											CachedField: "$$a_lag_1",
+											Cached:      true,
+										},
+										RHS: &ast.IntegerLiteral{
+											Val: 1,
+										},
+									},
+								},
+								{
+									Name:     "lag",
+									FuncId:   1,
+									FuncType: ast.FuncType(0),
+									Args: []ast.Expr{
+										&ast.FieldRef{
+											StreamName: "src1",
+											Name:       "id1",
+										},
+									},
+									CachedField: "$$a_lag_1",
+								},
+							},
+						}.Init(),
+					},
+				},
+				fields: []ast.Field{
+					{
+						Name: "kuiper_field_0",
+						Expr: &ast.CaseExpr{
+							WhenClauses: []*ast.WhenClause{
+								{
+									Expr: &ast.BinaryExpr{
+										OP: ast.BETWEEN,
+										LHS: &ast.Call{
+											Name:     "lag",
+											FuncId:   0,
+											FuncType: ast.FuncType(0),
+											Args: []ast.Expr{
+												&ast.FieldRef{
+													StreamName: "src1",
+													Name:       "temp",
+												},
+											},
+											CachedField: "$$a_lag_0",
+											Cached:      true,
+											WhenExpr: &ast.BinaryExpr{
+												OP: ast.GT,
+												LHS: &ast.Call{
+													Name:     "lag",
+													FuncId:   1,
+													FuncType: ast.FuncType(0),
+													Args: []ast.Expr{
+														&ast.FieldRef{
+															StreamName: "src1",
+															Name:       "id1",
+														},
+													},
+													CachedField: "$$a_lag_1",
+													Cached:      true,
+												},
+												RHS: &ast.IntegerLiteral{
+													Val: 1,
+												},
+											},
+										},
+										RHS: &ast.BetweenExpr{
+											Lower: &ast.IntegerLiteral{
+												Val: 0,
+											},
+											Higher: &ast.IntegerLiteral{
+												Val: 10,
+											},
+										},
+									},
+									Result: &ast.IntegerLiteral{
+										Val: 1,
+									},
+								},
+							},
+							ElseClause: &ast.IntegerLiteral{
+								Val: 0,
+							},
+						},
+					},
+				},
+			}.Init(),
+		},
 	}
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
 

+ 3 - 0
pkg/ast/visitor.go

@@ -142,6 +142,9 @@ func Walk(v Visitor, node Node) {
 
 	case *LikePattern:
 		Walk(v, n.Expr)
+
+	case *WhenClause:
+		Walk(v, n.Expr)
 	}
 }