kv_store.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. package states
  2. import (
  3. "bytes"
  4. "encoding/gob"
  5. "fmt"
  6. "github.com/emqx/kuiper/common"
  7. "github.com/emqx/kuiper/xstream/checkpoints"
  8. "path"
  9. "sync"
  10. )
  11. func init() {
  12. gob.Register(map[string]interface{}{})
  13. gob.Register(checkpoints.BufferOrEvent{})
  14. }
  15. //The manager for checkpoint storage.
  16. /**
  17. *** mapStore keys
  18. *** { "checkpoint1", "checkpoint2" ... "checkpointn" : The complete or incomplete snapshot
  19. */
  20. type KVStore struct {
  21. db common.KeyValue
  22. mapStore *sync.Map //The current root store of a rule
  23. checkpoints []int64
  24. max int
  25. }
  26. //Store in path ./data/checkpoint/$ruleId
  27. //Store 2 things:
  28. //"checkpoints":A queue for completed checkpoint id
  29. //"$checkpointId":A map with key of checkpoint id and value of snapshot(gob serialized)
  30. //Assume each operator only has one instance
  31. func getKVStore(ruleId string) (*KVStore, error) {
  32. dr, _ := common.GetDataLoc()
  33. db := common.GetSimpleKVStore(path.Join(dr, "checkpoints", ruleId))
  34. s := &KVStore{db: db, max: 3, mapStore: &sync.Map{}}
  35. //read data from badger db
  36. if err := s.restore(); err != nil {
  37. return nil, err
  38. }
  39. return s, nil
  40. }
  41. func (s *KVStore) restore() error {
  42. err := s.db.Open()
  43. if err != nil {
  44. return err
  45. }
  46. defer s.db.Close()
  47. if b, ok := s.db.Get(CheckpointListKey); ok {
  48. if cs, err := bytesToSlice(b.([]byte)); err != nil {
  49. return fmt.Errorf("invalid checkpoint data: %s", err)
  50. } else {
  51. s.checkpoints = cs
  52. for _, c := range cs {
  53. if b2, ok := s.db.Get(fmt.Sprintf("%d", c)); ok {
  54. if m, err := bytesToMap(b2.([]byte)); err != nil {
  55. return fmt.Errorf("invalid checkpoint data: %s", err)
  56. } else {
  57. s.mapStore.Store(c, common.MapToSyncMap(m))
  58. }
  59. }
  60. }
  61. }
  62. }
  63. return nil
  64. }
  65. func (s *KVStore) SaveState(checkpointId int64, opId string, state map[string]interface{}) error {
  66. logger := common.Log
  67. logger.Debugf("Save state for checkpoint %d, op %s, value %v", checkpointId, opId, state)
  68. var cstore *sync.Map
  69. if v, ok := s.mapStore.Load(checkpointId); !ok {
  70. cstore = &sync.Map{}
  71. s.mapStore.Store(checkpointId, cstore)
  72. } else {
  73. if cstore, ok = v.(*sync.Map); !ok {
  74. return fmt.Errorf("invalid KVStore for checkpointId %d with value %v: should be *sync.Map type", checkpointId, v)
  75. }
  76. }
  77. cstore.Store(opId, state)
  78. return nil
  79. }
  80. func (s *KVStore) SaveCheckpoint(checkpointId int64) error {
  81. if v, ok := s.mapStore.Load(checkpointId); !ok {
  82. return fmt.Errorf("store for checkpoint %d not found", checkpointId)
  83. } else {
  84. if m, ok := v.(*sync.Map); !ok {
  85. return fmt.Errorf("invalid KVStore for checkpointId %d with value %v: should be *sync.Map type", checkpointId, v)
  86. } else {
  87. err := s.db.Open()
  88. if err != nil {
  89. return fmt.Errorf("save checkpoint err: %v", err)
  90. }
  91. defer s.db.Close()
  92. b, err := mapToBytes(m)
  93. if err != nil {
  94. return fmt.Errorf("save checkpoint err, fail to encode states: %s", err)
  95. }
  96. err = s.db.Replace(fmt.Sprintf("%d", checkpointId), b)
  97. if err != nil {
  98. return fmt.Errorf("save checkpoint err: %v", err)
  99. }
  100. m.Delete(checkpointId)
  101. s.checkpoints = append(s.checkpoints, checkpointId)
  102. //TODO is the order promised?
  103. if len(s.checkpoints) > s.max {
  104. cp := s.checkpoints[0]
  105. s.checkpoints = s.checkpoints[1:]
  106. go func() {
  107. _ = s.db.Delete(fmt.Sprintf("%d", cp))
  108. }()
  109. }
  110. cs, ok := sliceToBytes(s.checkpoints)
  111. if !ok {
  112. return fmt.Errorf("save checkpoint err: fail to encode checkpoint counts")
  113. }
  114. err = s.db.Replace(CheckpointListKey, cs)
  115. if err != nil {
  116. return fmt.Errorf("save checkpoint err: %v", err)
  117. }
  118. }
  119. }
  120. return nil
  121. }
  122. //Only run in the initialization
  123. func (s *KVStore) GetOpState(opId string) (*sync.Map, error) {
  124. if len(s.checkpoints) > 0 {
  125. if v, ok := s.mapStore.Load(s.checkpoints[len(s.checkpoints)-1]); ok {
  126. if cstore, ok := v.(*sync.Map); !ok {
  127. return nil, fmt.Errorf("invalid state %v stored for op %s: data type is not *sync.Map", v, opId)
  128. } else {
  129. if sm, ok := cstore.Load(opId); ok {
  130. switch m := sm.(type) {
  131. case *sync.Map:
  132. return m, nil
  133. case map[string]interface{}:
  134. return common.MapToSyncMap(m), nil
  135. default:
  136. return nil, fmt.Errorf("invalid state %v stored for op %s: data type is not *sync.Map", sm, opId)
  137. }
  138. }
  139. }
  140. } else {
  141. return nil, fmt.Errorf("store for checkpoint %d not found", s.checkpoints[len(s.checkpoints)-1])
  142. }
  143. }
  144. return &sync.Map{}, nil
  145. }
  146. func mapToBytes(sm *sync.Map) ([]byte, error) {
  147. m := common.SyncMapToMap(sm)
  148. var buf bytes.Buffer
  149. enc := gob.NewEncoder(&buf)
  150. if err := enc.Encode(m); err != nil {
  151. return nil, err
  152. }
  153. return buf.Bytes(), nil
  154. }
  155. func bytesToMap(input []byte) (map[string]interface{}, error) {
  156. var result map[string]interface{}
  157. buf := bytes.NewBuffer(input)
  158. dec := gob.NewDecoder(buf)
  159. if err := dec.Decode(&result); err != nil {
  160. return nil, err
  161. }
  162. return result, nil
  163. }
  164. func sliceToBytes(s []int64) ([]byte, bool) {
  165. var buf bytes.Buffer
  166. enc := gob.NewEncoder(&buf)
  167. if err := enc.Encode(s); err != nil {
  168. return nil, false
  169. }
  170. return buf.Bytes(), true
  171. }
  172. func bytesToSlice(input []byte) ([]int64, error) {
  173. result := make([]int64, 3)
  174. buf := bytes.NewBuffer(input)
  175. dec := gob.NewDecoder(buf)
  176. if err := dec.Decode(&result); err != nil {
  177. return nil, err
  178. }
  179. return result, nil
  180. }