kv_store.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. package states
  2. import (
  3. "bytes"
  4. "encoding/gob"
  5. "fmt"
  6. "github.com/emqx/kuiper/common"
  7. "github.com/emqx/kuiper/xstream/checkpoints"
  8. "path"
  9. "sync"
  10. )
  11. func init() {
  12. gob.Register(map[string]interface{}{})
  13. gob.Register(checkpoints.BufferOrEvent{})
  14. }
  15. //The manager for checkpoint storage.
  16. /**
  17. *** mapStore keys
  18. *** { "checkpoint1", "checkpoint2" ... "checkpointn" : The complete or incomplete snapshot
  19. *** "op1", "op2" ... "opn": the current state for all ops
  20. */
  21. type KVStore struct {
  22. db common.KeyValue
  23. mapStore *sync.Map //The current root store of a rule
  24. checkpoints []int64
  25. max int
  26. }
  27. //Store in path ./data/checkpoint/$ruleId
  28. //Store 2 things:
  29. //"checkpoints":A queue for completed checkpoint id
  30. //"$checkpointId":A map with key of checkpoint id and value of snapshot(gob serialized)
  31. //The snapshot is a map also with key of opId and value of map
  32. //Assume each operator only has one instance
  33. func getKVStore(ruleId string) (*KVStore, error) {
  34. dr, _ := common.GetDataLoc()
  35. db := common.GetSimpleKVStore(path.Join(dr, "checkpoints", ruleId))
  36. s := &KVStore{db: db, max: 3, mapStore: &sync.Map{}}
  37. //read data from badger db
  38. if err := s.restore(); err != nil {
  39. return nil, err
  40. }
  41. return s, nil
  42. }
  43. func (s *KVStore) restore() error {
  44. err := s.db.Open()
  45. if err != nil {
  46. return err
  47. }
  48. defer s.db.Close()
  49. if bytes, ok := s.db.Get(CheckpointListKey); ok {
  50. if cs, err := bytesToSlice(bytes.([]byte)); err != nil {
  51. return fmt.Errorf("invalid checkpoint data: %s", err)
  52. } else {
  53. s.checkpoints = cs
  54. for _, c := range cs {
  55. if bytes, ok := s.db.Get(string(c)); ok {
  56. if m, err := bytesToMap(bytes.([]byte)); err != nil {
  57. return fmt.Errorf("invalid checkpoint data: %s", err)
  58. } else {
  59. s.mapStore.Store(c, common.MapToSyncMap(m))
  60. }
  61. }
  62. }
  63. }
  64. }
  65. return nil
  66. }
  67. func (s *KVStore) SaveState(checkpointId int64, opId string, state map[string]interface{}) error {
  68. logger := common.Log
  69. logger.Debugf("Save state for checkpoint %d, op %s, value %v", checkpointId, opId, state)
  70. var cstore *sync.Map
  71. if v, ok := s.mapStore.Load(checkpointId); !ok {
  72. cstore = &sync.Map{}
  73. s.mapStore.Store(checkpointId, cstore)
  74. } else {
  75. if cstore, ok = v.(*sync.Map); !ok {
  76. return fmt.Errorf("invalid KVStore for checkpointId %d with value %v: should be *sync.Map type", checkpointId, v)
  77. }
  78. }
  79. cstore.Store(opId, state)
  80. return nil
  81. }
  82. func (s *KVStore) SaveCheckpoint(checkpointId int64) error {
  83. if v, ok := s.mapStore.Load(checkpointId); !ok {
  84. return fmt.Errorf("store for checkpoint %d not found", checkpointId)
  85. } else {
  86. if m, ok := v.(*sync.Map); !ok {
  87. return fmt.Errorf("invalid KVStore for checkpointId %d with value %v: should be *sync.Map type", checkpointId, v)
  88. } else {
  89. err := s.db.Open()
  90. if err != nil {
  91. return fmt.Errorf("save checkpoint err: %v", err)
  92. }
  93. defer s.db.Close()
  94. b, err := mapToBytes(m)
  95. if err != nil {
  96. return fmt.Errorf("save checkpoint err, fail to encode states: %s", err)
  97. }
  98. err = s.db.Replace(string(checkpointId), b)
  99. if err != nil {
  100. return fmt.Errorf("save checkpoint err: %v", err)
  101. }
  102. m.Delete(checkpointId)
  103. s.checkpoints = append(s.checkpoints, checkpointId)
  104. //TODO is the order promised?
  105. if len(s.checkpoints) > s.max {
  106. cp := s.checkpoints[0]
  107. s.checkpoints = s.checkpoints[1:]
  108. go func() {
  109. s.db.Delete(string(cp))
  110. }()
  111. }
  112. cs, ok := sliceToBytes(s.checkpoints)
  113. if !ok {
  114. return fmt.Errorf("save checkpoint err: fail to encode checkpoint counts")
  115. }
  116. err = s.db.Replace(CheckpointListKey, cs)
  117. if err != nil {
  118. return fmt.Errorf("save checkpoint err: %v", err)
  119. }
  120. }
  121. }
  122. return nil
  123. }
  124. //Only run in the initialization
  125. func (s *KVStore) GetOpState(opId string) (*sync.Map, error) {
  126. if len(s.checkpoints) > 0 {
  127. if v, ok := s.mapStore.Load(s.checkpoints[len(s.checkpoints)-1]); ok {
  128. if cstore, ok := v.(*sync.Map); !ok {
  129. return nil, fmt.Errorf("invalid state %v stored for op %s: data type is not *sync.Map", v, opId)
  130. } else {
  131. if sm, ok := cstore.Load(opId); ok {
  132. switch m := sm.(type) {
  133. case *sync.Map:
  134. return m, nil
  135. case map[string]interface{}:
  136. return common.MapToSyncMap(m), nil
  137. default:
  138. return nil, fmt.Errorf("invalid state %v stored for op %s: data type is not *sync.Map", sm, opId)
  139. }
  140. }
  141. }
  142. } else {
  143. return nil, fmt.Errorf("store for checkpoint %d not found", s.checkpoints[len(s.checkpoints)-1])
  144. }
  145. }
  146. return &sync.Map{}, nil
  147. }
  148. func mapToBytes(sm *sync.Map) ([]byte, error) {
  149. m := common.SyncMapToMap(sm)
  150. var buf bytes.Buffer
  151. enc := gob.NewEncoder(&buf)
  152. if err := enc.Encode(m); err != nil {
  153. return nil, err
  154. }
  155. return buf.Bytes(), nil
  156. }
  157. func bytesToMap(input []byte) (map[string]interface{}, error) {
  158. var result map[string]interface{}
  159. buf := bytes.NewBuffer(input)
  160. dec := gob.NewDecoder(buf)
  161. if err := dec.Decode(&result); err != nil {
  162. return nil, err
  163. }
  164. return result, nil
  165. }
  166. func sliceToBytes(s []int64) ([]byte, bool) {
  167. var buf bytes.Buffer
  168. enc := gob.NewEncoder(&buf)
  169. if err := enc.Encode(s); err != nil {
  170. return nil, false
  171. }
  172. return buf.Bytes(), true
  173. }
  174. func bytesToSlice(input []byte) ([]int64, error) {
  175. result := make([]int64, 3)
  176. buf := bytes.NewBuffer(input)
  177. dec := gob.NewDecoder(buf)
  178. if err := dec.Decode(&result); err != nil {
  179. return nil, err
  180. }
  181. return result, nil
  182. }