kv_store.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. package state
  2. import (
  3. "encoding/gob"
  4. "fmt"
  5. "github.com/lf-edge/ekuiper/internal/conf"
  6. "github.com/lf-edge/ekuiper/internal/pkg/tskv"
  7. "github.com/lf-edge/ekuiper/internal/topo/checkpoint"
  8. "github.com/lf-edge/ekuiper/pkg/cast"
  9. "sync"
  10. )
  11. func init() {
  12. gob.Register(map[string]interface{}{})
  13. gob.Register(checkpoint.BufferOrEvent{})
  14. }
  15. // KVStore The manager for checkpoint storage.
  16. //
  17. // mapStore keys
  18. // { "checkpoint1", "checkpoint2" ... "checkpointn" : The complete or incomplete snapshot
  19. //
  20. type KVStore struct {
  21. db tskv.Tskv
  22. mapStore *sync.Map //The current root store of a rule
  23. checkpoints []int64
  24. max int
  25. ruleId string
  26. }
  27. //Store in path ./data/checkpoint/$ruleId
  28. //Store 2 things:
  29. //"checkpoints":A queue for completed checkpoint id
  30. //"$checkpointId":A map with key of checkpoint id and value of snapshot(gob serialized)
  31. //Assume each operator only has one instance
  32. func getKVStore(ruleId string) (*KVStore, error) {
  33. db, err := tskv.NewSqlite(ruleId)
  34. if err != nil {
  35. return nil, err
  36. }
  37. s := &KVStore{db: db, max: 3, mapStore: &sync.Map{}, ruleId: ruleId}
  38. //read data from badger db
  39. if err := s.restore(); err != nil {
  40. return nil, err
  41. }
  42. return s, nil
  43. }
  44. func (s *KVStore) restore() error {
  45. var m map[string]interface{}
  46. k, err := s.db.Last(&m)
  47. if err != nil {
  48. return err
  49. }
  50. s.checkpoints = []int64{k}
  51. s.mapStore.Store(k, cast.MapToSyncMap(m))
  52. return nil
  53. }
  54. func (s *KVStore) SaveState(checkpointId int64, opId string, state map[string]interface{}) error {
  55. logger := conf.Log
  56. logger.Debugf("Save state for checkpoint %d, op %s, value %v", checkpointId, opId, state)
  57. var cstore *sync.Map
  58. if v, ok := s.mapStore.Load(checkpointId); !ok {
  59. cstore = &sync.Map{}
  60. s.mapStore.Store(checkpointId, cstore)
  61. } else {
  62. if cstore, ok = v.(*sync.Map); !ok {
  63. return fmt.Errorf("invalid KVStore for checkpointId %d with value %v: should be *sync.Map type", checkpointId, v)
  64. }
  65. }
  66. cstore.Store(opId, state)
  67. return nil
  68. }
  69. func (s *KVStore) SaveCheckpoint(checkpointId int64) error {
  70. if v, ok := s.mapStore.Load(checkpointId); !ok {
  71. return fmt.Errorf("store for checkpoint %d not found", checkpointId)
  72. } else {
  73. if m, ok := v.(*sync.Map); !ok {
  74. return fmt.Errorf("invalid KVStore for checkpointId %d with value %v: should be *sync.Map type", checkpointId, v)
  75. } else {
  76. s.checkpoints = append(s.checkpoints, checkpointId)
  77. //TODO is the order promised?
  78. for len(s.checkpoints) > s.max {
  79. cp := s.checkpoints[0]
  80. s.checkpoints = s.checkpoints[1:]
  81. s.mapStore.Delete(cp)
  82. }
  83. _, err := s.db.Set(checkpointId, cast.SyncMapToMap(m))
  84. if err != nil {
  85. return fmt.Errorf("save checkpoint err: %v", err)
  86. }
  87. }
  88. }
  89. return nil
  90. }
  91. // GetOpState Only run in the initialization
  92. func (s *KVStore) GetOpState(opId string) (*sync.Map, error) {
  93. if len(s.checkpoints) > 0 {
  94. if v, ok := s.mapStore.Load(s.checkpoints[len(s.checkpoints)-1]); ok {
  95. if cstore, ok := v.(*sync.Map); !ok {
  96. return nil, fmt.Errorf("invalid state %v stored for op %s: data type is not *sync.Map", v, opId)
  97. } else {
  98. if sm, ok := cstore.Load(opId); ok {
  99. switch m := sm.(type) {
  100. case *sync.Map:
  101. return m, nil
  102. case map[string]interface{}:
  103. return cast.MapToSyncMap(m), nil
  104. default:
  105. return nil, fmt.Errorf("invalid state %v stored for op %s: data type is not *sync.Map", sm, opId)
  106. }
  107. }
  108. }
  109. } else {
  110. return nil, fmt.Errorf("store for checkpoint %d not found", s.checkpoints[len(s.checkpoints)-1])
  111. }
  112. }
  113. return &sync.Map{}, nil
  114. }
  115. func (s *KVStore) Clean() error {
  116. return s.db.DeleteBefore(s.checkpoints[0])
  117. }