123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464 |
- // Copyright 2022-2023 EMQ Technologies Co., Ltd.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package cache
- import (
- "path"
- "strconv"
- "time"
- "github.com/lf-edge/ekuiper/internal/conf"
- "github.com/lf-edge/ekuiper/internal/pkg/store"
- "github.com/lf-edge/ekuiper/internal/topo/node/metric"
- "github.com/lf-edge/ekuiper/pkg/api"
- "github.com/lf-edge/ekuiper/pkg/infra"
- "github.com/lf-edge/ekuiper/pkg/kv"
- )
- type AckResult bool
- // page Rotate storage for in memory cache
- // Not thread safe!
- type page struct {
- Data [][]map[string]interface{}
- H int
- T int
- L int
- Size int
- }
- // newPage create a new cache page
- // TODO the page is created even not used, need dynamic?
- func newPage(size int) *page {
- return &page{
- Data: make([][]map[string]interface{}, size),
- H: 0, // When deleting, head++, if tail == head, it is empty
- T: 0, // When append, tail++, if tail== head, it is full
- Size: size,
- }
- }
- // append item if list is not full and return true; otherwise return false
- func (p *page) append(item []map[string]interface{}) bool {
- if p.L == p.Size { // full
- return false
- }
- p.Data[p.T] = item
- p.T++
- if p.T == p.Size {
- p.T = 0
- }
- p.L++
- return true
- }
- // peak get the first item in the cache
- func (p *page) peak() ([]map[string]interface{}, bool) {
- if p.L == 0 {
- return nil, false
- }
- return p.Data[p.H], true
- }
- func (p *page) delete() bool {
- if p.L == 0 {
- return false
- }
- p.H++
- if p.H == p.Size {
- p.H = 0
- }
- p.L--
- return true
- }
- func (p *page) isEmpty() bool {
- return p.L == 0
- }
- func (p *page) reset() {
- p.H = 0
- p.T = 0
- p.L = 0
- }
- type SyncCache struct {
- // The input data to the cache
- in <-chan []map[string]interface{}
- Out chan []map[string]interface{}
- Ack chan bool
- cacheCtrl chan interface{} // CacheCtrl is the only place to control the cache; sync in and ack result
- errorCh chan<- error
- stats metric.StatManager
- // cache config
- cacheConf *conf.SinkConf
- maxDiskPage int
- maxMemPage int
- // cache storage
- memCache []*page
- diskBufferPage *page
- // status
- diskSize int // how many pages has been saved
- cacheLength int // readonly, for metrics only to save calculation
- diskPageTail int // init from the database
- diskPageHead int
- sendStatus int // 0: idle, 1: sending and waiting for ack, 2: stopped for error
- // serialize
- store kv.KeyValue
- exitCh chan<- struct{}
- }
- func NewSyncCacheWithExitChanel(ctx api.StreamContext, in <-chan []map[string]interface{}, errCh chan<- error, stats metric.StatManager, cacheConf *conf.SinkConf, bufferLength int, exitCh chan<- struct{}) *SyncCache {
- c := NewSyncCache(ctx, in, errCh, stats, cacheConf, bufferLength)
- c.exitCh = exitCh
- return c
- }
- func NewSyncCache(ctx api.StreamContext, in <-chan []map[string]interface{}, errCh chan<- error, stats metric.StatManager, cacheConf *conf.SinkConf, bufferLength int) *SyncCache {
- c := &SyncCache{
- cacheConf: cacheConf,
- in: in,
- Out: make(chan []map[string]interface{}, bufferLength),
- Ack: make(chan bool, 10),
- cacheCtrl: make(chan interface{}, 10),
- errorCh: errCh,
- maxMemPage: cacheConf.MemoryCacheThreshold / cacheConf.BufferPageSize,
- memCache: make([]*page, 0),
- // add one more slot so that there will be at least one slot between head and tail to find out the head/tail id
- maxDiskPage: (cacheConf.MaxDiskCache / cacheConf.BufferPageSize) + 1,
- stats: stats,
- }
- go func() {
- err := infra.SafeRun(func() error {
- c.run(ctx)
- return nil
- })
- if err != nil {
- infra.DrainError(ctx, err, errCh)
- }
- }()
- return c
- }
- func (c *SyncCache) run(ctx api.StreamContext) {
- c.initStore(ctx)
- defer c.onClose(ctx)
- if c.cacheLength > 0 { // start to send the cache
- c.send(ctx)
- }
- for {
- select {
- case item := <-c.in:
- ctx.GetLogger().Debugf("send to cache")
- go func() { // avoid deadlock when cacheCtrl is full
- c.cacheCtrl <- item
- }()
- case isSuccess := <-c.Ack:
- // only send the next sink after receiving an ack
- ctx.GetLogger().Debugf("cache ack")
- go func() {
- c.cacheCtrl <- AckResult(isSuccess)
- }()
- case data := <-c.cacheCtrl: // The only place to manipulate cache
- switch r := data.(type) {
- case AckResult:
- if r {
- ctx.GetLogger().Debugf("deleting cache")
- c.deleteCache(ctx)
- c.sendStatus = 0
- ctx.GetLogger().Debug("send status to 0 after true ack")
- } else {
- c.sendStatus = 2
- ctx.GetLogger().Debug("send status to 2 after false ack")
- }
- case []map[string]interface{}:
- ctx.GetLogger().Debugf("adding cache %v", data)
- c.addCache(ctx, r)
- if c.sendStatus == 2 {
- c.sendStatus = 0
- ctx.GetLogger().Debug("send status to 0 after adding cache in error state")
- }
- default:
- ctx.GetLogger().Errorf("unknown cache control command %v", data)
- }
- c.stats.SetBufferLength(int64(len(c.in) + c.cacheLength))
- if c.sendStatus == 0 {
- c.send(ctx)
- }
- case <-ctx.Done():
- ctx.GetLogger().Infof("sink node %s instance cache %d done", ctx.GetOpId(), ctx.GetInstanceId())
- return
- }
- }
- }
- func (c *SyncCache) send(ctx api.StreamContext) {
- if c.cacheLength > 1 && c.cacheConf.ResendInterval > 0 {
- time.Sleep(time.Duration(c.cacheConf.ResendInterval) * time.Millisecond)
- }
- d, ok := c.peakMemCache(ctx)
- if ok {
- ctx.GetLogger().Debugf("sending cache item %v", d)
- c.sendStatus = 1
- ctx.GetLogger().Debug("send status to 0 after sending tuple")
- select {
- case c.Out <- d:
- ctx.GetLogger().Debugf("sink cache send out %v", d)
- case <-ctx.Done():
- ctx.GetLogger().Debugf("stop sink cache send")
- }
- } else {
- ctx.GetLogger().Debug("no cache to send")
- }
- }
- // addCache not thread safe!
- func (c *SyncCache) addCache(ctx api.StreamContext, item []map[string]interface{}) {
- isNotFull := c.appendMemCache(item)
- if !isNotFull {
- if c.diskBufferPage == nil {
- c.diskBufferPage = newPage(c.cacheConf.BufferPageSize)
- }
- isBufferNotFull := c.diskBufferPage.append(item)
- if !isBufferNotFull { // cool page full, save to disk
- if c.diskSize == c.maxDiskPage {
- // disk full, read the oldest page to the hot page
- c.loadFromDisk(ctx)
- ctx.GetLogger().Debug("disk full, remove the last page")
- }
- err := c.store.Set(strconv.Itoa(c.diskPageTail), c.diskBufferPage)
- if err != nil {
- ctx.GetLogger().Errorf("fail to store disk cache %v", err)
- return
- } else {
- ctx.GetLogger().Debug("add cache to disk. the new disk buffer page is %v", c.diskBufferPage)
- c.diskPageTail++
- c.diskSize++
- err := c.store.Set("size", c.diskSize)
- if err != nil {
- ctx.GetLogger().Warnf("fail to store disk cache size %v", err)
- }
- // rotate
- if c.diskPageTail == c.maxDiskPage {
- c.diskPageTail = 0
- }
- }
- c.diskBufferPage.reset()
- c.diskBufferPage.append(item)
- } else {
- ctx.GetLogger().Debugf("added cache to disk buffer page %v", c.diskBufferPage)
- }
- } else {
- ctx.GetLogger().Debugf("added cache to mem cache %v", c.memCache)
- }
- c.cacheLength++
- ctx.GetLogger().Debugf("added cache %d", c.cacheLength)
- }
- // deleteCache not thread safe!
- func (c *SyncCache) deleteCache(ctx api.StreamContext) {
- ctx.GetLogger().Debugf("deleting cache. cacheLength: %d, diskSize: %d", c.cacheLength, c.diskSize)
- if len(c.memCache) == 0 {
- ctx.GetLogger().Debug("mem cache is empty")
- return
- }
- isNotEmpty := c.memCache[0].delete()
- if isNotEmpty {
- c.cacheLength--
- ctx.GetLogger().Debugf("deleted cache: %d", c.cacheLength)
- }
- if c.memCache[0].isEmpty() { // read from disk or cool list
- c.memCache = c.memCache[1:]
- if c.diskSize > 0 {
- c.loadFromDisk(ctx)
- } else if c.diskBufferPage != nil { // use cool page as the new page
- ctx.GetLogger().Debugf("reading from diskBufferPage: %d", c.cacheLength)
- c.memCache = append(c.memCache, c.diskBufferPage)
- c.diskBufferPage = nil
- }
- }
- ctx.GetLogger().Debugf("deleted cache. cacheLength: %d, diskSize: %d, memCache: %v", c.cacheLength, c.diskSize, c.memCache)
- }
- func (c *SyncCache) loadFromDisk(ctx api.StreamContext) {
- // load page from the disk
- ctx.GetLogger().Debugf("loading from disk %d. cacheLength: %d, diskSize: %d", c.diskPageTail, c.cacheLength, c.diskSize)
- hotPage := newPage(c.cacheConf.BufferPageSize)
- ok, err := c.store.Get(strconv.Itoa(c.diskPageHead), hotPage)
- if err != nil {
- ctx.GetLogger().Errorf("fail to load disk cache %v", err)
- } else if !ok {
- ctx.GetLogger().Errorf("nothing in the disk, should not happen")
- } else {
- _ = c.store.Delete(strconv.Itoa(c.diskPageHead))
- if len(c.memCache) >= c.maxMemPage {
- ctx.GetLogger().Warnf("drop a page of %d items in memory", c.memCache[0].L)
- c.cacheLength -= c.memCache[0].L
- c.memCache = c.memCache[1:]
- }
- c.memCache = append(c.memCache, hotPage)
- c.diskPageHead++
- c.diskSize--
- err := c.store.Set("size", c.diskSize)
- if err != nil {
- ctx.GetLogger().Warnf("fail to store disk cache size %v", err)
- }
- if c.diskPageHead == c.maxDiskPage {
- c.diskPageHead = 0
- }
- err = c.store.Set("head", c.diskPageHead)
- if err != nil {
- ctx.GetLogger().Warnf("fail to store disk cache size %v", err)
- }
- }
- ctx.GetLogger().Debugf("loaded from disk %d. cacheLength: %d, diskSize: %d", c.diskPageTail, c.cacheLength, c.diskSize)
- }
- func (c *SyncCache) appendMemCache(item []map[string]interface{}) bool {
- if len(c.memCache) > c.maxMemPage {
- return false
- }
- if len(c.memCache) == 0 {
- c.memCache = append(c.memCache, newPage(c.cacheConf.BufferPageSize))
- }
- isNotFull := c.memCache[len(c.memCache)-1].append(item)
- if !isNotFull {
- if len(c.memCache) == c.maxMemPage {
- return false
- }
- c.memCache = append(c.memCache, newPage(c.cacheConf.BufferPageSize))
- return c.memCache[len(c.memCache)-1].append(item)
- }
- return true
- }
- func (c *SyncCache) peakMemCache(_ api.StreamContext) ([]map[string]interface{}, bool) {
- if len(c.memCache) == 0 {
- return nil, false
- }
- return c.memCache[0].peak()
- }
- func (c *SyncCache) initStore(ctx api.StreamContext) {
- kvTable := path.Join("sink", ctx.GetRuleId()+ctx.GetOpId()+strconv.Itoa(ctx.GetInstanceId()))
- if c.cacheConf.CleanCacheAtStop {
- ctx.GetLogger().Infof("creating cache store %s", kvTable)
- store.DropCacheKV(kvTable)
- }
- var err error
- c.store, err = store.GetCacheKV(kvTable)
- if err != nil {
- infra.DrainError(ctx, err, c.errorCh)
- }
- // restore the sink cache from disk
- if !c.cacheConf.CleanCacheAtStop {
- // Save 0 when init and save 1 when close. Wait for close for newly started sink node
- var set int
- ok, _ := c.store.Get("storeSig", &set)
- if ok && set == 0 { // may be saving
- i := 0
- for ; i < 100; i++ {
- time.Sleep(time.Millisecond * 10)
- c.store.Get("storeSig", &set)
- if set == 1 {
- ctx.GetLogger().Infof("waiting for previous cache for %d times", i)
- break
- }
- }
- if i == 100 {
- ctx.GetLogger().Errorf("waiting for previous cache for %d times, exit and drop", i)
- }
- }
- c.store.Set("storeSig", 0)
- ctx.GetLogger().Infof("start to restore cache from disk")
- // restore the memCache
- _, err = c.store.Get("memcache", &c.memCache)
- if err != nil {
- ctx.GetLogger().Errorf("fail to restore mem cache %v", err)
- }
- for _, p := range c.memCache {
- c.cacheLength += p.L
- }
- err = c.store.Delete("memcache")
- if err != nil {
- ctx.GetLogger().Errorf("fail to delete mem cache %v", err)
- }
- ctx.GetLogger().Infof("restored mem cache %d", c.cacheLength)
- // restore the disk cache
- var size int
- ok, _ = c.store.Get("size", &size)
- if !ok || size == 0 { // no disk cache
- return
- }
- c.diskSize = size
- var head int
- ok, _ = c.store.Get("head", &head)
- if ok {
- c.diskPageHead = head
- }
- c.cacheLength += (c.diskSize - 1) * c.cacheConf.BufferPageSize
- c.diskPageTail = (c.diskPageHead + c.diskSize - 1) % c.maxDiskPage
- // load buffer page
- hotPage := newPage(c.cacheConf.BufferPageSize)
- ok, err = c.store.Get(strconv.Itoa(c.diskPageTail), hotPage)
- if err != nil {
- ctx.GetLogger().Errorf("fail to load disk cache to buffer %v", err)
- } else if !ok {
- ctx.GetLogger().Errorf("nothing in the disk, should not happen")
- } else {
- c.diskBufferPage = hotPage
- c.cacheLength += c.diskBufferPage.L
- c.diskSize--
- }
- ctx.GetLogger().Infof("restored all cache %d. diskSize %d", c.cacheLength, c.diskSize)
- }
- }
- // save memory states to disk
- func (c *SyncCache) onClose(ctx api.StreamContext) {
- defer func() {
- if c.exitCh != nil {
- c.exitCh <- struct{}{}
- }
- }()
- ctx.GetLogger().Infof("sink node %s instance cache %d closing", ctx.GetOpId(), ctx.GetInstanceId())
- if c.cacheConf.CleanCacheAtStop {
- kvTable := path.Join("sink", ctx.GetRuleId()+ctx.GetOpId()+strconv.Itoa(ctx.GetInstanceId()))
- ctx.GetLogger().Infof("cleaning cache store %s", kvTable)
- store.DropCacheKV(kvTable)
- } else {
- if c.diskBufferPage != nil {
- err := c.store.Set(strconv.Itoa(c.diskPageTail), c.diskBufferPage)
- if err != nil {
- ctx.GetLogger().Errorf("fail to store disk cache %v", err)
- }
- err = c.store.Set("size", c.diskSize+1)
- if err != nil {
- ctx.GetLogger().Errorf("fail to store disk size %v", err)
- }
- ctx.GetLogger().Debug("store disk cache")
- }
- // store the memory states
- if len(c.memCache) > 0 {
- err := c.store.Set("memcache", c.memCache)
- if err != nil {
- ctx.GetLogger().Errorf("fail to store memory cache to disk %v", err)
- }
- ctx.GetLogger().Debugf("store memory cache %d", len(c.memCache))
- }
- c.store.Set("storeSig", 1)
- }
- }
|