redisKv.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build redisdb || !core
  15. // +build redisdb !core
  16. package redis
  17. import (
  18. "bytes"
  19. "encoding/gob"
  20. "fmt"
  21. "github.com/gomodule/redigo/redis"
  22. "github.com/lf-edge/ekuiper/internal/conf"
  23. kvEncoding "github.com/lf-edge/ekuiper/internal/pkg/store/encoding"
  24. "strings"
  25. )
  26. const KvPrefix = "KV:STORE"
  27. type redisKvStore struct {
  28. database *Instance
  29. table string
  30. keyPrefix string
  31. }
  32. func CreateRedisKvStore(redis *Instance, table string) (*redisKvStore, error) {
  33. store := &redisKvStore{
  34. database: redis,
  35. table: table,
  36. keyPrefix: fmt.Sprintf("%s:%s", KvPrefix, table),
  37. }
  38. return store, nil
  39. }
  40. func (kv redisKvStore) Setnx(key string, value interface{}) error {
  41. return kv.database.Apply(func(conn redis.Conn) error {
  42. err, b := kvEncoding.Encode(value)
  43. if nil != err {
  44. return err
  45. }
  46. tKey := kv.tableKey(key)
  47. reply, err := conn.Do("SETNX", tKey, b)
  48. if err != nil {
  49. return err
  50. }
  51. code, err := redis.Int(reply, err)
  52. if code == 0 {
  53. return fmt.Errorf("item %s already exists under %s key because of %s", key, tKey, err)
  54. }
  55. return nil
  56. })
  57. }
  58. func (kv redisKvStore) Set(key string, value interface{}) error {
  59. err, b := kvEncoding.Encode(value)
  60. if nil != err {
  61. return err
  62. }
  63. err = kv.database.Apply(func(conn redis.Conn) error {
  64. tKey := kv.tableKey(key)
  65. reply, err := conn.Do("SET", tKey, b)
  66. code, err := redis.String(reply, err)
  67. if err != nil {
  68. return err
  69. }
  70. if code != "OK" {
  71. return fmt.Errorf("item %s (under key %s) not set because of %s", key, tKey, err)
  72. }
  73. return nil
  74. })
  75. return err
  76. }
  77. func (kv redisKvStore) Get(key string, value interface{}) (bool, error) {
  78. result := false
  79. err := kv.database.Apply(func(conn redis.Conn) error {
  80. tKey := kv.tableKey(key)
  81. reply, err := conn.Do("GET", tKey)
  82. if err != nil {
  83. return err
  84. }
  85. buff, err := redis.Bytes(reply, err)
  86. if err != nil {
  87. result = false
  88. return nil
  89. }
  90. dec := gob.NewDecoder(bytes.NewBuffer(buff))
  91. if err := dec.Decode(value); err != nil {
  92. return err
  93. }
  94. result = true
  95. return nil
  96. })
  97. return result, err
  98. }
  99. func (kv redisKvStore) Delete(key string) error {
  100. return kv.database.Apply(func(conn redis.Conn) error {
  101. tKey := kv.tableKey(key)
  102. _, err := conn.Do("DEL", tKey)
  103. return err
  104. })
  105. }
  106. func (kv redisKvStore) Keys() ([]string, error) {
  107. keys, err := kv.metaKeys()
  108. if err != nil {
  109. return nil, err
  110. }
  111. result := make([]string, 0)
  112. for _, k := range keys {
  113. result = append(result, kv.trimPrefix(k))
  114. }
  115. return result, nil
  116. }
  117. func (kv redisKvStore) All() (map[string]string, error) {
  118. keys, err := kv.metaKeys()
  119. if err != nil {
  120. return nil, err
  121. }
  122. var (
  123. value string
  124. result = make(map[string]string)
  125. )
  126. for _, k := range keys {
  127. key := kv.trimPrefix(k)
  128. ok, err := kv.Get(key, &value)
  129. if err != nil {
  130. conf.Log.Errorf("get %s fail during get all in redi: %v", key, err)
  131. }
  132. if ok {
  133. result[key] = value
  134. }
  135. }
  136. return result, nil
  137. }
  138. func (kv redisKvStore) metaKeys() ([]string, error) {
  139. keys := make([]string, 0)
  140. err := kv.database.Apply(func(conn redis.Conn) error {
  141. pattern := fmt.Sprintf("%s:*", kv.keyPrefix)
  142. reply, err := conn.Do("KEYS", pattern)
  143. keys, err = redis.Strings(reply, err)
  144. return err
  145. })
  146. return keys, err
  147. }
  148. func (kv redisKvStore) Clean() error {
  149. keys, err := kv.metaKeys()
  150. if err != nil {
  151. return err
  152. }
  153. keysToRemove := make([]interface{}, len(keys))
  154. for i, v := range keys {
  155. keysToRemove[i] = v
  156. }
  157. err = kv.database.Apply(func(conn redis.Conn) error {
  158. _, err := conn.Do("DEL", keysToRemove...)
  159. return err
  160. })
  161. return err
  162. }
  163. func (kv redisKvStore) Drop() error {
  164. keys, err := kv.metaKeys()
  165. if err != nil {
  166. return err
  167. }
  168. keysToRemove := make([]interface{}, len(keys))
  169. for i, v := range keys {
  170. keysToRemove[i] = v
  171. }
  172. err = kv.database.Apply(func(conn redis.Conn) error {
  173. _, err := conn.Do("DEL", keysToRemove...)
  174. return err
  175. })
  176. return err
  177. }
  178. func (kv redisKvStore) tableKey(key string) string {
  179. return fmt.Sprintf("%s:%s:%s", KvPrefix, kv.table, key)
  180. }
  181. func (kv redisKvStore) trimPrefix(fullKey string) string {
  182. prefixToTrim := fmt.Sprintf("%s:%s:", KvPrefix, kv.table)
  183. return strings.TrimPrefix(fullKey, prefixToTrim)
  184. }