kv.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. package common
  2. import (
  3. "fmt"
  4. "github.com/patrickmn/go-cache"
  5. "os"
  6. "sync"
  7. )
  8. type KeyValue interface {
  9. Open() error
  10. Close() error
  11. Set(key string, value interface{}) error
  12. Replace(key string, value interface{}) error
  13. Get(key string) (interface{}, bool)
  14. //Must return *common.Error with NOT_FOUND error
  15. Delete(key string) error
  16. Keys() (keys []string, err error)
  17. }
  18. type SyncKVMap struct {
  19. sync.RWMutex
  20. internal map[string]*SimpleKVStore
  21. }
  22. func (sm *SyncKVMap) Load(path string) (result *SimpleKVStore) {
  23. sm.Lock()
  24. defer sm.Unlock()
  25. if s, ok := sm.internal[path]; ok {
  26. result = s
  27. } else {
  28. c := cache.New(cache.NoExpiration, 0)
  29. if _, err := os.Stat(path); os.IsNotExist(err) {
  30. os.MkdirAll(path, os.ModePerm)
  31. }
  32. result = NewSimpleKVStore(path+"/stores.data", c)
  33. sm.internal[path] = result
  34. }
  35. return
  36. }
  37. var stores = &SyncKVMap{
  38. internal: make(map[string]*SimpleKVStore),
  39. }
  40. func GetSimpleKVStore(path string) *SimpleKVStore {
  41. return stores.Load(path)
  42. }
  43. type CtrlType int
  44. const (
  45. OPEN CtrlType = iota
  46. SAVE
  47. CLOSE
  48. )
  49. type SimpleKVStore struct {
  50. path string
  51. c *cache.Cache
  52. /* These 2 channels must be mapping one by one*/
  53. ctrlCh chan CtrlType
  54. errCh chan error
  55. }
  56. func NewSimpleKVStore(path string, c *cache.Cache) *SimpleKVStore {
  57. r := &SimpleKVStore{
  58. path: path,
  59. c: c,
  60. ctrlCh: make(chan CtrlType),
  61. errCh: make(chan error),
  62. }
  63. go r.run()
  64. return r
  65. }
  66. func (m *SimpleKVStore) run() {
  67. count := 0
  68. opened := false
  69. for c := range m.ctrlCh {
  70. switch c {
  71. case OPEN:
  72. count++
  73. if !opened {
  74. if _, err := os.Stat(m.path); os.IsNotExist(err) {
  75. m.errCh <- nil
  76. break
  77. }
  78. if e := m.c.LoadFile(m.path); e != nil {
  79. m.errCh <- e
  80. break
  81. }
  82. }
  83. m.errCh <- nil
  84. opened = true
  85. case CLOSE:
  86. count--
  87. if count == 0 {
  88. opened = false
  89. err := m.doClose()
  90. if err != nil {
  91. Log.Error(err)
  92. m.errCh <- err
  93. break
  94. }
  95. }
  96. m.errCh <- nil
  97. case SAVE:
  98. //swallow duplicate requests
  99. if len(m.ctrlCh) > 0 {
  100. m.errCh <- nil
  101. break
  102. }
  103. if e := m.c.SaveFile(m.path); e != nil {
  104. Log.Error(e)
  105. m.errCh <- e
  106. break
  107. }
  108. m.errCh <- nil
  109. }
  110. }
  111. }
  112. func (m *SimpleKVStore) Open() error {
  113. m.ctrlCh <- OPEN
  114. return <-m.errCh
  115. }
  116. func (m *SimpleKVStore) Close() error {
  117. m.ctrlCh <- CLOSE
  118. return <-m.errCh
  119. }
  120. func (m *SimpleKVStore) doClose() error {
  121. //e := m.c.SaveFile(m.path)
  122. m.c.Flush() //Delete all of the values from memory.
  123. return nil
  124. }
  125. func (m *SimpleKVStore) saveToFile() error {
  126. m.ctrlCh <- SAVE
  127. return <-m.errCh
  128. }
  129. func (m *SimpleKVStore) Set(key string, value interface{}) error {
  130. if m.c == nil {
  131. return fmt.Errorf("cache %s has not been initialized yet", m.path)
  132. }
  133. if err := m.c.Add(key, value, cache.NoExpiration); err != nil {
  134. return err
  135. }
  136. return m.saveToFile()
  137. }
  138. func (m *SimpleKVStore) Replace(key string, value interface{}) error {
  139. if m.c == nil {
  140. return fmt.Errorf("cache %s has not been initialized yet", m.path)
  141. }
  142. m.c.Set(key, value, cache.NoExpiration)
  143. return m.saveToFile()
  144. }
  145. func (m *SimpleKVStore) Get(key string) (interface{}, bool) {
  146. return m.c.Get(key)
  147. }
  148. func (m *SimpleKVStore) Delete(key string) error {
  149. if m.c == nil {
  150. return fmt.Errorf("cache %s has not been initialized yet", m.path)
  151. }
  152. if _, found := m.c.Get(key); found {
  153. m.c.Delete(key)
  154. } else {
  155. return NewErrorWithCode(NOT_FOUND, fmt.Sprintf("%s is not found", key))
  156. }
  157. return m.saveToFile()
  158. }
  159. func (m *SimpleKVStore) Keys() (keys []string, err error) {
  160. if m.c == nil {
  161. return nil, fmt.Errorf("Cache %s has not been initialized yet.", m.path)
  162. }
  163. its := m.c.Items()
  164. keys = make([]string, 0, len(its))
  165. for k := range its {
  166. keys = append(keys, k)
  167. }
  168. return keys, nil
  169. }