stores.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package store
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/pkg/store/definition"
  18. "github.com/lf-edge/ekuiper/internal/pkg/store/sql"
  19. "github.com/lf-edge/ekuiper/pkg/kv"
  20. "path"
  21. "strings"
  22. "sync"
  23. )
  24. type StoreCreator func(conf definition.Config, name string) (definition.StoreBuilder, definition.TsBuilder, error)
  25. var (
  26. storeBuilders = map[string]StoreCreator{
  27. "sqlite": sql.BuildStores,
  28. }
  29. globalStores *stores = nil
  30. cacheStores *stores = nil
  31. )
  32. type stores struct {
  33. kv map[string]kv.KeyValue
  34. ts map[string]kv.Tskv
  35. mu sync.Mutex
  36. kvBuilder definition.StoreBuilder
  37. tsBuilder definition.TsBuilder
  38. }
  39. func newStores(c definition.Config, name string) (*stores, error) {
  40. databaseType := c.Type
  41. if builder, ok := storeBuilders[databaseType]; ok {
  42. kvBuilder, tsBuilder, err := builder(c, name)
  43. if err != nil {
  44. return nil, err
  45. } else {
  46. return &stores{
  47. kv: make(map[string]kv.KeyValue),
  48. ts: make(map[string]kv.Tskv),
  49. mu: sync.Mutex{},
  50. kvBuilder: kvBuilder,
  51. tsBuilder: tsBuilder,
  52. }, nil
  53. }
  54. } else {
  55. return nil, fmt.Errorf("unknown database type: %s", databaseType)
  56. }
  57. }
  58. func (s *stores) GetKV(table string) (kv.KeyValue, error) {
  59. s.mu.Lock()
  60. defer s.mu.Unlock()
  61. if ks, contains := s.kv[table]; contains {
  62. return ks, nil
  63. }
  64. ks, err := s.kvBuilder.CreateStore(table)
  65. if err != nil {
  66. return nil, err
  67. }
  68. s.kv[table] = ks
  69. return ks, nil
  70. }
  71. func (s *stores) DropKV(table string) {
  72. s.mu.Lock()
  73. defer s.mu.Unlock()
  74. if ks, contains := s.kv[table]; contains {
  75. _ = ks.Drop()
  76. delete(s.ts, table)
  77. }
  78. }
  79. func (s *stores) DropRefKVs(tablePrefix string) {
  80. s.mu.Lock()
  81. defer s.mu.Unlock()
  82. for table, ks := range s.kv {
  83. if strings.HasPrefix(table, tablePrefix) {
  84. _ = ks.Drop()
  85. delete(s.kv, table)
  86. }
  87. }
  88. }
  89. func (s *stores) GetTS(table string) (kv.Tskv, error) {
  90. s.mu.Lock()
  91. defer s.mu.Unlock()
  92. if tts, contains := s.ts[table]; contains {
  93. return tts, nil
  94. }
  95. tts, err := s.tsBuilder.CreateTs(table)
  96. if err != nil {
  97. return nil, err
  98. }
  99. s.ts[table] = tts
  100. return tts, nil
  101. }
  102. func (s *stores) DropTS(table string) {
  103. s.mu.Lock()
  104. defer s.mu.Unlock()
  105. if tts, contains := s.ts[table]; contains {
  106. _ = tts.Drop()
  107. delete(s.ts, table)
  108. }
  109. }
  110. func GetKV(table string) (kv.KeyValue, error) {
  111. if globalStores == nil {
  112. return nil, fmt.Errorf("global stores are not initialized")
  113. }
  114. return globalStores.GetKV(table)
  115. }
  116. func GetTS(table string) (kv.Tskv, error) {
  117. if globalStores == nil {
  118. return nil, fmt.Errorf("global stores are not initialized")
  119. }
  120. return globalStores.GetTS(table)
  121. }
  122. func DropTS(table string) error {
  123. if globalStores == nil {
  124. return fmt.Errorf("global stores are not initialized")
  125. }
  126. globalStores.DropTS(table)
  127. return nil
  128. }
  129. func DropKV(table string) error {
  130. if globalStores == nil {
  131. return fmt.Errorf("global stores are not initialized")
  132. }
  133. globalStores.DropKV(table)
  134. return nil
  135. }
  136. func GetCacheKV(table string) (kv.KeyValue, error) {
  137. if cacheStores == nil {
  138. return nil, fmt.Errorf("cache stores are not initialized")
  139. }
  140. return cacheStores.GetKV(table)
  141. }
  142. func DropCacheKV(table string) error {
  143. if cacheStores == nil {
  144. return fmt.Errorf("cache stores are not initialized")
  145. }
  146. cacheStores.DropKV(table)
  147. return nil
  148. }
  149. func DropCacheKVForRule(rule string) error {
  150. if cacheStores == nil {
  151. return fmt.Errorf("cache stores are not initialized")
  152. }
  153. cacheStores.DropRefKVs(path.Join("sink", rule))
  154. return nil
  155. }