Ver código fonte

sink cache problems (#108)

* bug(sink): save cache concurrent map access problem

* feat(kv): close kv should not save to file again

This doubles the io

* feat(sink): sink only cache at idle or threshold
ngjaying 5 anos atrás
pai
commit
cc9e78e1b4
2 arquivos alterados com 32 adições e 10 exclusões
  1. 2 2
      common/kv.go
  2. 30 8
      xstream/nodes/sink_cache.go

+ 2 - 2
common/kv.go

@@ -132,9 +132,9 @@ func (m *SimpleKVStore) Close() error {
 }
 
 func (m *SimpleKVStore) doClose() error {
-	e := m.c.SaveFile(m.path)
+	//e := m.c.SaveFile(m.path)
 	m.c.Flush() //Delete all of the values from memory.
-	return e
+	return nil
 }
 
 func (m *SimpleKVStore) saveToFile() error {

+ 30 - 8
xstream/nodes/sink_cache.go

@@ -39,6 +39,17 @@ func (l *LinkedQueue) length() int {
 	return len(l.Data)
 }
 
+func (l *LinkedQueue) clone() *LinkedQueue {
+	result := &LinkedQueue{
+		Data: make(map[int]interface{}),
+		Tail: l.Tail,
+	}
+	for k, v := range l.Data {
+		result.Data[k] = v
+	}
+	return result
+}
+
 type Cache struct {
 	//Data and control channels
 	in       <-chan interface{}
@@ -51,11 +62,14 @@ type Cache struct {
 	key     string //the key for current cache
 	store   common.KeyValue
 	changed bool
+	saved   int
 	//configs
 	limit        int
 	saveInterval int
 }
 
+const THRESHOLD int = 10
+
 func NewCache(in <-chan interface{}, limit int, saveInterval int, errCh chan<- error, ctx api.StreamContext) *Cache {
 	c := &Cache{
 		in:       in,
@@ -101,22 +115,30 @@ func (c *Cache) run(ctx api.StreamContext) {
 			c.pending.delete(index)
 			c.changed = true
 		case <-ticker.C:
-			if c.pending.length() == 0 {
+			l := c.pending.length()
+			if l == 0 {
 				c.pending.reset()
 			}
-			if c.changed {
-				logger.Debugf("save cache")
+			//If the data is still changing, only do a save when the cache has more than threshold to prevent too much file IO
+			//If the data is not changing in the time slot and have not saved before, save it. This is to prevent the
+			//data won't be saved as the cache never pass the threshold
+			if (c.changed && l > THRESHOLD) || (!c.changed && c.saved != l) {
+				logger.Infof("save cache for rule %s", ctx.GetRuleId())
+				clone := c.pending.clone()
 				go func() {
-					if err := c.saveCache(); err != nil {
+					if err := c.saveCache(clone); err != nil {
 						logger.Debugf("%v", err)
 						c.drainError(err)
 					}
 				}()
-				c.changed = false
+				c.saved = l
+			} else if c.changed {
+				c.saved = 0
 			}
+			c.changed = false
 		case <-ctx.Done():
 			if c.changed {
-				c.saveCache()
+				c.saveCache(c.pending)
 			}
 			return
 		}
@@ -160,13 +182,13 @@ func (c *Cache) loadCache() error {
 	return nil
 }
 
-func (c *Cache) saveCache() error {
+func (c *Cache) saveCache(p *LinkedQueue) error {
 	err := c.store.Open()
 	if err != nil {
 		return err
 	}
 	defer c.store.Close()
-	return c.store.Replace(c.key, c.pending)
+	return c.store.Replace(c.key, p)
 }
 
 func (c *Cache) drainError(err error) {