sink_cache.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. package nodes
  2. import (
  3. "encoding/gob"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/common/kv"
  7. "github.com/emqx/kuiper/xstream/api"
  8. "github.com/emqx/kuiper/xstream/checkpoints"
  9. "io"
  10. "path"
  11. "sort"
  12. "strconv"
  13. )
  14. type CacheTuple struct {
  15. index int
  16. data interface{}
  17. }
  18. type LinkedQueue struct {
  19. Data map[int]interface{}
  20. Tail int
  21. }
  22. func (l *LinkedQueue) append(item interface{}) {
  23. l.Data[l.Tail] = item
  24. l.Tail++
  25. }
  26. func (l *LinkedQueue) delete(index int) {
  27. delete(l.Data, index)
  28. }
  29. func (l *LinkedQueue) reset() {
  30. l.Tail = 0
  31. }
  32. func (l *LinkedQueue) length() int {
  33. return len(l.Data)
  34. }
  35. func (l *LinkedQueue) clone() *LinkedQueue {
  36. result := &LinkedQueue{
  37. Data: make(map[int]interface{}),
  38. Tail: l.Tail,
  39. }
  40. for k, v := range l.Data {
  41. result.Data[k] = v
  42. }
  43. return result
  44. }
  45. func (l *LinkedQueue) String() string {
  46. return fmt.Sprintf("tail: %d, data: %v", l.Tail, l.Data)
  47. }
  48. type Cache struct {
  49. //Data and control channels
  50. in <-chan interface{}
  51. Out chan *CacheTuple
  52. Complete chan int
  53. errorCh chan<- error
  54. //states
  55. pending *LinkedQueue
  56. changed bool
  57. //serialize
  58. key string //the key for current cache
  59. store kv.KeyValue
  60. }
  61. func NewTimebasedCache(in <-chan interface{}, limit int, saveInterval int, errCh chan<- error, ctx api.StreamContext) *Cache {
  62. c := &Cache{
  63. in: in,
  64. Out: make(chan *CacheTuple, limit),
  65. Complete: make(chan int),
  66. errorCh: errCh,
  67. }
  68. go c.timebasedRun(ctx, saveInterval)
  69. return c
  70. }
  71. func (c *Cache) initStore(ctx api.StreamContext) {
  72. logger := ctx.GetLogger()
  73. c.pending = &LinkedQueue{
  74. Data: make(map[int]interface{}),
  75. Tail: 0,
  76. }
  77. dbDir, err := common.GetDataLoc()
  78. logger.Debugf("cache saved to %s", dbDir)
  79. if err != nil {
  80. c.drainError(err)
  81. }
  82. c.store = kv.GetDefaultKVStore(path.Join(dbDir, "sink", ctx.GetRuleId()))
  83. c.key = ctx.GetOpId() + strconv.Itoa(ctx.GetInstanceId())
  84. logger.Debugf("cache saved to key %s", c.key)
  85. //load cache
  86. if err := c.loadCache(); err != nil {
  87. go c.drainError(err)
  88. return
  89. }
  90. }
  91. func (c *Cache) timebasedRun(ctx api.StreamContext, saveInterval int) {
  92. logger := ctx.GetLogger()
  93. c.initStore(ctx)
  94. ticker := common.GetTicker(saveInterval)
  95. defer ticker.Stop()
  96. var tcount = 0
  97. for {
  98. select {
  99. case item := <-c.in:
  100. index := c.pending.Tail
  101. c.pending.append(item)
  102. //non blocking until limit exceeded
  103. c.Out <- &CacheTuple{
  104. index: index,
  105. data: item,
  106. }
  107. c.changed = true
  108. case index := <-c.Complete:
  109. c.pending.delete(index)
  110. c.changed = true
  111. case <-ticker.C:
  112. tcount++
  113. l := c.pending.length()
  114. if l == 0 {
  115. c.pending.reset()
  116. }
  117. //If the data is still changing, only do a save when the cache has more than threshold to prevent too much file IO
  118. //If the data is not changing in the time slot and have not saved before, save it. This is to prevent the
  119. //data won't be saved as the cache never pass the threshold
  120. //logger.Infof("ticker %t, l=%d\n", c.changed, l)
  121. if (c.changed && l > common.Config.Sink.CacheThreshold) || (tcount == common.Config.Sink.CacheTriggerCount && c.changed) {
  122. logger.Infof("save cache for rule %s, %s", ctx.GetRuleId(), c.pending.String())
  123. clone := c.pending.clone()
  124. c.changed = false
  125. go func() {
  126. if err := c.saveCache(logger, clone); err != nil {
  127. logger.Debugf("%v", err)
  128. c.drainError(err)
  129. }
  130. }()
  131. }
  132. if tcount >= common.Config.Sink.CacheThreshold {
  133. tcount = 0
  134. }
  135. case <-ctx.Done():
  136. err := c.saveCache(logger, c.pending)
  137. if err != nil {
  138. logger.Warnf("Error found during saving cache: %s \n ", err)
  139. }
  140. logger.Infof("sink node %s instance cache %d done", ctx.GetOpId(), ctx.GetInstanceId())
  141. return
  142. }
  143. }
  144. }
  145. func (c *Cache) loadCache() error {
  146. gob.Register(c.pending)
  147. err := c.store.Open()
  148. if err != nil && err != io.EOF {
  149. return err
  150. }
  151. defer c.store.Close()
  152. if err == nil {
  153. mt := new(LinkedQueue)
  154. if f, err := c.store.Get(c.key, &mt); f {
  155. if nil != err {
  156. return fmt.Errorf("load malform cache, found %v(%v)", c.key, mt)
  157. }
  158. c.pending = mt
  159. c.changed = true
  160. // To store the keys in slice in sorted order
  161. var keys []int
  162. for k := range mt.Data {
  163. keys = append(keys, k)
  164. }
  165. sort.Ints(keys)
  166. for _, k := range keys {
  167. t := &CacheTuple{
  168. index: k,
  169. data: mt.Data[k],
  170. }
  171. c.Out <- t
  172. }
  173. return nil
  174. }
  175. }
  176. return nil
  177. }
  178. func (c *Cache) saveCache(logger api.Logger, p *LinkedQueue) error {
  179. err := c.store.Open()
  180. if err != nil {
  181. logger.Errorf("save cache error while opening cache store: %s", err)
  182. logger.Infof("clean the cache and reopen")
  183. c.store.Close()
  184. c.store.Clean()
  185. err = c.store.Open()
  186. if err != nil {
  187. logger.Errorf("save cache error after reset the cache store: %s", err)
  188. return err
  189. }
  190. }
  191. defer c.store.Close()
  192. return c.store.Set(c.key, p)
  193. }
  194. func (c *Cache) drainError(err error) {
  195. c.errorCh <- err
  196. }
  197. func (c *Cache) Length() int {
  198. return c.pending.length()
  199. }
  200. func NewCheckpointbasedCache(in <-chan interface{}, limit int, tch <-chan struct{}, errCh chan<- error, ctx api.StreamContext) *Cache {
  201. c := &Cache{
  202. in: in,
  203. Out: make(chan *CacheTuple, limit),
  204. Complete: make(chan int),
  205. errorCh: errCh,
  206. }
  207. go c.checkpointbasedRun(ctx, tch)
  208. return c
  209. }
  210. func (c *Cache) checkpointbasedRun(ctx api.StreamContext, tch <-chan struct{}) {
  211. logger := ctx.GetLogger()
  212. c.initStore(ctx)
  213. for {
  214. select {
  215. case item := <-c.in:
  216. // possibility of barrier, ignore if found
  217. if boe, ok := item.(*checkpoints.BufferOrEvent); ok {
  218. if _, ok := boe.Data.(*checkpoints.Barrier); ok {
  219. c.Out <- &CacheTuple{
  220. data: item,
  221. }
  222. logger.Debugf("sink cache send out barrier %v", boe.Data)
  223. break
  224. }
  225. }
  226. index := c.pending.Tail
  227. c.pending.append(item)
  228. //non blocking until limit exceeded
  229. c.Out <- &CacheTuple{
  230. index: index,
  231. data: item,
  232. }
  233. logger.Debugf("sink cache send out tuple %v", item)
  234. c.changed = true
  235. case index := <-c.Complete:
  236. c.pending.delete(index)
  237. c.changed = true
  238. case <-tch:
  239. logger.Infof("save cache for rule %s, %s", ctx.GetRuleId(), c.pending.String())
  240. clone := c.pending.clone()
  241. if c.changed {
  242. go func() {
  243. if err := c.saveCache(logger, clone); err != nil {
  244. logger.Debugf("%v", err)
  245. c.drainError(err)
  246. }
  247. }()
  248. }
  249. c.changed = false
  250. case <-ctx.Done():
  251. logger.Infof("sink node %s instance cache %d done", ctx.GetOpId(), ctx.GetInstanceId())
  252. return
  253. }
  254. }
  255. }