123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200 |
- package nodes
- import (
- "encoding/gob"
- "fmt"
- "github.com/emqx/kuiper/common"
- "github.com/emqx/kuiper/xstream/api"
- "github.com/prometheus/common/log"
- "io"
- "path"
- "sort"
- "strconv"
- )
- type CacheTuple struct {
- index int
- data interface{}
- }
- type LinkedQueue struct {
- Data map[int]interface{}
- Tail int
- }
- func (l *LinkedQueue) append(item interface{}) {
- l.Data[l.Tail] = item
- l.Tail++
- }
- func (l *LinkedQueue) delete(index int) {
- delete(l.Data, index)
- }
- func (l *LinkedQueue) reset() {
- l.Tail = 0
- }
- func (l *LinkedQueue) length() int {
- return len(l.Data)
- }
- func (l *LinkedQueue) clone() *LinkedQueue {
- result := &LinkedQueue{
- Data: make(map[int]interface{}),
- Tail: l.Tail,
- }
- for k, v := range l.Data {
- result.Data[k] = v
- }
- return result
- }
- type Cache struct {
- //Data and control channels
- in <-chan interface{}
- Out chan *CacheTuple
- Complete chan int
- errorCh chan<- error
- //states
- pending *LinkedQueue
- //serialize
- key string //the key for current cache
- store common.KeyValue
- changed bool
- saved int
- //configs
- limit int
- saveInterval int
- }
- const THRESHOLD int = 10
- func NewCache(in <-chan interface{}, limit int, saveInterval int, errCh chan<- error, ctx api.StreamContext) *Cache {
- c := &Cache{
- in: in,
- Out: make(chan *CacheTuple, limit),
- Complete: make(chan int),
- errorCh: errCh,
- limit: limit,
- saveInterval: saveInterval,
- }
- go c.run(ctx)
- return c
- }
- func (c *Cache) run(ctx api.StreamContext) {
- logger := ctx.GetLogger()
- dbDir, err := common.GetAndCreateDataLoc("sink")
- logger.Debugf("cache saved to %s", dbDir)
- if err != nil {
- c.drainError(err)
- }
- c.store = common.GetSimpleKVStore(path.Join(dbDir, "cache"))
- c.key = ctx.GetRuleId() + ctx.GetOpId() + strconv.Itoa(ctx.GetInstanceId())
- //load cache
- if err := c.loadCache(); err != nil {
- go c.drainError(err)
- return
- }
- ticker := common.GetTicker(c.saveInterval)
- for {
- select {
- case item := <-c.in:
- index := c.pending.Tail
- c.pending.append(item)
- //non blocking until limit exceeded
- c.Out <- &CacheTuple{
- index: index,
- data: item,
- }
- c.changed = true
- case index := <-c.Complete:
- c.pending.delete(index)
- c.changed = true
- case <-ticker.C:
- l := c.pending.length()
- if l == 0 {
- c.pending.reset()
- }
- //If the data is still changing, only do a save when the cache has more than threshold to prevent too much file IO
- //If the data is not changing in the time slot and have not saved before, save it. This is to prevent the
- //data won't be saved as the cache never pass the threshold
- if (c.changed && l > THRESHOLD) || (!c.changed && c.saved != l) {
- logger.Infof("save cache for rule %s", ctx.GetRuleId())
- clone := c.pending.clone()
- go func() {
- if err := c.saveCache(clone); err != nil {
- logger.Debugf("%v", err)
- c.drainError(err)
- }
- }()
- c.saved = l
- } else if c.changed {
- c.saved = 0
- }
- c.changed = false
- case <-ctx.Done():
- if c.changed {
- c.saveCache(c.pending)
- }
- return
- }
- }
- }
- func (c *Cache) loadCache() error {
- c.pending = &LinkedQueue{
- Data: make(map[int]interface{}),
- Tail: 0,
- }
- gob.Register(c.pending)
- err := c.store.Open()
- if err != nil && err != io.EOF {
- return err
- }
- defer c.store.Close()
- if err == nil {
- if t, f := c.store.Get(c.key); f {
- if mt, ok := t.(*LinkedQueue); ok {
- c.pending = mt
- // To store the keys in slice in sorted order
- var keys []int
- for k := range mt.Data {
- keys = append(keys, k)
- }
- sort.Ints(keys)
- for _, k := range keys {
- log.Debugf("send by cache %d", k)
- c.Out <- &CacheTuple{
- index: k,
- data: mt.Data[k],
- }
- }
- return nil
- } else {
- return fmt.Errorf("load malform cache, found %t(%v)", t, t)
- }
- }
- }
- return nil
- }
- func (c *Cache) saveCache(p *LinkedQueue) error {
- err := c.store.Open()
- if err != nil {
- return err
- }
- defer c.store.Close()
- return c.store.Replace(c.key, p)
- }
- func (c *Cache) drainError(err error) {
- c.errorCh <- err
- }
- func (c *Cache) Length() int {
- return c.pending.length()
- }
|