|
@@ -86,8 +86,8 @@ func (c *Cache) initStore(ctx api.StreamContext) {
|
|
|
if err != nil {
|
|
|
c.drainError(err)
|
|
|
}
|
|
|
- c.store = common.GetSimpleKVStore(path.Join(dbDir, "sink"))
|
|
|
- c.key = ctx.GetRuleId() + ctx.GetOpId() + strconv.Itoa(ctx.GetInstanceId())
|
|
|
+ c.store = common.GetSimpleKVStore(path.Join(dbDir, "sink", ctx.GetRuleId()))
|
|
|
+ c.key = ctx.GetOpId() + strconv.Itoa(ctx.GetInstanceId())
|
|
|
logger.Debugf("cache saved to key %s", c.key)
|
|
|
//load cache
|
|
|
if err := c.loadCache(); err != nil {
|
|
@@ -188,10 +188,17 @@ func (c *Cache) loadCache() error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (c *Cache) saveCache(_ api.Logger, p *LinkedQueue) error {
|
|
|
+func (c *Cache) saveCache(logger api.Logger, p *LinkedQueue) error {
|
|
|
err := c.store.Open()
|
|
|
if err != nil {
|
|
|
- return err
|
|
|
+ logger.Errorf("save cache error while opening cache store: %s", err)
|
|
|
+ logger.Infof("clean the cache and reopen")
|
|
|
+ c.store.Clean()
|
|
|
+ err = c.store.Open()
|
|
|
+ if err != nil {
|
|
|
+ logger.Errorf("save cache error after reset the cache store: %s", err)
|
|
|
+ return err
|
|
|
+ }
|
|
|
}
|
|
|
defer c.store.Close()
|
|
|
return c.store.Replace(c.key, p)
|