sync_cache.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package cache
  15. import (
  16. "github.com/lf-edge/ekuiper/internal/conf"
  17. "github.com/lf-edge/ekuiper/internal/pkg/store"
  18. "github.com/lf-edge/ekuiper/internal/topo/checkpoint"
  19. "github.com/lf-edge/ekuiper/internal/topo/node/metric"
  20. "github.com/lf-edge/ekuiper/pkg/api"
  21. "github.com/lf-edge/ekuiper/pkg/infra"
  22. "github.com/lf-edge/ekuiper/pkg/kv"
  23. "path"
  24. "sort"
  25. "strconv"
  26. "time"
  27. )
  28. type AckResult bool
  29. // page Rotate storage for in memory cache
  30. // Not thread safe!
  31. type page struct {
  32. Data []interface{}
  33. H int
  34. T int
  35. L int
  36. Size int
  37. }
  38. // newPage create a new cache page
  39. // TODO the page is created even not used, need dynamic?
  40. func newPage(size int) *page {
  41. return &page{
  42. Data: make([]interface{}, size),
  43. H: 0, // When deleting, head++, if tail == head, it is empty
  44. T: 0, // When append, tail++, if tail== head, it is full
  45. Size: size,
  46. }
  47. }
  48. // append item if list is not full and return true; otherwise return false
  49. func (p *page) append(item interface{}) bool {
  50. if p.L == p.Size { // full
  51. return false
  52. }
  53. p.Data[p.T] = item
  54. p.T++
  55. if p.T == p.Size {
  56. p.T = 0
  57. }
  58. p.L++
  59. return true
  60. }
  61. // peak get the first item in the cache
  62. func (p *page) peak() (interface{}, bool) {
  63. if p.L == 0 {
  64. return nil, false
  65. }
  66. return p.Data[p.H], true
  67. }
  68. func (p *page) delete() bool {
  69. if p.L == 0 {
  70. return false
  71. }
  72. p.H++
  73. if p.H == p.Size {
  74. p.H = 0
  75. }
  76. p.L--
  77. return true
  78. }
  79. func (p *page) isEmpty() bool {
  80. return p.L == 0
  81. }
  82. func (p *page) reset() {
  83. p.H = 0
  84. p.T = 0
  85. p.L = 0
  86. }
  87. type SyncCache struct {
  88. // The input data to the cache
  89. in <-chan interface{}
  90. Out chan interface{}
  91. Ack chan bool
  92. cacheCtrl chan interface{} // CacheCtrl is the only place to control the cache; sync in and ack result
  93. errorCh chan<- error
  94. stats metric.StatManager
  95. // cache config
  96. cacheConf *conf.SinkConf
  97. maxDiskPage int
  98. // cache storage
  99. memCache []*page
  100. diskBufferPage *page
  101. // status
  102. memSize int // how many pages in memory has been saved
  103. diskSize int // how many pages has been saved
  104. cacheLength int //readonly, for metrics only to save calculation
  105. diskPageTail int // init from the database
  106. diskPageHead int
  107. pending bool
  108. //serialize
  109. store kv.KeyValue
  110. }
  111. func NewSyncCache(ctx api.StreamContext, in <-chan interface{}, errCh chan<- error, stats metric.StatManager, cacheConf *conf.SinkConf, bufferLength int) *SyncCache {
  112. c := &SyncCache{
  113. cacheConf: cacheConf,
  114. in: in,
  115. Out: make(chan interface{}, bufferLength),
  116. Ack: make(chan bool, 10),
  117. cacheCtrl: make(chan interface{}, 10),
  118. errorCh: errCh,
  119. memCache: make([]*page, 0, cacheConf.MemoryCacheThreshold/cacheConf.BufferPageSize),
  120. // add one more slot so that there will be at least one slot between head and tail to find out the head/tail id
  121. maxDiskPage: (cacheConf.MaxDiskCache / cacheConf.BufferPageSize) + 1,
  122. stats: stats,
  123. }
  124. go func() {
  125. err := infra.SafeRun(func() error {
  126. c.run(ctx)
  127. return nil
  128. })
  129. if err != nil {
  130. infra.DrainError(ctx, err, errCh)
  131. }
  132. }()
  133. return c
  134. }
  135. func (c *SyncCache) run(ctx api.StreamContext) {
  136. c.initStore(ctx)
  137. defer c.onClose(ctx)
  138. for {
  139. select {
  140. case item := <-c.in:
  141. // possibility of barrier, ignore if found
  142. if boe, ok := item.(*checkpoint.BufferOrEvent); ok {
  143. if _, ok := boe.Data.(*checkpoint.Barrier); ok {
  144. c.Out <- item
  145. ctx.GetLogger().Debugf("sink cache send out barrier %v", boe.Data)
  146. break
  147. }
  148. }
  149. c.stats.IncTotalRecordsIn()
  150. ctx.GetLogger().Debugf("send to cache")
  151. c.cacheCtrl <- item
  152. ctx.GetLogger().Debugf("cache done")
  153. case isSuccess := <-c.Ack:
  154. // only send the next sink after receiving an ack
  155. ctx.GetLogger().Debugf("cache ack")
  156. c.cacheCtrl <- AckResult(isSuccess)
  157. ctx.GetLogger().Debugf("cache ack done")
  158. case data := <-c.cacheCtrl: // The only place to manipulate cache
  159. switch r := data.(type) {
  160. case AckResult:
  161. if r {
  162. ctx.GetLogger().Debugf("deleting cache")
  163. c.deleteCache(ctx)
  164. }
  165. c.pending = false
  166. default:
  167. ctx.GetLogger().Debugf("adding cache %v", data)
  168. c.addCache(ctx, data)
  169. }
  170. c.stats.SetBufferLength(int64(len(c.in) + c.cacheLength))
  171. if !c.pending {
  172. if c.pending {
  173. time.Sleep(time.Duration(c.cacheConf.ResendInterval) * time.Millisecond)
  174. }
  175. d, ok := c.peakMemCache(ctx)
  176. if ok {
  177. ctx.GetLogger().Debugf("sending cache item %v", d)
  178. c.pending = true
  179. select {
  180. case c.Out <- d:
  181. ctx.GetLogger().Debugf("sink cache send out %v", d)
  182. case <-ctx.Done():
  183. ctx.GetLogger().Debugf("stop sink cache send")
  184. }
  185. } else {
  186. c.pending = false
  187. }
  188. }
  189. case <-ctx.Done():
  190. ctx.GetLogger().Infof("sink node %s instance cache %d done", ctx.GetOpId(), ctx.GetInstanceId())
  191. return
  192. }
  193. }
  194. }
  195. // addCache not thread safe!
  196. func (c *SyncCache) addCache(ctx api.StreamContext, item interface{}) {
  197. isNotFull := c.appendMemCache(item)
  198. if !isNotFull {
  199. if c.diskBufferPage == nil {
  200. c.diskBufferPage = newPage(c.cacheConf.BufferPageSize)
  201. }
  202. isBufferNotFull := c.diskBufferPage.append(item)
  203. if !isBufferNotFull { // cool page full, save to disk
  204. if c.diskSize == c.maxDiskPage {
  205. // disk full, read the oldest page to the hot page
  206. c.loadFromDisk(ctx)
  207. c.cacheLength -= c.cacheConf.BufferPageSize
  208. }
  209. err := c.store.Set(strconv.Itoa(c.diskPageTail), c.diskBufferPage)
  210. if err != nil {
  211. ctx.GetLogger().Errorf("fail to store disk cache %v", err)
  212. return
  213. } else {
  214. c.diskPageTail++
  215. c.diskSize++
  216. // rotate
  217. if c.diskPageTail == c.maxDiskPage {
  218. c.diskPageTail = 0
  219. }
  220. }
  221. c.diskBufferPage.reset()
  222. c.diskBufferPage.append(item)
  223. }
  224. }
  225. c.cacheLength++
  226. ctx.GetLogger().Debugf("added cache")
  227. }
  228. // deleteCache not thread safe!
  229. func (c *SyncCache) deleteCache(ctx api.StreamContext) {
  230. if len(c.memCache) == 0 {
  231. return
  232. }
  233. isNotEmpty := c.memCache[0].delete()
  234. if isNotEmpty {
  235. c.cacheLength--
  236. ctx.GetLogger().Debugf("deleted cache: %d", c.cacheLength)
  237. }
  238. if c.memCache[0].isEmpty() { // read from disk or cool list
  239. c.memCache = c.memCache[1:]
  240. if c.diskSize > 0 {
  241. c.loadFromDisk(ctx)
  242. } else if c.diskBufferPage != nil { // use cool page as the new page
  243. c.memCache = append(c.memCache, c.diskBufferPage)
  244. c.diskBufferPage = nil
  245. }
  246. }
  247. }
  248. func (c *SyncCache) loadFromDisk(ctx api.StreamContext) {
  249. // load page from the disk
  250. hotPage := newPage(c.cacheConf.BufferPageSize)
  251. ok, err := c.store.Get(strconv.Itoa(c.diskPageHead), hotPage)
  252. if err != nil {
  253. ctx.GetLogger().Errorf("fail to load disk cache %v", err)
  254. } else if !ok {
  255. ctx.GetLogger().Errorf("nothing in the disk, should not happen")
  256. } else {
  257. if len(c.memCache) > 0 {
  258. ctx.GetLogger().Debugf("drop a page in memory")
  259. c.memCache = c.memCache[1:]
  260. }
  261. c.memCache = append(c.memCache, hotPage)
  262. c.diskPageHead++
  263. c.diskSize--
  264. if c.diskPageHead == c.maxDiskPage {
  265. c.diskPageHead = 0
  266. }
  267. }
  268. }
  269. func (c *SyncCache) appendMemCache(item interface{}) bool {
  270. if c.memSize == cap(c.memCache) {
  271. return false
  272. }
  273. if len(c.memCache) <= c.memSize {
  274. c.memCache = append(c.memCache, newPage(c.cacheConf.BufferPageSize))
  275. }
  276. isNotFull := c.memCache[c.memSize].append(item)
  277. if !isNotFull {
  278. c.memSize++
  279. if c.memSize == cap(c.memCache) {
  280. return false
  281. }
  282. c.memCache = append(c.memCache, newPage(c.cacheConf.BufferPageSize))
  283. return c.memCache[c.memSize].append(item)
  284. }
  285. return true
  286. }
  287. func (c *SyncCache) peakMemCache(ctx api.StreamContext) (interface{}, bool) {
  288. if len(c.memCache) == 0 {
  289. return nil, false
  290. }
  291. return c.memCache[0].peak()
  292. }
  293. func (c *SyncCache) initStore(ctx api.StreamContext) {
  294. kvTable := path.Join("sink", ctx.GetRuleId()+ctx.GetOpId()+strconv.Itoa(ctx.GetInstanceId()))
  295. if c.cacheConf.CleanCacheAtStop {
  296. ctx.GetLogger().Infof("creating cache store %s", kvTable)
  297. store.DropCacheKV(kvTable)
  298. }
  299. var err error
  300. err, c.store = store.GetCacheKV(kvTable)
  301. if err != nil {
  302. infra.DrainError(ctx, err, c.errorCh)
  303. }
  304. // restore the sink cache from disk
  305. if !c.cacheConf.CleanCacheAtStop {
  306. // Save 0 when init and save 1 when close. Wait for close for newly started sink node
  307. var set int
  308. ok, err := c.store.Get("storeSig", &set)
  309. if ok && set == 0 { // may be saving
  310. var i = 0
  311. for ; i < 100; i++ {
  312. time.Sleep(time.Millisecond * 10)
  313. _, err = c.store.Get("storeSig", &set)
  314. if set == 1 {
  315. ctx.GetLogger().Infof("waiting for previous cache for %d times", i)
  316. break
  317. }
  318. }
  319. if i == 100 {
  320. ctx.GetLogger().Errorf("waiting for previous cache for %d times, exit and drop", i)
  321. }
  322. }
  323. c.store.Set("storeSig", 0)
  324. ctx.GetLogger().Infof("start to restore cache from disk")
  325. // restore the memCache
  326. _, err = c.store.Get("memcache", &c.memCache)
  327. if err != nil {
  328. ctx.GetLogger().Errorf("fail to restore mem cache %v", err)
  329. }
  330. c.memSize = len(c.memCache)
  331. for _, p := range c.memCache {
  332. c.cacheLength += p.L
  333. }
  334. err = c.store.Delete("memcache")
  335. if err != nil {
  336. ctx.GetLogger().Errorf("fail to delete mem cache %v", err)
  337. }
  338. ctx.GetLogger().Infof("restored mem cache %d", c.cacheLength)
  339. // restore the disk cache
  340. dks, err := c.store.Keys()
  341. if err != nil {
  342. ctx.GetLogger().Errorf("fail to restore disk cache %v", err)
  343. return
  344. }
  345. if len(dks) == 0 {
  346. return
  347. }
  348. dk := make([]int, 0, len(dks))
  349. for _, d := range dks {
  350. aint, err := strconv.Atoi(d)
  351. if err == nil { // filter mem cache
  352. dk = append(dk, aint)
  353. }
  354. }
  355. if len(dk) == 0 {
  356. return
  357. }
  358. c.diskSize = len(dk) - 1
  359. c.cacheLength += c.diskSize * c.cacheConf.BufferPageSize
  360. sort.Ints(dk)
  361. // default
  362. c.diskPageHead = dk[0]
  363. c.diskPageTail = dk[len(dk)-1]
  364. for i, k := range dk {
  365. if i-1 >= 0 {
  366. if k-dk[i-1] > 1 {
  367. c.diskPageTail = i - 1
  368. c.diskPageHead = k
  369. }
  370. }
  371. }
  372. // load buffer page
  373. hotPage := newPage(c.cacheConf.BufferPageSize)
  374. ok, err = c.store.Get(strconv.Itoa(c.diskPageTail), hotPage)
  375. if err != nil {
  376. ctx.GetLogger().Errorf("fail to load disk cache to buffer %v", err)
  377. } else if !ok {
  378. ctx.GetLogger().Errorf("nothing in the disk, should not happen")
  379. } else {
  380. c.diskBufferPage = hotPage
  381. c.cacheLength += c.diskBufferPage.L
  382. }
  383. ctx.GetLogger().Infof("restored all cache %d", c.cacheLength)
  384. }
  385. }
  386. // save memory states to disk
  387. func (c *SyncCache) onClose(ctx api.StreamContext) {
  388. ctx.GetLogger().Infof("sink node %s instance cache %d closing", ctx.GetOpId(), ctx.GetInstanceId())
  389. if c.cacheConf.CleanCacheAtStop {
  390. kvTable := path.Join("sink", ctx.GetRuleId()+ctx.GetOpId()+strconv.Itoa(ctx.GetInstanceId()))
  391. ctx.GetLogger().Infof("cleaning cache store %s", kvTable)
  392. store.DropCacheKV(kvTable)
  393. } else {
  394. if c.diskBufferPage != nil {
  395. err := c.store.Set(strconv.Itoa(c.diskPageTail), c.diskBufferPage)
  396. if err != nil {
  397. ctx.GetLogger().Errorf("fail to store disk cache %v", err)
  398. }
  399. ctx.GetLogger().Debug("store disk cache")
  400. }
  401. // store the memory states
  402. if len(c.memCache) > 0 {
  403. err := c.store.Set("memcache", c.memCache)
  404. if err != nil {
  405. ctx.GetLogger().Errorf("fail to store memory cache to disk %v", err)
  406. }
  407. ctx.GetLogger().Debugf("store memory cache %d", c.memSize)
  408. }
  409. c.store.Set("storeSig", 1)
  410. }
  411. }