sink_cache.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package nodes
  2. import (
  3. "encoding/gob"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xstream/api"
  7. "github.com/prometheus/common/log"
  8. "io"
  9. "path"
  10. "sort"
  11. "strconv"
  12. )
  13. type CacheTuple struct {
  14. index int
  15. data interface{}
  16. }
  17. type LinkedQueue struct {
  18. Data map[int]interface{}
  19. Tail int
  20. }
  21. func (l *LinkedQueue) append(item interface{}) {
  22. l.Data[l.Tail] = item
  23. l.Tail++
  24. }
  25. func (l *LinkedQueue) delete(index int) {
  26. delete(l.Data, index)
  27. }
  28. func (l *LinkedQueue) reset() {
  29. l.Tail = 0
  30. }
  31. func (l *LinkedQueue) length() int {
  32. return len(l.Data)
  33. }
  34. type Cache struct {
  35. //Data and control channels
  36. in <-chan interface{}
  37. Out chan *CacheTuple
  38. Complete chan int
  39. errorCh chan<- error
  40. //states
  41. pending *LinkedQueue
  42. //serialize
  43. key string //the key for current cache
  44. store common.KeyValue
  45. changed bool
  46. //configs
  47. limit int
  48. saveInterval int
  49. }
  50. func NewCache(in <-chan interface{}, limit int, saveInterval int, errCh chan<- error, ctx api.StreamContext) *Cache {
  51. c := &Cache{
  52. in: in,
  53. Out: make(chan *CacheTuple, limit),
  54. Complete: make(chan int),
  55. errorCh: errCh,
  56. limit: limit,
  57. saveInterval: saveInterval,
  58. }
  59. go c.run(ctx)
  60. return c
  61. }
  62. func (c *Cache) run(ctx api.StreamContext) {
  63. logger := ctx.GetLogger()
  64. dbDir, err := common.GetAndCreateDataLoc("sink")
  65. logger.Debugf("cache saved to %s", dbDir)
  66. if err != nil {
  67. c.drainError(err)
  68. }
  69. c.store = common.GetSimpleKVStore(path.Join(dbDir, "cache"))
  70. c.key = ctx.GetRuleId() + ctx.GetOpId() + strconv.Itoa(ctx.GetInstanceId())
  71. //load cache
  72. if err := c.loadCache(); err != nil {
  73. go c.drainError(err)
  74. return
  75. }
  76. ticker := common.GetTicker(c.saveInterval)
  77. for {
  78. select {
  79. case item := <-c.in:
  80. index := c.pending.Tail
  81. c.pending.append(item)
  82. //non blocking until limit exceeded
  83. c.Out <- &CacheTuple{
  84. index: index,
  85. data: item,
  86. }
  87. c.changed = true
  88. case index := <-c.Complete:
  89. c.pending.delete(index)
  90. c.changed = true
  91. case <-ticker.C:
  92. if c.pending.length() == 0 {
  93. c.pending.reset()
  94. }
  95. if c.changed {
  96. logger.Debugf("save cache")
  97. go func() {
  98. if err := c.saveCache(); err != nil {
  99. logger.Debugf("%v", err)
  100. c.drainError(err)
  101. }
  102. }()
  103. c.changed = false
  104. }
  105. case <-ctx.Done():
  106. if c.changed {
  107. c.saveCache()
  108. }
  109. return
  110. }
  111. }
  112. }
  113. func (c *Cache) loadCache() error {
  114. c.pending = &LinkedQueue{
  115. Data: make(map[int]interface{}),
  116. Tail: 0,
  117. }
  118. gob.Register(c.pending)
  119. err := c.store.Open()
  120. if err != nil && err != io.EOF {
  121. return err
  122. }
  123. defer c.store.Close()
  124. if err == nil {
  125. if t, f := c.store.Get(c.key); f {
  126. if mt, ok := t.(*LinkedQueue); ok {
  127. c.pending = mt
  128. // To store the keys in slice in sorted order
  129. var keys []int
  130. for k := range mt.Data {
  131. keys = append(keys, k)
  132. }
  133. sort.Ints(keys)
  134. for _, k := range keys {
  135. log.Debugf("send by cache %d", k)
  136. c.Out <- &CacheTuple{
  137. index: k,
  138. data: mt.Data[k],
  139. }
  140. }
  141. return nil
  142. } else {
  143. return fmt.Errorf("load malform cache, found %t(%v)", t, t)
  144. }
  145. }
  146. }
  147. return nil
  148. }
  149. func (c *Cache) saveCache() error {
  150. err := c.store.Open()
  151. if err != nil {
  152. return err
  153. }
  154. defer c.store.Close()
  155. return c.store.Replace(c.key, c.pending)
  156. }
  157. func (c *Cache) drainError(err error) {
  158. c.errorCh <- err
  159. }
  160. func (c *Cache) Length() int {
  161. return c.pending.length()
  162. }