redisTs.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package redis
  15. import (
  16. "bytes"
  17. "encoding/gob"
  18. "fmt"
  19. "github.com/gomodule/redigo/redis"
  20. dbRedis "github.com/lf-edge/ekuiper/internal/pkg/db/redis"
  21. kvEncoding "github.com/lf-edge/ekuiper/internal/pkg/store/encoding"
  22. "strconv"
  23. )
  24. const (
  25. TsPrefix = "KV:TS"
  26. AddToSortedSet = "ZADD"
  27. ReversedRangeByScore = "ZREVRANGEBYSCORE"
  28. RemoveRangeByScore = "ZREMRANGEBYSCORE"
  29. Delete = "DEL"
  30. ReversedRange = "ZREVRANGE"
  31. )
  32. type ts struct {
  33. redis dbRedis.Instance
  34. table string
  35. last int64
  36. key string
  37. }
  38. func init() {
  39. gob.Register(make(map[string]interface{}))
  40. }
  41. func createRedisTs(redis dbRedis.Instance, table string) (error, *ts) {
  42. key := fmt.Sprintf("%s:%s", TsPrefix, table)
  43. err, lastTs := getLast(redis, key)
  44. if err != nil {
  45. return err, nil
  46. }
  47. s := &ts{
  48. redis: redis,
  49. table: table,
  50. last: lastTs,
  51. key: key,
  52. }
  53. return nil, s
  54. }
  55. func (t *ts) Set(key int64, value interface{}) (bool, error) {
  56. if key <= t.last {
  57. return false, nil
  58. }
  59. err, b := kvEncoding.Encode(value)
  60. if err != nil {
  61. return false, err
  62. }
  63. err = t.redis.Apply(func(conn redis.Conn) error {
  64. reply, err := conn.Do(AddToSortedSet, t.key, key, b)
  65. if err != nil {
  66. return err
  67. }
  68. length, err := redis.Int(reply, err)
  69. if err != nil {
  70. return err
  71. }
  72. if length == 0 {
  73. return fmt.Errorf("list at %s key should be non empty", t.key)
  74. }
  75. t.last = key
  76. return nil
  77. })
  78. if err != nil {
  79. return false, err
  80. }
  81. return true, nil
  82. }
  83. func (t ts) Get(key int64, value interface{}) (bool, error) {
  84. err := t.redis.Apply(func(conn redis.Conn) error {
  85. reply, err := conn.Do(ReversedRangeByScore, t.key, key, key)
  86. if err != nil {
  87. return err
  88. }
  89. var tmp [][]byte
  90. tmp, err = redis.ByteSlices(reply, err)
  91. if err != nil {
  92. return err
  93. }
  94. if len(tmp) == 0 {
  95. return fmt.Errorf("record under %s key and %d score not found", t.key, key)
  96. }
  97. dec := gob.NewDecoder(bytes.NewBuffer(tmp[0]))
  98. err = dec.Decode(value)
  99. return err
  100. })
  101. if err != nil {
  102. return false, err
  103. }
  104. return true, nil
  105. }
  106. func (t ts) Last(value interface{}) (int64, error) {
  107. var last int64 = 0
  108. err := t.redis.Apply(func(conn redis.Conn) error {
  109. reply, err := conn.Do(ReversedRange, t.key, 0, 0, "WITHSCORES")
  110. if err != nil {
  111. return err
  112. }
  113. var tmp [][]byte
  114. tmp, err = redis.ByteSlices(reply, err)
  115. if err != nil {
  116. return err
  117. }
  118. if len(tmp) > 0 {
  119. dec := gob.NewDecoder(bytes.NewBuffer(tmp[0]))
  120. if err = dec.Decode(value); err != nil {
  121. return err
  122. }
  123. last, err = strconv.ParseInt(string(tmp[1]), 10, 64)
  124. }
  125. return err
  126. })
  127. if err != nil {
  128. return 0, err
  129. }
  130. return last, nil
  131. }
  132. func (t ts) Delete(key int64) error {
  133. return t.redis.Apply(func(conn redis.Conn) error {
  134. _, err := conn.Do(RemoveRangeByScore, t.key, key, key)
  135. return err
  136. })
  137. }
  138. func (t ts) DeleteBefore(key int64) error {
  139. return t.redis.Apply(func(conn redis.Conn) error {
  140. bound := fmt.Sprintf("(%d", key)
  141. _, err := conn.Do(RemoveRangeByScore, t.key, "-INF", bound)
  142. return err
  143. })
  144. }
  145. func (t ts) Close() error {
  146. return nil
  147. }
  148. func (t ts) Drop() error {
  149. return t.redis.Apply(func(conn redis.Conn) error {
  150. _, err := conn.Do(Delete, t.key)
  151. return err
  152. })
  153. }
  154. func getLast(db dbRedis.Instance, key string) (error, int64) {
  155. var lastTs int64
  156. err := db.Apply(func(conn redis.Conn) error {
  157. reply, err := conn.Do(ReversedRange, key, 0, 0, "WITHSCORES")
  158. if err != nil {
  159. return err
  160. }
  161. var tmp [][]byte
  162. tmp, err = redis.ByteSlices(reply, err)
  163. if err != nil {
  164. return err
  165. }
  166. if len(tmp) == 0 {
  167. return nil
  168. }
  169. lastTs, err = strconv.ParseInt(string(tmp[1]), 10, 64)
  170. return err
  171. })
  172. if err != nil {
  173. return err, 0
  174. }
  175. return nil, lastTs
  176. }