sink_cache.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. package nodes
  2. import (
  3. "encoding/gob"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xstream/api"
  7. "github.com/emqx/kuiper/xstream/checkpoints"
  8. "io"
  9. "path"
  10. "sort"
  11. "strconv"
  12. )
  13. type CacheTuple struct {
  14. index int
  15. data interface{}
  16. }
  17. type LinkedQueue struct {
  18. Data map[int]interface{}
  19. Tail int
  20. }
  21. func (l *LinkedQueue) append(item interface{}) {
  22. l.Data[l.Tail] = item
  23. l.Tail++
  24. }
  25. func (l *LinkedQueue) delete(index int) {
  26. delete(l.Data, index)
  27. }
  28. func (l *LinkedQueue) reset() {
  29. l.Tail = 0
  30. }
  31. func (l *LinkedQueue) length() int {
  32. return len(l.Data)
  33. }
  34. func (l *LinkedQueue) clone() *LinkedQueue {
  35. result := &LinkedQueue{
  36. Data: make(map[int]interface{}),
  37. Tail: l.Tail,
  38. }
  39. for k, v := range l.Data {
  40. result.Data[k] = v
  41. }
  42. return result
  43. }
  44. func (l *LinkedQueue) String() string {
  45. return fmt.Sprintf("tail: %d, data: %v", l.Tail, l.Data)
  46. }
  47. type Cache struct {
  48. //Data and control channels
  49. in <-chan interface{}
  50. Out chan *CacheTuple
  51. Complete chan int
  52. errorCh chan<- error
  53. //states
  54. pending *LinkedQueue
  55. changed bool
  56. //serialize
  57. key string //the key for current cache
  58. store common.KeyValue
  59. }
  60. func NewTimebasedCache(in <-chan interface{}, limit int, saveInterval int, errCh chan<- error, ctx api.StreamContext) *Cache {
  61. c := &Cache{
  62. in: in,
  63. Out: make(chan *CacheTuple, limit),
  64. Complete: make(chan int),
  65. errorCh: errCh,
  66. }
  67. go c.timebasedRun(ctx, saveInterval)
  68. return c
  69. }
  70. func (c *Cache) initStore(ctx api.StreamContext) {
  71. logger := ctx.GetLogger()
  72. c.pending = &LinkedQueue{
  73. Data: make(map[int]interface{}),
  74. Tail: 0,
  75. }
  76. if common.Config.Sink.DisableCache {
  77. logger.Infof("The cache is disabled, and skip the initialization of cache.")
  78. return
  79. }
  80. dbDir, err := common.GetDataLoc()
  81. logger.Debugf("cache saved to %s", dbDir)
  82. if err != nil {
  83. c.drainError(err)
  84. }
  85. c.store = common.GetSimpleKVStore(path.Join(dbDir, "sink", ctx.GetRuleId()))
  86. c.key = ctx.GetOpId() + strconv.Itoa(ctx.GetInstanceId())
  87. logger.Debugf("cache saved to key %s", c.key)
  88. //load cache
  89. if err := c.loadCache(); err != nil {
  90. go c.drainError(err)
  91. return
  92. }
  93. }
  94. func (c *Cache) timebasedRun(ctx api.StreamContext, saveInterval int) {
  95. logger := ctx.GetLogger()
  96. c.initStore(ctx)
  97. ticker := common.GetTicker(saveInterval)
  98. defer ticker.Stop()
  99. var tcount = 0
  100. for {
  101. select {
  102. case item := <-c.in:
  103. index := c.pending.Tail
  104. c.pending.append(item)
  105. //non blocking until limit exceeded
  106. c.Out <- &CacheTuple{
  107. index: index,
  108. data: item,
  109. }
  110. c.changed = true
  111. case index := <-c.Complete:
  112. c.pending.delete(index)
  113. c.changed = true
  114. case <-ticker.C:
  115. tcount++
  116. l := c.pending.length()
  117. if l == 0 {
  118. c.pending.reset()
  119. }
  120. //If the data is still changing, only do a save when the cache has more than threshold to prevent too much file IO
  121. //If the data is not changing in the time slot and have not saved before, save it. This is to prevent the
  122. //data won't be saved as the cache never pass the threshold
  123. //logger.Infof("ticker %t, l=%d\n", c.changed, l)
  124. if (c.changed && l > common.Config.Sink.CacheThreshold) || (tcount == common.Config.Sink.CacheTriggerCount && c.changed) {
  125. logger.Infof("save cache for rule %s, %s", ctx.GetRuleId(), c.pending.String())
  126. clone := c.pending.clone()
  127. c.changed = false
  128. go func() {
  129. if err := c.saveCache(logger, clone); err != nil {
  130. logger.Debugf("%v", err)
  131. c.drainError(err)
  132. }
  133. }()
  134. }
  135. if tcount >= common.Config.Sink.CacheThreshold {
  136. tcount = 0
  137. }
  138. case <-ctx.Done():
  139. err := c.saveCache(logger, c.pending)
  140. if err != nil {
  141. logger.Warnf("Error found during saving cache: %s \n ", err)
  142. }
  143. logger.Infof("sink node %s instance cache %d done", ctx.GetOpId(), ctx.GetInstanceId())
  144. return
  145. }
  146. }
  147. }
  148. func (c *Cache) loadCache() error {
  149. gob.Register(c.pending)
  150. err := c.store.Open()
  151. if err != nil && err != io.EOF {
  152. return err
  153. }
  154. defer c.store.Close()
  155. if err == nil {
  156. if t, f := c.store.Get(c.key); f {
  157. if mt, ok := t.(*LinkedQueue); ok {
  158. c.pending = mt
  159. c.changed = true
  160. // To store the keys in slice in sorted order
  161. var keys []int
  162. for k := range mt.Data {
  163. keys = append(keys, k)
  164. }
  165. sort.Ints(keys)
  166. for _, k := range keys {
  167. t := &CacheTuple{
  168. index: k,
  169. data: mt.Data[k],
  170. }
  171. c.Out <- t
  172. }
  173. return nil
  174. } else {
  175. return fmt.Errorf("load malform cache, found %t(%v)", t, t)
  176. }
  177. }
  178. }
  179. return nil
  180. }
  181. func (c *Cache) saveCache(logger api.Logger, p *LinkedQueue) error {
  182. if common.Config.Sink.DisableCache {
  183. return nil
  184. }
  185. err := c.store.Open()
  186. if err != nil {
  187. logger.Errorf("save cache error while opening cache store: %s", err)
  188. logger.Infof("clean the cache and reopen")
  189. c.store.Close()
  190. c.store.Clean()
  191. err = c.store.Open()
  192. if err != nil {
  193. logger.Errorf("save cache error after reset the cache store: %s", err)
  194. return err
  195. }
  196. }
  197. defer c.store.Close()
  198. return c.store.Replace(c.key, p)
  199. }
  200. func (c *Cache) drainError(err error) {
  201. c.errorCh <- err
  202. }
  203. func (c *Cache) Length() int {
  204. return c.pending.length()
  205. }
  206. func NewCheckpointbasedCache(in <-chan interface{}, limit int, tch <-chan struct{}, errCh chan<- error, ctx api.StreamContext) *Cache {
  207. c := &Cache{
  208. in: in,
  209. Out: make(chan *CacheTuple, limit),
  210. Complete: make(chan int),
  211. errorCh: errCh,
  212. }
  213. go c.checkpointbasedRun(ctx, tch)
  214. return c
  215. }
  216. func (c *Cache) checkpointbasedRun(ctx api.StreamContext, tch <-chan struct{}) {
  217. logger := ctx.GetLogger()
  218. c.initStore(ctx)
  219. for {
  220. select {
  221. case item := <-c.in:
  222. // possibility of barrier, ignore if found
  223. if boe, ok := item.(*checkpoints.BufferOrEvent); ok {
  224. if _, ok := boe.Data.(*checkpoints.Barrier); ok {
  225. c.Out <- &CacheTuple{
  226. data: item,
  227. }
  228. logger.Debugf("sink cache send out barrier %v", boe.Data)
  229. break
  230. }
  231. }
  232. index := c.pending.Tail
  233. c.pending.append(item)
  234. //non blocking until limit exceeded
  235. c.Out <- &CacheTuple{
  236. index: index,
  237. data: item,
  238. }
  239. logger.Debugf("sink cache send out tuple %v", item)
  240. c.changed = true
  241. case index := <-c.Complete:
  242. c.pending.delete(index)
  243. c.changed = true
  244. case <-tch:
  245. logger.Infof("save cache for rule %s, %s", ctx.GetRuleId(), c.pending.String())
  246. clone := c.pending.clone()
  247. if c.changed {
  248. go func() {
  249. if err := c.saveCache(logger, clone); err != nil {
  250. logger.Debugf("%v", err)
  251. c.drainError(err)
  252. }
  253. }()
  254. }
  255. c.changed = false
  256. case <-ctx.Done():
  257. logger.Infof("sink node %s instance cache %d done", ctx.GetOpId(), ctx.GetInstanceId())
  258. return
  259. }
  260. }
  261. }