sink_cache.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package node
  15. import (
  16. "encoding/gob"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/internal/pkg/store"
  20. "github.com/lf-edge/ekuiper/internal/topo/checkpoint"
  21. "github.com/lf-edge/ekuiper/pkg/api"
  22. "github.com/lf-edge/ekuiper/pkg/infra"
  23. "github.com/lf-edge/ekuiper/pkg/kv"
  24. "path"
  25. "sort"
  26. "strconv"
  27. )
  28. type CacheTuple struct {
  29. index int
  30. data interface{}
  31. }
  32. type LinkedQueue struct {
  33. Data map[int]interface{}
  34. Tail int
  35. }
  36. func (l *LinkedQueue) append(item interface{}) {
  37. l.Data[l.Tail] = item
  38. l.Tail++
  39. }
  40. func (l *LinkedQueue) delete(index int) {
  41. delete(l.Data, index)
  42. }
  43. func (l *LinkedQueue) reset() {
  44. l.Tail = 0
  45. }
  46. func (l *LinkedQueue) length() int {
  47. return len(l.Data)
  48. }
  49. func (l *LinkedQueue) clone() *LinkedQueue {
  50. result := &LinkedQueue{
  51. Data: make(map[int]interface{}),
  52. Tail: l.Tail,
  53. }
  54. for k, v := range l.Data {
  55. result.Data[k] = v
  56. }
  57. return result
  58. }
  59. func (l *LinkedQueue) String() string {
  60. return fmt.Sprintf("tail: %d, data: %v", l.Tail, l.Data)
  61. }
  62. type Cache struct {
  63. //Data and control channels
  64. in <-chan interface{}
  65. Out chan *CacheTuple
  66. Complete chan int
  67. errorCh chan<- error
  68. //states
  69. pending *LinkedQueue
  70. changed bool
  71. //serialize
  72. key string //the key for current cache
  73. store kv.KeyValue
  74. }
  75. func NewTimebasedCache(in <-chan interface{}, limit int, saveInterval int, errCh chan<- error, ctx api.StreamContext) *Cache {
  76. c := &Cache{
  77. in: in,
  78. Out: make(chan *CacheTuple, limit),
  79. Complete: make(chan int),
  80. errorCh: errCh,
  81. }
  82. go func() {
  83. err := infra.SafeRun(func() error {
  84. c.timebasedRun(ctx, saveInterval)
  85. return nil
  86. })
  87. if err != nil {
  88. infra.DrainError(ctx, err, errCh)
  89. }
  90. }()
  91. return c
  92. }
  93. func (c *Cache) initStore(ctx api.StreamContext) {
  94. logger := ctx.GetLogger()
  95. c.pending = &LinkedQueue{
  96. Data: make(map[int]interface{}),
  97. Tail: 0,
  98. }
  99. var err error
  100. err, c.store = store.GetKV(path.Join("sink", ctx.GetRuleId()))
  101. if err != nil {
  102. infra.DrainError(ctx, err, c.errorCh)
  103. return
  104. }
  105. c.key = ctx.GetOpId() + strconv.Itoa(ctx.GetInstanceId())
  106. logger.Debugf("cache saved to key %s", c.key)
  107. //load cache
  108. if err := c.loadCache(); err != nil {
  109. infra.DrainError(ctx, err, c.errorCh)
  110. return
  111. }
  112. }
  113. func (c *Cache) timebasedRun(ctx api.StreamContext, saveInterval int) {
  114. logger := ctx.GetLogger()
  115. c.initStore(ctx)
  116. ticker := conf.GetTicker(saveInterval)
  117. defer ticker.Stop()
  118. var tcount = 0
  119. for {
  120. select {
  121. case item := <-c.in:
  122. index := c.pending.Tail
  123. c.pending.append(item)
  124. //non blocking until limit exceeded
  125. c.Out <- &CacheTuple{
  126. index: index,
  127. data: item,
  128. }
  129. c.changed = true
  130. case index := <-c.Complete:
  131. c.pending.delete(index)
  132. c.changed = true
  133. case <-ticker.C:
  134. tcount++
  135. l := c.pending.length()
  136. if l == 0 {
  137. c.pending.reset()
  138. }
  139. //If the data is still changing, only do a save when the cache has more than threshold to prevent too much file IO
  140. //If the data is not changing in the time slot and have not saved before, save it. This is to prevent the
  141. //data won't be saved as the cache never pass the threshold
  142. //logger.Infof("ticker %t, l=%d\n", c.changed, l)
  143. if (c.changed && l > conf.Config.Sink.CacheThreshold) || (tcount == conf.Config.Sink.CacheTriggerCount && c.changed) {
  144. logger.Infof("save cache for rule %s, %s", ctx.GetRuleId(), c.pending.String())
  145. clone := c.pending.clone()
  146. c.changed = false
  147. go func() {
  148. err := infra.SafeRun(func() error {
  149. return c.saveCache(logger, clone)
  150. })
  151. if err != nil {
  152. logger.Debugf("%v", err)
  153. infra.DrainError(ctx, err, c.errorCh)
  154. return
  155. }
  156. }()
  157. }
  158. if tcount >= conf.Config.Sink.CacheThreshold {
  159. tcount = 0
  160. }
  161. case <-ctx.Done():
  162. err := c.saveCache(logger, c.pending)
  163. if err != nil {
  164. logger.Warnf("Error found during saving cache: %s \n ", err)
  165. }
  166. logger.Infof("sink node %s instance cache %d done", ctx.GetOpId(), ctx.GetInstanceId())
  167. return
  168. }
  169. }
  170. }
  171. func (c *Cache) loadCache() error {
  172. gob.Register(c.pending)
  173. mt := new(LinkedQueue)
  174. if f, err := c.store.Get(c.key, &mt); f {
  175. if nil != err {
  176. return fmt.Errorf("load malform cache, found %v(%v)", c.key, mt)
  177. }
  178. c.pending = mt
  179. c.changed = true
  180. // To store the keys in slice in sorted order
  181. var keys []int
  182. for k := range mt.Data {
  183. keys = append(keys, k)
  184. }
  185. sort.Ints(keys)
  186. for _, k := range keys {
  187. t := &CacheTuple{
  188. index: k,
  189. data: mt.Data[k],
  190. }
  191. c.Out <- t
  192. }
  193. return nil
  194. }
  195. return nil
  196. }
  197. func (c *Cache) saveCache(logger api.Logger, p *LinkedQueue) error {
  198. logger.Infof("clean the cache and reopen")
  199. _ = c.store.Delete(c.key)
  200. return c.store.Set(c.key, p)
  201. }
  202. func (c *Cache) Length() int {
  203. return c.pending.length()
  204. }
  205. func NewCheckpointbasedCache(in <-chan interface{}, limit int, tch <-chan struct{}, errCh chan<- error, ctx api.StreamContext) *Cache {
  206. c := &Cache{
  207. in: in,
  208. Out: make(chan *CacheTuple, limit),
  209. Complete: make(chan int),
  210. errorCh: errCh,
  211. }
  212. go func() {
  213. err := infra.SafeRun(func() error {
  214. c.checkpointbasedRun(ctx, tch)
  215. return nil
  216. })
  217. if err != nil {
  218. infra.DrainError(ctx, err, c.errorCh)
  219. }
  220. }()
  221. return c
  222. }
  223. func (c *Cache) checkpointbasedRun(ctx api.StreamContext, tch <-chan struct{}) {
  224. logger := ctx.GetLogger()
  225. c.initStore(ctx)
  226. for {
  227. select {
  228. case item := <-c.in:
  229. // possibility of barrier, ignore if found
  230. if boe, ok := item.(*checkpoint.BufferOrEvent); ok {
  231. if _, ok := boe.Data.(*checkpoint.Barrier); ok {
  232. c.Out <- &CacheTuple{
  233. data: item,
  234. }
  235. logger.Debugf("sink cache send out barrier %v", boe.Data)
  236. break
  237. }
  238. }
  239. index := c.pending.Tail
  240. c.pending.append(item)
  241. //non blocking until limit exceeded
  242. c.Out <- &CacheTuple{
  243. index: index,
  244. data: item,
  245. }
  246. logger.Debugf("sink cache send out tuple %v", item)
  247. c.changed = true
  248. case index := <-c.Complete:
  249. c.pending.delete(index)
  250. c.changed = true
  251. case <-tch:
  252. logger.Infof("save cache for rule %s, %s", ctx.GetRuleId(), c.pending.String())
  253. clone := c.pending.clone()
  254. if c.changed {
  255. go func() {
  256. err := infra.SafeRun(func() error {
  257. return c.saveCache(logger, clone)
  258. })
  259. if err != nil {
  260. logger.Debugf("%v", err)
  261. infra.DrainError(ctx, err, c.errorCh)
  262. }
  263. }()
  264. }
  265. c.changed = false
  266. case <-ctx.Done():
  267. logger.Infof("sink node %s instance cache %d done", ctx.GetOpId(), ctx.GetInstanceId())
  268. return
  269. }
  270. }
  271. }