Browse Source

feat(stream): sql processor for aggregate alias

ngjaying 4 years atrás
parent
commit
bf2a316b40
2 changed files with 16 additions and 12 deletions
  1. 1 1
      xsql/processors/xsql_processor.go
  2. 15 11
      xsql/processors/xsql_processor_test.go

+ 1 - 1
xsql/processors/xsql_processor.go

@@ -471,7 +471,7 @@ func (p *RuleProcessor) createTopoWithSources(rule *api.Rule, sources []*nodes.S
 			var ds xsql.Dimensions
 			if dimensions != nil || len(aggregateAlias) > 0 {
 				ds = dimensions.GetGroups()
-				if ds != nil && len(ds) > 0 {
+				if (ds != nil && len(ds) > 0) || len(aggregateAlias) > 0 {
 					aggregateOp := xstream.Transform(&plans.AggregatePlan{Dimensions: ds, Alias: aggregateAlias}, "aggregate", bufferLength)
 					aggregateOp.SetConcurrency(concurrency)
 					tp.AddOperator(inputs, aggregateOp)

+ 15 - 11
xsql/processors/xsql_processor_test.go

@@ -1903,10 +1903,7 @@ func TestWindow(t *testing.T) {
 				{{
 					"color": "red",
 					"ts":    float64(1541152486013),
-					"c":     2,
-				}, {
-					"color": "blue",
-					"ts":    float64(1541152486822),
+					"c":     float64(2),
 				}},
 			},
 			m: map[string]interface{}{
@@ -1917,12 +1914,12 @@ func TestWindow(t *testing.T) {
 
 				"op_project_0_exceptions_total":   int64(0),
 				"op_project_0_process_latency_ms": int64(0),
-				"op_project_0_records_in_total":   int64(2),
-				"op_project_0_records_out_total":  int64(2),
+				"op_project_0_records_in_total":   int64(1),
+				"op_project_0_records_out_total":  int64(1),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
-				"sink_mockSink_0_records_in_total":  int64(2),
-				"sink_mockSink_0_records_out_total": int64(2),
+				"sink_mockSink_0_records_in_total":  int64(1),
+				"sink_mockSink_0_records_out_total": int64(1),
 
 				"source_demo_0_exceptions_total":  int64(0),
 				"source_demo_0_records_in_total":  int64(5),
@@ -1937,6 +1934,16 @@ func TestWindow(t *testing.T) {
 				"op_filter_0_process_latency_ms": int64(0),
 				"op_filter_0_records_in_total":   int64(3),
 				"op_filter_0_records_out_total":  int64(2),
+
+				"op_aggregate_0_exceptions_total":   int64(0),
+				"op_aggregate_0_process_latency_ms": int64(0),
+				"op_aggregate_0_records_in_total":   int64(2),
+				"op_aggregate_0_records_out_total":  int64(2),
+
+				"op_having_0_exceptions_total":   int64(0),
+				"op_having_0_process_latency_ms": int64(0),
+				"op_having_0_records_in_total":   int64(2),
+				"op_having_0_records_out_total":  int64(1),
 			},
 		},
 	}
@@ -1944,9 +1951,6 @@ func TestWindow(t *testing.T) {
 	createStreams(t)
 	defer dropStreams(t)
 	for i, tt := range tests {
-		if i != 7 {
-			continue
-		}
 		test.ResetClock(1541152486000)
 		p := NewRuleProcessor(DbDir)
 		parser := xsql.NewParser(strings.NewReader(tt.sql))