kv_store.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. package state
  2. import (
  3. "encoding/gob"
  4. "fmt"
  5. "github.com/lf-edge/ekuiper/internal/conf"
  6. "github.com/lf-edge/ekuiper/internal/topo/checkpoint"
  7. "github.com/lf-edge/ekuiper/pkg/cast"
  8. "github.com/lf-edge/ekuiper/pkg/kv"
  9. "path"
  10. "sync"
  11. )
  12. func init() {
  13. gob.Register(map[string]interface{}{})
  14. gob.Register(checkpoint.BufferOrEvent{})
  15. }
  16. // KVStore The manager for checkpoint storage.
  17. //
  18. // mapStore keys
  19. // { "checkpoint1", "checkpoint2" ... "checkpointn" : The complete or incomplete snapshot
  20. //
  21. type KVStore struct {
  22. db kv.KeyValue
  23. mapStore *sync.Map //The current root store of a rule
  24. checkpoints []int64
  25. max int
  26. }
  27. //Store in path ./data/checkpoint/$ruleId
  28. //Store 2 things:
  29. //"checkpoints":A queue for completed checkpoint id
  30. //"$checkpointId":A map with key of checkpoint id and value of snapshot(gob serialized)
  31. //Assume each operator only has one instance
  32. func getKVStore(ruleId string) (*KVStore, error) {
  33. dr, _ := conf.GetDataLoc()
  34. db := kv.GetDefaultKVStore(path.Join(dr, ruleId, CheckpointListKey))
  35. s := &KVStore{db: db, max: 3, mapStore: &sync.Map{}}
  36. //read data from badger db
  37. if err := s.restore(); err != nil {
  38. return nil, err
  39. }
  40. return s, nil
  41. }
  42. func (s *KVStore) restore() error {
  43. err := s.db.Open()
  44. if err != nil {
  45. return err
  46. }
  47. defer s.db.Close()
  48. var cs []int64
  49. if ok, _ := s.db.Get(CheckpointListKey, &cs); ok {
  50. s.checkpoints = cs
  51. for _, c := range cs {
  52. var m map[string]interface{}
  53. if ok, _ := s.db.Get(fmt.Sprintf("%d", c), &m); ok {
  54. s.mapStore.Store(c, cast.MapToSyncMap(m))
  55. } else {
  56. return fmt.Errorf("invalid checkpoint data: %v", c)
  57. }
  58. }
  59. }
  60. return nil
  61. }
  62. func (s *KVStore) SaveState(checkpointId int64, opId string, state map[string]interface{}) error {
  63. logger := conf.Log
  64. logger.Debugf("Save state for checkpoint %d, op %s, value %v", checkpointId, opId, state)
  65. var cstore *sync.Map
  66. if v, ok := s.mapStore.Load(checkpointId); !ok {
  67. cstore = &sync.Map{}
  68. s.mapStore.Store(checkpointId, cstore)
  69. } else {
  70. if cstore, ok = v.(*sync.Map); !ok {
  71. return fmt.Errorf("invalid KVStore for checkpointId %d with value %v: should be *sync.Map type", checkpointId, v)
  72. }
  73. }
  74. cstore.Store(opId, state)
  75. return nil
  76. }
  77. func (s *KVStore) SaveCheckpoint(checkpointId int64) error {
  78. if v, ok := s.mapStore.Load(checkpointId); !ok {
  79. return fmt.Errorf("store for checkpoint %d not found", checkpointId)
  80. } else {
  81. if m, ok := v.(*sync.Map); !ok {
  82. return fmt.Errorf("invalid KVStore for checkpointId %d with value %v: should be *sync.Map type", checkpointId, v)
  83. } else {
  84. err := s.db.Open()
  85. if err != nil {
  86. return fmt.Errorf("save checkpoint err: %v", err)
  87. }
  88. err = s.db.Set(fmt.Sprintf("%d", checkpointId), cast.SyncMapToMap(m))
  89. if err != nil {
  90. return fmt.Errorf("save checkpoint err: %v", err)
  91. }
  92. s.checkpoints = append(s.checkpoints, checkpointId)
  93. //TODO is the order promised?
  94. for len(s.checkpoints) > s.max {
  95. cp := s.checkpoints[0]
  96. s.checkpoints = s.checkpoints[1:]
  97. s.mapStore.Delete(cp)
  98. err = s.db.Delete(fmt.Sprintf("%d", cp))
  99. if err != nil {
  100. fmt.Printf("delete checkpoint %d db store error %s", cp, err)
  101. }
  102. }
  103. err = s.db.Set(CheckpointListKey, s.checkpoints)
  104. if err != nil {
  105. return fmt.Errorf("save checkpoint err: %v", err)
  106. }
  107. }
  108. }
  109. return nil
  110. }
  111. // GetOpState Only run in the initialization
  112. func (s *KVStore) GetOpState(opId string) (*sync.Map, error) {
  113. if len(s.checkpoints) > 0 {
  114. if v, ok := s.mapStore.Load(s.checkpoints[len(s.checkpoints)-1]); ok {
  115. if cstore, ok := v.(*sync.Map); !ok {
  116. return nil, fmt.Errorf("invalid state %v stored for op %s: data type is not *sync.Map", v, opId)
  117. } else {
  118. if sm, ok := cstore.Load(opId); ok {
  119. switch m := sm.(type) {
  120. case *sync.Map:
  121. return m, nil
  122. case map[string]interface{}:
  123. return cast.MapToSyncMap(m), nil
  124. default:
  125. return nil, fmt.Errorf("invalid state %v stored for op %s: data type is not *sync.Map", sm, opId)
  126. }
  127. }
  128. }
  129. } else {
  130. return nil, fmt.Errorf("store for checkpoint %d not found", s.checkpoints[len(s.checkpoints)-1])
  131. }
  132. }
  133. return &sync.Map{}, nil
  134. }