Quellcode durchsuchen

feat(row): let affiliate row become thread safe (#1429)

Signed-off-by: Jiyong Huang <huangjy@emqx.io>

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
ngjaying vor 2 Jahren
Ursprung
Commit
cf625179c0
2 geänderte Dateien mit 41 neuen und 30 gelöschten Zeilen
  1. 36 25
      internal/xsql/row.go
  2. 5 5
      internal/xsql/row_test.go

+ 36 - 25
internal/xsql/row.go

@@ -18,6 +18,7 @@ import (
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"strings"
 	"strings"
+	"sync"
 )
 )
 
 
 // The original message map may be big. Make sure it is immutable so that never make a copy of it.
 // The original message map may be big. Make sure it is immutable so that never make a copy of it.
@@ -74,11 +75,34 @@ type CollectionRow interface {
 
 
 // AffiliateRow part of other row types do help calculation of newly added cols
 // AffiliateRow part of other row types do help calculation of newly added cols
 type AffiliateRow struct {
 type AffiliateRow struct {
-	CalCols map[string]interface{} // mutable and must be cloned when broadcast
-	Alias
+	lock     sync.RWMutex
+	CalCols  map[string]interface{} // mutable and must be cloned when broadcast
+	AliasMap map[string]interface{}
+}
+
+func (d *AffiliateRow) AppendAlias(key string, value interface{}) bool {
+	d.lock.Lock()
+	defer d.lock.Unlock()
+	if d.AliasMap == nil {
+		d.AliasMap = make(map[string]interface{})
+	}
+	d.AliasMap[key] = value
+	return true
+}
+
+func (d *AffiliateRow) AliasValue(key string) (interface{}, bool) {
+	d.lock.RLock()
+	defer d.lock.RUnlock()
+	if d.AliasMap == nil {
+		return nil, false
+	}
+	v, ok := d.AliasMap[key]
+	return v, ok
 }
 }
 
 
 func (d *AffiliateRow) Value(key, table string) (interface{}, bool) {
 func (d *AffiliateRow) Value(key, table string) (interface{}, bool) {
+	d.lock.RLock()
+	defer d.lock.RUnlock()
 	if table == "" {
 	if table == "" {
 		r, ok := d.AliasValue(key)
 		r, ok := d.AliasValue(key)
 		if ok {
 		if ok {
@@ -93,6 +117,8 @@ func (d *AffiliateRow) Value(key, table string) (interface{}, bool) {
 }
 }
 
 
 func (d *AffiliateRow) Set(col string, value interface{}) {
 func (d *AffiliateRow) Set(col string, value interface{}) {
+	d.lock.Lock()
+	defer d.lock.Unlock()
 	if d.CalCols == nil {
 	if d.CalCols == nil {
 		d.CalCols = make(map[string]interface{})
 		d.CalCols = make(map[string]interface{})
 	}
 	}
@@ -100,6 +126,8 @@ func (d *AffiliateRow) Set(col string, value interface{}) {
 }
 }
 
 
 func (d *AffiliateRow) Clone() AffiliateRow {
 func (d *AffiliateRow) Clone() AffiliateRow {
+	d.lock.RLock()
+	defer d.lock.RUnlock()
 	nd := &AffiliateRow{}
 	nd := &AffiliateRow{}
 	if d.CalCols != nil && len(d.CalCols) > 0 {
 	if d.CalCols != nil && len(d.CalCols) > 0 {
 		nd.CalCols = make(map[string]interface{}, len(d.CalCols))
 		nd.CalCols = make(map[string]interface{}, len(d.CalCols))
@@ -117,10 +145,14 @@ func (d *AffiliateRow) Clone() AffiliateRow {
 }
 }
 
 
 func (d *AffiliateRow) IsEmpty() bool {
 func (d *AffiliateRow) IsEmpty() bool {
+	d.lock.RLock()
+	defer d.lock.RUnlock()
 	return len(d.CalCols) == 0 && len(d.AliasMap) == 0
 	return len(d.CalCols) == 0 && len(d.AliasMap) == 0
 }
 }
 
 
 func (d *AffiliateRow) MergeMap(cachedMap map[string]interface{}) {
 func (d *AffiliateRow) MergeMap(cachedMap map[string]interface{}) {
+	d.lock.RLock()
+	defer d.lock.RUnlock()
 	for k, v := range d.CalCols {
 	for k, v := range d.CalCols {
 		cachedMap[k] = v
 		cachedMap[k] = v
 	}
 	}
@@ -129,12 +161,9 @@ func (d *AffiliateRow) MergeMap(cachedMap map[string]interface{}) {
 	}
 	}
 }
 }
 
 
-func (d *AffiliateRow) Reset() {
-	d.CalCols = nil
-	d.AliasMap = nil
-}
-
 func (d *AffiliateRow) Pick(cols [][]string) [][]string {
 func (d *AffiliateRow) Pick(cols [][]string) [][]string {
+	d.lock.Lock()
+	defer d.lock.Unlock()
 	if len(cols) > 0 {
 	if len(cols) > 0 {
 		newAliasMap := make(map[string]interface{})
 		newAliasMap := make(map[string]interface{})
 		newCalCols := make(map[string]interface{})
 		newCalCols := make(map[string]interface{})
@@ -284,24 +313,6 @@ func (m Metadata) Meta(key, table string) (interface{}, bool) {
 	return msg.Meta(key, table)
 	return msg.Meta(key, table)
 }
 }
 
 
-// Alias implementation
-
-func (a *Alias) AppendAlias(key string, value interface{}) bool {
-	if a.AliasMap == nil {
-		a.AliasMap = make(map[string]interface{})
-	}
-	a.AliasMap[key] = value
-	return true
-}
-
-func (a *Alias) AliasValue(key string) (interface{}, bool) {
-	if a.AliasMap == nil {
-		return nil, false
-	}
-	v, ok := a.AliasMap[key]
-	return v, ok
-}
-
 // Tuple implementation
 // Tuple implementation
 
 
 func (t *Tuple) Value(key, table string) (interface{}, bool) {
 func (t *Tuple) Value(key, table string) (interface{}, bool) {

+ 5 - 5
internal/xsql/row_test.go

@@ -38,7 +38,7 @@ func TestCollectionRow(t *testing.T) {
 			wildcard: []string{""},
 			wildcard: []string{""},
 			result:   []interface{}{1, "2", Message{"a": 1, "b": "2"}},
 			result:   []interface{}{1, "2", Message{"a": 1, "b": "2"}},
 		}, {
 		}, {
-			rowC:     &Tuple{Emitter: "a", Message: map[string]interface{}{"a": 1, "b": "2"}, AffiliateRow: AffiliateRow{CalCols: map[string]interface{}{"a": 4, "c": 3}, Alias: Alias{AliasMap: map[string]interface{}{"b": "b1"}}}},
+			rowC:     &Tuple{Emitter: "a", Message: map[string]interface{}{"a": 1, "b": "2"}, AffiliateRow: AffiliateRow{CalCols: map[string]interface{}{"a": 4, "c": 3}, AliasMap: map[string]interface{}{"b": "b1"}}},
 			value:    []string{"a", "b", "c"},
 			value:    []string{"a", "b", "c"},
 			wildcard: []string{""},
 			wildcard: []string{""},
 			result:   []interface{}{4, "b1", 3, Message{"a": 4, "b": "b1", "c": 3}},
 			result:   []interface{}{4, "b1", 3, Message{"a": 4, "b": "b1", "c": 3}},
@@ -54,7 +54,7 @@ func TestCollectionRow(t *testing.T) {
 			rowC: &JoinTuple{Tuples: []TupleRow{
 			rowC: &JoinTuple{Tuples: []TupleRow{
 				&Tuple{Emitter: "src1", Message: Message{"a": 1, "b": "v1"}},
 				&Tuple{Emitter: "src1", Message: Message{"a": 1, "b": "v1"}},
 				&Tuple{Emitter: "src2", Message: Message{"a": 2, "c": "w2"}},
 				&Tuple{Emitter: "src2", Message: Message{"a": 2, "c": "w2"}},
-			}, AffiliateRow: AffiliateRow{CalCols: map[string]interface{}{"a": 4, "d": 3}, Alias: Alias{AliasMap: map[string]interface{}{"d": 4}}}},
+			}, AffiliateRow: AffiliateRow{CalCols: map[string]interface{}{"a": 4, "d": 3}, AliasMap: map[string]interface{}{"d": 4}}},
 			value:    []string{"a", "src2.a", "b", "c", "d"},
 			value:    []string{"a", "src2.a", "b", "c", "d"},
 			wildcard: []string{"", "src1"},
 			wildcard: []string{"", "src1"},
 			result:   []interface{}{4, 2, "v1", "w2", 4, Message{"a": 4, "b": "v1", "c": "w2", "d": 4}, Message{"a": 1, "b": "v1"}},
 			result:   []interface{}{4, 2, "v1", "w2", 4, Message{"a": 4, "b": "v1", "c": "w2", "d": 4}, Message{"a": 1, "b": "v1"}},
@@ -64,12 +64,12 @@ func TestCollectionRow(t *testing.T) {
 			wildcard: []string{""},
 			wildcard: []string{""},
 			result:   []interface{}{1, "v1", Message{"a": 1, "b": "v1"}},
 			result:   []interface{}{1, "v1", Message{"a": 1, "b": "v1"}},
 		}, {
 		}, {
-			rowC:     &GroupedTuples{Content: []TupleRow{&Tuple{Emitter: "src1", Message: Message{"a": 1, "b": "v1"}}, &Tuple{Emitter: "src1", Message: Message{"a": 2, "b": "v2"}}}, AffiliateRow: AffiliateRow{CalCols: map[string]interface{}{"a": 4, "d": 3}, Alias: Alias{AliasMap: map[string]interface{}{"d": 4}}}},
+			rowC:     &GroupedTuples{Content: []TupleRow{&Tuple{Emitter: "src1", Message: Message{"a": 1, "b": "v1"}}, &Tuple{Emitter: "src1", Message: Message{"a": 2, "b": "v2"}}}, AffiliateRow: AffiliateRow{CalCols: map[string]interface{}{"a": 4, "d": 3}, AliasMap: map[string]interface{}{"d": 4}}},
 			value:    []string{"a", "b", "d"},
 			value:    []string{"a", "b", "d"},
 			wildcard: []string{""},
 			wildcard: []string{""},
 			result:   []interface{}{4, "v1", 4, Message{"a": 4, "b": "v1", "d": 4}},
 			result:   []interface{}{4, "v1", 4, Message{"a": 4, "b": "v1", "d": 4}},
 		}, {
 		}, {
-			rowC:     &WindowTuples{Content: []TupleRow{&Tuple{Emitter: "src1", Message: Message{"a": 1, "b": "v1"}}, &Tuple{Emitter: "src1", Message: Message{"a": 2, "b": "v2"}}}, AffiliateRow: AffiliateRow{CalCols: map[string]interface{}{"a": 4, "d": 3}, Alias: Alias{AliasMap: map[string]interface{}{"d": 4}}}},
+			rowC:     &WindowTuples{Content: []TupleRow{&Tuple{Emitter: "src1", Message: Message{"a": 1, "b": "v1"}}, &Tuple{Emitter: "src1", Message: Message{"a": 2, "b": "v2"}}}, AffiliateRow: AffiliateRow{CalCols: map[string]interface{}{"a": 4, "d": 3}, AliasMap: map[string]interface{}{"d": 4}}},
 			value:    []string{"a", "b", "d"},
 			value:    []string{"a", "b", "d"},
 			wildcard: []string{""},
 			wildcard: []string{""},
 			result:   []interface{}{4, "v1", 4, Message{"a": 4, "b": "v1", "d": 4}},
 			result:   []interface{}{4, "v1", 4, Message{"a": 4, "b": "v1", "d": 4}},
@@ -77,7 +77,7 @@ func TestCollectionRow(t *testing.T) {
 			rowC: &JoinTuples{Content: []*JoinTuple{{Tuples: []TupleRow{
 			rowC: &JoinTuples{Content: []*JoinTuple{{Tuples: []TupleRow{
 				&Tuple{Emitter: "src1", Message: Message{"a": 1, "b": "v1"}},
 				&Tuple{Emitter: "src1", Message: Message{"a": 1, "b": "v1"}},
 				&Tuple{Emitter: "src2", Message: Message{"a": 2, "c": "w2"}},
 				&Tuple{Emitter: "src2", Message: Message{"a": 2, "c": "w2"}},
-			}}}, AffiliateRow: AffiliateRow{CalCols: map[string]interface{}{"a": 4, "d": 3}, Alias: Alias{AliasMap: map[string]interface{}{"d": 4}}}},
+			}}}, AffiliateRow: AffiliateRow{CalCols: map[string]interface{}{"a": 4, "d": 3}, AliasMap: map[string]interface{}{"d": 4}}},
 			value:    []string{"a", "b", "d"},
 			value:    []string{"a", "b", "d"},
 			wildcard: []string{""},
 			wildcard: []string{""},
 			result:   []interface{}{4, "v1", 4, Message{"a": 4, "b": "v1", "c": "w2", "d": 4}},
 			result:   []interface{}{4, "v1", 4, Message{"a": 4, "b": "v1", "c": "w2", "d": 4}},