kv_store.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package state
  15. import (
  16. "encoding/gob"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/internal/pkg/tskv"
  20. "github.com/lf-edge/ekuiper/internal/topo/checkpoint"
  21. "github.com/lf-edge/ekuiper/pkg/cast"
  22. "sync"
  23. )
  24. func init() {
  25. gob.Register(map[string]interface{}{})
  26. gob.Register(checkpoint.BufferOrEvent{})
  27. }
  28. // KVStore The manager for checkpoint storage.
  29. //
  30. // mapStore keys
  31. // { "checkpoint1", "checkpoint2" ... "checkpointn" : The complete or incomplete snapshot
  32. //
  33. type KVStore struct {
  34. db tskv.Tskv
  35. mapStore *sync.Map //The current root store of a rule
  36. checkpoints []int64
  37. max int
  38. ruleId string
  39. }
  40. //Store in path ./data/checkpoint/$ruleId
  41. //Store 2 things:
  42. //"checkpoints":A queue for completed checkpoint id
  43. //"$checkpointId":A map with key of checkpoint id and value of snapshot(gob serialized)
  44. //Assume each operator only has one instance
  45. func getKVStore(ruleId string) (*KVStore, error) {
  46. db, err := tskv.NewSqlite(ruleId)
  47. if err != nil {
  48. return nil, err
  49. }
  50. s := &KVStore{db: db, max: 3, mapStore: &sync.Map{}, ruleId: ruleId}
  51. //read data from badger db
  52. if err := s.restore(); err != nil {
  53. return nil, err
  54. }
  55. return s, nil
  56. }
  57. func (s *KVStore) restore() error {
  58. var m map[string]interface{}
  59. k, err := s.db.Last(&m)
  60. if err != nil {
  61. return err
  62. }
  63. s.checkpoints = []int64{k}
  64. s.mapStore.Store(k, cast.MapToSyncMap(m))
  65. return nil
  66. }
  67. func (s *KVStore) SaveState(checkpointId int64, opId string, state map[string]interface{}) error {
  68. logger := conf.Log
  69. logger.Debugf("Save state for checkpoint %d, op %s, value %v", checkpointId, opId, state)
  70. var cstore *sync.Map
  71. if v, ok := s.mapStore.Load(checkpointId); !ok {
  72. cstore = &sync.Map{}
  73. s.mapStore.Store(checkpointId, cstore)
  74. } else {
  75. if cstore, ok = v.(*sync.Map); !ok {
  76. return fmt.Errorf("invalid KVStore for checkpointId %d with value %v: should be *sync.Map type", checkpointId, v)
  77. }
  78. }
  79. cstore.Store(opId, state)
  80. return nil
  81. }
  82. func (s *KVStore) SaveCheckpoint(checkpointId int64) error {
  83. if v, ok := s.mapStore.Load(checkpointId); !ok {
  84. return fmt.Errorf("store for checkpoint %d not found", checkpointId)
  85. } else {
  86. if m, ok := v.(*sync.Map); !ok {
  87. return fmt.Errorf("invalid KVStore for checkpointId %d with value %v: should be *sync.Map type", checkpointId, v)
  88. } else {
  89. s.checkpoints = append(s.checkpoints, checkpointId)
  90. //TODO is the order promised?
  91. for len(s.checkpoints) > s.max {
  92. cp := s.checkpoints[0]
  93. s.checkpoints = s.checkpoints[1:]
  94. s.mapStore.Delete(cp)
  95. }
  96. _, err := s.db.Set(checkpointId, cast.SyncMapToMap(m))
  97. if err != nil {
  98. return fmt.Errorf("save checkpoint err: %v", err)
  99. }
  100. }
  101. }
  102. return nil
  103. }
  104. // GetOpState Only run in the initialization
  105. func (s *KVStore) GetOpState(opId string) (*sync.Map, error) {
  106. if len(s.checkpoints) > 0 {
  107. if v, ok := s.mapStore.Load(s.checkpoints[len(s.checkpoints)-1]); ok {
  108. if cstore, ok := v.(*sync.Map); !ok {
  109. return nil, fmt.Errorf("invalid state %v stored for op %s: data type is not *sync.Map", v, opId)
  110. } else {
  111. if sm, ok := cstore.Load(opId); ok {
  112. switch m := sm.(type) {
  113. case *sync.Map:
  114. return m, nil
  115. case map[string]interface{}:
  116. return cast.MapToSyncMap(m), nil
  117. default:
  118. return nil, fmt.Errorf("invalid state %v stored for op %s: data type is not *sync.Map", sm, opId)
  119. }
  120. }
  121. }
  122. } else {
  123. return nil, fmt.Errorf("store for checkpoint %d not found", s.checkpoints[len(s.checkpoints)-1])
  124. }
  125. }
  126. return &sync.Map{}, nil
  127. }
  128. func (s *KVStore) Clean() error {
  129. return s.db.DeleteBefore(s.checkpoints[0])
  130. }