sink_cache.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package node
  15. import (
  16. "encoding/gob"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/internal/topo/checkpoint"
  20. "github.com/lf-edge/ekuiper/pkg/api"
  21. "github.com/lf-edge/ekuiper/pkg/kv"
  22. "path"
  23. "sort"
  24. "strconv"
  25. )
  26. type CacheTuple struct {
  27. index int
  28. data interface{}
  29. }
  30. type LinkedQueue struct {
  31. Data map[int]interface{}
  32. Tail int
  33. }
  34. func (l *LinkedQueue) append(item interface{}) {
  35. l.Data[l.Tail] = item
  36. l.Tail++
  37. }
  38. func (l *LinkedQueue) delete(index int) {
  39. delete(l.Data, index)
  40. }
  41. func (l *LinkedQueue) reset() {
  42. l.Tail = 0
  43. }
  44. func (l *LinkedQueue) length() int {
  45. return len(l.Data)
  46. }
  47. func (l *LinkedQueue) clone() *LinkedQueue {
  48. result := &LinkedQueue{
  49. Data: make(map[int]interface{}),
  50. Tail: l.Tail,
  51. }
  52. for k, v := range l.Data {
  53. result.Data[k] = v
  54. }
  55. return result
  56. }
  57. func (l *LinkedQueue) String() string {
  58. return fmt.Sprintf("tail: %d, data: %v", l.Tail, l.Data)
  59. }
  60. type Cache struct {
  61. //Data and control channels
  62. in <-chan interface{}
  63. Out chan *CacheTuple
  64. Complete chan int
  65. errorCh chan<- error
  66. //states
  67. pending *LinkedQueue
  68. changed bool
  69. //serialize
  70. key string //the key for current cache
  71. store kv.KeyValue
  72. }
  73. func NewTimebasedCache(in <-chan interface{}, limit int, saveInterval int, errCh chan<- error, ctx api.StreamContext) *Cache {
  74. c := &Cache{
  75. in: in,
  76. Out: make(chan *CacheTuple, limit),
  77. Complete: make(chan int),
  78. errorCh: errCh,
  79. }
  80. go c.timebasedRun(ctx, saveInterval)
  81. return c
  82. }
  83. func (c *Cache) initStore(ctx api.StreamContext) {
  84. logger := ctx.GetLogger()
  85. c.pending = &LinkedQueue{
  86. Data: make(map[int]interface{}),
  87. Tail: 0,
  88. }
  89. dbDir, err := conf.GetDataLoc()
  90. logger.Debugf("cache saved to %s", dbDir)
  91. if err != nil {
  92. c.drainError(err)
  93. }
  94. err, c.store = kv.GetKVStore(path.Join("sink", ctx.GetRuleId()))
  95. if err != nil {
  96. c.drainError(err)
  97. }
  98. c.key = ctx.GetOpId() + strconv.Itoa(ctx.GetInstanceId())
  99. logger.Debugf("cache saved to key %s", c.key)
  100. //load cache
  101. if err := c.loadCache(); err != nil {
  102. go c.drainError(err)
  103. return
  104. }
  105. }
  106. func (c *Cache) timebasedRun(ctx api.StreamContext, saveInterval int) {
  107. logger := ctx.GetLogger()
  108. c.initStore(ctx)
  109. ticker := conf.GetTicker(saveInterval)
  110. defer ticker.Stop()
  111. var tcount = 0
  112. for {
  113. select {
  114. case item := <-c.in:
  115. index := c.pending.Tail
  116. c.pending.append(item)
  117. //non blocking until limit exceeded
  118. c.Out <- &CacheTuple{
  119. index: index,
  120. data: item,
  121. }
  122. c.changed = true
  123. case index := <-c.Complete:
  124. c.pending.delete(index)
  125. c.changed = true
  126. case <-ticker.C:
  127. tcount++
  128. l := c.pending.length()
  129. if l == 0 {
  130. c.pending.reset()
  131. }
  132. //If the data is still changing, only do a save when the cache has more than threshold to prevent too much file IO
  133. //If the data is not changing in the time slot and have not saved before, save it. This is to prevent the
  134. //data won't be saved as the cache never pass the threshold
  135. //logger.Infof("ticker %t, l=%d\n", c.changed, l)
  136. if (c.changed && l > conf.Config.Sink.CacheThreshold) || (tcount == conf.Config.Sink.CacheTriggerCount && c.changed) {
  137. logger.Infof("save cache for rule %s, %s", ctx.GetRuleId(), c.pending.String())
  138. clone := c.pending.clone()
  139. c.changed = false
  140. go func() {
  141. if err := c.saveCache(logger, clone); err != nil {
  142. logger.Debugf("%v", err)
  143. c.drainError(err)
  144. }
  145. }()
  146. }
  147. if tcount >= conf.Config.Sink.CacheThreshold {
  148. tcount = 0
  149. }
  150. case <-ctx.Done():
  151. err := c.saveCache(logger, c.pending)
  152. if err != nil {
  153. logger.Warnf("Error found during saving cache: %s \n ", err)
  154. }
  155. logger.Infof("sink node %s instance cache %d done", ctx.GetOpId(), ctx.GetInstanceId())
  156. return
  157. }
  158. }
  159. }
  160. func (c *Cache) loadCache() error {
  161. gob.Register(c.pending)
  162. mt := new(LinkedQueue)
  163. if f, err := c.store.Get(c.key, &mt); f {
  164. if nil != err {
  165. return fmt.Errorf("load malform cache, found %v(%v)", c.key, mt)
  166. }
  167. c.pending = mt
  168. c.changed = true
  169. // To store the keys in slice in sorted order
  170. var keys []int
  171. for k := range mt.Data {
  172. keys = append(keys, k)
  173. }
  174. sort.Ints(keys)
  175. for _, k := range keys {
  176. t := &CacheTuple{
  177. index: k,
  178. data: mt.Data[k],
  179. }
  180. c.Out <- t
  181. }
  182. return nil
  183. }
  184. return nil
  185. }
  186. func (c *Cache) saveCache(logger api.Logger, p *LinkedQueue) error {
  187. logger.Infof("clean the cache and reopen")
  188. c.store.Clean()
  189. return c.store.Set(c.key, p)
  190. }
  191. func (c *Cache) drainError(err error) {
  192. c.errorCh <- err
  193. }
  194. func (c *Cache) Length() int {
  195. return c.pending.length()
  196. }
  197. func NewCheckpointbasedCache(in <-chan interface{}, limit int, tch <-chan struct{}, errCh chan<- error, ctx api.StreamContext) *Cache {
  198. c := &Cache{
  199. in: in,
  200. Out: make(chan *CacheTuple, limit),
  201. Complete: make(chan int),
  202. errorCh: errCh,
  203. }
  204. go c.checkpointbasedRun(ctx, tch)
  205. return c
  206. }
  207. func (c *Cache) checkpointbasedRun(ctx api.StreamContext, tch <-chan struct{}) {
  208. logger := ctx.GetLogger()
  209. c.initStore(ctx)
  210. for {
  211. select {
  212. case item := <-c.in:
  213. // possibility of barrier, ignore if found
  214. if boe, ok := item.(*checkpoint.BufferOrEvent); ok {
  215. if _, ok := boe.Data.(*checkpoint.Barrier); ok {
  216. c.Out <- &CacheTuple{
  217. data: item,
  218. }
  219. logger.Debugf("sink cache send out barrier %v", boe.Data)
  220. break
  221. }
  222. }
  223. index := c.pending.Tail
  224. c.pending.append(item)
  225. //non blocking until limit exceeded
  226. c.Out <- &CacheTuple{
  227. index: index,
  228. data: item,
  229. }
  230. logger.Debugf("sink cache send out tuple %v", item)
  231. c.changed = true
  232. case index := <-c.Complete:
  233. c.pending.delete(index)
  234. c.changed = true
  235. case <-tch:
  236. logger.Infof("save cache for rule %s, %s", ctx.GetRuleId(), c.pending.String())
  237. clone := c.pending.clone()
  238. if c.changed {
  239. go func() {
  240. if err := c.saveCache(logger, clone); err != nil {
  241. logger.Debugf("%v", err)
  242. c.drainError(err)
  243. }
  244. }()
  245. }
  246. c.changed = false
  247. case <-ctx.Done():
  248. logger.Infof("sink node %s instance cache %d done", ctx.GetOpId(), ctx.GetInstanceId())
  249. return
  250. }
  251. }
  252. }