sync_cache.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package cache
  15. import (
  16. "path"
  17. "strconv"
  18. "time"
  19. "github.com/lf-edge/ekuiper/internal/conf"
  20. "github.com/lf-edge/ekuiper/internal/pkg/store"
  21. "github.com/lf-edge/ekuiper/internal/topo/node/metric"
  22. "github.com/lf-edge/ekuiper/pkg/api"
  23. "github.com/lf-edge/ekuiper/pkg/infra"
  24. "github.com/lf-edge/ekuiper/pkg/kv"
  25. )
  26. type AckResult bool
  27. // page Rotate storage for in memory cache
  28. // Not thread safe!
  29. type page struct {
  30. Data [][]map[string]interface{}
  31. H int
  32. T int
  33. L int
  34. Size int
  35. }
  36. // newPage create a new cache page
  37. // TODO the page is created even not used, need dynamic?
  38. func newPage(size int) *page {
  39. return &page{
  40. Data: make([][]map[string]interface{}, size),
  41. H: 0, // When deleting, head++, if tail == head, it is empty
  42. T: 0, // When append, tail++, if tail== head, it is full
  43. Size: size,
  44. }
  45. }
  46. // append item if list is not full and return true; otherwise return false
  47. func (p *page) append(item []map[string]interface{}) bool {
  48. if p.L == p.Size { // full
  49. return false
  50. }
  51. p.Data[p.T] = item
  52. p.T++
  53. if p.T == p.Size {
  54. p.T = 0
  55. }
  56. p.L++
  57. return true
  58. }
  59. // peak get the first item in the cache
  60. func (p *page) peak() ([]map[string]interface{}, bool) {
  61. if p.L == 0 {
  62. return nil, false
  63. }
  64. return p.Data[p.H], true
  65. }
  66. func (p *page) delete() bool {
  67. if p.L == 0 {
  68. return false
  69. }
  70. p.H++
  71. if p.H == p.Size {
  72. p.H = 0
  73. }
  74. p.L--
  75. return true
  76. }
  77. func (p *page) isEmpty() bool {
  78. return p.L == 0
  79. }
  80. func (p *page) reset() {
  81. p.H = 0
  82. p.T = 0
  83. p.L = 0
  84. }
  85. type SyncCache struct {
  86. // The input data to the cache
  87. in <-chan []map[string]interface{}
  88. Out chan []map[string]interface{}
  89. Ack chan bool
  90. cacheCtrl chan interface{} // CacheCtrl is the only place to control the cache; sync in and ack result
  91. errorCh chan<- error
  92. stats metric.StatManager
  93. // cache config
  94. cacheConf *conf.SinkConf
  95. maxDiskPage int
  96. maxMemPage int
  97. // cache storage
  98. memCache []*page
  99. diskBufferPage *page
  100. // status
  101. diskSize int // how many pages has been saved
  102. cacheLength int // readonly, for metrics only to save calculation
  103. diskPageTail int // init from the database
  104. diskPageHead int
  105. sendStatus int // 0: idle, 1: sending and waiting for ack, 2: stopped for error
  106. // serialize
  107. store kv.KeyValue
  108. exitCh chan<- struct{}
  109. }
  110. func NewSyncCacheWithExitChanel(ctx api.StreamContext, in <-chan []map[string]interface{}, errCh chan<- error, stats metric.StatManager, cacheConf *conf.SinkConf, bufferLength int, exitCh chan<- struct{}) *SyncCache {
  111. c := NewSyncCache(ctx, in, errCh, stats, cacheConf, bufferLength)
  112. c.exitCh = exitCh
  113. return c
  114. }
  115. func NewSyncCache(ctx api.StreamContext, in <-chan []map[string]interface{}, errCh chan<- error, stats metric.StatManager, cacheConf *conf.SinkConf, bufferLength int) *SyncCache {
  116. c := &SyncCache{
  117. cacheConf: cacheConf,
  118. in: in,
  119. Out: make(chan []map[string]interface{}, bufferLength),
  120. Ack: make(chan bool, 10),
  121. cacheCtrl: make(chan interface{}, 10),
  122. errorCh: errCh,
  123. maxMemPage: cacheConf.MemoryCacheThreshold / cacheConf.BufferPageSize,
  124. memCache: make([]*page, 0),
  125. // add one more slot so that there will be at least one slot between head and tail to find out the head/tail id
  126. maxDiskPage: (cacheConf.MaxDiskCache / cacheConf.BufferPageSize) + 1,
  127. stats: stats,
  128. }
  129. go func() {
  130. err := infra.SafeRun(func() error {
  131. c.run(ctx)
  132. return nil
  133. })
  134. if err != nil {
  135. infra.DrainError(ctx, err, errCh)
  136. }
  137. }()
  138. return c
  139. }
  140. func (c *SyncCache) run(ctx api.StreamContext) {
  141. c.initStore(ctx)
  142. defer c.onClose(ctx)
  143. if c.cacheLength > 0 { // start to send the cache
  144. c.send(ctx)
  145. }
  146. for {
  147. select {
  148. case item := <-c.in:
  149. ctx.GetLogger().Debugf("send to cache")
  150. go func() { // avoid deadlock when cacheCtrl is full
  151. c.cacheCtrl <- item
  152. }()
  153. case isSuccess := <-c.Ack:
  154. // only send the next sink after receiving an ack
  155. ctx.GetLogger().Debugf("cache ack")
  156. go func() {
  157. c.cacheCtrl <- AckResult(isSuccess)
  158. }()
  159. case data := <-c.cacheCtrl: // The only place to manipulate cache
  160. switch r := data.(type) {
  161. case AckResult:
  162. if r {
  163. ctx.GetLogger().Debugf("deleting cache")
  164. c.deleteCache(ctx)
  165. c.sendStatus = 0
  166. ctx.GetLogger().Debug("send status to 0 after true ack")
  167. } else {
  168. c.sendStatus = 2
  169. ctx.GetLogger().Debug("send status to 2 after false ack")
  170. }
  171. case []map[string]interface{}:
  172. ctx.GetLogger().Debugf("adding cache %v", data)
  173. c.addCache(ctx, r)
  174. if c.sendStatus == 2 {
  175. c.sendStatus = 0
  176. ctx.GetLogger().Debug("send status to 0 after adding cache in error state")
  177. }
  178. default:
  179. ctx.GetLogger().Errorf("unknown cache control command %v", data)
  180. }
  181. c.stats.SetBufferLength(int64(len(c.in) + c.cacheLength))
  182. if c.sendStatus == 0 {
  183. c.send(ctx)
  184. }
  185. case <-ctx.Done():
  186. ctx.GetLogger().Infof("sink node %s instance cache %d done", ctx.GetOpId(), ctx.GetInstanceId())
  187. return
  188. }
  189. }
  190. }
  191. func (c *SyncCache) send(ctx api.StreamContext) {
  192. if c.cacheLength > 1 && c.cacheConf.ResendInterval > 0 {
  193. time.Sleep(time.Duration(c.cacheConf.ResendInterval) * time.Millisecond)
  194. }
  195. d, ok := c.peakMemCache(ctx)
  196. if ok {
  197. ctx.GetLogger().Debugf("sending cache item %v", d)
  198. c.sendStatus = 1
  199. ctx.GetLogger().Debug("send status to 0 after sending tuple")
  200. select {
  201. case c.Out <- d:
  202. ctx.GetLogger().Debugf("sink cache send out %v", d)
  203. case <-ctx.Done():
  204. ctx.GetLogger().Debugf("stop sink cache send")
  205. }
  206. } else {
  207. ctx.GetLogger().Debug("no cache to send")
  208. }
  209. }
  210. // addCache not thread safe!
  211. func (c *SyncCache) addCache(ctx api.StreamContext, item []map[string]interface{}) {
  212. isNotFull := c.appendMemCache(item)
  213. if !isNotFull {
  214. if c.diskBufferPage == nil {
  215. c.diskBufferPage = newPage(c.cacheConf.BufferPageSize)
  216. }
  217. isBufferNotFull := c.diskBufferPage.append(item)
  218. if !isBufferNotFull { // cool page full, save to disk
  219. if c.diskSize == c.maxDiskPage {
  220. // disk full, read the oldest page to the hot page
  221. c.loadFromDisk(ctx)
  222. ctx.GetLogger().Debug("disk full, remove the last page")
  223. }
  224. err := c.store.Set(strconv.Itoa(c.diskPageTail), c.diskBufferPage)
  225. if err != nil {
  226. ctx.GetLogger().Errorf("fail to store disk cache %v", err)
  227. return
  228. } else {
  229. ctx.GetLogger().Debug("add cache to disk. the new disk buffer page is %v", c.diskBufferPage)
  230. c.diskPageTail++
  231. c.diskSize++
  232. err := c.store.Set("size", c.diskSize)
  233. if err != nil {
  234. ctx.GetLogger().Warnf("fail to store disk cache size %v", err)
  235. }
  236. // rotate
  237. if c.diskPageTail == c.maxDiskPage {
  238. c.diskPageTail = 0
  239. }
  240. }
  241. c.diskBufferPage.reset()
  242. c.diskBufferPage.append(item)
  243. } else {
  244. ctx.GetLogger().Debugf("added cache to disk buffer page %v", c.diskBufferPage)
  245. }
  246. } else {
  247. ctx.GetLogger().Debugf("added cache to mem cache %v", c.memCache)
  248. }
  249. c.cacheLength++
  250. ctx.GetLogger().Debugf("added cache %d", c.cacheLength)
  251. }
  252. // deleteCache not thread safe!
  253. func (c *SyncCache) deleteCache(ctx api.StreamContext) {
  254. ctx.GetLogger().Debugf("deleting cache. cacheLength: %d, diskSize: %d", c.cacheLength, c.diskSize)
  255. if len(c.memCache) == 0 {
  256. ctx.GetLogger().Debug("mem cache is empty")
  257. return
  258. }
  259. isNotEmpty := c.memCache[0].delete()
  260. if isNotEmpty {
  261. c.cacheLength--
  262. ctx.GetLogger().Debugf("deleted cache: %d", c.cacheLength)
  263. }
  264. if c.memCache[0].isEmpty() { // read from disk or cool list
  265. c.memCache = c.memCache[1:]
  266. if c.diskSize > 0 {
  267. c.loadFromDisk(ctx)
  268. } else if c.diskBufferPage != nil { // use cool page as the new page
  269. ctx.GetLogger().Debugf("reading from diskBufferPage: %d", c.cacheLength)
  270. c.memCache = append(c.memCache, c.diskBufferPage)
  271. c.diskBufferPage = nil
  272. }
  273. }
  274. ctx.GetLogger().Debugf("deleted cache. cacheLength: %d, diskSize: %d, memCache: %v", c.cacheLength, c.diskSize, c.memCache)
  275. }
  276. func (c *SyncCache) loadFromDisk(ctx api.StreamContext) {
  277. // load page from the disk
  278. ctx.GetLogger().Debugf("loading from disk %d. cacheLength: %d, diskSize: %d", c.diskPageTail, c.cacheLength, c.diskSize)
  279. hotPage := newPage(c.cacheConf.BufferPageSize)
  280. ok, err := c.store.Get(strconv.Itoa(c.diskPageHead), hotPage)
  281. if err != nil {
  282. ctx.GetLogger().Errorf("fail to load disk cache %v", err)
  283. } else if !ok {
  284. ctx.GetLogger().Errorf("nothing in the disk, should not happen")
  285. } else {
  286. _ = c.store.Delete(strconv.Itoa(c.diskPageHead))
  287. if len(c.memCache) >= c.maxMemPage {
  288. ctx.GetLogger().Warnf("drop a page of %d items in memory", c.memCache[0].L)
  289. c.cacheLength -= c.memCache[0].L
  290. c.memCache = c.memCache[1:]
  291. }
  292. c.memCache = append(c.memCache, hotPage)
  293. c.diskPageHead++
  294. c.diskSize--
  295. err := c.store.Set("size", c.diskSize)
  296. if err != nil {
  297. ctx.GetLogger().Warnf("fail to store disk cache size %v", err)
  298. }
  299. if c.diskPageHead == c.maxDiskPage {
  300. c.diskPageHead = 0
  301. }
  302. err = c.store.Set("head", c.diskPageHead)
  303. if err != nil {
  304. ctx.GetLogger().Warnf("fail to store disk cache size %v", err)
  305. }
  306. }
  307. ctx.GetLogger().Debugf("loaded from disk %d. cacheLength: %d, diskSize: %d", c.diskPageTail, c.cacheLength, c.diskSize)
  308. }
  309. func (c *SyncCache) appendMemCache(item []map[string]interface{}) bool {
  310. if len(c.memCache) > c.maxMemPage {
  311. return false
  312. }
  313. if len(c.memCache) == 0 {
  314. c.memCache = append(c.memCache, newPage(c.cacheConf.BufferPageSize))
  315. }
  316. isNotFull := c.memCache[len(c.memCache)-1].append(item)
  317. if !isNotFull {
  318. if len(c.memCache) == c.maxMemPage {
  319. return false
  320. }
  321. c.memCache = append(c.memCache, newPage(c.cacheConf.BufferPageSize))
  322. return c.memCache[len(c.memCache)-1].append(item)
  323. }
  324. return true
  325. }
  326. func (c *SyncCache) peakMemCache(_ api.StreamContext) ([]map[string]interface{}, bool) {
  327. if len(c.memCache) == 0 {
  328. return nil, false
  329. }
  330. return c.memCache[0].peak()
  331. }
  332. func (c *SyncCache) initStore(ctx api.StreamContext) {
  333. kvTable := path.Join("sink", ctx.GetRuleId()+ctx.GetOpId()+strconv.Itoa(ctx.GetInstanceId()))
  334. if c.cacheConf.CleanCacheAtStop {
  335. ctx.GetLogger().Infof("creating cache store %s", kvTable)
  336. store.DropCacheKV(kvTable)
  337. }
  338. var err error
  339. c.store, err = store.GetCacheKV(kvTable)
  340. if err != nil {
  341. infra.DrainError(ctx, err, c.errorCh)
  342. }
  343. // restore the sink cache from disk
  344. if !c.cacheConf.CleanCacheAtStop {
  345. // Save 0 when init and save 1 when close. Wait for close for newly started sink node
  346. var set int
  347. ok, _ := c.store.Get("storeSig", &set)
  348. if ok && set == 0 { // may be saving
  349. i := 0
  350. for ; i < 100; i++ {
  351. time.Sleep(time.Millisecond * 10)
  352. c.store.Get("storeSig", &set)
  353. if set == 1 {
  354. ctx.GetLogger().Infof("waiting for previous cache for %d times", i)
  355. break
  356. }
  357. }
  358. if i == 100 {
  359. ctx.GetLogger().Errorf("waiting for previous cache for %d times, exit and drop", i)
  360. }
  361. }
  362. c.store.Set("storeSig", 0)
  363. ctx.GetLogger().Infof("start to restore cache from disk")
  364. // restore the memCache
  365. _, err = c.store.Get("memcache", &c.memCache)
  366. if err != nil {
  367. ctx.GetLogger().Errorf("fail to restore mem cache %v", err)
  368. }
  369. for _, p := range c.memCache {
  370. c.cacheLength += p.L
  371. }
  372. err = c.store.Delete("memcache")
  373. if err != nil {
  374. ctx.GetLogger().Errorf("fail to delete mem cache %v", err)
  375. }
  376. ctx.GetLogger().Infof("restored mem cache %d", c.cacheLength)
  377. // restore the disk cache
  378. var size int
  379. ok, _ = c.store.Get("size", &size)
  380. if !ok || size == 0 { // no disk cache
  381. return
  382. }
  383. c.diskSize = size
  384. var head int
  385. ok, _ = c.store.Get("head", &head)
  386. if ok {
  387. c.diskPageHead = head
  388. }
  389. c.cacheLength += (c.diskSize - 1) * c.cacheConf.BufferPageSize
  390. c.diskPageTail = (c.diskPageHead + c.diskSize - 1) % c.maxDiskPage
  391. // load buffer page
  392. hotPage := newPage(c.cacheConf.BufferPageSize)
  393. ok, err = c.store.Get(strconv.Itoa(c.diskPageTail), hotPage)
  394. if err != nil {
  395. ctx.GetLogger().Errorf("fail to load disk cache to buffer %v", err)
  396. } else if !ok {
  397. ctx.GetLogger().Errorf("nothing in the disk, should not happen")
  398. } else {
  399. c.diskBufferPage = hotPage
  400. c.cacheLength += c.diskBufferPage.L
  401. c.diskSize--
  402. }
  403. ctx.GetLogger().Infof("restored all cache %d. diskSize %d", c.cacheLength, c.diskSize)
  404. }
  405. }
  406. // save memory states to disk
  407. func (c *SyncCache) onClose(ctx api.StreamContext) {
  408. defer func() {
  409. if c.exitCh != nil {
  410. c.exitCh <- struct{}{}
  411. }
  412. }()
  413. ctx.GetLogger().Infof("sink node %s instance cache %d closing", ctx.GetOpId(), ctx.GetInstanceId())
  414. if c.cacheConf.CleanCacheAtStop {
  415. kvTable := path.Join("sink", ctx.GetRuleId()+ctx.GetOpId()+strconv.Itoa(ctx.GetInstanceId()))
  416. ctx.GetLogger().Infof("cleaning cache store %s", kvTable)
  417. store.DropCacheKV(kvTable)
  418. } else {
  419. if c.diskBufferPage != nil {
  420. err := c.store.Set(strconv.Itoa(c.diskPageTail), c.diskBufferPage)
  421. if err != nil {
  422. ctx.GetLogger().Errorf("fail to store disk cache %v", err)
  423. }
  424. err = c.store.Set("size", c.diskSize+1)
  425. if err != nil {
  426. ctx.GetLogger().Errorf("fail to store disk size %v", err)
  427. }
  428. ctx.GetLogger().Debug("store disk cache")
  429. }
  430. // store the memory states
  431. if len(c.memCache) > 0 {
  432. err := c.store.Set("memcache", c.memCache)
  433. if err != nil {
  434. ctx.GetLogger().Errorf("fail to store memory cache to disk %v", err)
  435. }
  436. ctx.GetLogger().Debugf("store memory cache %d", len(c.memCache))
  437. }
  438. c.store.Set("storeSig", 1)
  439. }
  440. }