migration.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. package util
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common/kv"
  5. "github.com/patrickmn/go-cache"
  6. "io/ioutil"
  7. "os"
  8. "path"
  9. )
  10. func migration(dir string) error {
  11. fpath := path.Join(dir, "stores.data")
  12. c := cache.New(cache.NoExpiration, 0)
  13. if err := gocacheOpen(c, fpath); nil != err {
  14. return err
  15. }
  16. defer gocacheClose(c, fpath)
  17. keys, err := gocacheKeys(c)
  18. if nil != err {
  19. return err
  20. }
  21. store := kv.GetDefaultKVStore(dir)
  22. if err := store.Open(); nil != err {
  23. return err
  24. }
  25. defer store.Close()
  26. for _, k := range keys {
  27. if value, ok := c.Get(k); !ok {
  28. return fmt.Errorf("not found %s from %s", k, fpath)
  29. } else {
  30. if err := store.Setnx(k, value); nil != err {
  31. return err
  32. }
  33. if err := gocacheDel(c, k); nil != err {
  34. return err
  35. }
  36. }
  37. }
  38. return os.Remove(fpath)
  39. }
  40. func DataMigration(dir string) error {
  41. var dirs []string
  42. dirs = append(dirs, dir)
  43. for i := 0; i < len(dirs); i++ {
  44. files, err := ioutil.ReadDir(dirs[i])
  45. if nil != err {
  46. return err
  47. }
  48. for _, file := range files {
  49. fname := file.Name()
  50. if file.IsDir() {
  51. dirs = append(dirs, path.Join(dirs[i], fname))
  52. } else if "stores.data" == fname {
  53. return migration(dirs[i])
  54. }
  55. }
  56. }
  57. return nil
  58. }
  59. func gocacheClose(c *cache.Cache, path string) error {
  60. if e := c.SaveFile(path); e != nil {
  61. return e
  62. }
  63. c.Flush()
  64. return nil
  65. }
  66. func gocacheOpen(c *cache.Cache, path string) error {
  67. if _, err := os.Stat(path); os.IsNotExist(err) {
  68. return nil
  69. }
  70. if e := c.LoadFile(path); e != nil {
  71. return e
  72. }
  73. return nil
  74. }
  75. func gocacheKeys(c *cache.Cache) (keys []string, err error) {
  76. if c == nil {
  77. return nil, fmt.Errorf("cache has not been initialized yet.")
  78. }
  79. its := c.Items()
  80. keys = make([]string, 0, len(its))
  81. for k := range its {
  82. keys = append(keys, k)
  83. }
  84. return keys, nil
  85. }
  86. func gocacheSet(c *cache.Cache, path, key string, value interface{}) error {
  87. if c == nil {
  88. return fmt.Errorf("cache has not been initialized yet.")
  89. }
  90. return c.Add(key, value, cache.NoExpiration)
  91. }
  92. func gocacheDel(c *cache.Cache, key string) error {
  93. if c == nil {
  94. return fmt.Errorf("cache has not been initialized yet.")
  95. }
  96. c.Delete(key)
  97. return nil
  98. }