redis.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. // Copyright 2022-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build redisdb || !core
  15. // +build redisdb !core
  16. package redis
  17. import (
  18. "fmt"
  19. "github.com/gomodule/redigo/redis"
  20. "github.com/lf-edge/ekuiper/internal/pkg/store/definition"
  21. "sync"
  22. "time"
  23. )
  24. type Instance struct {
  25. ConnectionString string
  26. pool *redis.Pool
  27. mu *sync.Mutex
  28. config definition.RedisConfig
  29. }
  30. func NewRedisFromConf(c definition.Config) (definition.Database, error) {
  31. conf := c.Redis
  32. host := conf.Host
  33. port := conf.Port
  34. return &Instance{
  35. ConnectionString: connectionString(host, port),
  36. pool: nil,
  37. mu: &sync.Mutex{},
  38. config: conf,
  39. }, nil
  40. }
  41. func NewRedis(host string, port int) *Instance {
  42. return &Instance{
  43. ConnectionString: connectionString(host, port),
  44. pool: nil,
  45. mu: &sync.Mutex{},
  46. }
  47. }
  48. func (r *Instance) Connect() error {
  49. r.mu.Lock()
  50. defer r.mu.Unlock()
  51. if r.ConnectionString == "" {
  52. return fmt.Errorf("connection string for redis not initalized")
  53. }
  54. err, pool := r.connectRedis()
  55. if err != nil {
  56. return err
  57. }
  58. conn := pool.Get()
  59. defer conn.Close()
  60. reply, err := conn.Do("PING")
  61. if err != nil {
  62. return err
  63. }
  64. response, err := redis.String(reply, err)
  65. if err != nil {
  66. return err
  67. }
  68. if response != "PONG" {
  69. return fmt.Errorf("failed to connect to redis")
  70. }
  71. r.pool = pool
  72. return nil
  73. }
  74. func (r *Instance) connectRedis() (error, *redis.Pool) {
  75. opts := []redis.DialOption{
  76. redis.DialConnectTimeout(time.Duration(r.config.Timeout) * time.Millisecond),
  77. }
  78. if r.config.Password != "" {
  79. opts = append(opts, redis.DialPassword(r.config.Password))
  80. }
  81. dialFunction := func() (redis.Conn, error) {
  82. conn, err := redis.Dial("tcp", r.ConnectionString, opts...)
  83. if err == nil {
  84. _, err = conn.Do("PING")
  85. if err == nil {
  86. return conn, nil
  87. }
  88. }
  89. return nil, fmt.Errorf("could not dial redis: %s", err)
  90. }
  91. pool := &redis.Pool{
  92. IdleTimeout: 0,
  93. MaxIdle: 10,
  94. Dial: dialFunction,
  95. }
  96. return nil, pool
  97. }
  98. func connectionString(host string, port int) string {
  99. return fmt.Sprintf("%s:%d", host, port)
  100. }
  101. func (r *Instance) Disconnect() error {
  102. r.mu.Lock()
  103. defer r.mu.Unlock()
  104. if r == nil {
  105. return nil
  106. }
  107. err := r.pool.Close()
  108. r.pool = nil
  109. return err
  110. }
  111. func (r *Instance) Apply(f func(conn redis.Conn) error) error {
  112. connection := r.pool.Get()
  113. defer connection.Close()
  114. return f(connection)
  115. }