Переглянути джерело

fix(tuple): group tuple content must clone before pick

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 роки тому
батько
коміт
1d6c16c465
2 змінених файлів з 47 додано та 3 видалено
  1. 40 0
      internal/topo/topotest/window_rule_test.go
  2. 7 3
      internal/xsql/row.go

+ 40 - 0
internal/topo/topotest/window_rule_test.go

@@ -659,6 +659,46 @@ func TestWindow(t *testing.T) {
 				"source_table1_0_records_in_total":  int64(4),
 				"source_table1_0_records_in_total":  int64(4),
 				"source_table1_0_records_out_total": int64(1),
 				"source_table1_0_records_out_total": int64(1),
 			},
 			},
+		}, {
+			Name: `TestWindowRule12`,
+			Sql:  `SELECT collect(size) as allSize FROM demo GROUP BY HOPPINGWINDOW(ss, 2, 1), color ORDER BY color`,
+			R: [][]map[string]interface{}{
+				{{
+					"allSize": []interface{}{float64(6)},
+				}, {
+					"allSize": []interface{}{float64(3)},
+				}},
+				{{
+					"allSize": []interface{}{float64(6), float64(2)},
+				}, {
+
+					"allSize": []interface{}{float64(3)},
+				}},
+				{{
+					"allSize": []interface{}{float64(2)},
+				}, {
+					"allSize": []interface{}{float64(4)},
+				}},
+				{{
+					"allSize": []interface{}{float64(1)},
+				}, {
+					"allSize": []interface{}{float64(4)},
+				}},
+			},
+			M: map[string]interface{}{
+				"sink_mockSink_0_exceptions_total":  int64(0),
+				"sink_mockSink_0_records_in_total":  int64(4),
+				"sink_mockSink_0_records_out_total": int64(4),
+
+				"source_demo_0_exceptions_total":  int64(0),
+				"source_demo_0_records_in_total":  int64(5),
+				"source_demo_0_records_out_total": int64(5),
+
+				"op_2_window_0_exceptions_total":   int64(0),
+				"op_2_window_0_process_latency_us": int64(0),
+				"op_2_window_0_records_in_total":   int64(5),
+				"op_2_window_0_records_out_total":  int64(4),
+			},
 		},
 		},
 	}
 	}
 	HandleStream(true, streamList, t)
 	HandleStream(true, streamList, t)

+ 7 - 3
internal/xsql/row.go

@@ -514,11 +514,13 @@ func (jt *JoinTuple) Pick(allWildcard bool, cols [][]string, wildcardEmitters ma
 	cols = jt.AffiliateRow.Pick(cols)
 	cols = jt.AffiliateRow.Pick(cols)
 	if !allWildcard {
 	if !allWildcard {
 		if len(cols) > 0 {
 		if len(cols) > 0 {
-			for _, tuple := range jt.Tuples {
+			for i, tuple := range jt.Tuples {
 				if _, ok := wildcardEmitters[tuple.GetEmitter()]; ok {
 				if _, ok := wildcardEmitters[tuple.GetEmitter()]; ok {
 					continue
 					continue
 				}
 				}
-				tuple.Pick(allWildcard, cols, wildcardEmitters)
+				nt := tuple.Clone()
+				nt.Pick(allWildcard, cols, wildcardEmitters)
+				jt.Tuples[i] = nt
 			}
 			}
 		} else {
 		} else {
 			jt.Tuples = jt.Tuples[:0]
 			jt.Tuples = jt.Tuples[:0]
@@ -582,5 +584,7 @@ func (s *GroupedTuples) Clone() CollectionRow {
 
 
 func (s *GroupedTuples) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool) {
 func (s *GroupedTuples) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool) {
 	cols = s.AffiliateRow.Pick(cols)
 	cols = s.AffiliateRow.Pick(cols)
-	s.Content[0].Pick(allWildcard, cols, wildcardEmitters)
+	sc := s.Content[0].Clone()
+	sc.Pick(allWildcard, cols, wildcardEmitters)
+	s.Content[0] = sc
 }
 }