sink_cache.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. package nodes
  2. import (
  3. "encoding/gob"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xstream/api"
  7. "github.com/prometheus/common/log"
  8. "io"
  9. "path"
  10. "sort"
  11. "strconv"
  12. )
  13. type CacheTuple struct {
  14. index int
  15. data interface{}
  16. }
  17. type LinkedQueue struct {
  18. Data map[int]interface{}
  19. Tail int
  20. }
  21. func (l *LinkedQueue) append(item interface{}) {
  22. l.Data[l.Tail] = item
  23. l.Tail++
  24. }
  25. func (l *LinkedQueue) delete(index int) {
  26. delete(l.Data, index)
  27. }
  28. func (l *LinkedQueue) reset() {
  29. l.Tail = 0
  30. }
  31. func (l *LinkedQueue) length() int {
  32. return len(l.Data)
  33. }
  34. func (l *LinkedQueue) clone() *LinkedQueue {
  35. result := &LinkedQueue{
  36. Data: make(map[int]interface{}),
  37. Tail: l.Tail,
  38. }
  39. for k, v := range l.Data {
  40. result.Data[k] = v
  41. }
  42. return result
  43. }
  44. type Cache struct {
  45. //Data and control channels
  46. in <-chan interface{}
  47. Out chan *CacheTuple
  48. Complete chan int
  49. errorCh chan<- error
  50. //states
  51. pending *LinkedQueue
  52. //serialize
  53. key string //the key for current cache
  54. store common.KeyValue
  55. changed bool
  56. saved int
  57. //configs
  58. limit int
  59. saveInterval int
  60. }
  61. const THRESHOLD int = 10
  62. func NewCache(in <-chan interface{}, limit int, saveInterval int, errCh chan<- error, ctx api.StreamContext) *Cache {
  63. c := &Cache{
  64. in: in,
  65. Out: make(chan *CacheTuple, limit),
  66. Complete: make(chan int),
  67. errorCh: errCh,
  68. limit: limit,
  69. saveInterval: saveInterval,
  70. }
  71. go c.run(ctx)
  72. return c
  73. }
  74. func (c *Cache) run(ctx api.StreamContext) {
  75. logger := ctx.GetLogger()
  76. dbDir, err := common.GetAndCreateDataLoc("sink")
  77. logger.Debugf("cache saved to %s", dbDir)
  78. if err != nil {
  79. c.drainError(err)
  80. }
  81. c.store = common.GetSimpleKVStore(path.Join(dbDir, "cache"))
  82. c.key = ctx.GetRuleId() + ctx.GetOpId() + strconv.Itoa(ctx.GetInstanceId())
  83. //load cache
  84. if err := c.loadCache(); err != nil {
  85. go c.drainError(err)
  86. return
  87. }
  88. ticker := common.GetTicker(c.saveInterval)
  89. for {
  90. select {
  91. case item := <-c.in:
  92. index := c.pending.Tail
  93. c.pending.append(item)
  94. //non blocking until limit exceeded
  95. c.Out <- &CacheTuple{
  96. index: index,
  97. data: item,
  98. }
  99. c.changed = true
  100. case index := <-c.Complete:
  101. c.pending.delete(index)
  102. c.changed = true
  103. case <-ticker.C:
  104. l := c.pending.length()
  105. if l == 0 {
  106. c.pending.reset()
  107. }
  108. //If the data is still changing, only do a save when the cache has more than threshold to prevent too much file IO
  109. //If the data is not changing in the time slot and have not saved before, save it. This is to prevent the
  110. //data won't be saved as the cache never pass the threshold
  111. if (c.changed && l > THRESHOLD) || (!c.changed && c.saved != l) {
  112. logger.Infof("save cache for rule %s", ctx.GetRuleId())
  113. clone := c.pending.clone()
  114. go func() {
  115. if err := c.saveCache(clone); err != nil {
  116. logger.Debugf("%v", err)
  117. c.drainError(err)
  118. }
  119. }()
  120. c.saved = l
  121. } else if c.changed {
  122. c.saved = 0
  123. }
  124. c.changed = false
  125. case <-ctx.Done():
  126. if c.changed {
  127. c.saveCache(c.pending)
  128. }
  129. logger.Infof("sink node %s instance cache %d done", ctx.GetOpId(), ctx.GetInstanceId())
  130. return
  131. }
  132. }
  133. }
  134. func (c *Cache) loadCache() error {
  135. c.pending = &LinkedQueue{
  136. Data: make(map[int]interface{}),
  137. Tail: 0,
  138. }
  139. gob.Register(c.pending)
  140. err := c.store.Open()
  141. if err != nil && err != io.EOF {
  142. return err
  143. }
  144. defer c.store.Close()
  145. if err == nil {
  146. if t, f := c.store.Get(c.key); f {
  147. if mt, ok := t.(*LinkedQueue); ok {
  148. c.pending = mt
  149. // To store the keys in slice in sorted order
  150. var keys []int
  151. for k := range mt.Data {
  152. keys = append(keys, k)
  153. }
  154. sort.Ints(keys)
  155. for _, k := range keys {
  156. log.Debugf("send by cache %d", k)
  157. c.Out <- &CacheTuple{
  158. index: k,
  159. data: mt.Data[k],
  160. }
  161. }
  162. return nil
  163. } else {
  164. return fmt.Errorf("load malform cache, found %t(%v)", t, t)
  165. }
  166. }
  167. }
  168. return nil
  169. }
  170. func (c *Cache) saveCache(p *LinkedQueue) error {
  171. err := c.store.Open()
  172. if err != nil {
  173. return err
  174. }
  175. defer c.store.Close()
  176. return c.store.Replace(c.key, p)
  177. }
  178. func (c *Cache) drainError(err error) {
  179. c.errorCh <- err
  180. }
  181. func (c *Cache) Length() int {
  182. return c.pending.length()
  183. }