|
@@ -1622,6 +1622,174 @@ func Test_createLogicalPlan(t *testing.T) {
|
|
|
sendMeta: false,
|
|
|
}.Init(),
|
|
|
},
|
|
|
+ {
|
|
|
+ // 18 analytic function over when plan
|
|
|
+ sql: `SELECT CASE WHEN lag(temp) OVER (WHEN lag(id1) > 1) BETWEEN 0 AND 10 THEN 1 ELSE 0 END FROM src1`,
|
|
|
+ p: ProjectPlan{
|
|
|
+ baseLogicalPlan: baseLogicalPlan{
|
|
|
+ children: []LogicalPlan{
|
|
|
+ AnalyticFuncsPlan{
|
|
|
+ baseLogicalPlan: baseLogicalPlan{
|
|
|
+ children: []LogicalPlan{
|
|
|
+ DataSourcePlan{
|
|
|
+ name: "src1",
|
|
|
+ streamFields: map[string]*ast.JsonStreamField{
|
|
|
+ "id1": {
|
|
|
+ Type: "bigint",
|
|
|
+ },
|
|
|
+ "temp": {
|
|
|
+ Type: "bigint",
|
|
|
+ },
|
|
|
+ },
|
|
|
+ streamStmt: &ast.StreamStmt{
|
|
|
+ Name: "src1",
|
|
|
+ StreamFields: []ast.StreamField{
|
|
|
+ {
|
|
|
+ Name: "id1",
|
|
|
+ FieldType: &ast.BasicType{
|
|
|
+ Type: ast.DataType(1),
|
|
|
+ },
|
|
|
+ },
|
|
|
+ {
|
|
|
+ Name: "temp",
|
|
|
+ FieldType: &ast.BasicType{
|
|
|
+ Type: ast.DataType(1),
|
|
|
+ },
|
|
|
+ },
|
|
|
+ {
|
|
|
+ Name: "name",
|
|
|
+ FieldType: &ast.BasicType{
|
|
|
+ Type: ast.DataType(3),
|
|
|
+ },
|
|
|
+ },
|
|
|
+ {
|
|
|
+ Name: "myarray",
|
|
|
+ FieldType: &ast.ArrayType{
|
|
|
+ Type: ast.DataType(3),
|
|
|
+ },
|
|
|
+ },
|
|
|
+ },
|
|
|
+ Options: &ast.Options{
|
|
|
+ DATASOURCE: "src1",
|
|
|
+ KEY: "ts",
|
|
|
+ FORMAT: "json",
|
|
|
+ },
|
|
|
+ StreamType: ast.StreamType(0),
|
|
|
+ },
|
|
|
+ metaFields: []string{},
|
|
|
+ }.Init(),
|
|
|
+ },
|
|
|
+ },
|
|
|
+ funcs: []*ast.Call{
|
|
|
+ {
|
|
|
+ Name: "lag",
|
|
|
+ FuncId: 0,
|
|
|
+ FuncType: ast.FuncType(0),
|
|
|
+ Args: []ast.Expr{
|
|
|
+ &ast.FieldRef{
|
|
|
+ StreamName: "src1",
|
|
|
+ Name: "temp",
|
|
|
+ },
|
|
|
+ },
|
|
|
+ CachedField: "$$a_lag_0",
|
|
|
+ WhenExpr: &ast.BinaryExpr{
|
|
|
+ OP: ast.GT,
|
|
|
+ LHS: &ast.Call{
|
|
|
+ Name: "lag",
|
|
|
+ FuncId: 1,
|
|
|
+ FuncType: ast.FuncType(0),
|
|
|
+ Args: []ast.Expr{
|
|
|
+ &ast.FieldRef{
|
|
|
+ StreamName: "src1",
|
|
|
+ Name: "id1",
|
|
|
+ },
|
|
|
+ },
|
|
|
+ CachedField: "$$a_lag_1",
|
|
|
+ Cached: true,
|
|
|
+ },
|
|
|
+ RHS: &ast.IntegerLiteral{
|
|
|
+ Val: 1,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ },
|
|
|
+ {
|
|
|
+ Name: "lag",
|
|
|
+ FuncId: 1,
|
|
|
+ FuncType: ast.FuncType(0),
|
|
|
+ Args: []ast.Expr{
|
|
|
+ &ast.FieldRef{
|
|
|
+ StreamName: "src1",
|
|
|
+ Name: "id1",
|
|
|
+ },
|
|
|
+ },
|
|
|
+ CachedField: "$$a_lag_1",
|
|
|
+ },
|
|
|
+ },
|
|
|
+ }.Init(),
|
|
|
+ },
|
|
|
+ },
|
|
|
+ fields: []ast.Field{
|
|
|
+ {
|
|
|
+ Name: "kuiper_field_0",
|
|
|
+ Expr: &ast.CaseExpr{
|
|
|
+ WhenClauses: []*ast.WhenClause{
|
|
|
+ {
|
|
|
+ Expr: &ast.BinaryExpr{
|
|
|
+ OP: ast.BETWEEN,
|
|
|
+ LHS: &ast.Call{
|
|
|
+ Name: "lag",
|
|
|
+ FuncId: 0,
|
|
|
+ FuncType: ast.FuncType(0),
|
|
|
+ Args: []ast.Expr{
|
|
|
+ &ast.FieldRef{
|
|
|
+ StreamName: "src1",
|
|
|
+ Name: "temp",
|
|
|
+ },
|
|
|
+ },
|
|
|
+ CachedField: "$$a_lag_0",
|
|
|
+ Cached: true,
|
|
|
+ WhenExpr: &ast.BinaryExpr{
|
|
|
+ OP: ast.GT,
|
|
|
+ LHS: &ast.Call{
|
|
|
+ Name: "lag",
|
|
|
+ FuncId: 1,
|
|
|
+ FuncType: ast.FuncType(0),
|
|
|
+ Args: []ast.Expr{
|
|
|
+ &ast.FieldRef{
|
|
|
+ StreamName: "src1",
|
|
|
+ Name: "id1",
|
|
|
+ },
|
|
|
+ },
|
|
|
+ CachedField: "$$a_lag_1",
|
|
|
+ Cached: true,
|
|
|
+ },
|
|
|
+ RHS: &ast.IntegerLiteral{
|
|
|
+ Val: 1,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ },
|
|
|
+ RHS: &ast.BetweenExpr{
|
|
|
+ Lower: &ast.IntegerLiteral{
|
|
|
+ Val: 0,
|
|
|
+ },
|
|
|
+ Higher: &ast.IntegerLiteral{
|
|
|
+ Val: 10,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ },
|
|
|
+ Result: &ast.IntegerLiteral{
|
|
|
+ Val: 1,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ },
|
|
|
+ ElseClause: &ast.IntegerLiteral{
|
|
|
+ Val: 0,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ },
|
|
|
+ },
|
|
|
+ }.Init(),
|
|
|
+ },
|
|
|
}
|
|
|
fmt.Printf("The test bucket size is %d.\n\n", len(tests))
|
|
|
|