sync_cache.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package cache
  15. import (
  16. "github.com/lf-edge/ekuiper/internal/conf"
  17. "github.com/lf-edge/ekuiper/internal/pkg/store"
  18. "github.com/lf-edge/ekuiper/internal/topo/checkpoint"
  19. "github.com/lf-edge/ekuiper/internal/topo/node/metric"
  20. "github.com/lf-edge/ekuiper/pkg/api"
  21. "github.com/lf-edge/ekuiper/pkg/infra"
  22. "github.com/lf-edge/ekuiper/pkg/kv"
  23. "path"
  24. "strconv"
  25. "time"
  26. )
  27. type AckResult bool
  28. // page Rotate storage for in memory cache
  29. // Not thread safe!
  30. type page struct {
  31. Data []interface{}
  32. H int
  33. T int
  34. L int
  35. Size int
  36. }
  37. // newPage create a new cache page
  38. // TODO the page is created even not used, need dynamic?
  39. func newPage(size int) *page {
  40. return &page{
  41. Data: make([]interface{}, size),
  42. H: 0, // When deleting, head++, if tail == head, it is empty
  43. T: 0, // When append, tail++, if tail== head, it is full
  44. Size: size,
  45. }
  46. }
  47. // append item if list is not full and return true; otherwise return false
  48. func (p *page) append(item interface{}) bool {
  49. if p.L == p.Size { // full
  50. return false
  51. }
  52. p.Data[p.T] = item
  53. p.T++
  54. if p.T == p.Size {
  55. p.T = 0
  56. }
  57. p.L++
  58. return true
  59. }
  60. // peak get the first item in the cache
  61. func (p *page) peak() (interface{}, bool) {
  62. if p.L == 0 {
  63. return nil, false
  64. }
  65. return p.Data[p.H], true
  66. }
  67. func (p *page) delete() bool {
  68. if p.L == 0 {
  69. return false
  70. }
  71. p.H++
  72. if p.H == p.Size {
  73. p.H = 0
  74. }
  75. p.L--
  76. return true
  77. }
  78. func (p *page) isEmpty() bool {
  79. return p.L == 0
  80. }
  81. func (p *page) reset() {
  82. p.H = 0
  83. p.T = 0
  84. p.L = 0
  85. }
  86. type SyncCache struct {
  87. // The input data to the cache
  88. in <-chan interface{}
  89. Out chan interface{}
  90. Ack chan bool
  91. cacheCtrl chan interface{} // CacheCtrl is the only place to control the cache; sync in and ack result
  92. errorCh chan<- error
  93. stats metric.StatManager
  94. // cache config
  95. cacheConf *conf.SinkConf
  96. maxDiskPage int
  97. maxMemPage int
  98. // cache storage
  99. memCache []*page
  100. diskBufferPage *page
  101. // status
  102. diskSize int // how many pages has been saved
  103. cacheLength int //readonly, for metrics only to save calculation
  104. diskPageTail int // init from the database
  105. diskPageHead int
  106. sendStatus int // 0: idle, 1: sending and waiting for ack, 2: stopped for error
  107. //serialize
  108. store kv.KeyValue
  109. }
  110. func NewSyncCache(ctx api.StreamContext, in <-chan interface{}, errCh chan<- error, stats metric.StatManager, cacheConf *conf.SinkConf, bufferLength int) *SyncCache {
  111. c := &SyncCache{
  112. cacheConf: cacheConf,
  113. in: in,
  114. Out: make(chan interface{}, bufferLength),
  115. Ack: make(chan bool, 10),
  116. cacheCtrl: make(chan interface{}, 10),
  117. errorCh: errCh,
  118. maxMemPage: cacheConf.MemoryCacheThreshold / cacheConf.BufferPageSize,
  119. memCache: make([]*page, 0),
  120. // add one more slot so that there will be at least one slot between head and tail to find out the head/tail id
  121. maxDiskPage: (cacheConf.MaxDiskCache / cacheConf.BufferPageSize) + 1,
  122. stats: stats,
  123. }
  124. go func() {
  125. err := infra.SafeRun(func() error {
  126. c.run(ctx)
  127. return nil
  128. })
  129. if err != nil {
  130. infra.DrainError(ctx, err, errCh)
  131. }
  132. }()
  133. return c
  134. }
  135. func (c *SyncCache) run(ctx api.StreamContext) {
  136. c.initStore(ctx)
  137. defer c.onClose(ctx)
  138. if c.cacheLength > 0 { // start to send the cache
  139. c.send(ctx)
  140. }
  141. for {
  142. select {
  143. case item := <-c.in:
  144. // possibility of barrier, ignore if found
  145. if boe, ok := item.(*checkpoint.BufferOrEvent); ok {
  146. if _, ok := boe.Data.(*checkpoint.Barrier); ok {
  147. c.Out <- item
  148. ctx.GetLogger().Debugf("sink cache send out barrier %v", boe.Data)
  149. break
  150. }
  151. }
  152. c.stats.IncTotalRecordsIn()
  153. ctx.GetLogger().Debugf("send to cache")
  154. c.cacheCtrl <- item
  155. case isSuccess := <-c.Ack:
  156. // only send the next sink after receiving an ack
  157. ctx.GetLogger().Debugf("cache ack")
  158. c.cacheCtrl <- AckResult(isSuccess)
  159. case data := <-c.cacheCtrl: // The only place to manipulate cache
  160. switch r := data.(type) {
  161. case AckResult:
  162. if r {
  163. ctx.GetLogger().Debugf("deleting cache")
  164. c.deleteCache(ctx)
  165. c.sendStatus = 0
  166. ctx.GetLogger().Debug("send status to 0 after true ack")
  167. } else {
  168. c.sendStatus = 2
  169. ctx.GetLogger().Debug("send status to 2 after false ack")
  170. }
  171. default:
  172. ctx.GetLogger().Debugf("adding cache %v", data)
  173. c.addCache(ctx, data)
  174. if c.sendStatus == 2 {
  175. c.sendStatus = 0
  176. ctx.GetLogger().Debug("send status to 0 after adding cache in error state")
  177. }
  178. }
  179. c.stats.SetBufferLength(int64(len(c.in) + c.cacheLength))
  180. if c.sendStatus == 0 {
  181. c.send(ctx)
  182. }
  183. case <-ctx.Done():
  184. ctx.GetLogger().Infof("sink node %s instance cache %d done", ctx.GetOpId(), ctx.GetInstanceId())
  185. return
  186. }
  187. }
  188. }
  189. func (c *SyncCache) send(ctx api.StreamContext) {
  190. if c.cacheLength > 1 && c.cacheConf.ResendInterval > 0 {
  191. time.Sleep(time.Duration(c.cacheConf.ResendInterval) * time.Millisecond)
  192. }
  193. d, ok := c.peakMemCache(ctx)
  194. if ok {
  195. ctx.GetLogger().Debugf("sending cache item %v", d)
  196. c.sendStatus = 1
  197. ctx.GetLogger().Debug("send status to 0 after sending tuple")
  198. select {
  199. case c.Out <- d:
  200. ctx.GetLogger().Debugf("sink cache send out %v", d)
  201. case <-ctx.Done():
  202. ctx.GetLogger().Debugf("stop sink cache send")
  203. }
  204. } else {
  205. ctx.GetLogger().Debug("no cache to send")
  206. }
  207. }
  208. // addCache not thread safe!
  209. func (c *SyncCache) addCache(ctx api.StreamContext, item interface{}) {
  210. isNotFull := c.appendMemCache(item)
  211. if !isNotFull {
  212. if c.diskBufferPage == nil {
  213. c.diskBufferPage = newPage(c.cacheConf.BufferPageSize)
  214. }
  215. isBufferNotFull := c.diskBufferPage.append(item)
  216. if !isBufferNotFull { // cool page full, save to disk
  217. if c.diskSize == c.maxDiskPage {
  218. // disk full, read the oldest page to the hot page
  219. c.loadFromDisk(ctx)
  220. ctx.GetLogger().Debug("disk full, remove the last page")
  221. }
  222. err := c.store.Set(strconv.Itoa(c.diskPageTail), c.diskBufferPage)
  223. if err != nil {
  224. ctx.GetLogger().Errorf("fail to store disk cache %v", err)
  225. return
  226. } else {
  227. ctx.GetLogger().Debug("add cache to disk. the new disk buffer page is %v", c.diskBufferPage)
  228. c.diskPageTail++
  229. c.diskSize++
  230. err := c.store.Set("size", c.diskSize)
  231. if err != nil {
  232. ctx.GetLogger().Warnf("fail to store disk cache size %v", err)
  233. }
  234. // rotate
  235. if c.diskPageTail == c.maxDiskPage {
  236. c.diskPageTail = 0
  237. }
  238. }
  239. c.diskBufferPage.reset()
  240. c.diskBufferPage.append(item)
  241. } else {
  242. ctx.GetLogger().Debugf("added cache to disk buffer page %v", c.diskBufferPage)
  243. }
  244. } else {
  245. ctx.GetLogger().Debugf("added cache to mem cache %v", c.memCache)
  246. }
  247. c.cacheLength++
  248. }
  249. // deleteCache not thread safe!
  250. func (c *SyncCache) deleteCache(ctx api.StreamContext) {
  251. ctx.GetLogger().Debugf("deleting cache. cacheLength: %d, diskSize: %d", c.cacheLength, c.diskSize)
  252. if len(c.memCache) == 0 {
  253. ctx.GetLogger().Debug("mem cache is empty")
  254. return
  255. }
  256. isNotEmpty := c.memCache[0].delete()
  257. if isNotEmpty {
  258. c.cacheLength--
  259. ctx.GetLogger().Debugf("deleted cache: %d", c.cacheLength)
  260. }
  261. if c.memCache[0].isEmpty() { // read from disk or cool list
  262. c.memCache = c.memCache[1:]
  263. if c.diskSize > 0 {
  264. c.loadFromDisk(ctx)
  265. } else if c.diskBufferPage != nil { // use cool page as the new page
  266. ctx.GetLogger().Debugf("reading from diskBufferPage: %d", c.cacheLength)
  267. c.memCache = append(c.memCache, c.diskBufferPage)
  268. c.diskBufferPage = nil
  269. }
  270. }
  271. ctx.GetLogger().Debugf("deleted cache. cacheLength: %d, diskSize: %d, memCache: %v", c.cacheLength, c.diskSize, c.memCache)
  272. }
  273. func (c *SyncCache) loadFromDisk(ctx api.StreamContext) {
  274. // load page from the disk
  275. ctx.GetLogger().Debugf("loading from disk %d. cacheLength: %d, diskSize: %d", c.diskPageTail, c.cacheLength, c.diskSize)
  276. hotPage := newPage(c.cacheConf.BufferPageSize)
  277. ok, err := c.store.Get(strconv.Itoa(c.diskPageHead), hotPage)
  278. if err != nil {
  279. ctx.GetLogger().Errorf("fail to load disk cache %v", err)
  280. } else if !ok {
  281. ctx.GetLogger().Errorf("nothing in the disk, should not happen")
  282. } else {
  283. if len(c.memCache) >= c.maxMemPage {
  284. ctx.GetLogger().Warnf("drop a page of %d items in memory", c.memCache[0].L)
  285. c.cacheLength -= c.memCache[0].L
  286. c.memCache = c.memCache[1:]
  287. }
  288. c.memCache = append(c.memCache, hotPage)
  289. c.diskPageHead++
  290. c.diskSize--
  291. err := c.store.Set("size", c.diskSize)
  292. if err != nil {
  293. ctx.GetLogger().Warnf("fail to store disk cache size %v", err)
  294. }
  295. if c.diskPageHead == c.maxDiskPage {
  296. c.diskPageHead = 0
  297. }
  298. err = c.store.Set("head", c.diskPageHead)
  299. if err != nil {
  300. ctx.GetLogger().Warnf("fail to store disk cache size %v", err)
  301. }
  302. }
  303. ctx.GetLogger().Debugf("loaded from disk %d. cacheLength: %d, diskSize: %d", c.diskPageTail, c.cacheLength, c.diskSize)
  304. }
  305. func (c *SyncCache) appendMemCache(item interface{}) bool {
  306. if len(c.memCache) > c.maxMemPage {
  307. return false
  308. }
  309. if len(c.memCache) == 0 {
  310. c.memCache = append(c.memCache, newPage(c.cacheConf.BufferPageSize))
  311. }
  312. isNotFull := c.memCache[len(c.memCache)-1].append(item)
  313. if !isNotFull {
  314. if len(c.memCache) == c.maxMemPage {
  315. return false
  316. }
  317. c.memCache = append(c.memCache, newPage(c.cacheConf.BufferPageSize))
  318. return c.memCache[len(c.memCache)-1].append(item)
  319. }
  320. return true
  321. }
  322. func (c *SyncCache) peakMemCache(_ api.StreamContext) (interface{}, bool) {
  323. if len(c.memCache) == 0 {
  324. return nil, false
  325. }
  326. return c.memCache[0].peak()
  327. }
  328. func (c *SyncCache) initStore(ctx api.StreamContext) {
  329. kvTable := path.Join("sink", ctx.GetRuleId()+ctx.GetOpId()+strconv.Itoa(ctx.GetInstanceId()))
  330. if c.cacheConf.CleanCacheAtStop {
  331. ctx.GetLogger().Infof("creating cache store %s", kvTable)
  332. store.DropCacheKV(kvTable)
  333. }
  334. var err error
  335. err, c.store = store.GetCacheKV(kvTable)
  336. if err != nil {
  337. infra.DrainError(ctx, err, c.errorCh)
  338. }
  339. // restore the sink cache from disk
  340. if !c.cacheConf.CleanCacheAtStop {
  341. // Save 0 when init and save 1 when close. Wait for close for newly started sink node
  342. var set int
  343. ok, err := c.store.Get("storeSig", &set)
  344. if ok && set == 0 { // may be saving
  345. var i = 0
  346. for ; i < 100; i++ {
  347. time.Sleep(time.Millisecond * 10)
  348. _, err = c.store.Get("storeSig", &set)
  349. if set == 1 {
  350. ctx.GetLogger().Infof("waiting for previous cache for %d times", i)
  351. break
  352. }
  353. }
  354. if i == 100 {
  355. ctx.GetLogger().Errorf("waiting for previous cache for %d times, exit and drop", i)
  356. }
  357. }
  358. c.store.Set("storeSig", 0)
  359. ctx.GetLogger().Infof("start to restore cache from disk")
  360. // restore the memCache
  361. _, err = c.store.Get("memcache", &c.memCache)
  362. if err != nil {
  363. ctx.GetLogger().Errorf("fail to restore mem cache %v", err)
  364. }
  365. for _, p := range c.memCache {
  366. c.cacheLength += p.L
  367. }
  368. err = c.store.Delete("memcache")
  369. if err != nil {
  370. ctx.GetLogger().Errorf("fail to delete mem cache %v", err)
  371. }
  372. ctx.GetLogger().Infof("restored mem cache %d", c.cacheLength)
  373. // restore the disk cache
  374. var size int
  375. ok, _ = c.store.Get("size", &size)
  376. if !ok || size == 0 { // no disk cache
  377. return
  378. }
  379. c.diskSize = size
  380. var head int
  381. ok, _ = c.store.Get("head", &head)
  382. if ok {
  383. c.diskPageHead = head
  384. }
  385. c.cacheLength += (c.diskSize - 1) * c.cacheConf.BufferPageSize
  386. c.diskPageTail = (c.diskPageHead + c.diskSize - 1) % c.maxDiskPage
  387. // load buffer page
  388. hotPage := newPage(c.cacheConf.BufferPageSize)
  389. ok, err = c.store.Get(strconv.Itoa(c.diskPageTail), hotPage)
  390. if err != nil {
  391. ctx.GetLogger().Errorf("fail to load disk cache to buffer %v", err)
  392. } else if !ok {
  393. ctx.GetLogger().Errorf("nothing in the disk, should not happen")
  394. } else {
  395. c.diskBufferPage = hotPage
  396. c.cacheLength += c.diskBufferPage.L
  397. c.diskSize--
  398. }
  399. ctx.GetLogger().Infof("restored all cache %d. diskSize %d", c.cacheLength, c.diskSize)
  400. }
  401. }
  402. // save memory states to disk
  403. func (c *SyncCache) onClose(ctx api.StreamContext) {
  404. ctx.GetLogger().Infof("sink node %s instance cache %d closing", ctx.GetOpId(), ctx.GetInstanceId())
  405. if c.cacheConf.CleanCacheAtStop {
  406. kvTable := path.Join("sink", ctx.GetRuleId()+ctx.GetOpId()+strconv.Itoa(ctx.GetInstanceId()))
  407. ctx.GetLogger().Infof("cleaning cache store %s", kvTable)
  408. store.DropCacheKV(kvTable)
  409. } else {
  410. if c.diskBufferPage != nil {
  411. err := c.store.Set(strconv.Itoa(c.diskPageTail), c.diskBufferPage)
  412. if err != nil {
  413. ctx.GetLogger().Errorf("fail to store disk cache %v", err)
  414. }
  415. err = c.store.Set("size", c.diskSize+1)
  416. if err != nil {
  417. ctx.GetLogger().Errorf("fail to store disk size %v", err)
  418. }
  419. ctx.GetLogger().Debug("store disk cache")
  420. }
  421. // store the memory states
  422. if len(c.memCache) > 0 {
  423. err := c.store.Set("memcache", c.memCache)
  424. if err != nil {
  425. ctx.GetLogger().Errorf("fail to store memory cache to disk %v", err)
  426. }
  427. ctx.GetLogger().Debugf("store memory cache %d", len(c.memCache))
  428. }
  429. c.store.Set("storeSig", 1)
  430. }
  431. }