redisTs.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. // Copyright 2021 INTECH Process Automation Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package redis
  15. import (
  16. "bytes"
  17. "encoding/gob"
  18. "fmt"
  19. "github.com/gomodule/redigo/redis"
  20. dbRedis "github.com/lf-edge/ekuiper/internal/pkg/db/redis"
  21. kvEncoding "github.com/lf-edge/ekuiper/internal/pkg/store/encoding"
  22. "strconv"
  23. )
  24. const (
  25. TsPrefix = "KV:TS"
  26. AddToSortedSet = "ZADD"
  27. ReversedRangeByScore = "ZREVRANGEBYSCORE"
  28. RemoveRangeByScore = "ZREMRANGEBYSCORE"
  29. Delete = "DEL"
  30. ReversedRange = "ZREVRANGE"
  31. )
  32. type ts struct {
  33. redis dbRedis.Instance
  34. table string
  35. last int64
  36. key string
  37. }
  38. func init() {
  39. gob.Register(make(map[string]interface{}))
  40. }
  41. func createRedisTs(redis dbRedis.Instance, table string) (error, *ts) {
  42. key := fmt.Sprintf("%s:%s", TsPrefix, table)
  43. err, lastTs := getLast(redis, key)
  44. if err != nil {
  45. return err, nil
  46. }
  47. s := &ts{
  48. redis: redis,
  49. table: table,
  50. last: lastTs,
  51. key: key,
  52. }
  53. return nil, s
  54. }
  55. func (t *ts) Set(key int64, value interface{}) (bool, error) {
  56. if key <= t.last {
  57. return false, nil
  58. }
  59. err, b := kvEncoding.Encode(value)
  60. if err != nil {
  61. return false, err
  62. }
  63. err = t.redis.Apply(func(conn redis.Conn) error {
  64. reply, err := conn.Do(AddToSortedSet, t.key, key, b)
  65. if err != nil {
  66. return err
  67. }
  68. length, err := redis.Int(reply, err)
  69. if err != nil {
  70. return err
  71. }
  72. if length == 0 {
  73. return fmt.Errorf("list at %s key should be non empty", t.key)
  74. }
  75. t.last = key
  76. return nil
  77. })
  78. if err != nil {
  79. return false, err
  80. }
  81. return true, nil
  82. }
  83. func (t ts) Get(key int64, value interface{}) (bool, error) {
  84. err := t.redis.Apply(func(conn redis.Conn) error {
  85. reply, err := conn.Do(ReversedRangeByScore, t.key, key, key)
  86. if err != nil {
  87. return err
  88. }
  89. var tmp [][]byte
  90. tmp, err = redis.ByteSlices(reply, err)
  91. if err != nil {
  92. return err
  93. }
  94. if len(tmp) == 0 {
  95. return fmt.Errorf("record under %s key and %d score not found", t.key, key)
  96. }
  97. dec := gob.NewDecoder(bytes.NewBuffer(tmp[0]))
  98. err = dec.Decode(value)
  99. return err
  100. })
  101. if err != nil {
  102. return false, err
  103. }
  104. return true, nil
  105. }
  106. func (t ts) Last(value interface{}) (int64, error) {
  107. var last int64 = 0
  108. err := t.redis.Apply(func(conn redis.Conn) error {
  109. reply, err := conn.Do(ReversedRange, t.key, 0, 0, "WITHSCORES")
  110. if err != nil {
  111. return err
  112. }
  113. var tmp [][]byte
  114. tmp, err = redis.ByteSlices(reply, err)
  115. if err != nil {
  116. return err
  117. }
  118. dec := gob.NewDecoder(bytes.NewBuffer(tmp[0]))
  119. if err = dec.Decode(value); err != nil {
  120. return err
  121. }
  122. last, err = strconv.ParseInt(string(tmp[1]), 10, 64)
  123. return err
  124. })
  125. if err != nil {
  126. return 0, err
  127. }
  128. return last, nil
  129. }
  130. func (t ts) Delete(key int64) error {
  131. return t.redis.Apply(func(conn redis.Conn) error {
  132. _, err := conn.Do(RemoveRangeByScore, t.key, key, key)
  133. return err
  134. })
  135. }
  136. func (t ts) DeleteBefore(key int64) error {
  137. return t.redis.Apply(func(conn redis.Conn) error {
  138. bound := fmt.Sprintf("(%d", key)
  139. _, err := conn.Do(RemoveRangeByScore, t.key, "-INF", bound)
  140. return err
  141. })
  142. }
  143. func (t ts) Close() error {
  144. return nil
  145. }
  146. func (t ts) Drop() error {
  147. return t.redis.Apply(func(conn redis.Conn) error {
  148. _, err := conn.Do(Delete, t.key)
  149. return err
  150. })
  151. }
  152. func getLast(db dbRedis.Instance, key string) (error, int64) {
  153. var lastTs int64
  154. err := db.Apply(func(conn redis.Conn) error {
  155. reply, err := conn.Do(ReversedRange, key, 0, 0, "WITHSCORES")
  156. if err != nil {
  157. return err
  158. }
  159. var tmp [][]byte
  160. tmp, err = redis.ByteSlices(reply, err)
  161. if err != nil {
  162. return err
  163. }
  164. if len(tmp) == 0 {
  165. return nil
  166. }
  167. lastTs, err = strconv.ParseInt(string(tmp[1]), 10, 64)
  168. return err
  169. })
  170. if err != nil {
  171. return err, 0
  172. }
  173. return nil, lastTs
  174. }