瀏覽代碼

fix(planner): do not move predicate for sliding window

Not time window will depend on the events, so should not filter any before running window op

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 年之前
父節點
當前提交
7472aece6b
共有 2 個文件被更改,包括 78 次插入2 次删除
  1. 75 0
      internal/topo/planner/planner_test.go
  2. 3 2
      internal/topo/planner/windowPlan.go

+ 75 - 0
internal/topo/planner/planner_test.go

@@ -1450,6 +1450,81 @@ func Test_createLogicalPlan(t *testing.T) {
 				isAggregate: false,
 				sendMeta:    false,
 			}.Init(),
+		}, { // 17. do not optimize sliding window
+			sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY SLIDINGWINDOW(ss, 10) HAVING COUNT(*) > 2`,
+			p: ProjectPlan{
+				baseLogicalPlan: baseLogicalPlan{
+					children: []LogicalPlan{
+						HavingPlan{
+							baseLogicalPlan: baseLogicalPlan{
+								children: []LogicalPlan{
+									FilterPlan{
+										baseLogicalPlan: baseLogicalPlan{
+											children: []LogicalPlan{
+												WindowPlan{
+													baseLogicalPlan: baseLogicalPlan{
+														children: []LogicalPlan{
+															DataSourcePlan{
+																name:       "src1",
+																isWildCard: true,
+																streamFields: []interface{}{
+																	&ast.StreamField{
+																		Name:      "id1",
+																		FieldType: &ast.BasicType{Type: ast.BIGINT},
+																	},
+																	&ast.StreamField{
+																		Name:      "temp",
+																		FieldType: &ast.BasicType{Type: ast.BIGINT},
+																	},
+																	&ast.StreamField{
+																		Name:      "name",
+																		FieldType: &ast.BasicType{Type: ast.STRINGS},
+																	},
+																	&ast.StreamField{
+																		Name:      "myarray",
+																		FieldType: &ast.ArrayType{Type: ast.STRINGS},
+																	},
+																},
+																streamStmt: streams["src1"],
+																metaFields: []string{},
+															}.Init(),
+														},
+													},
+													condition: nil,
+													wtype:     ast.SLIDING_WINDOW,
+													length:    10000,
+													interval:  0,
+													limit:     0,
+												}.Init(),
+											},
+										},
+										condition: &ast.BinaryExpr{
+											LHS: &ast.FieldRef{Name: "temp", StreamName: "src1"},
+											OP:  ast.GT,
+											RHS: &ast.IntegerLiteral{Val: 20},
+										},
+									}.Init(),
+								},
+							},
+							condition: &ast.BinaryExpr{
+								LHS: &ast.Call{Name: "count", FuncId: 0, Args: []ast.Expr{&ast.Wildcard{
+									Token: ast.ASTERISK,
+								}}, FuncType: ast.FuncTypeAgg},
+								OP:  ast.GT,
+								RHS: &ast.IntegerLiteral{Val: 2},
+							},
+						}.Init(),
+					},
+				},
+				fields: []ast.Field{
+					{
+						Expr:  &ast.Wildcard{Token: ast.ASTERISK},
+						Name:  "*",
+						AName: ""},
+				},
+				isAggregate: false,
+				sendMeta:    false,
+			}.Init(),
 		},
 	}
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))

+ 3 - 2
internal/topo/planner/windowPlan.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -32,7 +32,8 @@ func (p WindowPlan) Init() *WindowPlan {
 }
 
 func (p *WindowPlan) PushDownPredicate(condition ast.Expr) (ast.Expr, LogicalPlan) {
-	if p.wtype == ast.COUNT_WINDOW {
+	// not time window depends on the event, so should not filter any
+	if p.wtype == ast.COUNT_WINDOW || p.wtype == ast.SLIDING_WINDOW {
 		return condition, p
 	} else if p.isEventTime {
 		// TODO event time filter, need event window op support