sync_cache.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package cache
  15. import (
  16. "path"
  17. "strconv"
  18. "time"
  19. "github.com/lf-edge/ekuiper/internal/conf"
  20. "github.com/lf-edge/ekuiper/internal/pkg/store"
  21. "github.com/lf-edge/ekuiper/pkg/api"
  22. "github.com/lf-edge/ekuiper/pkg/infra"
  23. "github.com/lf-edge/ekuiper/pkg/kv"
  24. )
  25. type AckResult bool
  26. // page Rotates storage for in memory cache
  27. // Not thread safe!
  28. type page struct {
  29. Data [][]map[string]interface{}
  30. H int
  31. T int
  32. L int
  33. Size int
  34. }
  35. // newPage create a new cache page
  36. // TODO the page is created even not used, need dynamic?
  37. func newPage(size int) *page {
  38. return &page{
  39. Data: make([][]map[string]interface{}, size),
  40. H: 0, // When deleting, head++, if tail == head, it is empty
  41. T: 0, // When append, tail++, if tail== head, it is full
  42. Size: size,
  43. }
  44. }
  45. // append item if list is not full and return true; otherwise return false
  46. func (p *page) append(item []map[string]interface{}) bool {
  47. if p.L == p.Size { // full
  48. return false
  49. }
  50. p.Data[p.T] = item
  51. p.T++
  52. if p.T == p.Size {
  53. p.T = 0
  54. }
  55. p.L++
  56. return true
  57. }
  58. // peak get the first item in the cache
  59. func (p *page) peak() ([]map[string]interface{}, bool) {
  60. if p.L == 0 {
  61. return nil, false
  62. }
  63. return p.Data[p.H], true
  64. }
  65. func (p *page) delete() bool {
  66. if p.L == 0 {
  67. return false
  68. }
  69. p.H++
  70. if p.H == p.Size {
  71. p.H = 0
  72. }
  73. p.L--
  74. return true
  75. }
  76. func (p *page) isEmpty() bool {
  77. return p.L == 0
  78. }
  79. func (p *page) reset() {
  80. p.H = 0
  81. p.T = 0
  82. p.L = 0
  83. }
  84. type SyncCache struct {
  85. // The input data to the cache
  86. in <-chan []map[string]interface{}
  87. Out chan []map[string]interface{}
  88. Ack chan bool
  89. cacheCtrl chan interface{} // CacheCtrl is the only place to control the cache; sync in and ack result
  90. errorCh chan<- error
  91. // cache config
  92. cacheConf *conf.SinkConf
  93. maxDiskPage int
  94. maxMemPage int
  95. // cache storage
  96. memCache []*page
  97. diskBufferPage *page
  98. // status
  99. diskSize int // the count of pages has been saved
  100. CacheLength int // readonly, for metrics only to save calculation
  101. diskPageTail int // init from the database
  102. diskPageHead int
  103. sendStatus int // 0: idle, 1: sending and waiting for ack, 2: stopped for error
  104. // serialize
  105. store kv.KeyValue
  106. exitCh chan<- struct{}
  107. }
  108. func NewSyncCacheWithExitChanel(ctx api.StreamContext, in <-chan []map[string]interface{}, errCh chan<- error, cacheConf *conf.SinkConf, bufferLength int, exitCh chan<- struct{}) *SyncCache {
  109. c := NewSyncCache(ctx, in, errCh, cacheConf, bufferLength)
  110. c.exitCh = exitCh
  111. return c
  112. }
  113. func NewSyncCache(ctx api.StreamContext, in <-chan []map[string]interface{}, errCh chan<- error, cacheConf *conf.SinkConf, bufferLength int) *SyncCache {
  114. c := &SyncCache{
  115. cacheConf: cacheConf,
  116. in: in,
  117. Out: make(chan []map[string]interface{}, bufferLength),
  118. Ack: make(chan bool, 10),
  119. cacheCtrl: make(chan interface{}, 10),
  120. errorCh: errCh,
  121. maxMemPage: cacheConf.MemoryCacheThreshold / cacheConf.BufferPageSize,
  122. memCache: make([]*page, 0),
  123. // add one more slot so that there will be at least one slot between head and tail to find out the head/tail id
  124. maxDiskPage: (cacheConf.MaxDiskCache / cacheConf.BufferPageSize) + 1,
  125. }
  126. go func() {
  127. err := infra.SafeRun(func() error {
  128. c.run(ctx)
  129. return nil
  130. })
  131. if err != nil {
  132. infra.DrainError(ctx, err, errCh)
  133. }
  134. }()
  135. return c
  136. }
  137. func (c *SyncCache) run(ctx api.StreamContext) {
  138. c.initStore(ctx)
  139. defer c.onClose(ctx)
  140. if c.CacheLength > 0 { // start to send the cache
  141. c.send(ctx)
  142. }
  143. for {
  144. select {
  145. case item := <-c.in:
  146. ctx.GetLogger().Debugf("send to cache")
  147. go func() { // avoid deadlock when cacheCtrl is full
  148. c.cacheCtrl <- item
  149. }()
  150. case isSuccess := <-c.Ack:
  151. // only send the next sink after receiving an ack
  152. ctx.GetLogger().Debugf("cache ack")
  153. go func() {
  154. c.cacheCtrl <- AckResult(isSuccess)
  155. }()
  156. case data := <-c.cacheCtrl: // The only place to manipulate cache
  157. switch r := data.(type) {
  158. case AckResult:
  159. if r {
  160. ctx.GetLogger().Debugf("deleting cache")
  161. c.deleteCache(ctx)
  162. c.sendStatus = 0
  163. ctx.GetLogger().Debug("send status to 0 after true ack")
  164. } else {
  165. c.sendStatus = 2
  166. ctx.GetLogger().Debug("send status to 2 after false ack")
  167. }
  168. case []map[string]interface{}:
  169. ctx.GetLogger().Debugf("adding cache %v", data)
  170. // hack here: nil is a signal to continue sending, so not adding nil to cache
  171. if r != nil {
  172. c.addCache(ctx, r)
  173. } else {
  174. ctx.GetLogger().Debug("nil cache, continue sending")
  175. }
  176. if c.sendStatus == 2 {
  177. c.sendStatus = 0
  178. ctx.GetLogger().Debug("send status to 0 after adding cache in error state")
  179. }
  180. default:
  181. ctx.GetLogger().Errorf("unknown cache control command %v", data)
  182. }
  183. ctx.GetLogger().Debugf("cache status %d", c.sendStatus)
  184. if c.sendStatus == 0 {
  185. c.send(ctx)
  186. }
  187. case <-ctx.Done():
  188. ctx.GetLogger().Infof("sink node %s instance cache %d done", ctx.GetOpId(), ctx.GetInstanceId())
  189. return
  190. }
  191. }
  192. }
  193. func (c *SyncCache) send(ctx api.StreamContext) {
  194. if c.CacheLength > 1 && c.cacheConf.ResendInterval > 0 {
  195. time.Sleep(time.Duration(c.cacheConf.ResendInterval) * time.Millisecond)
  196. }
  197. d, ok := c.peakMemCache(ctx)
  198. if ok {
  199. ctx.GetLogger().Debugf("sending cache item %v", d)
  200. c.sendStatus = 1
  201. ctx.GetLogger().Debug("send status to 0 after sending tuple")
  202. select {
  203. case c.Out <- d:
  204. ctx.GetLogger().Debugf("sink cache send out %v", d)
  205. case <-ctx.Done():
  206. ctx.GetLogger().Debugf("stop sink cache send")
  207. }
  208. } else {
  209. ctx.GetLogger().Debug("no cache to send")
  210. }
  211. }
  212. // addCache not thread safe!
  213. func (c *SyncCache) addCache(ctx api.StreamContext, item []map[string]interface{}) {
  214. isNotFull := c.appendMemCache(item)
  215. if !isNotFull {
  216. if c.diskBufferPage == nil {
  217. c.diskBufferPage = newPage(c.cacheConf.BufferPageSize)
  218. }
  219. isBufferNotFull := c.diskBufferPage.append(item)
  220. if !isBufferNotFull { // cool page full, save to disk
  221. if c.diskSize == c.maxDiskPage {
  222. // disk full, read the oldest page to the hot page
  223. c.loadFromDisk(ctx)
  224. ctx.GetLogger().Debug("disk full, remove the last page")
  225. }
  226. err := c.store.Set(strconv.Itoa(c.diskPageTail), c.diskBufferPage)
  227. if err != nil {
  228. ctx.GetLogger().Errorf("fail to store disk cache %v", err)
  229. return
  230. } else {
  231. ctx.GetLogger().Debug("add cache to disk. the new disk buffer page is %v", c.diskBufferPage)
  232. c.diskPageTail++
  233. c.diskSize++
  234. err := c.store.Set("size", c.diskSize)
  235. if err != nil {
  236. ctx.GetLogger().Warnf("fail to store disk cache size %v", err)
  237. }
  238. // rotate
  239. if c.diskPageTail == c.maxDiskPage {
  240. c.diskPageTail = 0
  241. }
  242. }
  243. c.diskBufferPage.reset()
  244. c.diskBufferPage.append(item)
  245. } else {
  246. ctx.GetLogger().Debugf("added cache to disk buffer page %v", c.diskBufferPage)
  247. }
  248. } else {
  249. ctx.GetLogger().Debugf("added cache to mem cache %v", c.memCache)
  250. }
  251. c.CacheLength++
  252. ctx.GetLogger().Debugf("added cache %d", c.CacheLength)
  253. }
  254. // deleteCache not thread safe!
  255. func (c *SyncCache) deleteCache(ctx api.StreamContext) {
  256. ctx.GetLogger().Debugf("deleting cache. CacheLength: %d, diskSize: %d", c.CacheLength, c.diskSize)
  257. if len(c.memCache) == 0 {
  258. ctx.GetLogger().Debug("mem cache is empty")
  259. return
  260. }
  261. isNotEmpty := c.memCache[0].delete()
  262. if isNotEmpty {
  263. c.CacheLength--
  264. ctx.GetLogger().Debugf("deleted cache: %d", c.CacheLength)
  265. }
  266. if c.memCache[0].isEmpty() { // read from disk or cool list
  267. c.memCache = c.memCache[1:]
  268. if c.diskSize > 0 {
  269. c.loadFromDisk(ctx)
  270. } else if c.diskBufferPage != nil { // use cool page as the new page
  271. ctx.GetLogger().Debugf("reading from diskBufferPage: %d", c.CacheLength)
  272. c.memCache = append(c.memCache, c.diskBufferPage)
  273. c.diskBufferPage = nil
  274. }
  275. }
  276. ctx.GetLogger().Debugf("deleted cache. CacheLength: %d, diskSize: %d, memCache: %v", c.CacheLength, c.diskSize, c.memCache)
  277. }
  278. func (c *SyncCache) loadFromDisk(ctx api.StreamContext) {
  279. // load page from the disk
  280. ctx.GetLogger().Debugf("loading from disk %d. CacheLength: %d, diskSize: %d", c.diskPageTail, c.CacheLength, c.diskSize)
  281. hotPage := newPage(c.cacheConf.BufferPageSize)
  282. ok, err := c.store.Get(strconv.Itoa(c.diskPageHead), hotPage)
  283. if err != nil {
  284. ctx.GetLogger().Errorf("fail to load disk cache %v", err)
  285. } else if !ok {
  286. ctx.GetLogger().Errorf("nothing in the disk, should not happen")
  287. } else {
  288. _ = c.store.Delete(strconv.Itoa(c.diskPageHead))
  289. if len(c.memCache) >= c.maxMemPage {
  290. ctx.GetLogger().Warnf("drop a page of %d items in memory", c.memCache[0].L)
  291. c.CacheLength -= c.memCache[0].L
  292. c.memCache = c.memCache[1:]
  293. }
  294. c.memCache = append(c.memCache, hotPage)
  295. c.diskPageHead++
  296. c.diskSize--
  297. err := c.store.Set("size", c.diskSize)
  298. if err != nil {
  299. ctx.GetLogger().Warnf("fail to store disk cache size %v", err)
  300. }
  301. if c.diskPageHead == c.maxDiskPage {
  302. c.diskPageHead = 0
  303. }
  304. err = c.store.Set("head", c.diskPageHead)
  305. if err != nil {
  306. ctx.GetLogger().Warnf("fail to store disk cache size %v", err)
  307. }
  308. }
  309. ctx.GetLogger().Debugf("loaded from disk %d. CacheLength: %d, diskSize: %d", c.diskPageTail, c.CacheLength, c.diskSize)
  310. }
  311. func (c *SyncCache) appendMemCache(item []map[string]interface{}) bool {
  312. if len(c.memCache) > c.maxMemPage {
  313. return false
  314. }
  315. if len(c.memCache) == 0 {
  316. c.memCache = append(c.memCache, newPage(c.cacheConf.BufferPageSize))
  317. }
  318. isNotFull := c.memCache[len(c.memCache)-1].append(item)
  319. if !isNotFull {
  320. if len(c.memCache) == c.maxMemPage {
  321. return false
  322. }
  323. c.memCache = append(c.memCache, newPage(c.cacheConf.BufferPageSize))
  324. return c.memCache[len(c.memCache)-1].append(item)
  325. }
  326. return true
  327. }
  328. func (c *SyncCache) peakMemCache(_ api.StreamContext) ([]map[string]interface{}, bool) {
  329. if len(c.memCache) == 0 {
  330. return nil, false
  331. }
  332. return c.memCache[0].peak()
  333. }
  334. func (c *SyncCache) initStore(ctx api.StreamContext) {
  335. kvTable := path.Join("sink", ctx.GetRuleId()+ctx.GetOpId()+strconv.Itoa(ctx.GetInstanceId()))
  336. if c.cacheConf.CleanCacheAtStop {
  337. ctx.GetLogger().Infof("creating cache store %s", kvTable)
  338. _ = store.DropCacheKV(kvTable)
  339. }
  340. var err error
  341. c.store, err = store.GetCacheKV(kvTable)
  342. if err != nil {
  343. infra.DrainError(ctx, err, c.errorCh)
  344. }
  345. // restore the sink cache from disk
  346. if !c.cacheConf.CleanCacheAtStop {
  347. // Save 0 when init and save 1 when close. Wait for close for newly started sink node
  348. var set int
  349. ok, _ := c.store.Get("storeSig", &set)
  350. if ok && set == 0 { // may be saving
  351. i := 0
  352. for ; i < 100; i++ {
  353. time.Sleep(time.Millisecond * 10)
  354. _, err := c.store.Get("storeSig", &set)
  355. if err == nil && set == 1 {
  356. ctx.GetLogger().Infof("waiting for previous cache for %d times", i)
  357. break
  358. }
  359. }
  360. if i == 100 {
  361. ctx.GetLogger().Errorf("waiting for previous cache for %d times, exit and drop", i)
  362. }
  363. }
  364. _ = c.store.Set("storeSig", 0)
  365. ctx.GetLogger().Infof("start to restore cache from disk")
  366. // restore the memCache
  367. _, err = c.store.Get("memcache", &c.memCache)
  368. if err != nil {
  369. ctx.GetLogger().Errorf("fail to restore mem cache %v", err)
  370. }
  371. for _, p := range c.memCache {
  372. c.CacheLength += p.L
  373. }
  374. err = c.store.Delete("memcache")
  375. if err != nil {
  376. ctx.GetLogger().Errorf("fail to delete mem cache %v", err)
  377. }
  378. ctx.GetLogger().Infof("restored mem cache %d", c.CacheLength)
  379. // restore the disk cache
  380. var size int
  381. ok, _ = c.store.Get("size", &size)
  382. if !ok || size == 0 { // no disk cache
  383. return
  384. }
  385. c.diskSize = size
  386. var head int
  387. ok, _ = c.store.Get("head", &head)
  388. if ok {
  389. c.diskPageHead = head
  390. }
  391. c.CacheLength += (c.diskSize - 1) * c.cacheConf.BufferPageSize
  392. c.diskPageTail = (c.diskPageHead + c.diskSize - 1) % c.maxDiskPage
  393. // load buffer page
  394. hotPage := newPage(c.cacheConf.BufferPageSize)
  395. ok, err = c.store.Get(strconv.Itoa(c.diskPageTail), hotPage)
  396. if err != nil {
  397. ctx.GetLogger().Errorf("fail to load disk cache to buffer %v", err)
  398. } else if !ok {
  399. ctx.GetLogger().Errorf("nothing in the disk, should not happen")
  400. } else {
  401. c.diskBufferPage = hotPage
  402. c.CacheLength += c.diskBufferPage.L
  403. c.diskSize--
  404. }
  405. ctx.GetLogger().Infof("restored all cache %d. diskSize %d", c.CacheLength, c.diskSize)
  406. }
  407. }
  408. // save memory states to disk
  409. func (c *SyncCache) onClose(ctx api.StreamContext) {
  410. defer func() {
  411. if c.exitCh != nil {
  412. c.exitCh <- struct{}{}
  413. }
  414. }()
  415. ctx.GetLogger().Infof("sink node %s instance cache %d closing", ctx.GetOpId(), ctx.GetInstanceId())
  416. if c.cacheConf.CleanCacheAtStop {
  417. kvTable := path.Join("sink", ctx.GetRuleId()+ctx.GetOpId()+strconv.Itoa(ctx.GetInstanceId()))
  418. ctx.GetLogger().Infof("cleaning cache store %s", kvTable)
  419. _ = store.DropCacheKV(kvTable)
  420. } else {
  421. if c.diskBufferPage != nil {
  422. err := c.store.Set(strconv.Itoa(c.diskPageTail), c.diskBufferPage)
  423. if err != nil {
  424. ctx.GetLogger().Errorf("fail to store disk cache %v", err)
  425. }
  426. err = c.store.Set("size", c.diskSize+1)
  427. if err != nil {
  428. ctx.GetLogger().Errorf("fail to store disk size %v", err)
  429. }
  430. ctx.GetLogger().Debug("store disk cache")
  431. }
  432. // store the memory states
  433. if len(c.memCache) > 0 {
  434. err := c.store.Set("memcache", c.memCache)
  435. if err != nil {
  436. ctx.GetLogger().Errorf("fail to store memory cache to disk %v", err)
  437. }
  438. ctx.GetLogger().Debugf("store memory cache %d", len(c.memCache))
  439. }
  440. _ = c.store.Set("storeSig", 1)
  441. }
  442. }