瀏覽代碼

fix(cache): fit to graph api result

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 年之前
父節點
當前提交
5391b66504
共有 3 個文件被更改,包括 113 次插入73 次删除
  1. 18 25
      internal/topo/node/cache/sync_cache.go
  2. 49 24
      internal/topo/node/cache/sync_cache_test.go
  3. 46 24
      internal/topo/node/sink_node.go

+ 18 - 25
internal/topo/node/cache/sync_cache.go

@@ -17,7 +17,6 @@ package cache
 import (
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/pkg/store"
-	"github.com/lf-edge/ekuiper/internal/topo/checkpoint"
 	"github.com/lf-edge/ekuiper/internal/topo/node/metric"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/infra"
@@ -32,7 +31,7 @@ type AckResult bool
 // page Rotate storage for in memory cache
 // Not thread safe!
 type page struct {
-	Data []interface{}
+	Data [][]map[string]interface{}
 	H    int
 	T    int
 	L    int
@@ -43,7 +42,7 @@ type page struct {
 // TODO the page is created even not used, need dynamic?
 func newPage(size int) *page {
 	return &page{
-		Data: make([]interface{}, size),
+		Data: make([][]map[string]interface{}, size),
 		H:    0, // When deleting, head++, if tail == head, it is empty
 		T:    0, // When append, tail++, if tail== head, it is full
 		Size: size,
@@ -51,7 +50,7 @@ func newPage(size int) *page {
 }
 
 // append item if list is not full and return true; otherwise return false
-func (p *page) append(item interface{}) bool {
+func (p *page) append(item []map[string]interface{}) bool {
 	if p.L == p.Size { // full
 		return false
 	}
@@ -65,7 +64,7 @@ func (p *page) append(item interface{}) bool {
 }
 
 // peak get the first item in the cache
-func (p *page) peak() (interface{}, bool) {
+func (p *page) peak() ([]map[string]interface{}, bool) {
 	if p.L == 0 {
 		return nil, false
 	}
@@ -96,8 +95,8 @@ func (p *page) reset() {
 
 type SyncCache struct {
 	// The input data to the cache
-	in        <-chan interface{}
-	Out       chan interface{}
+	in        <-chan []map[string]interface{}
+	Out       chan []map[string]interface{}
 	Ack       chan bool
 	cacheCtrl chan interface{} // CacheCtrl is the only place to control the cache; sync in and ack result
 	errorCh   chan<- error
@@ -119,11 +118,11 @@ type SyncCache struct {
 	store kv.KeyValue
 }
 
-func NewSyncCache(ctx api.StreamContext, in <-chan interface{}, errCh chan<- error, stats metric.StatManager, cacheConf *conf.SinkConf, bufferLength int) *SyncCache {
+func NewSyncCache(ctx api.StreamContext, in <-chan []map[string]interface{}, errCh chan<- error, stats metric.StatManager, cacheConf *conf.SinkConf, bufferLength int) *SyncCache {
 	c := &SyncCache{
 		cacheConf:  cacheConf,
 		in:         in,
-		Out:        make(chan interface{}, bufferLength),
+		Out:        make(chan []map[string]interface{}, bufferLength),
 		Ack:        make(chan bool, 10),
 		cacheCtrl:  make(chan interface{}, 10),
 		errorCh:    errCh,
@@ -154,15 +153,6 @@ func (c *SyncCache) run(ctx api.StreamContext) {
 	for {
 		select {
 		case item := <-c.in:
-			// possibility of barrier, ignore if found
-			if boe, ok := item.(*checkpoint.BufferOrEvent); ok {
-				if _, ok := boe.Data.(*checkpoint.Barrier); ok {
-					c.Out <- item
-					ctx.GetLogger().Debugf("sink cache send out barrier %v", boe.Data)
-					break
-				}
-			}
-			c.stats.IncTotalRecordsIn()
 			ctx.GetLogger().Debugf("send to cache")
 			c.cacheCtrl <- item
 		case isSuccess := <-c.Ack:
@@ -181,13 +171,15 @@ func (c *SyncCache) run(ctx api.StreamContext) {
 					c.sendStatus = 2
 					ctx.GetLogger().Debug("send status to 2 after false ack")
 				}
-			default:
+			case []map[string]interface{}:
 				ctx.GetLogger().Debugf("adding cache %v", data)
-				c.addCache(ctx, data)
+				c.addCache(ctx, r)
 				if c.sendStatus == 2 {
 					c.sendStatus = 0
 					ctx.GetLogger().Debug("send status to 0 after adding cache in error state")
 				}
+			default:
+				ctx.GetLogger().Errorf("unknown cache control command %v", data)
 			}
 			c.stats.SetBufferLength(int64(len(c.in) + c.cacheLength))
 			if c.sendStatus == 0 {
@@ -221,7 +213,7 @@ func (c *SyncCache) send(ctx api.StreamContext) {
 }
 
 // addCache not thread safe!
-func (c *SyncCache) addCache(ctx api.StreamContext, item interface{}) {
+func (c *SyncCache) addCache(ctx api.StreamContext, item []map[string]interface{}) {
 	isNotFull := c.appendMemCache(item)
 	if !isNotFull {
 		if c.diskBufferPage == nil {
@@ -260,6 +252,7 @@ func (c *SyncCache) addCache(ctx api.StreamContext, item interface{}) {
 		ctx.GetLogger().Debugf("added cache to mem cache %v", c.memCache)
 	}
 	c.cacheLength++
+	ctx.GetLogger().Infof("added cache %d", c.cacheLength)
 }
 
 // deleteCache not thread safe!
@@ -284,12 +277,12 @@ func (c *SyncCache) deleteCache(ctx api.StreamContext) {
 			c.diskBufferPage = nil
 		}
 	}
-	ctx.GetLogger().Debugf("deleted cache. cacheLength: %d, diskSize: %d, memCache: %v", c.cacheLength, c.diskSize, c.memCache)
+	ctx.GetLogger().Infof("deleted cache. cacheLength: %d, diskSize: %d, memCache: %v", c.cacheLength, c.diskSize, c.memCache)
 }
 
 func (c *SyncCache) loadFromDisk(ctx api.StreamContext) {
 	// load page from the disk
-	ctx.GetLogger().Debugf("loading from disk %d. cacheLength: %d, diskSize: %d", c.diskPageTail, c.cacheLength, c.diskSize)
+	ctx.GetLogger().Infof("loading from disk %d. cacheLength: %d, diskSize: %d", c.diskPageTail, c.cacheLength, c.diskSize)
 	hotPage := newPage(c.cacheConf.BufferPageSize)
 	ok, err := c.store.Get(strconv.Itoa(c.diskPageHead), hotPage)
 	if err != nil {
@@ -320,7 +313,7 @@ func (c *SyncCache) loadFromDisk(ctx api.StreamContext) {
 	ctx.GetLogger().Debugf("loaded from disk %d. cacheLength: %d, diskSize: %d", c.diskPageTail, c.cacheLength, c.diskSize)
 }
 
-func (c *SyncCache) appendMemCache(item interface{}) bool {
+func (c *SyncCache) appendMemCache(item []map[string]interface{}) bool {
 	if len(c.memCache) > c.maxMemPage {
 		return false
 	}
@@ -338,7 +331,7 @@ func (c *SyncCache) appendMemCache(item interface{}) bool {
 	return true
 }
 
-func (c *SyncCache) peakMemCache(_ api.StreamContext) (interface{}, bool) {
+func (c *SyncCache) peakMemCache(_ api.StreamContext) ([]map[string]interface{}, bool) {
 	if len(c.memCache) == 0 {
 		return nil, false
 	}

+ 49 - 24
internal/topo/node/cache/sync_cache_test.go

@@ -34,23 +34,33 @@ func TestPage(t *testing.T) {
 	if !p.isEmpty() {
 		t.Errorf("page is not empty")
 	}
-	if !p.append(1) {
+	if !p.append([]map[string]interface{}{
+		{"a": 1},
+	}) {
 		t.Fatal("append failed")
 	}
-	if !p.append(2) {
+	if !p.append([]map[string]interface{}{
+		{"a": 2},
+	}) {
 		t.Fatal("append failed")
 	}
-	if p.append(3) {
+	if p.append([]map[string]interface{}{
+		{"a": 3},
+	}) {
 		t.Fatal("should append fail")
 	}
 	v, ok := p.peak()
 	if !ok {
 		t.Fatal("peak failed")
 	}
-	if v != 1 {
-		t.Fatalf("peak value mismatch, expect 3 but got %v", v)
+	if !reflect.DeepEqual(v, []map[string]interface{}{
+		{"a": 1},
+	}) {
+		t.Fatalf("peak value mismatch, expect 1 but got %v", v)
 	}
-	if p.append(4) {
+	if p.append([]map[string]interface{}{
+		{"a": 4},
+	}) {
 		t.Fatal("should append failed")
 	}
 	if !p.delete() {
@@ -60,11 +70,15 @@ func TestPage(t *testing.T) {
 	if !ok {
 		t.Fatal("peak failed")
 	}
-	if v != 2 {
+	if !reflect.DeepEqual(v, []map[string]interface{}{
+		{"a": 2},
+	}) {
 		t.Fatalf("peak value mismatch, expect 2 but got %v", v)
 	}
 	p.reset()
-	if !p.append(5) {
+	if !p.append([]map[string]interface{}{
+		{"a": 5},
+	}) {
 		t.Fatal("append failed")
 	}
 	if p.isEmpty() {
@@ -73,10 +87,14 @@ func TestPage(t *testing.T) {
 	if !p.delete() {
 		t.Fatal("delete failed")
 	}
-	if !p.append(5) {
+	if !p.append([]map[string]interface{}{
+		{"a": 5},
+	}) {
 		t.Fatal("append failed")
 	}
-	if !p.append(6) {
+	if !p.append([]map[string]interface{}{
+		{"a": 6},
+	}) {
 		t.Fatal("append failed")
 	}
 	if !p.delete() {
@@ -103,8 +121,8 @@ func TestPage(t *testing.T) {
 func TestRun(t *testing.T) {
 	var tests = []struct {
 		sconf   *conf.SinkConf
-		dataIn  []interface{}
-		dataOut []interface{}
+		dataIn  [][]map[string]interface{}
+		dataOut [][]map[string]interface{}
 		stopPt  int // restart the rule in this point
 	}{
 		{ // 0
@@ -116,8 +134,8 @@ func TestRun(t *testing.T) {
 				ResendInterval:       0,
 				CleanCacheAtStop:     false,
 			},
-			dataIn: []interface{}{
-				1, 2, 3, 4, 5,
+			dataIn: [][]map[string]interface{}{
+				{{"a": 1}}, {{"a": 2}}, {{"a": 3}}, {{"a": 4}}, {{"a": 5}},
 			},
 			stopPt: 4,
 		},
@@ -130,8 +148,8 @@ func TestRun(t *testing.T) {
 				ResendInterval:       0,
 				CleanCacheAtStop:     false,
 			},
-			dataIn: []interface{}{
-				1, 2, 3, 4, 5, 6,
+			dataIn: [][]map[string]interface{}{
+				{{"a": 1}}, {{"a": 2}}, {{"a": 3}}, {{"a": 4}}, {{"a": 5}}, {{"a": 6}},
 			},
 			stopPt: 5,
 		},
@@ -144,8 +162,8 @@ func TestRun(t *testing.T) {
 				ResendInterval:       0,
 				CleanCacheAtStop:     false,
 			},
-			dataIn: []interface{}{
-				1, 2, 3, 4, 5, 6,
+			dataIn: [][]map[string]interface{}{
+				{{"a": 1}}, {{"a": 2}}, {{"a": 3}}, {{"a": 4}}, {{"a": 5}}, {{"a": 6}},
 			},
 			stopPt: 4,
 		},
@@ -158,11 +176,11 @@ func TestRun(t *testing.T) {
 				ResendInterval:       0,
 				CleanCacheAtStop:     false,
 			},
-			dataIn: []interface{}{
-				1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13,
+			dataIn: [][]map[string]interface{}{
+				{{"a": 1}}, {{"a": 2}}, {{"a": 3}}, {{"a": 4}}, {{"a": 5}}, {{"a": 6}}, {{"a": 7}}, {{"a": 8}}, {{"a": 9}}, {{"a": 10}}, {{"a": 11}}, {{"a": 12}}, {{"a": 13}},
 			},
-			dataOut: []interface{}{
-				1, 6, 7, 8, 9, 10, 11, 12, 13,
+			dataOut: [][]map[string]interface{}{
+				{{"a": 1}}, {{"a": 6}}, {{"a": 7}}, {{"a": 8}}, {{"a": 9}}, {{"a": 10}}, {{"a": 11}}, {{"a": 12}}, {{"a": 13}},
 			},
 			stopPt: 4,
 		},
@@ -179,7 +197,7 @@ func TestRun(t *testing.T) {
 			t.Fatal(err)
 			return
 		}
-		in := make(chan interface{})
+		in := make(chan []map[string]interface{})
 		errCh := make(chan error)
 		var result []interface{}
 		go func() {
@@ -218,8 +236,15 @@ func TestRun(t *testing.T) {
 		if tt.dataOut == nil {
 			tt.dataOut = tt.dataIn
 		}
-		if !reflect.DeepEqual(tt.dataOut, result) {
+		if len(tt.dataOut) != len(result) {
 			t.Errorf("test %d data mismatch\nexpect\t%v\nbut got\t%v", i, tt.dataOut, result)
+			continue
+		}
+		for i, v := range result {
+			if !reflect.DeepEqual(tt.dataOut[i], v) {
+				t.Errorf("test %d data mismatch\nexpect\t%v\nbut got\t%v", i, tt.dataOut, result)
+				break
+			}
 		}
 	}
 }

+ 46 - 24
internal/topo/node/sink_node.go

@@ -189,17 +189,30 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 								// is not supported and validated in the configure, should not go here
 								return fmt.Errorf("async mode is not supported for cache sink")
 							} else { // sync mode, the ack is already in order
-								c := cache.NewSyncCache(ctx, m.input, result, stats, &sconf.SinkConf, sconf.BufferLength)
+								dataCh := make(chan []map[string]interface{}, sconf.BufferLength)
+								c := cache.NewSyncCache(ctx, dataCh, result, stats, &sconf.SinkConf, sconf.BufferLength)
 								for {
 									select {
-									case data := <-c.Out:
+									case data := <-m.input:
 										if temp, processed := m.preprocess(data); processed {
 											break
 										} else {
 											data = temp
 										}
+										stats.IncTotalRecordsIn()
+										outs := itemToMap(data)
+										if sconf.Omitempty && (data == nil || len(outs) == 0) {
+											ctx.GetLogger().Debugf("receive empty in sink")
+											return nil
+										}
+										select {
+										case dataCh <- outs:
+										case <-ctx.Done():
+										}
+									case data := <-c.Out:
+										stats.ProcessTimeStart()
 										ack := true
-										err := doCollect(ctx, sink, data, stats, sconf)
+										err := doCollectMaps(ctx, sink, sconf, data, stats)
 										// Only recoverable error should be cached
 										if err != nil {
 											if strings.HasPrefix(err.Error(), errorx.IOErr) { // do not log to prevent a lot of logs!
@@ -214,6 +227,7 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 										case c.Ack <- ack:
 										case <-ctx.Done():
 										}
+										stats.ProcessTimeEnd()
 									case <-ctx.Done():
 										logger.Infof("sink node %s instance %d done", m.name, instance)
 										if err := sink.Close(ctx); err != nil {
@@ -288,6 +302,34 @@ func (m *SinkNode) reset() {
 func doCollect(ctx api.StreamContext, sink api.Sink, item interface{}, stats metric.StatManager, sconf *SinkConf) error {
 	stats.ProcessTimeStart()
 	defer stats.ProcessTimeEnd()
+	outs := itemToMap(item)
+	if sconf.Omitempty && (item == nil || len(outs) == 0) {
+		ctx.GetLogger().Debugf("receive empty in sink")
+		return nil
+	}
+	return doCollectMaps(ctx, sink, sconf, outs, stats)
+}
+
+func doCollectMaps(ctx api.StreamContext, sink api.Sink, sconf *SinkConf, outs []map[string]interface{}, stats metric.StatManager) error {
+	if !sconf.SendSingle {
+		return doCollectData(ctx, sink, outs, stats)
+	} else {
+		var err error
+		for _, d := range outs {
+			if sconf.Omitempty && (d == nil || len(d) == 0) {
+				ctx.GetLogger().Debugf("receive empty in sink")
+				continue
+			}
+			newErr := doCollectData(ctx, sink, d, stats)
+			if newErr != nil {
+				err = newErr
+			}
+		}
+		return err
+	}
+}
+
+func itemToMap(item interface{}) []map[string]interface{} {
 	var outs []map[string]interface{}
 	switch val := item.(type) {
 	case error:
@@ -311,26 +353,7 @@ func doCollect(ctx api.StreamContext, sink api.Sink, item interface{}, stats met
 			{"error": fmt.Sprintf("result is not a map slice but found %#v", val)},
 		}
 	}
-	if sconf.Omitempty && (item == nil || len(outs) == 0) {
-		ctx.GetLogger().Debugf("receive empty in sink")
-		return nil
-	}
-	if !sconf.SendSingle {
-		return doCollectData(ctx, sink, outs, stats)
-	} else {
-		var err error
-		for _, d := range outs {
-			if sconf.Omitempty && (d == nil || len(d) == 0) {
-				ctx.GetLogger().Debugf("receive empty in sink")
-				continue
-			}
-			newErr := doCollectData(ctx, sink, d, stats)
-			if newErr != nil {
-				err = newErr
-			}
-		}
-		return err
-	}
+	return outs
 }
 
 // doCollectData outData must be map or []map
@@ -341,7 +364,6 @@ func doCollectData(ctx api.StreamContext, sink api.Sink, outData interface{}, st
 		return nil
 	default:
 		if err := sink.Collect(ctx, outData); err != nil {
-			ctx.GetLogger().Errorf("sink node %s instance %d send data error %v", ctx.GetOpId(), ctx.GetInstanceId(), err)
 			stats.IncTotalExceptions()
 			return err
 		} else {