Преглед изворни кода

fix(redis): upgrade redis client from v7 to v9

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>
Jianxiang Ran пре 2 година
родитељ
комит
f20f53a535

+ 2 - 1
go.mod

@@ -12,7 +12,6 @@ require (
 	github.com/edgexfoundry/go-mod-messaging/v2 v2.3.0
 	github.com/fxamacker/cbor/v2 v2.4.0
 	github.com/gdexlab/go-render v1.0.1
-	github.com/go-redis/redis/v7 v7.4.1
 	github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3
 	github.com/golang-jwt/jwt v3.2.2+incompatible
 	github.com/golang/protobuf v1.5.2
@@ -28,6 +27,7 @@ require (
 	github.com/msgpack-rpc/msgpack-rpc-go v0.0.0-20131026060856-c76397e1782b
 	github.com/pebbe/zmq4 v1.2.9
 	github.com/prometheus/client_golang v1.14.0
+	github.com/redis/go-redis/v9 v9.0.2
 	github.com/second-state/WasmEdge-go v0.11.2
 	github.com/sirupsen/logrus v1.9.0
 	github.com/ugorji/go/codec v1.2.10
@@ -49,6 +49,7 @@ require (
 	github.com/bufbuild/protocompile v0.3.0 // indirect
 	github.com/cespare/xxhash/v2 v2.2.0 // indirect
 	github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
+	github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
 	github.com/dlclark/regexp2 v1.8.1 // indirect
 	github.com/felixge/httpsnoop v1.0.3 // indirect
 	github.com/go-playground/locales v0.14.1 // indirect

+ 8 - 2
go.sum

@@ -23,6 +23,8 @@ github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT
 github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
 github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
 github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
+github.com/bsm/ginkgo/v2 v2.5.0 h1:aOAnND1T40wEdAtkGSkvSICWeQ8L3UASX7YVCqQx+eQ=
+github.com/bsm/gomega v1.20.0 h1:JhAwLmtRzXFTx2AkALSLa8ijZafntmhSoU63Ok18Uq8=
 github.com/bufbuild/protocompile v0.3.0 h1:u9AWw2p4cYFx2H82Deds/kZx3drqTxX38bwSkWhUtV0=
 github.com/bufbuild/protocompile v0.3.0/go.mod h1:tleDrpPTlLUVmgnEoN6qBliKWqJaZFJXqZdFjTd+ocU=
 github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
@@ -36,6 +38,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
+github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
 github.com/dlclark/regexp2 v1.4.1-0.20201116162257-a2a8dda75c91/go.mod h1:2pZnwuY/m+8K6iRw6wQdMtk+rH5tNGR1i55kozfMjCc=
 github.com/dlclark/regexp2 v1.7.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8=
 github.com/dlclark/regexp2 v1.8.1 h1:6Lcdwya6GjPUNsBct8Lg/yRPwMhABj269AAzdGSiR+0=
@@ -68,8 +72,8 @@ github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJn
 github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
 github.com/go-playground/validator/v10 v10.11.2 h1:q3SHpufmypg+erIExEKUmsgmhDTyhcJ38oeKGACXohU=
 github.com/go-playground/validator/v10 v10.11.2/go.mod h1:NieE624vt4SCTJtD87arVLvdmjPAeV8BQlHtMnw9D7s=
-github.com/go-redis/redis/v7 v7.4.1 h1:PASvf36gyUpr2zdOUS/9Zqc80GbM+9BDyiJSJDDOrTI=
-github.com/go-redis/redis/v7 v7.4.1/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg=
+github.com/go-redis/redis/v7 v7.3.0 h1:3oHqd0W7f/VLKBxeYTEpqdMUsmMectngjM9OtoRoIgg=
+github.com/go-redis/redis/v7 v7.3.0/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg=
 github.com/go-sourcemap/sourcemap v2.1.3+incompatible h1:W1iEw64niKVGogNgBN3ePyLFfuisuzeidWPMPWmECqU=
 github.com/go-sourcemap/sourcemap v2.1.3+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg=
 github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3 h1:zN2lZNZRflqFyxVaTIU61KNKQ9C0055u9CAfpmqUvo4=
@@ -179,6 +183,8 @@ github.com/prometheus/common v0.40.0 h1:Afz7EVRqGg2Mqqf4JuF9vdvp1pi220m55Pi9T2Jn
 github.com/prometheus/common v0.40.0/go.mod h1:L65ZJPSmfn/UBWLQIHV7dBrKFidB/wPlF1y5TlSt9OE=
 github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJfhI=
 github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY=
+github.com/redis/go-redis/v9 v9.0.2 h1:BA426Zqe/7r56kCcvxYLWe1mkaz71LKF77GwgFzSxfE=
+github.com/redis/go-redis/v9 v9.0.2/go.mod h1:/xDTe9EF1LM61hek62Poq2nzQSGj0xSrEtEHbBQevps=
 github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
 github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8=
 github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=

+ 1 - 0
go.work.sum

@@ -376,6 +376,7 @@ github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e h1:aoZm08cpOy4WuID//EZDgc
 github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
 github.com/pkg/profile v1.6.0 h1:hUDfIISABYI59DyeB3OTay/HxSRwTQ8rB/H83k6r5dM=
 github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg=
+github.com/redis/go-redis/v9 v9.0.2/go.mod h1:/xDTe9EF1LM61hek62Poq2nzQSGj0xSrEtEHbBQevps=
 github.com/rogpeppe/fastuuid v1.2.0 h1:Ppwyp6VYCF1nvBTXL3trRso7mXMlRrw9ooo375wvi2s=
 github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE=
 github.com/rs/xid v1.2.1 h1:mhH9Nq+C1fY2l1XIpgxIiUOfNpRBYH1kKcr+qfKgjRc=

+ 5 - 4
internal/io/redis/lookup.go

@@ -17,12 +17,13 @@
 package redis
 
 import (
+	"context"
 	"encoding/json"
 	"errors"
 	"fmt"
-	"github.com/go-redis/redis/v7"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/cast"
+	"github.com/redis/go-redis/v9"
 )
 
 type conf struct {
@@ -68,7 +69,7 @@ func (s *lookupSource) Configure(datasource string, props map[string]interface{}
 		Password: s.c.Password,
 		DB:       s.db,
 	})
-	_, err = s.cli.Ping().Result()
+	_, err = s.cli.Ping(context.Background()).Result()
 	return err
 }
 
@@ -84,7 +85,7 @@ func (s *lookupSource) Lookup(ctx api.StreamContext, _ []string, keys []string,
 	}
 	v := fmt.Sprintf("%v", values[0])
 	if s.c.DataType == "string" {
-		res, err := s.cli.Get(v).Result()
+		res, err := s.cli.Get(ctx, v).Result()
 		if err != nil {
 			if err == redis.Nil {
 				return []api.SourceTuple{}, nil
@@ -98,7 +99,7 @@ func (s *lookupSource) Lookup(ctx api.StreamContext, _ []string, keys []string,
 		}
 		return []api.SourceTuple{api.NewDefaultSourceTuple(m, nil)}, nil
 	} else {
-		res, err := s.cli.LRange(v, 0, -1).Result()
+		res, err := s.cli.LRange(ctx, v, 0, -1).Result()
 		if err != nil {
 			if err == redis.Nil {
 				return []api.SourceTuple{}, nil

+ 8 - 10
internal/io/redis/sink.go

@@ -20,13 +20,11 @@ import (
 	"encoding/json"
 	"errors"
 	"fmt"
-	"github.com/lf-edge/ekuiper/pkg/ast"
-	"time"
-
-	"github.com/go-redis/redis/v7"
-
 	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
+	"github.com/redis/go-redis/v9"
+	"time"
 )
 
 type config struct {
@@ -78,7 +76,7 @@ func (r *RedisSink) Open(ctx api.StreamContext) (err error) {
 		Password: r.c.Password,
 		DB:       r.c.Db, // use default DB
 	})
-	_, err = r.cli.Ping().Result()
+	_, err = r.cli.Ping(ctx).Result()
 	return err
 }
 
@@ -162,13 +160,13 @@ func (r *RedisSink) save(ctx api.StreamContext, data map[string]interface{}, val
 	switch rowkind {
 	case ast.RowkindInsert, ast.RowkindUpdate, ast.RowkindUpsert:
 		if r.c.DataType == "list" {
-			err = r.cli.LPush(key, val).Err()
+			err = r.cli.LPush(ctx, key, val).Err()
 			if err != nil {
 				return fmt.Errorf("lpush %s:%s error, %v", key, val, err)
 			}
 			logger.Debugf("push redis list success, key:%s data: %v", key, val)
 		} else {
-			err = r.cli.Set(key, val, r.c.Expiration*time.Second).Err()
+			err = r.cli.Set(ctx, key, val, r.c.Expiration*time.Second).Err()
 			if err != nil {
 				return fmt.Errorf("set %s:%s error, %v", key, val, err)
 			}
@@ -176,13 +174,13 @@ func (r *RedisSink) save(ctx api.StreamContext, data map[string]interface{}, val
 		}
 	case ast.RowkindDelete:
 		if r.c.DataType == "list" {
-			err = r.cli.LPop(key).Err()
+			err = r.cli.LPop(ctx, key).Err()
 			if err != nil {
 				return fmt.Errorf("lpop %s error, %v", key, err)
 			}
 			logger.Debugf("pop redis list success, key:%s data: %v", key, val)
 		} else {
-			err = r.cli.Del(key).Err()
+			err = r.cli.Del(ctx, key).Err()
 			if err != nil {
 				logger.Error(err)
 				return err

+ 1 - 1
internal/pkg/store/redis/redis.go

@@ -19,8 +19,8 @@ package redis
 
 import (
 	"fmt"
-	"github.com/go-redis/redis/v7"
 	"github.com/lf-edge/ekuiper/internal/pkg/store/definition"
+	"github.com/redis/go-redis/v9"
 	"time"
 )
 

+ 8 - 7
internal/pkg/store/redis/redisKv.go

@@ -18,11 +18,12 @@ package redis
 
 import (
 	"bytes"
+	"context"
 	"encoding/gob"
 	"fmt"
-	"github.com/go-redis/redis/v7"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	kvEncoding "github.com/lf-edge/ekuiper/internal/pkg/store/encoding"
+	"github.com/redis/go-redis/v9"
 	"strings"
 )
 
@@ -48,7 +49,7 @@ func (kv redisKvStore) Setnx(key string, value interface{}) error {
 	if nil != err {
 		return err
 	}
-	done, err := kv.database.SetNX(kv.tableKey(key), b, 0).Result()
+	done, err := kv.database.SetNX(context.Background(), kv.tableKey(key), b, 0).Result()
 	if err != nil {
 		return err
 	}
@@ -63,11 +64,11 @@ func (kv redisKvStore) Set(key string, value interface{}) error {
 	if nil != err {
 		return err
 	}
-	return kv.database.SetNX(kv.tableKey(key), b, 0).Err()
+	return kv.database.SetNX(context.Background(), kv.tableKey(key), b, 0).Err()
 }
 
 func (kv redisKvStore) Get(key string, value interface{}) (bool, error) {
-	val, err := kv.database.Get(kv.tableKey(key)).Result()
+	val, err := kv.database.Get(context.Background(), kv.tableKey(key)).Result()
 	if err != nil {
 		return false, err
 	}
@@ -79,7 +80,7 @@ func (kv redisKvStore) Get(key string, value interface{}) (bool, error) {
 }
 
 func (kv redisKvStore) Delete(key string) error {
-	return kv.database.Del(kv.tableKey(key)).Err()
+	return kv.database.Del(context.Background(), kv.tableKey(key)).Err()
 }
 
 func (kv redisKvStore) Keys() ([]string, error) {
@@ -117,7 +118,7 @@ func (kv redisKvStore) All() (map[string]string, error) {
 }
 
 func (kv redisKvStore) metaKeys() ([]string, error) {
-	return kv.database.Keys(fmt.Sprintf("%s:*", kv.keyPrefix)).Result()
+	return kv.database.Keys(context.Background(), fmt.Sprintf("%s:*", kv.keyPrefix)).Result()
 }
 
 func (kv redisKvStore) Clean() error {
@@ -129,7 +130,7 @@ func (kv redisKvStore) Clean() error {
 	for i, v := range keys {
 		keysToRemove[i] = v
 	}
-	return kv.database.Del(keysToRemove...).Err()
+	return kv.database.Del(context.Background(), keysToRemove...).Err()
 }
 
 func (kv redisKvStore) Drop() error {

+ 1 - 1
internal/pkg/store/redis/redisKv_test.go

@@ -18,9 +18,9 @@ package redis
 
 import (
 	"github.com/alicebob/miniredis/v2"
-	"github.com/go-redis/redis/v7"
 	"github.com/lf-edge/ekuiper/internal/pkg/store/test/common"
 	"github.com/lf-edge/ekuiper/pkg/kv"
+	"github.com/redis/go-redis/v9"
 	"strconv"
 	"testing"
 )

+ 1 - 1
internal/pkg/store/redis/redisStoreBuilder.go

@@ -18,8 +18,8 @@
 package redis
 
 import (
-	"github.com/go-redis/redis/v7"
 	"github.com/lf-edge/ekuiper/pkg/kv"
+	"github.com/redis/go-redis/v9"
 )
 
 type StoreBuilder struct {

+ 8 - 7
internal/pkg/store/redis/redisTs.go

@@ -19,10 +19,11 @@ package redis
 
 import (
 	"bytes"
+	"context"
 	"encoding/gob"
 	"fmt"
-	"github.com/go-redis/redis/v7"
 	kvEncoding "github.com/lf-edge/ekuiper/internal/pkg/store/encoding"
+	"github.com/redis/go-redis/v9"
 	"strconv"
 )
 
@@ -69,7 +70,7 @@ func (t *ts) Set(key int64, value interface{}) (bool, error) {
 	if err != nil {
 		return false, err
 	}
-	length, err := t.db.ZAdd(t.key, &redis.Z{Score: float64(key), Member: b}).Result()
+	length, err := t.db.ZAdd(context.Background(), t.key, redis.Z{Score: float64(key), Member: b}).Result()
 	if err != nil {
 		return false, err
 	}
@@ -81,7 +82,7 @@ func (t *ts) Set(key int64, value interface{}) (bool, error) {
 }
 
 func (t *ts) Get(key int64, value interface{}) (bool, error) {
-	reply, err := t.db.ZRevRangeByScore(t.key, &redis.ZRangeBy{Min: strconv.FormatInt(key, 10), Max: strconv.FormatInt(key, 10)}).Result()
+	reply, err := t.db.ZRevRangeByScore(context.Background(), t.key, &redis.ZRangeBy{Min: strconv.FormatInt(key, 10), Max: strconv.FormatInt(key, 10)}).Result()
 	if len(reply) == 0 {
 		return false, fmt.Errorf("record under %s key and %d score not found", t.key, key)
 	}
@@ -98,11 +99,11 @@ func (t *ts) Last(value interface{}) (int64, error) {
 }
 
 func (t *ts) Delete(key int64) error {
-	return t.db.ZRemRangeByScore(t.key, strconv.FormatInt(key, 10), strconv.FormatInt(key, 10)).Err()
+	return t.db.ZRemRangeByScore(context.Background(), t.key, strconv.FormatInt(key, 10), strconv.FormatInt(key, 10)).Err()
 }
 
 func (t *ts) DeleteBefore(key int64) error {
-	return t.db.ZRemRangeByScore(t.key, "-inf", strconv.FormatInt(key, 10)).Err()
+	return t.db.ZRemRangeByScore(context.Background(), t.key, "-inf", strconv.FormatInt(key, 10)).Err()
 }
 
 func (t *ts) Close() error {
@@ -110,12 +111,12 @@ func (t *ts) Close() error {
 }
 
 func (t *ts) Drop() error {
-	return t.db.Del(t.key).Err()
+	return t.db.Del(context.Background(), t.key).Err()
 }
 
 func getLast(db *redis.Client, key string, value interface{}) (int64, error) {
 	var last int64 = 0
-	reply, err := db.ZRevRangeWithScores(key, 0, 0).Result()
+	reply, err := db.ZRevRangeWithScores(context.Background(), key, 0, 0).Result()
 	if len(reply) > 0 {
 		if value != nil {
 			v := reply[0].Member.(string)

+ 1 - 1
internal/pkg/store/redis/redisTsBuilder.go

@@ -18,8 +18,8 @@
 package redis
 
 import (
-	"github.com/go-redis/redis/v7"
 	st "github.com/lf-edge/ekuiper/pkg/kv"
+	"github.com/redis/go-redis/v9"
 )
 
 type TsBuilder struct {

+ 1 - 1
internal/pkg/store/redis/redisTs_test.go

@@ -18,9 +18,9 @@ package redis
 
 import (
 	"github.com/alicebob/miniredis/v2"
-	"github.com/go-redis/redis/v7"
 	"github.com/lf-edge/ekuiper/internal/pkg/store/test/common"
 	ts2 "github.com/lf-edge/ekuiper/pkg/kv"
+	"github.com/redis/go-redis/v9"
 	"testing"
 )