kv.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. package common
  2. import (
  3. "fmt"
  4. "github.com/patrickmn/go-cache"
  5. "os"
  6. "sync"
  7. )
  8. type KeyValue interface {
  9. Open() error
  10. Close() error
  11. Set(key string, value interface{}) error
  12. Replace(key string, value interface{}) error
  13. Get(key string) (interface{}, bool)
  14. //Must return *common.Error with NOT_FOUND error
  15. Delete(key string) error
  16. Keys() (keys []string, err error)
  17. Clean() error
  18. }
  19. type SyncKVMap struct {
  20. sync.RWMutex
  21. internal map[string]*SimpleKVStore
  22. }
  23. func (sm *SyncKVMap) Load(path string) (result *SimpleKVStore) {
  24. sm.Lock()
  25. defer sm.Unlock()
  26. if s, ok := sm.internal[path]; ok {
  27. result = s
  28. } else {
  29. c := cache.New(cache.NoExpiration, 0)
  30. if _, err := os.Stat(path); os.IsNotExist(err) {
  31. os.MkdirAll(path, os.ModePerm)
  32. }
  33. result = NewSimpleKVStore(path+"/stores.data", c)
  34. sm.internal[path] = result
  35. }
  36. return
  37. }
  38. var stores = &SyncKVMap{
  39. internal: make(map[string]*SimpleKVStore),
  40. }
  41. func GetSimpleKVStore(path string) *SimpleKVStore {
  42. return stores.Load(path)
  43. }
  44. type CtrlType int
  45. const (
  46. OPEN CtrlType = iota
  47. SAVE
  48. CLOSE
  49. )
  50. type SimpleKVStore struct {
  51. path string
  52. c *cache.Cache
  53. /* These 2 channels must be mapping one by one*/
  54. ctrlCh chan CtrlType
  55. errCh chan error
  56. }
  57. func NewSimpleKVStore(path string, c *cache.Cache) *SimpleKVStore {
  58. r := &SimpleKVStore{
  59. path: path,
  60. c: c,
  61. ctrlCh: make(chan CtrlType),
  62. errCh: make(chan error),
  63. }
  64. go r.run()
  65. return r
  66. }
  67. func (m *SimpleKVStore) run() {
  68. count := 0
  69. opened := false
  70. for c := range m.ctrlCh {
  71. switch c {
  72. case OPEN:
  73. count++
  74. if !opened {
  75. if _, err := os.Stat(m.path); os.IsNotExist(err) {
  76. m.errCh <- nil
  77. break
  78. }
  79. if e := m.c.LoadFile(m.path); e != nil {
  80. m.errCh <- e
  81. break
  82. }
  83. }
  84. m.errCh <- nil
  85. opened = true
  86. case CLOSE:
  87. count--
  88. if count == 0 {
  89. opened = false
  90. err := m.doClose()
  91. if err != nil {
  92. Log.Error(err)
  93. m.errCh <- err
  94. break
  95. }
  96. }
  97. m.errCh <- nil
  98. case SAVE:
  99. //swallow duplicate requests
  100. if len(m.ctrlCh) > 0 {
  101. m.errCh <- nil
  102. break
  103. }
  104. if e := m.c.SaveFile(m.path); e != nil {
  105. Log.Error(e)
  106. m.errCh <- e
  107. break
  108. }
  109. m.errCh <- nil
  110. }
  111. }
  112. }
  113. func (m *SimpleKVStore) Open() error {
  114. m.ctrlCh <- OPEN
  115. return <-m.errCh
  116. }
  117. func (m *SimpleKVStore) Close() error {
  118. m.ctrlCh <- CLOSE
  119. return <-m.errCh
  120. }
  121. func (m *SimpleKVStore) doClose() error {
  122. //e := m.c.SaveFile(m.path)
  123. m.c.Flush() //Delete all of the values from memory.
  124. return nil
  125. }
  126. func (m *SimpleKVStore) saveToFile() error {
  127. m.ctrlCh <- SAVE
  128. return <-m.errCh
  129. }
  130. func (m *SimpleKVStore) Set(key string, value interface{}) error {
  131. if m.c == nil {
  132. return fmt.Errorf("cache %s has not been initialized yet", m.path)
  133. }
  134. if err := m.c.Add(key, value, cache.NoExpiration); err != nil {
  135. return err
  136. }
  137. return m.saveToFile()
  138. }
  139. func (m *SimpleKVStore) Replace(key string, value interface{}) error {
  140. if m.c == nil {
  141. return fmt.Errorf("cache %s has not been initialized yet", m.path)
  142. }
  143. m.c.Set(key, value, cache.NoExpiration)
  144. return m.saveToFile()
  145. }
  146. func (m *SimpleKVStore) Get(key string) (interface{}, bool) {
  147. return m.c.Get(key)
  148. }
  149. func (m *SimpleKVStore) Delete(key string) error {
  150. if m.c == nil {
  151. return fmt.Errorf("cache %s has not been initialized yet", m.path)
  152. }
  153. if _, found := m.c.Get(key); found {
  154. m.c.Delete(key)
  155. } else {
  156. return NewErrorWithCode(NOT_FOUND, fmt.Sprintf("%s is not found", key))
  157. }
  158. return m.saveToFile()
  159. }
  160. func (m *SimpleKVStore) Keys() (keys []string, err error) {
  161. if m.c == nil {
  162. return nil, fmt.Errorf("Cache %s has not been initialized yet.", m.path)
  163. }
  164. its := m.c.Items()
  165. keys = make([]string, 0, len(its))
  166. for k := range its {
  167. keys = append(keys, k)
  168. }
  169. return keys, nil
  170. }
  171. func (m *SimpleKVStore) Clean() error {
  172. return os.RemoveAll(m.path)
  173. }