瀏覽代碼

refactor(redis): switch all redis inference to goredis

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 年之前
父節點
當前提交
91703f9ecb

+ 0 - 1
extensions.mod

@@ -26,7 +26,6 @@ require (
 	github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3
 	github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3
 	github.com/golang-jwt/jwt v3.2.2+incompatible
 	github.com/golang-jwt/jwt v3.2.2+incompatible
 	github.com/golang/protobuf v1.5.2
 	github.com/golang/protobuf v1.5.2
-	github.com/gomodule/redigo v2.0.0+incompatible
 	github.com/google/uuid v1.3.0
 	github.com/google/uuid v1.3.0
 	github.com/googleapis/go-sql-spanner v0.0.0-20220321120010-12780e57be1c
 	github.com/googleapis/go-sql-spanner v0.0.0-20220321120010-12780e57be1c
 	github.com/gorilla/handlers v1.4.2
 	github.com/gorilla/handlers v1.4.2

+ 0 - 2
extensions.sum

@@ -407,8 +407,6 @@ github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu
 github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA=
 github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA=
 github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
 github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
 github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219/go.mod h1:/X8TswGSh1pIozq4ZwCfxS0WA5JGXguxk94ar/4c87Y=
 github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219/go.mod h1:/X8TswGSh1pIozq4ZwCfxS0WA5JGXguxk94ar/4c87Y=
-github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0=
-github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4=
 github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
 github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
 github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
 github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
 github.com/google/flatbuffers v2.0.0+incompatible h1:dicJ2oXwypfwUGnB2/TYWYEKiuk9eYQlQO/AnOHl5mI=
 github.com/google/flatbuffers v2.0.0+incompatible h1:dicJ2oXwypfwUGnB2/TYWYEKiuk9eYQlQO/AnOHl5mI=

+ 1 - 2
go.mod

@@ -9,12 +9,12 @@ require (
 	github.com/eclipse/paho.mqtt.golang v1.4.2-0.20220810043731-079a117b4614
 	github.com/eclipse/paho.mqtt.golang v1.4.2-0.20220810043731-079a117b4614
 	github.com/edgexfoundry/go-mod-core-contracts/v2 v2.2.0
 	github.com/edgexfoundry/go-mod-core-contracts/v2 v2.2.0
 	github.com/edgexfoundry/go-mod-messaging/v2 v2.2.0
 	github.com/edgexfoundry/go-mod-messaging/v2 v2.2.0
+	github.com/fxamacker/cbor/v2 v2.4.0
 	github.com/gdexlab/go-render v1.0.1
 	github.com/gdexlab/go-render v1.0.1
 	github.com/go-redis/redis/v7 v7.3.0
 	github.com/go-redis/redis/v7 v7.3.0
 	github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3
 	github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3
 	github.com/golang-jwt/jwt v3.2.2+incompatible
 	github.com/golang-jwt/jwt v3.2.2+incompatible
 	github.com/golang/protobuf v1.5.2
 	github.com/golang/protobuf v1.5.2
-	github.com/gomodule/redigo v2.0.0+incompatible
 	github.com/google/uuid v1.3.0
 	github.com/google/uuid v1.3.0
 	github.com/gorilla/handlers v1.4.2
 	github.com/gorilla/handlers v1.4.2
 	github.com/gorilla/mux v1.7.3
 	github.com/gorilla/mux v1.7.3
@@ -46,7 +46,6 @@ require (
 	github.com/cespare/xxhash/v2 v2.1.1 // indirect
 	github.com/cespare/xxhash/v2 v2.1.1 // indirect
 	github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d // indirect
 	github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d // indirect
 	github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect
 	github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect
-	github.com/fxamacker/cbor/v2 v2.4.0 // indirect
 	github.com/go-playground/locales v0.14.0 // indirect
 	github.com/go-playground/locales v0.14.0 // indirect
 	github.com/go-playground/universal-translator v0.18.0 // indirect
 	github.com/go-playground/universal-translator v0.18.0 // indirect
 	github.com/go-playground/validator/v10 v10.10.1 // indirect
 	github.com/go-playground/validator/v10 v10.10.1 // indirect

+ 0 - 3
go.sum

@@ -114,8 +114,6 @@ github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw
 github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
 github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
 github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
 github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
 github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
 github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
-github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0=
-github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4=
 github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
 github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
 github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
 github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
 github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
 github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
@@ -438,7 +436,6 @@ gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
 gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
-gopkg.in/yaml.v3 v3.0.0 h1:hjy8E9ON/egN1tAYqKb61G10WtihqetD4sz2H+8nIeA=
 gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
 gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
 gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

+ 11 - 97
internal/pkg/store/redis/redis.go

@@ -19,108 +19,22 @@ package redis
 
 
 import (
 import (
 	"fmt"
 	"fmt"
-	"github.com/gomodule/redigo/redis"
+	"github.com/go-redis/redis/v7"
 	"github.com/lf-edge/ekuiper/internal/pkg/store/definition"
 	"github.com/lf-edge/ekuiper/internal/pkg/store/definition"
-	"sync"
 	"time"
 	"time"
 )
 )
 
 
-type Instance struct {
-	ConnectionString string
-	pool             *redis.Pool
-	mu               *sync.Mutex
-	config           definition.RedisConfig
-}
-
-func NewRedisFromConf(c definition.Config) (definition.Database, error) {
+func NewRedisFromConf(c definition.Config) *redis.Client {
 	conf := c.Redis
 	conf := c.Redis
-	host := conf.Host
-	port := conf.Port
-	return &Instance{
-		ConnectionString: connectionString(host, port),
-		pool:             nil,
-		mu:               &sync.Mutex{},
-		config:           conf,
-	}, nil
-}
-
-func NewRedis(host string, port int) *Instance {
-	return &Instance{
-		ConnectionString: connectionString(host, port),
-		pool:             nil,
-		mu:               &sync.Mutex{},
-	}
-}
-
-func (r *Instance) Connect() error {
-	r.mu.Lock()
-	defer r.mu.Unlock()
-	if r.ConnectionString == "" {
-		return fmt.Errorf("connection string for redis not initalized")
-	}
-	err, pool := r.connectRedis()
-	if err != nil {
-		return err
-	}
-	conn := pool.Get()
-	defer conn.Close()
-	reply, err := conn.Do("PING")
-	if err != nil {
-		return err
-	}
-	response, err := redis.String(reply, err)
-	if err != nil {
-		return err
-	}
-	if response != "PONG" {
-		return fmt.Errorf("failed to connect to redis")
-	}
-	r.pool = pool
-	return nil
-}
-
-func (r *Instance) connectRedis() (error, *redis.Pool) {
-	opts := []redis.DialOption{
-		redis.DialConnectTimeout(time.Duration(r.config.Timeout) * time.Millisecond),
-	}
-	if r.config.Password != "" {
-		opts = append(opts, redis.DialPassword(r.config.Password))
-	}
-	dialFunction := func() (redis.Conn, error) {
-		conn, err := redis.Dial("tcp", r.ConnectionString, opts...)
-		if err == nil {
-			_, err = conn.Do("PING")
-			if err == nil {
-				return conn, nil
-			}
-		}
-		return nil, fmt.Errorf("could not dial redis: %s", err)
-	}
-	pool := &redis.Pool{
-		IdleTimeout: 0,
-		MaxIdle:     10,
-		Dial:        dialFunction,
-	}
-	return nil, pool
-}
-
-func connectionString(host string, port int) string {
-	return fmt.Sprintf("%s:%d", host, port)
-}
-
-func (r *Instance) Disconnect() error {
-	r.mu.Lock()
-	defer r.mu.Unlock()
-	if r == nil {
-		return nil
-	}
-	err := r.pool.Close()
-	r.pool = nil
-	return err
+	return redis.NewClient(&redis.Options{
+		Addr:        fmt.Sprintf("%s:%d", conf.Host, conf.Port),
+		Password:    conf.Password,
+		DialTimeout: time.Duration(conf.Timeout) * time.Millisecond,
+	})
 }
 }
 
 
-func (r *Instance) Apply(f func(conn redis.Conn) error) error {
-	connection := r.pool.Get()
-	defer connection.Close()
-	return f(connection)
+func NewRedis(host string, port int) *redis.Client {
+	return redis.NewClient(&redis.Options{
+		Addr: fmt.Sprintf("%s:%d", host, port),
+	})
 }
 }

+ 30 - 85
internal/pkg/store/redis/redisKv.go

@@ -13,7 +13,6 @@
 // limitations under the License.
 // limitations under the License.
 
 
 //go:build redisdb || !core
 //go:build redisdb || !core
-// +build redisdb !core
 
 
 package redis
 package redis
 
 
@@ -21,7 +20,7 @@ import (
 	"bytes"
 	"bytes"
 	"encoding/gob"
 	"encoding/gob"
 	"fmt"
 	"fmt"
-	"github.com/gomodule/redigo/redis"
+	"github.com/go-redis/redis/v7"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	kvEncoding "github.com/lf-edge/ekuiper/internal/pkg/store/encoding"
 	kvEncoding "github.com/lf-edge/ekuiper/internal/pkg/store/encoding"
 	"strings"
 	"strings"
@@ -30,12 +29,12 @@ import (
 const KvPrefix = "KV:STORE"
 const KvPrefix = "KV:STORE"
 
 
 type redisKvStore struct {
 type redisKvStore struct {
-	database  *Instance
+	database  *redis.Client
 	table     string
 	table     string
 	keyPrefix string
 	keyPrefix string
 }
 }
 
 
-func CreateRedisKvStore(redis *Instance, table string) (*redisKvStore, error) {
+func createRedisKvStore(redis *redis.Client, table string) (*redisKvStore, error) {
 	store := &redisKvStore{
 	store := &redisKvStore{
 		database:  redis,
 		database:  redis,
 		table:     table,
 		table:     table,
@@ -45,22 +44,18 @@ func CreateRedisKvStore(redis *Instance, table string) (*redisKvStore, error) {
 }
 }
 
 
 func (kv redisKvStore) Setnx(key string, value interface{}) error {
 func (kv redisKvStore) Setnx(key string, value interface{}) error {
-	return kv.database.Apply(func(conn redis.Conn) error {
-		err, b := kvEncoding.Encode(value)
-		if nil != err {
-			return err
-		}
-		tKey := kv.tableKey(key)
-		reply, err := conn.Do("SETNX", tKey, b)
-		if err != nil {
-			return err
-		}
-		code, err := redis.Int(reply, err)
-		if code == 0 {
-			return fmt.Errorf("item %s already exists under %s key because of %s", key, tKey, err)
-		}
-		return nil
-	})
+	err, b := kvEncoding.Encode(value)
+	if nil != err {
+		return err
+	}
+	done, err := kv.database.SetNX(kv.tableKey(key), b, 0).Result()
+	if err != nil {
+		return err
+	}
+	if !done {
+		return fmt.Errorf("key %s already exists", key)
+	}
+	return nil
 }
 }
 
 
 func (kv redisKvStore) Set(key string, value interface{}) error {
 func (kv redisKvStore) Set(key string, value interface{}) error {
@@ -68,50 +63,23 @@ func (kv redisKvStore) Set(key string, value interface{}) error {
 	if nil != err {
 	if nil != err {
 		return err
 		return err
 	}
 	}
-	err = kv.database.Apply(func(conn redis.Conn) error {
-		tKey := kv.tableKey(key)
-		reply, err := conn.Do("SET", tKey, b)
-		code, err := redis.String(reply, err)
-		if err != nil {
-			return err
-		}
-		if code != "OK" {
-			return fmt.Errorf("item %s (under key %s) not set because of %s", key, tKey, err)
-		}
-		return nil
-	})
-	return err
+	return kv.database.SetNX(kv.tableKey(key), b, 0).Err()
 }
 }
 
 
 func (kv redisKvStore) Get(key string, value interface{}) (bool, error) {
 func (kv redisKvStore) Get(key string, value interface{}) (bool, error) {
-	result := false
-	err := kv.database.Apply(func(conn redis.Conn) error {
-		tKey := kv.tableKey(key)
-		reply, err := conn.Do("GET", tKey)
-		if err != nil {
-			return err
-		}
-		buff, err := redis.Bytes(reply, err)
-		if err != nil {
-			result = false
-			return nil
-		}
-		dec := gob.NewDecoder(bytes.NewBuffer(buff))
-		if err := dec.Decode(value); err != nil {
-			return err
-		}
-		result = true
-		return nil
-	})
-	return result, err
+	val, err := kv.database.Get(kv.tableKey(key)).Result()
+	if err != nil {
+		return false, err
+	}
+	dec := gob.NewDecoder(bytes.NewBuffer([]byte(val)))
+	if err := dec.Decode(value); err != nil {
+		return false, err
+	}
+	return true, nil
 }
 }
 
 
 func (kv redisKvStore) Delete(key string) error {
 func (kv redisKvStore) Delete(key string) error {
-	return kv.database.Apply(func(conn redis.Conn) error {
-		tKey := kv.tableKey(key)
-		_, err := conn.Do("DEL", tKey)
-		return err
-	})
+	return kv.database.Del(kv.tableKey(key)).Err()
 }
 }
 
 
 func (kv redisKvStore) Keys() ([]string, error) {
 func (kv redisKvStore) Keys() ([]string, error) {
@@ -149,14 +117,7 @@ func (kv redisKvStore) All() (map[string]string, error) {
 }
 }
 
 
 func (kv redisKvStore) metaKeys() ([]string, error) {
 func (kv redisKvStore) metaKeys() ([]string, error) {
-	keys := make([]string, 0)
-	err := kv.database.Apply(func(conn redis.Conn) error {
-		pattern := fmt.Sprintf("%s:*", kv.keyPrefix)
-		reply, err := conn.Do("KEYS", pattern)
-		keys, err = redis.Strings(reply, err)
-		return err
-	})
-	return keys, err
+	return kv.database.Keys(fmt.Sprintf("%s:*", kv.keyPrefix)).Result()
 }
 }
 
 
 func (kv redisKvStore) Clean() error {
 func (kv redisKvStore) Clean() error {
@@ -164,31 +125,15 @@ func (kv redisKvStore) Clean() error {
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
-	keysToRemove := make([]interface{}, len(keys))
+	keysToRemove := make([]string, len(keys))
 	for i, v := range keys {
 	for i, v := range keys {
 		keysToRemove[i] = v
 		keysToRemove[i] = v
 	}
 	}
-	err = kv.database.Apply(func(conn redis.Conn) error {
-		_, err := conn.Do("DEL", keysToRemove...)
-		return err
-	})
-	return err
+	return kv.database.Del(keysToRemove...).Err()
 }
 }
 
 
 func (kv redisKvStore) Drop() error {
 func (kv redisKvStore) Drop() error {
-	keys, err := kv.metaKeys()
-	if err != nil {
-		return err
-	}
-	keysToRemove := make([]interface{}, len(keys))
-	for i, v := range keys {
-		keysToRemove[i] = v
-	}
-	err = kv.database.Apply(func(conn redis.Conn) error {
-		_, err := conn.Do("DEL", keysToRemove...)
-		return err
-	})
-	return err
+	return kv.Clean()
 }
 }
 
 
 func (kv redisKvStore) tableKey(key string) string {
 func (kv redisKvStore) tableKey(key string) string {

+ 7 - 9
internal/pkg/store/redis/redisKv_test.go

@@ -13,12 +13,12 @@
 // limitations under the License.
 // limitations under the License.
 
 
 //go:build redisdb || !core
 //go:build redisdb || !core
-// +build redisdb !core
 
 
 package redis
 package redis
 
 
 import (
 import (
 	"github.com/alicebob/miniredis/v2"
 	"github.com/alicebob/miniredis/v2"
+	"github.com/go-redis/redis/v7"
 	"github.com/lf-edge/ekuiper/internal/pkg/store/test/common"
 	"github.com/lf-edge/ekuiper/internal/pkg/store/test/common"
 	"github.com/lf-edge/ekuiper/pkg/kv"
 	"github.com/lf-edge/ekuiper/pkg/kv"
 	"strconv"
 	"strconv"
@@ -59,16 +59,14 @@ func TestRedisKvAll(t *testing.T) {
 	common.TestKvAll(length, ks, t)
 	common.TestKvAll(length, ks, t)
 }
 }
 
 
-func setupRedisKv() (kv.KeyValue, *Instance, *miniredis.Miniredis) {
+func setupRedisKv() (kv.KeyValue, *redis.Client, *miniredis.Miniredis) {
 	minRedis, err := miniredis.Run()
 	minRedis, err := miniredis.Run()
 	if err != nil {
 	if err != nil {
 		panic(err)
 		panic(err)
 	}
 	}
-	redisDB := NewRedis("localhost", stringToInt(minRedis.Port()))
-	err = redisDB.Connect()
-	if err != nil {
-		panic(err)
-	}
+	redisDB := redis.NewClient(&redis.Options{
+		Addr: minRedis.Addr(),
+	})
 	builder := NewStoreBuilder(redisDB)
 	builder := NewStoreBuilder(redisDB)
 	var ks kv.KeyValue
 	var ks kv.KeyValue
 	ks, err = builder.CreateStore("test")
 	ks, err = builder.CreateStore("test")
@@ -78,8 +76,8 @@ func setupRedisKv() (kv.KeyValue, *Instance, *miniredis.Miniredis) {
 	return ks, redisDB, minRedis
 	return ks, redisDB, minRedis
 }
 }
 
 
-func cleanRedisKv(instance *Instance, minRedis *miniredis.Miniredis) {
-	instance.Disconnect()
+func cleanRedisKv(instance *redis.Client, minRedis *miniredis.Miniredis) {
+	instance.Close()
 	minRedis.Close()
 	minRedis.Close()
 }
 }
 
 

+ 4 - 3
internal/pkg/store/redis/redisStoreBuilder.go

@@ -18,19 +18,20 @@
 package redis
 package redis
 
 
 import (
 import (
+	"github.com/go-redis/redis/v7"
 	"github.com/lf-edge/ekuiper/pkg/kv"
 	"github.com/lf-edge/ekuiper/pkg/kv"
 )
 )
 
 
 type StoreBuilder struct {
 type StoreBuilder struct {
-	database *Instance
+	database *redis.Client
 }
 }
 
 
-func NewStoreBuilder(redis *Instance) StoreBuilder {
+func NewStoreBuilder(redis *redis.Client) StoreBuilder {
 	return StoreBuilder{
 	return StoreBuilder{
 		database: redis,
 		database: redis,
 	}
 	}
 }
 }
 
 
 func (b StoreBuilder) CreateStore(table string) (kv.KeyValue, error) {
 func (b StoreBuilder) CreateStore(table string) (kv.KeyValue, error) {
-	return CreateRedisKvStore(b.database, table)
+	return createRedisKvStore(b.database, table)
 }
 }

+ 38 - 100
internal/pkg/store/redis/redisTs.go

@@ -21,7 +21,7 @@ import (
 	"bytes"
 	"bytes"
 	"encoding/gob"
 	"encoding/gob"
 	"fmt"
 	"fmt"
-	"github.com/gomodule/redigo/redis"
+	"github.com/go-redis/redis/v7"
 	kvEncoding "github.com/lf-edge/ekuiper/internal/pkg/store/encoding"
 	kvEncoding "github.com/lf-edge/ekuiper/internal/pkg/store/encoding"
 	"strconv"
 	"strconv"
 )
 )
@@ -36,7 +36,7 @@ const (
 )
 )
 
 
 type ts struct {
 type ts struct {
-	redis *Instance
+	db    *redis.Client
 	table string
 	table string
 	last  int64
 	last  int64
 	key   string
 	key   string
@@ -46,14 +46,14 @@ func init() {
 	gob.Register(make(map[string]interface{}))
 	gob.Register(make(map[string]interface{}))
 }
 }
 
 
-func createRedisTs(redis *Instance, table string) (error, *ts) {
+func createRedisTs(redis *redis.Client, table string) (error, *ts) {
 	key := fmt.Sprintf("%s:%s", TsPrefix, table)
 	key := fmt.Sprintf("%s:%s", TsPrefix, table)
-	err, lastTs := getLast(redis, key)
+	lastTs, err := getLast(redis, key, nil)
 	if err != nil {
 	if err != nil {
 		return err, nil
 		return err, nil
 	}
 	}
 	s := &ts{
 	s := &ts{
-		redis: redis,
+		db:    redis,
 		table: table,
 		table: table,
 		last:  lastTs,
 		last:  lastTs,
 		key:   key,
 		key:   key,
@@ -69,124 +69,62 @@ func (t *ts) Set(key int64, value interface{}) (bool, error) {
 	if err != nil {
 	if err != nil {
 		return false, err
 		return false, err
 	}
 	}
-	err = t.redis.Apply(func(conn redis.Conn) error {
-		reply, err := conn.Do(AddToSortedSet, t.key, key, b)
-		if err != nil {
-			return err
-		}
-		length, err := redis.Int(reply, err)
-		if err != nil {
-			return err
-		}
-		if length == 0 {
-			return fmt.Errorf("list at %s key should be non empty", t.key)
-		}
-		t.last = key
-		return nil
-	})
+	length, err := t.db.ZAdd(t.key, &redis.Z{Score: float64(key), Member: b}).Result()
 	if err != nil {
 	if err != nil {
 		return false, err
 		return false, err
 	}
 	}
+	if length == 0 {
+		return false, fmt.Errorf("list at %s key should be non empty", t.key)
+	}
+	t.last = key
 	return true, nil
 	return true, nil
 }
 }
 
 
-func (t ts) Get(key int64, value interface{}) (bool, error) {
-	err := t.redis.Apply(func(conn redis.Conn) error {
-		reply, err := conn.Do(ReversedRangeByScore, t.key, key, key)
-		if err != nil {
-			return err
-		}
-		var tmp [][]byte
-		tmp, err = redis.ByteSlices(reply, err)
-		if err != nil {
-			return err
-		}
-		if len(tmp) == 0 {
-			return fmt.Errorf("record under %s key and %d score not found", t.key, key)
-		}
-		dec := gob.NewDecoder(bytes.NewBuffer(tmp[0]))
-		err = dec.Decode(value)
-		return err
-	})
+func (t *ts) Get(key int64, value interface{}) (bool, error) {
+	reply, err := t.db.ZRevRangeByScore(t.key, &redis.ZRangeBy{Min: strconv.FormatInt(key, 10), Max: strconv.FormatInt(key, 10)}).Result()
+	if len(reply) == 0 {
+		return false, fmt.Errorf("record under %s key and %d score not found", t.key, key)
+	}
+	dec := gob.NewDecoder(bytes.NewBuffer([]byte(reply[0])))
+	err = dec.Decode(value)
 	if err != nil {
 	if err != nil {
 		return false, err
 		return false, err
 	}
 	}
 	return true, nil
 	return true, nil
 }
 }
 
 
-func (t ts) Last(value interface{}) (int64, error) {
-	var last int64 = 0
-	err := t.redis.Apply(func(conn redis.Conn) error {
-		reply, err := conn.Do(ReversedRange, t.key, 0, 0, "WITHSCORES")
-		if err != nil {
-			return err
-		}
-		var tmp [][]byte
-		tmp, err = redis.ByteSlices(reply, err)
-		if err != nil {
-			return err
-		}
-		if len(tmp) > 0 {
-			dec := gob.NewDecoder(bytes.NewBuffer(tmp[0]))
-			if err = dec.Decode(value); err != nil {
-				return err
-			}
-			last, err = strconv.ParseInt(string(tmp[1]), 10, 64)
-		}
-		return err
-	})
-	if err != nil {
-		return 0, err
-	}
-	return last, nil
+func (t *ts) Last(value interface{}) (int64, error) {
+	return getLast(t.db, t.key, value)
 }
 }
 
 
-func (t ts) Delete(key int64) error {
-	return t.redis.Apply(func(conn redis.Conn) error {
-		_, err := conn.Do(RemoveRangeByScore, t.key, key, key)
-		return err
-	})
+func (t *ts) Delete(key int64) error {
+	return t.db.ZRemRangeByScore(t.key, strconv.FormatInt(key, 10), strconv.FormatInt(key, 10)).Err()
 }
 }
 
 
-func (t ts) DeleteBefore(key int64) error {
-	return t.redis.Apply(func(conn redis.Conn) error {
-		bound := fmt.Sprintf("(%d", key)
-		_, err := conn.Do(RemoveRangeByScore, t.key, "-INF", bound)
-		return err
-	})
+func (t *ts) DeleteBefore(key int64) error {
+	return t.db.ZRemRangeByScore(t.key, "-inf", strconv.FormatInt(key, 10)).Err()
 }
 }
 
 
-func (t ts) Close() error {
+func (t *ts) Close() error {
 	return nil
 	return nil
 }
 }
 
 
-func (t ts) Drop() error {
-	return t.redis.Apply(func(conn redis.Conn) error {
-		_, err := conn.Do(Delete, t.key)
-		return err
-	})
+func (t *ts) Drop() error {
+	return t.db.Del(t.key).Err()
 }
 }
 
 
-func getLast(db *Instance, key string) (error, int64) {
-	var lastTs int64
-	err := db.Apply(func(conn redis.Conn) error {
-		reply, err := conn.Do(ReversedRange, key, 0, 0, "WITHSCORES")
-		if err != nil {
-			return err
-		}
-		var tmp [][]byte
-		tmp, err = redis.ByteSlices(reply, err)
-		if err != nil {
-			return err
-		}
-		if len(tmp) == 0 {
-			return nil
+func getLast(db *redis.Client, key string, value interface{}) (int64, error) {
+	var last int64 = 0
+	reply, err := db.ZRevRangeWithScores(key, 0, 0).Result()
+	if len(reply) > 0 {
+		if value != nil {
+			v := reply[0].Member.(string)
+			dec := gob.NewDecoder(bytes.NewBuffer([]byte(v)))
+			if err = dec.Decode(value); err != nil {
+				return 0, err
+			}
 		}
 		}
-		lastTs, err = strconv.ParseInt(string(tmp[1]), 10, 64)
-		return err
-	})
-	if err != nil {
-		return err, 0
+		last = int64(reply[0].Score)
 	}
 	}
-	return nil, lastTs
+	return last, nil
 }
 }

+ 3 - 2
internal/pkg/store/redis/redisTsBuilder.go

@@ -18,14 +18,15 @@
 package redis
 package redis
 
 
 import (
 import (
+	"github.com/go-redis/redis/v7"
 	st "github.com/lf-edge/ekuiper/pkg/kv"
 	st "github.com/lf-edge/ekuiper/pkg/kv"
 )
 )
 
 
 type TsBuilder struct {
 type TsBuilder struct {
-	redis *Instance
+	redis *redis.Client
 }
 }
 
 
-func NewTsBuilder(d *Instance) TsBuilder {
+func NewTsBuilder(d *redis.Client) TsBuilder {
 	return TsBuilder{
 	return TsBuilder{
 		redis: d,
 		redis: d,
 	}
 	}

+ 5 - 7
internal/pkg/store/redis/redisTs_test.go

@@ -13,12 +13,12 @@
 // limitations under the License.
 // limitations under the License.
 
 
 //go:build redisdb || !core
 //go:build redisdb || !core
-// +build redisdb !core
 
 
 package redis
 package redis
 
 
 import (
 import (
 	"github.com/alicebob/miniredis/v2"
 	"github.com/alicebob/miniredis/v2"
+	"github.com/go-redis/redis/v7"
 	"github.com/lf-edge/ekuiper/internal/pkg/store/test/common"
 	"github.com/lf-edge/ekuiper/internal/pkg/store/test/common"
 	ts2 "github.com/lf-edge/ekuiper/pkg/kv"
 	ts2 "github.com/lf-edge/ekuiper/pkg/kv"
 	"testing"
 	"testing"
@@ -59,16 +59,14 @@ func TestRedisTsDeleteBefore(t *testing.T) {
 	common.TestTsDeleteBefore(ks, t)
 	common.TestTsDeleteBefore(ks, t)
 }
 }
 
 
-func setupTRedisKv() (ts2.Tskv, *Instance, *miniredis.Miniredis) {
+func setupTRedisKv() (ts2.Tskv, *redis.Client, *miniredis.Miniredis) {
 	minRedis, err := miniredis.Run()
 	minRedis, err := miniredis.Run()
 	if err != nil {
 	if err != nil {
 		panic(err)
 		panic(err)
 	}
 	}
-	redisDB := NewRedis("localhost", stringToInt(minRedis.Port()))
-	err = redisDB.Connect()
-	if err != nil {
-		panic(err)
-	}
+	redisDB := redis.NewClient(&redis.Options{
+		Addr: minRedis.Addr(),
+	})
 
 
 	builder := NewTsBuilder(redisDB)
 	builder := NewTsBuilder(redisDB)
 	var ks ts2.Tskv
 	var ks ts2.Tskv

+ 1 - 13
internal/pkg/store/redis/stores_builder.go

@@ -18,23 +18,11 @@
 package redis
 package redis
 
 
 import (
 import (
-	"fmt"
 	"github.com/lf-edge/ekuiper/internal/pkg/store/definition"
 	"github.com/lf-edge/ekuiper/internal/pkg/store/definition"
 )
 )
 
 
 func BuildStores(c definition.Config, _ string) (definition.StoreBuilder, definition.TsBuilder, error) {
 func BuildStores(c definition.Config, _ string) (definition.StoreBuilder, definition.TsBuilder, error) {
-	db, err := NewRedisFromConf(c)
-	if err != nil {
-		return nil, nil, err
-	}
-	err = db.Connect()
-	if err != nil {
-		return nil, nil, err
-	}
-	d, ok := db.(*Instance)
-	if !ok {
-		return nil, nil, fmt.Errorf("unrecognized database type")
-	}
+	d := NewRedisFromConf(c)
 	kvBuilder := NewStoreBuilder(d)
 	kvBuilder := NewStoreBuilder(d)
 	tsBuilder := NewTsBuilder(d)
 	tsBuilder := NewTsBuilder(d)
 	return kvBuilder, tsBuilder, nil
 	return kvBuilder, tsBuilder, nil

+ 1 - 1
internal/pkg/store/test/common/test.go

@@ -28,7 +28,7 @@ func TestKvSetnx(ks kv.KeyValue, t *testing.T) {
 	}
 	}
 
 
 	if err := ks.Setnx("foo", "bar1"); nil == err {
 	if err := ks.Setnx("foo", "bar1"); nil == err {
-		t.Errorf("Can't overwrite an existing intem")
+		t.Errorf("Can't overwrite an existing intem: %v", err)
 	}
 	}
 }
 }
 
 

+ 1 - 1
internal/pkg/store/test/common/tstest.go

@@ -81,7 +81,7 @@ func TestTsDeleteBefore(ks kv.Tskv, t *testing.T) {
 		t.Error("should allow key 3500")
 		t.Error("should allow key 3500")
 	}
 	}
 
 
-	if err := ks.DeleteBefore(3000); nil != err {
+	if err := ks.DeleteBefore(2999); nil != err {
 		t.Error(err)
 		t.Error(err)
 	}
 	}