redisKv.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build redisdb || !core
  15. // +build redisdb !core
  16. package redis
  17. import (
  18. "bytes"
  19. "encoding/gob"
  20. "fmt"
  21. "github.com/gomodule/redigo/redis"
  22. kvEncoding "github.com/lf-edge/ekuiper/internal/pkg/store/encoding"
  23. "strings"
  24. )
  25. const KvPrefix = "KV:STORE"
  26. type redisKvStore struct {
  27. database *Instance
  28. table string
  29. keyPrefix string
  30. }
  31. func CreateRedisKvStore(redis *Instance, table string) (*redisKvStore, error) {
  32. store := &redisKvStore{
  33. database: redis,
  34. table: table,
  35. keyPrefix: fmt.Sprintf("%s:%s", KvPrefix, table),
  36. }
  37. return store, nil
  38. }
  39. func (kv redisKvStore) Setnx(key string, value interface{}) error {
  40. return kv.database.Apply(func(conn redis.Conn) error {
  41. err, b := kvEncoding.Encode(value)
  42. if nil != err {
  43. return err
  44. }
  45. tKey := kv.tableKey(key)
  46. reply, err := conn.Do("SETNX", tKey, b)
  47. if err != nil {
  48. return err
  49. }
  50. code, err := redis.Int(reply, err)
  51. if code == 0 {
  52. return fmt.Errorf("item %s already exists under %s key because of %s", key, tKey, err)
  53. }
  54. return nil
  55. })
  56. }
  57. func (kv redisKvStore) Set(key string, value interface{}) error {
  58. err, b := kvEncoding.Encode(value)
  59. if nil != err {
  60. return err
  61. }
  62. err = kv.database.Apply(func(conn redis.Conn) error {
  63. tKey := kv.tableKey(key)
  64. reply, err := conn.Do("SET", tKey, b)
  65. code, err := redis.String(reply, err)
  66. if err != nil {
  67. return err
  68. }
  69. if code != "OK" {
  70. return fmt.Errorf("item %s (under key %s) not set because of %s", key, tKey, err)
  71. }
  72. return nil
  73. })
  74. return err
  75. }
  76. func (kv redisKvStore) Get(key string, value interface{}) (bool, error) {
  77. result := false
  78. err := kv.database.Apply(func(conn redis.Conn) error {
  79. tKey := kv.tableKey(key)
  80. reply, err := conn.Do("GET", tKey)
  81. if err != nil {
  82. return err
  83. }
  84. buff, err := redis.Bytes(reply, err)
  85. if err != nil {
  86. result = false
  87. return nil
  88. }
  89. dec := gob.NewDecoder(bytes.NewBuffer(buff))
  90. if err := dec.Decode(value); err != nil {
  91. return err
  92. }
  93. result = true
  94. return nil
  95. })
  96. return result, err
  97. }
  98. func (kv redisKvStore) Delete(key string) error {
  99. return kv.database.Apply(func(conn redis.Conn) error {
  100. tKey := kv.tableKey(key)
  101. _, err := conn.Do("DEL", tKey)
  102. return err
  103. })
  104. }
  105. func (kv redisKvStore) Keys() ([]string, error) {
  106. keys, err := kv.metaKeys()
  107. if err != nil {
  108. return nil, err
  109. }
  110. result := make([]string, 0)
  111. for _, k := range keys {
  112. result = append(result, kv.trimPrefix(k))
  113. }
  114. return result, nil
  115. }
  116. func (kv redisKvStore) metaKeys() ([]string, error) {
  117. keys := make([]string, 0)
  118. err := kv.database.Apply(func(conn redis.Conn) error {
  119. pattern := fmt.Sprintf("%s:*", kv.keyPrefix)
  120. reply, err := conn.Do("KEYS", pattern)
  121. keys, err = redis.Strings(reply, err)
  122. return err
  123. })
  124. return keys, err
  125. }
  126. func (kv redisKvStore) Clean() error {
  127. keys, err := kv.metaKeys()
  128. if err != nil {
  129. return err
  130. }
  131. keysToRemove := make([]interface{}, len(keys))
  132. for i, v := range keys {
  133. keysToRemove[i] = v
  134. }
  135. err = kv.database.Apply(func(conn redis.Conn) error {
  136. _, err := conn.Do("DEL", keysToRemove...)
  137. return err
  138. })
  139. return err
  140. }
  141. func (kv redisKvStore) Drop() error {
  142. keys, err := kv.metaKeys()
  143. if err != nil {
  144. return err
  145. }
  146. keysToRemove := make([]interface{}, len(keys))
  147. for i, v := range keys {
  148. keysToRemove[i] = v
  149. }
  150. err = kv.database.Apply(func(conn redis.Conn) error {
  151. _, err := conn.Do("DEL", keysToRemove...)
  152. return err
  153. })
  154. return err
  155. }
  156. func (kv redisKvStore) tableKey(key string) string {
  157. return fmt.Sprintf("%s:%s:%s", KvPrefix, kv.table, key)
  158. }
  159. func (kv redisKvStore) trimPrefix(fullKey string) string {
  160. prefixToTrim := fmt.Sprintf("%s:%s:", KvPrefix, kv.table)
  161. return strings.TrimPrefix(fullKey, prefixToTrim)
  162. }