Переглянути джерело

fix(row): do not write out the internal fields for join tuple

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 роки тому
батько
коміт
2bdbc8d0e0

+ 21 - 27
internal/topo/operator/analyticfuncs_operator_test.go

@@ -31,7 +31,7 @@ func TestAnalyticFuncs(t *testing.T) {
 	var tests = []struct {
 		funcs  []*ast.Call
 		data   []interface{}
-		result [][]map[string]interface{}
+		result []map[string]interface{}
 	}{
 		{ // 0 Lag test
 			funcs: []*ast.Call{
@@ -85,17 +85,16 @@ func TestAnalyticFuncs(t *testing.T) {
 					},
 				},
 			},
-			result: [][]map[string]interface{}{{{
+			result: []map[string]interface{}{{
 				"$$a_lag_0": nil,
 				"$$a_lag_1": nil,
-				"a":         "a1", "b": "b1", "c": "c1",
-			}}, {{
-				"$$a_lag_0": "a1", "$$a_lag_1": "b1", "a": "a1", "b": "b2", "c": "c1",
-			}}, {{
-				"$$a_lag_0": "a1", "$$a_lag_1": "b2", "a": "a1", "c": "c1",
-			}}, {{
-				"$$a_lag_0": "a1", "$$a_lag_1": interface{}(nil), "a": "a1", "b": "b2", "c": "c2",
-			}}},
+			}, {
+				"$$a_lag_0": "a1", "$$a_lag_1": "b1",
+			}, {
+				"$$a_lag_0": "a1", "$$a_lag_1": "b2",
+			}, {
+				"$$a_lag_0": "a1", "$$a_lag_1": interface{}(nil),
+			}},
 		},
 		{ // 1 changed test
 			funcs: []*ast.Call{
@@ -157,16 +156,16 @@ func TestAnalyticFuncs(t *testing.T) {
 					},
 				},
 			},
-			result: [][]map[string]interface{}{
-				{{
-					"$$a_changed_col_0": "a1", "$$a_had_changed_0": false, "$$a_lag_1": nil, "a": "a1", "b": "b1",
-				}}, {{
-					"$$a_changed_col_0": nil, "$$a_had_changed_0": true, "$$a_lag_1": "b1", "a": "a1", "c": "c1",
-				}}, {{
-					"$$a_changed_col_0": nil, "$$a_had_changed_0": false, "$$a_lag_1": nil, "a": "a1", "c": "c1",
-				}}, {{
-					"$$a_changed_col_0": nil, "$$a_had_changed_0": true, "$$a_lag_1": nil, "a": "a1", "b": "b2", "c": "c2",
-				}},
+			result: []map[string]interface{}{
+				{
+					"$$a_changed_col_0": "a1", "$$a_had_changed_0": false, "$$a_lag_1": nil,
+				}, {
+					"$$a_changed_col_0": nil, "$$a_had_changed_0": true, "$$a_lag_1": "b1",
+				}, {
+					"$$a_changed_col_0": nil, "$$a_had_changed_0": false, "$$a_lag_1": nil,
+				}, {
+					"$$a_changed_col_0": nil, "$$a_had_changed_0": true, "$$a_lag_1": nil,
+				},
 			},
 		},
 	}
@@ -179,15 +178,10 @@ func TestAnalyticFuncs(t *testing.T) {
 
 		pp := &AnalyticFuncsOp{Funcs: tt.funcs}
 		fv, afv := xsql.NewFunctionValuersForOp(ctx)
-		r := make([][]map[string]interface{}, 0, len(tt.data))
+		r := make([]map[string]interface{}, 0, len(tt.data))
 		for _, d := range tt.data {
 			opResult := pp.Apply(ctx, d, fv, afv)
-			result, err := parseResult(opResult, false)
-			if err != nil {
-				t.Errorf("parse result error: %s", err)
-				continue
-			}
-			r = append(r, result)
+			r = append(r, opResult.(*xsql.Tuple).CalCols)
 		}
 
 		if !reflect.DeepEqual(tt.result, r) {

+ 4 - 1
internal/xsql/row.go

@@ -154,7 +154,10 @@ func (d *AffiliateRow) MergeMap(cachedMap map[string]interface{}) {
 	d.lock.RLock()
 	defer d.lock.RUnlock()
 	for k, v := range d.CalCols {
-		cachedMap[k] = v
+		// Do not write out the internal fields
+		if !strings.HasPrefix(k, "$$") {
+			cachedMap[k] = v
+		}
 	}
 	for k, v := range d.AliasMap {
 		cachedMap[k] = v

+ 2 - 2
internal/xsql/row_test.go

@@ -75,12 +75,12 @@ func TestCollectionRow(t *testing.T) {
 			result:   []interface{}{4, "v1", 4, Message{"a": 4, "b": "v1", "d": 4}},
 		}, {
 			rowC: &JoinTuples{Content: []*JoinTuple{{Tuples: []TupleRow{
-				&Tuple{Emitter: "src1", Message: Message{"a": 1, "b": "v1"}},
+				&Tuple{Emitter: "src1", Message: Message{"a": 1, "b": "v1"}, AffiliateRow: AffiliateRow{CalCols: map[string]interface{}{"b": "v2", "$$lag_a": 1}}},
 				&Tuple{Emitter: "src2", Message: Message{"a": 2, "c": "w2"}},
 			}}}, AffiliateRow: AffiliateRow{CalCols: map[string]interface{}{"a": 4, "d": 3}, AliasMap: map[string]interface{}{"d": 4}}},
 			value:    []string{"a", "b", "d"},
 			wildcard: []string{""},
-			result:   []interface{}{4, "v1", 4, Message{"a": 4, "b": "v1", "c": "w2", "d": 4}},
+			result:   []interface{}{4, "v2", 4, Message{"a": 4, "b": "v2", "c": "w2", "d": 4}},
 		},
 	}
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))