|
@@ -16,11 +16,11 @@ func init() {
|
|
|
gob.Register(checkpoint.BufferOrEvent{})
|
|
|
}
|
|
|
|
|
|
-//The manager for checkpoint storage.
|
|
|
-/**
|
|
|
-*** mapStore keys
|
|
|
-*** { "checkpoint1", "checkpoint2" ... "checkpointn" : The complete or incomplete snapshot
|
|
|
- */
|
|
|
+// KVStore The manager for checkpoint storage.
|
|
|
+//
|
|
|
+// mapStore keys
|
|
|
+// { "checkpoint1", "checkpoint2" ... "checkpointn" : The complete or incomplete snapshot
|
|
|
+//
|
|
|
type KVStore struct {
|
|
|
db kv.KeyValue
|
|
|
mapStore *sync.Map //The current root store of a rule
|
|
@@ -93,20 +93,20 @@ func (s *KVStore) SaveCheckpoint(checkpointId int64) error {
|
|
|
if err != nil {
|
|
|
return fmt.Errorf("save checkpoint err: %v", err)
|
|
|
}
|
|
|
- defer s.db.Close()
|
|
|
err = s.db.Set(fmt.Sprintf("%d", checkpointId), cast.SyncMapToMap(m))
|
|
|
if err != nil {
|
|
|
return fmt.Errorf("save checkpoint err: %v", err)
|
|
|
}
|
|
|
- m.Delete(checkpointId)
|
|
|
s.checkpoints = append(s.checkpoints, checkpointId)
|
|
|
//TODO is the order promised?
|
|
|
- if len(s.checkpoints) > s.max {
|
|
|
+ for len(s.checkpoints) > s.max {
|
|
|
cp := s.checkpoints[0]
|
|
|
s.checkpoints = s.checkpoints[1:]
|
|
|
- go func() {
|
|
|
- _ = s.db.Delete(fmt.Sprintf("%d", cp))
|
|
|
- }()
|
|
|
+ s.mapStore.Delete(cp)
|
|
|
+ err = s.db.Delete(fmt.Sprintf("%d", cp))
|
|
|
+ if err != nil {
|
|
|
+ fmt.Printf("delete checkpoint %d db store error %s", cp, err)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
err = s.db.Set(CheckpointListKey, s.checkpoints)
|
|
@@ -118,7 +118,7 @@ func (s *KVStore) SaveCheckpoint(checkpointId int64) error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-//Only run in the initialization
|
|
|
+// GetOpState Only run in the initialization
|
|
|
func (s *KVStore) GetOpState(opId string) (*sync.Map, error) {
|
|
|
if len(s.checkpoints) > 0 {
|
|
|
if v, ok := s.mapStore.Load(s.checkpoints[len(s.checkpoints)-1]); ok {
|