Ver código fonte

fix(valuer): allow window_start() and window_end() as parameter (#1264)

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
ngjaying 3 anos atrás
pai
commit
4af3751c20

+ 2 - 0
internal/topo/operator/project_operator.go

@@ -106,6 +106,8 @@ func (pp *ProjectOp) getVE(tuple xsql.DataValuer, agg xsql.AggregateData, fv *xs
 			wr = input.WindowRange
 		case *xsql.JoinTupleSets:
 			wr = input.WindowRange
+		case xsql.GroupedTuples:
+			wr = input.WindowRange
 		}
 		if wr != nil {
 			return &xsql.ValuerEval{Valuer: xsql.MultiValuer(tuple, &xsql.WindowRangeValuer{WindowRange: wr}, fv, &xsql.WildcardValuer{Data: tuple})}

+ 3 - 3
internal/xsql/collections.go

@@ -304,7 +304,7 @@ func (w WindowTuplesSet) AggregateEval(expr ast.Expr, v CallValuer) []interface{
 		return nil
 	}
 	for _, t := range w.Content[0].Tuples {
-		result = append(result, Eval(expr, MultiValuer(&t, v, &WildcardValuer{&t})))
+		result = append(result, Eval(expr, MultiValuer(&t, &WindowRangeValuer{WindowRange: w.WindowRange}, v, &WildcardValuer{&t})))
 	}
 	return result
 }
@@ -411,7 +411,7 @@ func (s *JoinTupleSets) Index(i int) Valuer { return &(s.Content[i]) }
 func (s *JoinTupleSets) AggregateEval(expr ast.Expr, v CallValuer) []interface{} {
 	var result []interface{}
 	for _, t := range s.Content {
-		result = append(result, Eval(expr, MultiValuer(&t, v, &WildcardValuer{&t})))
+		result = append(result, Eval(expr, MultiValuer(&t, &WindowRangeValuer{WindowRange: s.WindowRange}, v, &WildcardValuer{&t})))
 	}
 	return result
 }
@@ -424,7 +424,7 @@ type GroupedTuples struct {
 func (s GroupedTuples) AggregateEval(expr ast.Expr, v CallValuer) []interface{} {
 	var result []interface{}
 	for _, t := range s.Content {
-		result = append(result, Eval(expr, MultiValuer(t, v, &WildcardValuer{t})))
+		result = append(result, Eval(expr, MultiValuer(t, &WindowRangeValuer{WindowRange: s.WindowRange}, v, &WildcardValuer{t})))
 	}
 	return result
 }