kv_store.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package state
  15. import (
  16. "encoding/gob"
  17. "fmt"
  18. "sync"
  19. "github.com/lf-edge/ekuiper/internal/conf"
  20. ts "github.com/lf-edge/ekuiper/internal/pkg/store"
  21. "github.com/lf-edge/ekuiper/internal/topo/checkpoint"
  22. "github.com/lf-edge/ekuiper/pkg/cast"
  23. ts2 "github.com/lf-edge/ekuiper/pkg/kv"
  24. )
  25. func init() {
  26. gob.Register(map[string]interface{}{})
  27. gob.Register(checkpoint.BufferOrEvent{})
  28. }
  29. // KVStore The manager for checkpoint storage.
  30. //
  31. // mapStore keys
  32. //
  33. // { "checkpoint1", "checkpoint2" ... "checkpointn" : The complete or incomplete snapshot
  34. type KVStore struct {
  35. db ts2.Tskv
  36. mapStore *sync.Map // The current root store of a rule
  37. checkpoints []int64
  38. max int
  39. ruleId string
  40. }
  41. // Store in path ./data/checkpoint/$ruleId
  42. // Store 2 things:
  43. // "checkpoints":A queue for completed checkpoint id
  44. // "$checkpointId":A map with key of checkpoint id and value of snapshot(gob serialized)
  45. // Assume each operator only has one instance
  46. func getKVStore(ruleId string) (*KVStore, error) {
  47. db, err := ts.GetTS(ruleId)
  48. if err != nil {
  49. return nil, err
  50. }
  51. s := &KVStore{db: db, max: 3, mapStore: &sync.Map{}, ruleId: ruleId}
  52. // read data from badger db
  53. if err := s.restore(); err != nil {
  54. return nil, err
  55. }
  56. return s, nil
  57. }
  58. func (s *KVStore) restore() error {
  59. var m map[string]interface{}
  60. k, err := s.db.Last(&m)
  61. if err != nil {
  62. return err
  63. }
  64. if k > 0 {
  65. s.checkpoints = []int64{k}
  66. s.mapStore.Store(k, cast.MapToSyncMap(m))
  67. }
  68. return nil
  69. }
  70. func (s *KVStore) SaveState(checkpointId int64, opId string, state map[string]interface{}) error {
  71. logger := conf.Log
  72. logger.Debugf("Save state for checkpoint %d, op %s, value %v", checkpointId, opId, state)
  73. var cstore *sync.Map
  74. if v, ok := s.mapStore.Load(checkpointId); !ok {
  75. cstore = &sync.Map{}
  76. s.mapStore.Store(checkpointId, cstore)
  77. } else {
  78. if cstore, ok = v.(*sync.Map); !ok {
  79. return fmt.Errorf("invalid KVStore for checkpointId %d with value %v: should be *sync.Map type", checkpointId, v)
  80. }
  81. }
  82. cstore.Store(opId, state)
  83. return nil
  84. }
  85. func (s *KVStore) SaveCheckpoint(checkpointId int64) error {
  86. if v, ok := s.mapStore.Load(checkpointId); !ok {
  87. return fmt.Errorf("store for checkpoint %d not found", checkpointId)
  88. } else {
  89. if m, ok := v.(*sync.Map); !ok {
  90. return fmt.Errorf("invalid KVStore for checkpointId %d with value %v: should be *sync.Map type", checkpointId, v)
  91. } else {
  92. s.checkpoints = append(s.checkpoints, checkpointId)
  93. // TODO is the order promised?
  94. for len(s.checkpoints) > s.max {
  95. cp := s.checkpoints[0]
  96. s.checkpoints = s.checkpoints[1:]
  97. s.mapStore.Delete(cp)
  98. }
  99. _, err := s.db.Set(checkpointId, cast.SyncMapToMap(m))
  100. if err != nil {
  101. return fmt.Errorf("save checkpoint err: %v", err)
  102. }
  103. }
  104. }
  105. return nil
  106. }
  107. // GetOpState Only run in the initialization
  108. func (s *KVStore) GetOpState(opId string) (*sync.Map, error) {
  109. if len(s.checkpoints) > 0 {
  110. if v, ok := s.mapStore.Load(s.checkpoints[len(s.checkpoints)-1]); ok {
  111. if cstore, ok := v.(*sync.Map); !ok {
  112. return nil, fmt.Errorf("invalid state %v stored for op %s: data type is not *sync.Map", v, opId)
  113. } else {
  114. if sm, ok := cstore.Load(opId); ok {
  115. switch m := sm.(type) {
  116. case *sync.Map:
  117. return m, nil
  118. case map[string]interface{}:
  119. return cast.MapToSyncMap(m), nil
  120. default:
  121. return nil, fmt.Errorf("invalid state %v stored for op %s: data type is not *sync.Map", sm, opId)
  122. }
  123. }
  124. }
  125. } else {
  126. return nil, fmt.Errorf("store for checkpoint %d not found", s.checkpoints[len(s.checkpoints)-1])
  127. }
  128. }
  129. return &sync.Map{}, nil
  130. }
  131. func (s *KVStore) Clean() error {
  132. return s.db.DeleteBefore(s.checkpoints[0])
  133. }