redisTs.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build redisdb || !core
  15. // +build redisdb !core
  16. package redis
  17. import (
  18. "bytes"
  19. "encoding/gob"
  20. "fmt"
  21. "github.com/gomodule/redigo/redis"
  22. kvEncoding "github.com/lf-edge/ekuiper/internal/pkg/store/encoding"
  23. "strconv"
  24. )
  25. const (
  26. TsPrefix = "KV:TS"
  27. AddToSortedSet = "ZADD"
  28. ReversedRangeByScore = "ZREVRANGEBYSCORE"
  29. RemoveRangeByScore = "ZREMRANGEBYSCORE"
  30. Delete = "DEL"
  31. ReversedRange = "ZREVRANGE"
  32. )
  33. type ts struct {
  34. redis *Instance
  35. table string
  36. last int64
  37. key string
  38. }
  39. func init() {
  40. gob.Register(make(map[string]interface{}))
  41. }
  42. func createRedisTs(redis *Instance, table string) (error, *ts) {
  43. key := fmt.Sprintf("%s:%s", TsPrefix, table)
  44. err, lastTs := getLast(redis, key)
  45. if err != nil {
  46. return err, nil
  47. }
  48. s := &ts{
  49. redis: redis,
  50. table: table,
  51. last: lastTs,
  52. key: key,
  53. }
  54. return nil, s
  55. }
  56. func (t *ts) Set(key int64, value interface{}) (bool, error) {
  57. if key <= t.last {
  58. return false, nil
  59. }
  60. err, b := kvEncoding.Encode(value)
  61. if err != nil {
  62. return false, err
  63. }
  64. err = t.redis.Apply(func(conn redis.Conn) error {
  65. reply, err := conn.Do(AddToSortedSet, t.key, key, b)
  66. if err != nil {
  67. return err
  68. }
  69. length, err := redis.Int(reply, err)
  70. if err != nil {
  71. return err
  72. }
  73. if length == 0 {
  74. return fmt.Errorf("list at %s key should be non empty", t.key)
  75. }
  76. t.last = key
  77. return nil
  78. })
  79. if err != nil {
  80. return false, err
  81. }
  82. return true, nil
  83. }
  84. func (t ts) Get(key int64, value interface{}) (bool, error) {
  85. err := t.redis.Apply(func(conn redis.Conn) error {
  86. reply, err := conn.Do(ReversedRangeByScore, t.key, key, key)
  87. if err != nil {
  88. return err
  89. }
  90. var tmp [][]byte
  91. tmp, err = redis.ByteSlices(reply, err)
  92. if err != nil {
  93. return err
  94. }
  95. if len(tmp) == 0 {
  96. return fmt.Errorf("record under %s key and %d score not found", t.key, key)
  97. }
  98. dec := gob.NewDecoder(bytes.NewBuffer(tmp[0]))
  99. err = dec.Decode(value)
  100. return err
  101. })
  102. if err != nil {
  103. return false, err
  104. }
  105. return true, nil
  106. }
  107. func (t ts) Last(value interface{}) (int64, error) {
  108. var last int64 = 0
  109. err := t.redis.Apply(func(conn redis.Conn) error {
  110. reply, err := conn.Do(ReversedRange, t.key, 0, 0, "WITHSCORES")
  111. if err != nil {
  112. return err
  113. }
  114. var tmp [][]byte
  115. tmp, err = redis.ByteSlices(reply, err)
  116. if err != nil {
  117. return err
  118. }
  119. if len(tmp) > 0 {
  120. dec := gob.NewDecoder(bytes.NewBuffer(tmp[0]))
  121. if err = dec.Decode(value); err != nil {
  122. return err
  123. }
  124. last, err = strconv.ParseInt(string(tmp[1]), 10, 64)
  125. }
  126. return err
  127. })
  128. if err != nil {
  129. return 0, err
  130. }
  131. return last, nil
  132. }
  133. func (t ts) Delete(key int64) error {
  134. return t.redis.Apply(func(conn redis.Conn) error {
  135. _, err := conn.Do(RemoveRangeByScore, t.key, key, key)
  136. return err
  137. })
  138. }
  139. func (t ts) DeleteBefore(key int64) error {
  140. return t.redis.Apply(func(conn redis.Conn) error {
  141. bound := fmt.Sprintf("(%d", key)
  142. _, err := conn.Do(RemoveRangeByScore, t.key, "-INF", bound)
  143. return err
  144. })
  145. }
  146. func (t ts) Close() error {
  147. return nil
  148. }
  149. func (t ts) Drop() error {
  150. return t.redis.Apply(func(conn redis.Conn) error {
  151. _, err := conn.Do(Delete, t.key)
  152. return err
  153. })
  154. }
  155. func getLast(db *Instance, key string) (error, int64) {
  156. var lastTs int64
  157. err := db.Apply(func(conn redis.Conn) error {
  158. reply, err := conn.Do(ReversedRange, key, 0, 0, "WITHSCORES")
  159. if err != nil {
  160. return err
  161. }
  162. var tmp [][]byte
  163. tmp, err = redis.ByteSlices(reply, err)
  164. if err != nil {
  165. return err
  166. }
  167. if len(tmp) == 0 {
  168. return nil
  169. }
  170. lastTs, err = strconv.ParseInt(string(tmp[1]), 10, 64)
  171. return err
  172. })
  173. if err != nil {
  174. return err, 0
  175. }
  176. return nil, lastTs
  177. }