Przeglądaj źródła

fix(sql): should not change message map to avoid concurrent read/write (#1407)

Signed-off-by: Jiyong Huang <huangjy@emqx.io>

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
ngjaying 2 lat temu
rodzic
commit
3cf8726628
1 zmienionych plików z 4 dodań i 9 usunięć
  1. 4 9
      internal/xsql/row.go

+ 4 - 9
internal/xsql/row.go

@@ -18,7 +18,6 @@ import (
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"strings"
-	"sync"
 )
 
 // The original message map may be big. Make sure it is immutable so that never make a copy of it.
@@ -192,7 +191,6 @@ type Tuple struct {
 	Metadata  Metadata // immutable
 
 	AffiliateRow
-	lock sync.Mutex
 
 	cachedMap map[string]interface{} // clone of the row and cached for performance
 }
@@ -330,8 +328,6 @@ func (t *Tuple) Clone() TupleRow {
 
 // ToMap should only use in sink.
 func (t *Tuple) ToMap() map[string]interface{} {
-	t.lock.Lock()
-	defer t.lock.Unlock()
 	if t.AffiliateRow.IsEmpty() {
 		return t.Message
 	}
@@ -341,7 +337,6 @@ func (t *Tuple) ToMap() map[string]interface{} {
 			m[k] = v
 		}
 		t.cachedMap = m
-		t.Message = t.cachedMap
 	}
 	t.AffiliateRow.MergeMap(t.cachedMap)
 	return t.cachedMap
@@ -377,18 +372,18 @@ func (t *Tuple) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[str
 	}
 	if !allWildcard {
 		if len(cols) > 0 {
-			t.cachedMap = make(map[string]interface{})
+			pickedMap := make(map[string]interface{})
 			for _, colTab := range cols {
 				if colTab[1] == "" || colTab[1] == string(ast.DefaultStream) || colTab[1] == t.Emitter {
 					if v, ok := t.Message.Value(colTab[0], colTab[1]); ok {
-						t.cachedMap[colTab[0]] = v
+						pickedMap[colTab[0]] = v
 					}
 				}
 			}
-			t.Message = t.cachedMap
+			t.Message = pickedMap
 		} else {
 			t.Message = make(map[string]interface{})
-			t.cachedMap = t.Message
+			t.cachedMap = make(map[string]interface{})
 		}
 	}
 }