Pārlūkot izejas kodu

fix(cache): problem of some cases

1. The pending status now expand to 3 status: idle, waiting and error. Only allow to resend in idle status. Avoid massive error messages.
2. MemCache problem with cap/memsize. It is now a dynamic slice, so just use the length and drop those status.
3. cacheLength calculation problem when dropping items. Should subtract the real page size.
4. Add more debug logs
5. disk save and restore size/head status problem

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 gadi atpakaļ
vecāks
revīzija
fd07d789a8

+ 22 - 0
internal/pkg/store/stores.go

@@ -19,6 +19,8 @@ import (
 	"github.com/lf-edge/ekuiper/internal/pkg/store/definition"
 	"github.com/lf-edge/ekuiper/internal/pkg/store/definition"
 	"github.com/lf-edge/ekuiper/internal/pkg/store/sql"
 	"github.com/lf-edge/ekuiper/internal/pkg/store/sql"
 	"github.com/lf-edge/ekuiper/pkg/kv"
 	"github.com/lf-edge/ekuiper/pkg/kv"
+	"path"
+	"strings"
 	"sync"
 	"sync"
 )
 )
 
 
@@ -84,6 +86,18 @@ func (s *stores) DropKV(table string) {
 	}
 	}
 }
 }
 
 
+func (s *stores) DropRefKVs(tablePrefix string) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	for table, ks := range s.kv {
+		if strings.HasPrefix(table, tablePrefix) {
+			_ = ks.Drop()
+			delete(s.kv, table)
+		}
+	}
+}
+
 func (s *stores) GetTS(table string) (error, kv.Tskv) {
 func (s *stores) GetTS(table string) (error, kv.Tskv) {
 	s.mu.Lock()
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	defer s.mu.Unlock()
@@ -152,3 +166,11 @@ func DropCacheKV(table string) error {
 	cacheStores.DropKV(table)
 	cacheStores.DropKV(table)
 	return nil
 	return nil
 }
 }
+
+func DropCacheKVForRule(rule string) error {
+	if cacheStores == nil {
+		return fmt.Errorf("cache stores are not initialized")
+	}
+	cacheStores.DropRefKVs(path.Join("sink", rule))
+	return nil
+}

+ 1 - 2
internal/processor/rule.go

@@ -24,7 +24,6 @@ import (
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
 	"github.com/lf-edge/ekuiper/pkg/kv"
 	"github.com/lf-edge/ekuiper/pkg/kv"
-	"path"
 )
 )
 
 
 type RuleProcessor struct {
 type RuleProcessor struct {
@@ -230,7 +229,7 @@ func cleanCheckpoint(name string) error {
 }
 }
 
 
 func cleanSinkCache(name string) error {
 func cleanSinkCache(name string) error {
-	err := store.DropKV(path.Join("sink", name))
+	err := store.DropCacheKVForRule(name)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}

+ 94 - 73
internal/topo/node/cache/sync_cache.go

@@ -23,7 +23,6 @@ import (
 	"github.com/lf-edge/ekuiper/pkg/infra"
 	"github.com/lf-edge/ekuiper/pkg/infra"
 	"github.com/lf-edge/ekuiper/pkg/kv"
 	"github.com/lf-edge/ekuiper/pkg/kv"
 	"path"
 	"path"
-	"sort"
 	"strconv"
 	"strconv"
 	"time"
 	"time"
 )
 )
@@ -106,29 +105,30 @@ type SyncCache struct {
 	// cache config
 	// cache config
 	cacheConf   *conf.SinkConf
 	cacheConf   *conf.SinkConf
 	maxDiskPage int
 	maxDiskPage int
+	maxMemPage  int
 	// cache storage
 	// cache storage
 	memCache       []*page
 	memCache       []*page
 	diskBufferPage *page
 	diskBufferPage *page
 	// status
 	// status
-	memSize      int // how many pages in memory has been saved
 	diskSize     int // how many pages has been saved
 	diskSize     int // how many pages has been saved
 	cacheLength  int //readonly, for metrics only to save calculation
 	cacheLength  int //readonly, for metrics only to save calculation
 	diskPageTail int // init from the database
 	diskPageTail int // init from the database
 	diskPageHead int
 	diskPageHead int
-	pending      bool
+	sendStatus   int // 0: idle, 1: sending and waiting for ack, 2: stopped for error
 	//serialize
 	//serialize
 	store kv.KeyValue
 	store kv.KeyValue
 }
 }
 
 
 func NewSyncCache(ctx api.StreamContext, in <-chan interface{}, errCh chan<- error, stats metric.StatManager, cacheConf *conf.SinkConf, bufferLength int) *SyncCache {
 func NewSyncCache(ctx api.StreamContext, in <-chan interface{}, errCh chan<- error, stats metric.StatManager, cacheConf *conf.SinkConf, bufferLength int) *SyncCache {
 	c := &SyncCache{
 	c := &SyncCache{
-		cacheConf: cacheConf,
-		in:        in,
-		Out:       make(chan interface{}, bufferLength),
-		Ack:       make(chan bool, 10),
-		cacheCtrl: make(chan interface{}, 10),
-		errorCh:   errCh,
-		memCache:  make([]*page, 0, cacheConf.MemoryCacheThreshold/cacheConf.BufferPageSize),
+		cacheConf:  cacheConf,
+		in:         in,
+		Out:        make(chan interface{}, bufferLength),
+		Ack:        make(chan bool, 10),
+		cacheCtrl:  make(chan interface{}, 10),
+		errorCh:    errCh,
+		maxMemPage: cacheConf.MemoryCacheThreshold / cacheConf.BufferPageSize,
+		memCache:   make([]*page, 0),
 		// add one more slot so that there will be at least one slot between head and tail to find out the head/tail id
 		// add one more slot so that there will be at least one slot between head and tail to find out the head/tail id
 		maxDiskPage: (cacheConf.MaxDiskCache / cacheConf.BufferPageSize) + 1,
 		maxDiskPage: (cacheConf.MaxDiskCache / cacheConf.BufferPageSize) + 1,
 		stats:       stats,
 		stats:       stats,
@@ -148,6 +148,9 @@ func NewSyncCache(ctx api.StreamContext, in <-chan interface{}, errCh chan<- err
 func (c *SyncCache) run(ctx api.StreamContext) {
 func (c *SyncCache) run(ctx api.StreamContext) {
 	c.initStore(ctx)
 	c.initStore(ctx)
 	defer c.onClose(ctx)
 	defer c.onClose(ctx)
+	if c.cacheLength > 0 { // start to send the cache
+		c.send(ctx)
+	}
 	for {
 	for {
 		select {
 		select {
 		case item := <-c.in:
 		case item := <-c.in:
@@ -162,42 +165,33 @@ func (c *SyncCache) run(ctx api.StreamContext) {
 			c.stats.IncTotalRecordsIn()
 			c.stats.IncTotalRecordsIn()
 			ctx.GetLogger().Debugf("send to cache")
 			ctx.GetLogger().Debugf("send to cache")
 			c.cacheCtrl <- item
 			c.cacheCtrl <- item
-			ctx.GetLogger().Debugf("cache done")
 		case isSuccess := <-c.Ack:
 		case isSuccess := <-c.Ack:
 			// only send the next sink after receiving an ack
 			// only send the next sink after receiving an ack
 			ctx.GetLogger().Debugf("cache ack")
 			ctx.GetLogger().Debugf("cache ack")
 			c.cacheCtrl <- AckResult(isSuccess)
 			c.cacheCtrl <- AckResult(isSuccess)
-			ctx.GetLogger().Debugf("cache ack done")
 		case data := <-c.cacheCtrl: // The only place to manipulate cache
 		case data := <-c.cacheCtrl: // The only place to manipulate cache
 			switch r := data.(type) {
 			switch r := data.(type) {
 			case AckResult:
 			case AckResult:
 				if r {
 				if r {
 					ctx.GetLogger().Debugf("deleting cache")
 					ctx.GetLogger().Debugf("deleting cache")
 					c.deleteCache(ctx)
 					c.deleteCache(ctx)
+					c.sendStatus = 0
+					ctx.GetLogger().Debug("send status to 0 after true ack")
+				} else {
+					c.sendStatus = 2
+					ctx.GetLogger().Debug("send status to 2 after false ack")
 				}
 				}
-				c.pending = false
 			default:
 			default:
 				ctx.GetLogger().Debugf("adding cache %v", data)
 				ctx.GetLogger().Debugf("adding cache %v", data)
 				c.addCache(ctx, data)
 				c.addCache(ctx, data)
+				if c.sendStatus == 2 {
+					c.sendStatus = 0
+					ctx.GetLogger().Debug("send status to 0 after adding cache in error state")
+				}
 			}
 			}
 			c.stats.SetBufferLength(int64(len(c.in) + c.cacheLength))
 			c.stats.SetBufferLength(int64(len(c.in) + c.cacheLength))
-			if !c.pending {
-				if c.pending {
-					time.Sleep(time.Duration(c.cacheConf.ResendInterval) * time.Millisecond)
-				}
-				d, ok := c.peakMemCache(ctx)
-				if ok {
-					ctx.GetLogger().Debugf("sending cache item %v", d)
-					c.pending = true
-					select {
-					case c.Out <- d:
-						ctx.GetLogger().Debugf("sink cache send out %v", d)
-					case <-ctx.Done():
-						ctx.GetLogger().Debugf("stop sink cache send")
-					}
-				} else {
-					c.pending = false
-				}
+			if c.sendStatus == 0 {
+				c.send(ctx)
 			}
 			}
 		case <-ctx.Done():
 		case <-ctx.Done():
 			ctx.GetLogger().Infof("sink node %s instance cache %d done", ctx.GetOpId(), ctx.GetInstanceId())
 			ctx.GetLogger().Infof("sink node %s instance cache %d done", ctx.GetOpId(), ctx.GetInstanceId())
@@ -206,6 +200,26 @@ func (c *SyncCache) run(ctx api.StreamContext) {
 	}
 	}
 }
 }
 
 
+func (c *SyncCache) send(ctx api.StreamContext) {
+	if c.cacheLength > 1 && c.cacheConf.ResendInterval > 0 {
+		time.Sleep(time.Duration(c.cacheConf.ResendInterval) * time.Millisecond)
+	}
+	d, ok := c.peakMemCache(ctx)
+	if ok {
+		ctx.GetLogger().Debugf("sending cache item %v", d)
+		c.sendStatus = 1
+		ctx.GetLogger().Debug("send status to 0 after sending tuple")
+		select {
+		case c.Out <- d:
+			ctx.GetLogger().Debugf("sink cache send out %v", d)
+		case <-ctx.Done():
+			ctx.GetLogger().Debugf("stop sink cache send")
+		}
+	} else {
+		ctx.GetLogger().Debug("no cache to send")
+	}
+}
+
 // addCache not thread safe!
 // addCache not thread safe!
 func (c *SyncCache) addCache(ctx api.StreamContext, item interface{}) {
 func (c *SyncCache) addCache(ctx api.StreamContext, item interface{}) {
 	isNotFull := c.appendMemCache(item)
 	isNotFull := c.appendMemCache(item)
@@ -218,15 +232,20 @@ func (c *SyncCache) addCache(ctx api.StreamContext, item interface{}) {
 			if c.diskSize == c.maxDiskPage {
 			if c.diskSize == c.maxDiskPage {
 				// disk full, read the oldest page to the hot page
 				// disk full, read the oldest page to the hot page
 				c.loadFromDisk(ctx)
 				c.loadFromDisk(ctx)
-				c.cacheLength -= c.cacheConf.BufferPageSize
+				ctx.GetLogger().Debug("disk full, remove the last page")
 			}
 			}
 			err := c.store.Set(strconv.Itoa(c.diskPageTail), c.diskBufferPage)
 			err := c.store.Set(strconv.Itoa(c.diskPageTail), c.diskBufferPage)
 			if err != nil {
 			if err != nil {
 				ctx.GetLogger().Errorf("fail to store disk cache %v", err)
 				ctx.GetLogger().Errorf("fail to store disk cache %v", err)
 				return
 				return
 			} else {
 			} else {
+				ctx.GetLogger().Debug("add cache to disk. the new disk buffer page is %v", c.diskBufferPage)
 				c.diskPageTail++
 				c.diskPageTail++
 				c.diskSize++
 				c.diskSize++
+				err := c.store.Set("size", c.diskSize)
+				if err != nil {
+					ctx.GetLogger().Warnf("fail to store disk cache size %v", err)
+				}
 				// rotate
 				// rotate
 				if c.diskPageTail == c.maxDiskPage {
 				if c.diskPageTail == c.maxDiskPage {
 					c.diskPageTail = 0
 					c.diskPageTail = 0
@@ -234,15 +253,20 @@ func (c *SyncCache) addCache(ctx api.StreamContext, item interface{}) {
 			}
 			}
 			c.diskBufferPage.reset()
 			c.diskBufferPage.reset()
 			c.diskBufferPage.append(item)
 			c.diskBufferPage.append(item)
+		} else {
+			ctx.GetLogger().Debugf("added cache to disk buffer page %v", c.diskBufferPage)
 		}
 		}
+	} else {
+		ctx.GetLogger().Debugf("added cache to mem cache %v", c.memCache)
 	}
 	}
 	c.cacheLength++
 	c.cacheLength++
-	ctx.GetLogger().Debugf("added cache")
 }
 }
 
 
 // deleteCache not thread safe!
 // deleteCache not thread safe!
 func (c *SyncCache) deleteCache(ctx api.StreamContext) {
 func (c *SyncCache) deleteCache(ctx api.StreamContext) {
+	ctx.GetLogger().Debugf("deleting cache. cacheLength: %d, diskSize: %d", c.cacheLength, c.diskSize)
 	if len(c.memCache) == 0 {
 	if len(c.memCache) == 0 {
+		ctx.GetLogger().Debug("mem cache is empty")
 		return
 		return
 	}
 	}
 	isNotEmpty := c.memCache[0].delete()
 	isNotEmpty := c.memCache[0].delete()
@@ -255,14 +279,17 @@ func (c *SyncCache) deleteCache(ctx api.StreamContext) {
 		if c.diskSize > 0 {
 		if c.diskSize > 0 {
 			c.loadFromDisk(ctx)
 			c.loadFromDisk(ctx)
 		} else if c.diskBufferPage != nil { // use cool page as the new page
 		} else if c.diskBufferPage != nil { // use cool page as the new page
+			ctx.GetLogger().Debugf("reading from diskBufferPage: %d", c.cacheLength)
 			c.memCache = append(c.memCache, c.diskBufferPage)
 			c.memCache = append(c.memCache, c.diskBufferPage)
 			c.diskBufferPage = nil
 			c.diskBufferPage = nil
 		}
 		}
 	}
 	}
+	ctx.GetLogger().Debugf("deleted cache. cacheLength: %d, diskSize: %d, memCache: %v", c.cacheLength, c.diskSize, c.memCache)
 }
 }
 
 
 func (c *SyncCache) loadFromDisk(ctx api.StreamContext) {
 func (c *SyncCache) loadFromDisk(ctx api.StreamContext) {
 	// load page from the disk
 	// load page from the disk
+	ctx.GetLogger().Debugf("loading from disk %d. cacheLength: %d, diskSize: %d", c.diskPageTail, c.cacheLength, c.diskSize)
 	hotPage := newPage(c.cacheConf.BufferPageSize)
 	hotPage := newPage(c.cacheConf.BufferPageSize)
 	ok, err := c.store.Get(strconv.Itoa(c.diskPageHead), hotPage)
 	ok, err := c.store.Get(strconv.Itoa(c.diskPageHead), hotPage)
 	if err != nil {
 	if err != nil {
@@ -270,39 +297,48 @@ func (c *SyncCache) loadFromDisk(ctx api.StreamContext) {
 	} else if !ok {
 	} else if !ok {
 		ctx.GetLogger().Errorf("nothing in the disk, should not happen")
 		ctx.GetLogger().Errorf("nothing in the disk, should not happen")
 	} else {
 	} else {
-		if len(c.memCache) > 0 {
-			ctx.GetLogger().Debugf("drop a page in memory")
+		if len(c.memCache) >= c.maxMemPage {
+			ctx.GetLogger().Warnf("drop a page of %d items in memory", c.memCache[0].L)
+			c.cacheLength -= c.memCache[0].L
 			c.memCache = c.memCache[1:]
 			c.memCache = c.memCache[1:]
 		}
 		}
 		c.memCache = append(c.memCache, hotPage)
 		c.memCache = append(c.memCache, hotPage)
 		c.diskPageHead++
 		c.diskPageHead++
 		c.diskSize--
 		c.diskSize--
+		err := c.store.Set("size", c.diskSize)
+		if err != nil {
+			ctx.GetLogger().Warnf("fail to store disk cache size %v", err)
+		}
 		if c.diskPageHead == c.maxDiskPage {
 		if c.diskPageHead == c.maxDiskPage {
 			c.diskPageHead = 0
 			c.diskPageHead = 0
 		}
 		}
+		err = c.store.Set("head", c.diskPageHead)
+		if err != nil {
+			ctx.GetLogger().Warnf("fail to store disk cache size %v", err)
+		}
 	}
 	}
+	ctx.GetLogger().Debugf("loaded from disk %d. cacheLength: %d, diskSize: %d", c.diskPageTail, c.cacheLength, c.diskSize)
 }
 }
 
 
 func (c *SyncCache) appendMemCache(item interface{}) bool {
 func (c *SyncCache) appendMemCache(item interface{}) bool {
-	if c.memSize == cap(c.memCache) {
+	if len(c.memCache) > c.maxMemPage {
 		return false
 		return false
 	}
 	}
-	if len(c.memCache) <= c.memSize {
+	if len(c.memCache) == 0 {
 		c.memCache = append(c.memCache, newPage(c.cacheConf.BufferPageSize))
 		c.memCache = append(c.memCache, newPage(c.cacheConf.BufferPageSize))
 	}
 	}
-	isNotFull := c.memCache[c.memSize].append(item)
+	isNotFull := c.memCache[len(c.memCache)-1].append(item)
 	if !isNotFull {
 	if !isNotFull {
-		c.memSize++
-		if c.memSize == cap(c.memCache) {
+		if len(c.memCache) == c.maxMemPage {
 			return false
 			return false
 		}
 		}
 		c.memCache = append(c.memCache, newPage(c.cacheConf.BufferPageSize))
 		c.memCache = append(c.memCache, newPage(c.cacheConf.BufferPageSize))
-		return c.memCache[c.memSize].append(item)
+		return c.memCache[len(c.memCache)-1].append(item)
 	}
 	}
 	return true
 	return true
 }
 }
 
 
-func (c *SyncCache) peakMemCache(ctx api.StreamContext) (interface{}, bool) {
+func (c *SyncCache) peakMemCache(_ api.StreamContext) (interface{}, bool) {
 	if len(c.memCache) == 0 {
 	if len(c.memCache) == 0 {
 		return nil, false
 		return nil, false
 	}
 	}
@@ -346,7 +382,6 @@ func (c *SyncCache) initStore(ctx api.StreamContext) {
 		if err != nil {
 		if err != nil {
 			ctx.GetLogger().Errorf("fail to restore mem cache %v", err)
 			ctx.GetLogger().Errorf("fail to restore mem cache %v", err)
 		}
 		}
-		c.memSize = len(c.memCache)
 		for _, p := range c.memCache {
 		for _, p := range c.memCache {
 			c.cacheLength += p.L
 			c.cacheLength += p.L
 		}
 		}
@@ -356,38 +391,19 @@ func (c *SyncCache) initStore(ctx api.StreamContext) {
 		}
 		}
 		ctx.GetLogger().Infof("restored mem cache %d", c.cacheLength)
 		ctx.GetLogger().Infof("restored mem cache %d", c.cacheLength)
 		// restore the disk cache
 		// restore the disk cache
-		dks, err := c.store.Keys()
-		if err != nil {
-			ctx.GetLogger().Errorf("fail to restore disk cache %v", err)
+		var size int
+		ok, _ = c.store.Get("size", &size)
+		if !ok || size == 0 { // no disk cache
 			return
 			return
 		}
 		}
-		if len(dks) == 0 {
-			return
-		}
-		dk := make([]int, 0, len(dks))
-		for _, d := range dks {
-			aint, err := strconv.Atoi(d)
-			if err == nil { // filter mem cache
-				dk = append(dk, aint)
-			}
-		}
-		if len(dk) == 0 {
-			return
-		}
-		c.diskSize = len(dk) - 1
-		c.cacheLength += c.diskSize * c.cacheConf.BufferPageSize
-		sort.Ints(dk)
-		// default
-		c.diskPageHead = dk[0]
-		c.diskPageTail = dk[len(dk)-1]
-		for i, k := range dk {
-			if i-1 >= 0 {
-				if k-dk[i-1] > 1 {
-					c.diskPageTail = i - 1
-					c.diskPageHead = k
-				}
-			}
+		c.diskSize = size
+		var head int
+		ok, _ = c.store.Get("head", &head)
+		if ok {
+			c.diskPageHead = head
 		}
 		}
+		c.cacheLength += (c.diskSize - 1) * c.cacheConf.BufferPageSize
+		c.diskPageTail = (c.diskPageHead + c.diskSize - 1) % c.maxDiskPage
 		// load buffer page
 		// load buffer page
 		hotPage := newPage(c.cacheConf.BufferPageSize)
 		hotPage := newPage(c.cacheConf.BufferPageSize)
 		ok, err = c.store.Get(strconv.Itoa(c.diskPageTail), hotPage)
 		ok, err = c.store.Get(strconv.Itoa(c.diskPageTail), hotPage)
@@ -398,8 +414,9 @@ func (c *SyncCache) initStore(ctx api.StreamContext) {
 		} else {
 		} else {
 			c.diskBufferPage = hotPage
 			c.diskBufferPage = hotPage
 			c.cacheLength += c.diskBufferPage.L
 			c.cacheLength += c.diskBufferPage.L
+			c.diskSize--
 		}
 		}
-		ctx.GetLogger().Infof("restored all cache %d", c.cacheLength)
+		ctx.GetLogger().Infof("restored all cache %d. diskSize %d", c.cacheLength, c.diskSize)
 	}
 	}
 }
 }
 
 
@@ -416,6 +433,10 @@ func (c *SyncCache) onClose(ctx api.StreamContext) {
 			if err != nil {
 			if err != nil {
 				ctx.GetLogger().Errorf("fail to store disk cache %v", err)
 				ctx.GetLogger().Errorf("fail to store disk cache %v", err)
 			}
 			}
+			err = c.store.Set("size", c.diskSize+1)
+			if err != nil {
+				ctx.GetLogger().Errorf("fail to store disk size %v", err)
+			}
 			ctx.GetLogger().Debug("store disk cache")
 			ctx.GetLogger().Debug("store disk cache")
 		}
 		}
 		// store the memory states
 		// store the memory states
@@ -424,7 +445,7 @@ func (c *SyncCache) onClose(ctx api.StreamContext) {
 			if err != nil {
 			if err != nil {
 				ctx.GetLogger().Errorf("fail to store memory cache to disk %v", err)
 				ctx.GetLogger().Errorf("fail to store memory cache to disk %v", err)
 			}
 			}
-			ctx.GetLogger().Debugf("store memory cache %d", c.memSize)
+			ctx.GetLogger().Debugf("store memory cache %d", len(c.memCache))
 		}
 		}
 		c.store.Set("storeSig", 1)
 		c.store.Set("storeSig", 1)
 	}
 	}

+ 2 - 2
internal/topo/node/cache/sync_cache_test.go

@@ -109,9 +109,9 @@ func TestRun(t *testing.T) {
 	}{
 	}{
 		{ // 0
 		{ // 0
 			sconf: &conf.SinkConf{
 			sconf: &conf.SinkConf{
-				MemoryCacheThreshold: 6,
+				MemoryCacheThreshold: 4,
 				MaxDiskCache:         12,
 				MaxDiskCache:         12,
-				BufferPageSize:       3,
+				BufferPageSize:       2,
 				EnableCache:          true,
 				EnableCache:          true,
 				ResendInterval:       0,
 				ResendInterval:       0,
 				CleanCacheAtStop:     false,
 				CleanCacheAtStop:     false,