Bläddra i källkod

fix: wildcard contains alias (#2060)

* fix wildcard

Signed-off-by: Rui-Gan <1171530954@qq.com>

* fix lint

Signed-off-by: Rui-Gan <1171530954@qq.com>

---------

Signed-off-by: Rui-Gan <1171530954@qq.com>
Regina 1 år sedan
förälder
incheckning
9ddd7ba1ad
3 ändrade filer med 24 tillägg och 14 borttagningar
  1. 6 2
      internal/xsql/collection.go
  2. 12 6
      internal/xsql/row.go
  3. 6 6
      internal/xsql/row_test.go

+ 6 - 2
internal/xsql/collection.go

@@ -223,7 +223,11 @@ func (w *WindowTuples) Meta(key, table string) (interface{}, bool) {
 }
 }
 
 
 func (w *WindowTuples) All(_ string) (map[string]interface{}, bool) {
 func (w *WindowTuples) All(_ string) (map[string]interface{}, bool) {
-	return w.ToMap(), true
+	if len(w.Content) == 0 {
+		m := make(map[string]interface{})
+		return m, true
+	}
+	return w.Content[0].All("")
 }
 }
 
 
 func (w *WindowTuples) ToMap() map[string]interface{} {
 func (w *WindowTuples) ToMap() map[string]interface{} {
@@ -363,7 +367,7 @@ func (s *JoinTuples) Meta(key, table string) (interface{}, bool) {
 }
 }
 
 
 func (s *JoinTuples) All(_ string) (map[string]interface{}, bool) {
 func (s *JoinTuples) All(_ string) (map[string]interface{}, bool) {
-	return s.ToMap(), true
+	return s.Content[0].All("")
 }
 }
 
 
 func (s *JoinTuples) ToMap() map[string]interface{} {
 func (s *JoinTuples) ToMap() map[string]interface{} {

+ 12 - 6
internal/xsql/row.go

@@ -358,7 +358,7 @@ func (t *Tuple) Value(key, table string) (interface{}, bool) {
 }
 }
 
 
 func (t *Tuple) All(string) (map[string]interface{}, bool) {
 func (t *Tuple) All(string) (map[string]interface{}, bool) {
-	return t.ToMap(), true
+	return t.Message, true
 }
 }
 
 
 func (t *Tuple) Clone() CloneAbleRow {
 func (t *Tuple) Clone() CloneAbleRow {
@@ -519,13 +519,19 @@ func (jt *JoinTuple) All(stream string) (map[string]interface{}, bool) {
 	if stream != "" {
 	if stream != "" {
 		for _, t := range jt.Tuples {
 		for _, t := range jt.Tuples {
 			if t.GetEmitter() == stream {
 			if t.GetEmitter() == stream {
-				return t.ToMap(), true
+				return t.All("")
 			}
 			}
 		}
 		}
-	} else {
-		return jt.ToMap(), true
 	}
 	}
-	return nil, false
+	result := make(map[string]interface{})
+	for _, t := range jt.Tuples {
+		if m, ok := t.All(""); ok {
+			for k, v := range m {
+				result[k] = v
+			}
+		}
+	}
+	return result, true
 }
 }
 
 
 func (jt *JoinTuple) Clone() CloneAbleRow {
 func (jt *JoinTuple) Clone() CloneAbleRow {
@@ -598,7 +604,7 @@ func (s *GroupedTuples) Meta(key, table string) (interface{}, bool) {
 }
 }
 
 
 func (s *GroupedTuples) All(_ string) (map[string]interface{}, bool) {
 func (s *GroupedTuples) All(_ string) (map[string]interface{}, bool) {
-	return s.ToMap(), true
+	return s.Content[0].All("")
 }
 }
 
 
 func (s *GroupedTuples) ToMap() map[string]interface{} {
 func (s *GroupedTuples) ToMap() map[string]interface{} {

+ 6 - 6
internal/xsql/row_test.go

@@ -42,7 +42,7 @@ func TestCollectionRow(t *testing.T) {
 			rowC:     &Tuple{Emitter: "a", Message: map[string]interface{}{"a": 1, "b": "2"}, AffiliateRow: AffiliateRow{CalCols: map[string]interface{}{"a": 4, "c": 3}, AliasMap: map[string]interface{}{"b": "b1"}}},
 			rowC:     &Tuple{Emitter: "a", Message: map[string]interface{}{"a": 1, "b": "2"}, AffiliateRow: AffiliateRow{CalCols: map[string]interface{}{"a": 4, "c": 3}, AliasMap: map[string]interface{}{"b": "b1"}}},
 			value:    []string{"a", "b", "c"},
 			value:    []string{"a", "b", "c"},
 			wildcard: []string{""},
 			wildcard: []string{""},
-			result:   []interface{}{4, "b1", 3, map[string]interface{}{"a": 4, "b": "b1", "c": 3}},
+			result:   []interface{}{4, "b1", 3, map[string]interface{}{"a": 1, "b": "2"}},
 		}, {
 		}, {
 			rowC: &JoinTuple{Tuples: []TupleRow{
 			rowC: &JoinTuple{Tuples: []TupleRow{
 				&Tuple{Emitter: "src1", Message: Message{"a": 1, "b": "v1"}},
 				&Tuple{Emitter: "src1", Message: Message{"a": 1, "b": "v1"}},
@@ -50,7 +50,7 @@ func TestCollectionRow(t *testing.T) {
 			}},
 			}},
 			value:    []string{"a", "src2.a", "b", "c"},
 			value:    []string{"a", "src2.a", "b", "c"},
 			wildcard: []string{"", "src1"},
 			wildcard: []string{"", "src1"},
-			result:   []interface{}{1, 2, "v1", "w2", map[string]interface{}{"a": 1, "b": "v1", "c": "w2"}, map[string]interface{}{"a": 1, "b": "v1"}},
+			result:   []interface{}{1, 2, "v1", "w2", map[string]interface{}{"a": 2, "b": "v1", "c": "w2"}, map[string]interface{}{"a": 1, "b": "v1"}},
 		}, {
 		}, {
 			rowC: &JoinTuple{Tuples: []TupleRow{
 			rowC: &JoinTuple{Tuples: []TupleRow{
 				&Tuple{Emitter: "src1", Message: Message{"a": 1, "b": "v1"}},
 				&Tuple{Emitter: "src1", Message: Message{"a": 1, "b": "v1"}},
@@ -58,7 +58,7 @@ func TestCollectionRow(t *testing.T) {
 			}, AffiliateRow: AffiliateRow{CalCols: map[string]interface{}{"a": 4, "d": 3}, AliasMap: map[string]interface{}{"d": 4}}},
 			}, AffiliateRow: AffiliateRow{CalCols: map[string]interface{}{"a": 4, "d": 3}, AliasMap: map[string]interface{}{"d": 4}}},
 			value:    []string{"a", "src2.a", "b", "c", "d"},
 			value:    []string{"a", "src2.a", "b", "c", "d"},
 			wildcard: []string{"", "src1"},
 			wildcard: []string{"", "src1"},
-			result:   []interface{}{4, 2, "v1", "w2", 4, map[string]interface{}{"a": 4, "b": "v1", "c": "w2", "d": 4}, map[string]interface{}{"a": 1, "b": "v1"}},
+			result:   []interface{}{4, 2, "v1", "w2", 4, map[string]interface{}{"a": 2, "b": "v1", "c": "w2"}, map[string]interface{}{"a": 1, "b": "v1"}},
 		}, {
 		}, {
 			rowC:     &GroupedTuples{Content: []TupleRow{&Tuple{Emitter: "src1", Message: Message{"a": 1, "b": "v1"}}, &Tuple{Emitter: "src1", Message: Message{"a": 2, "b": "v2"}}}},
 			rowC:     &GroupedTuples{Content: []TupleRow{&Tuple{Emitter: "src1", Message: Message{"a": 1, "b": "v1"}}, &Tuple{Emitter: "src1", Message: Message{"a": 2, "b": "v2"}}}},
 			value:    []string{"a", "b"},
 			value:    []string{"a", "b"},
@@ -68,12 +68,12 @@ func TestCollectionRow(t *testing.T) {
 			rowC:     &GroupedTuples{Content: []TupleRow{&Tuple{Emitter: "src1", Message: Message{"a": 1, "b": "v1"}}, &Tuple{Emitter: "src1", Message: Message{"a": 2, "b": "v2"}}}, AffiliateRow: AffiliateRow{CalCols: map[string]interface{}{"a": 4, "d": 3}, AliasMap: map[string]interface{}{"d": 4}}},
 			rowC:     &GroupedTuples{Content: []TupleRow{&Tuple{Emitter: "src1", Message: Message{"a": 1, "b": "v1"}}, &Tuple{Emitter: "src1", Message: Message{"a": 2, "b": "v2"}}}, AffiliateRow: AffiliateRow{CalCols: map[string]interface{}{"a": 4, "d": 3}, AliasMap: map[string]interface{}{"d": 4}}},
 			value:    []string{"a", "b", "d"},
 			value:    []string{"a", "b", "d"},
 			wildcard: []string{""},
 			wildcard: []string{""},
-			result:   []interface{}{4, "v1", 4, map[string]interface{}{"a": 4, "b": "v1", "d": 4}},
+			result:   []interface{}{4, "v1", 4, map[string]interface{}{"a": 1, "b": "v1"}},
 		}, {
 		}, {
 			rowC:     &WindowTuples{Content: []TupleRow{&Tuple{Emitter: "src1", Message: Message{"a": 1, "b": "v1"}}, &Tuple{Emitter: "src1", Message: Message{"a": 2, "b": "v2"}}}, AffiliateRow: AffiliateRow{CalCols: map[string]interface{}{"a": 4, "d": 3}, AliasMap: map[string]interface{}{"d": 4}}},
 			rowC:     &WindowTuples{Content: []TupleRow{&Tuple{Emitter: "src1", Message: Message{"a": 1, "b": "v1"}}, &Tuple{Emitter: "src1", Message: Message{"a": 2, "b": "v2"}}}, AffiliateRow: AffiliateRow{CalCols: map[string]interface{}{"a": 4, "d": 3}, AliasMap: map[string]interface{}{"d": 4}}},
 			value:    []string{"a", "b", "d"},
 			value:    []string{"a", "b", "d"},
 			wildcard: []string{""},
 			wildcard: []string{""},
-			result:   []interface{}{4, "v1", 4, map[string]interface{}{"a": 4, "b": "v1", "d": 4}},
+			result:   []interface{}{4, "v1", 4, map[string]interface{}{"a": 1, "b": "v1"}},
 		}, {
 		}, {
 			rowC: &JoinTuples{Content: []*JoinTuple{{Tuples: []TupleRow{
 			rowC: &JoinTuples{Content: []*JoinTuple{{Tuples: []TupleRow{
 				&Tuple{Emitter: "src1", Message: Message{"a": 1, "b": "v1"}, AffiliateRow: AffiliateRow{CalCols: map[string]interface{}{"b": "v2", "$$lag_a": 1}}},
 				&Tuple{Emitter: "src1", Message: Message{"a": 1, "b": "v1"}, AffiliateRow: AffiliateRow{CalCols: map[string]interface{}{"b": "v2", "$$lag_a": 1}}},
@@ -81,7 +81,7 @@ func TestCollectionRow(t *testing.T) {
 			}}}, AffiliateRow: AffiliateRow{CalCols: map[string]interface{}{"a": 4, "d": 3}, AliasMap: map[string]interface{}{"d": 4}}},
 			}}}, AffiliateRow: AffiliateRow{CalCols: map[string]interface{}{"a": 4, "d": 3}, AliasMap: map[string]interface{}{"d": 4}}},
 			value:    []string{"a", "b", "d"},
 			value:    []string{"a", "b", "d"},
 			wildcard: []string{""},
 			wildcard: []string{""},
-			result:   []interface{}{4, "v2", 4, map[string]interface{}{"a": 4, "b": "v2", "c": "w2", "d": 4}},
+			result:   []interface{}{4, "v2", 4, map[string]interface{}{"a": 2, "b": "v1", "c": "w2"}},
 		},
 		},
 	}
 	}
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))