redis.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. // Copyright 2021 INTECH Process Automation Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package redis
  15. import (
  16. "fmt"
  17. "github.com/gomodule/redigo/redis"
  18. "sync"
  19. "time"
  20. )
  21. type Instance struct {
  22. ConnectionString string
  23. pool *redis.Pool
  24. mu *sync.Mutex
  25. config Config
  26. }
  27. func NewRedisFromConf(conf Config) Instance {
  28. host := conf.Host
  29. port := conf.Port
  30. return Instance{
  31. ConnectionString: connectionString(host, port),
  32. pool: nil,
  33. mu: &sync.Mutex{},
  34. config: conf,
  35. }
  36. }
  37. func NewRedis(host string, port int) Instance {
  38. return Instance{
  39. ConnectionString: connectionString(host, port),
  40. pool: nil,
  41. mu: &sync.Mutex{},
  42. }
  43. }
  44. func (r *Instance) Connect() error {
  45. r.mu.Lock()
  46. defer r.mu.Unlock()
  47. if r.ConnectionString == "" {
  48. return fmt.Errorf("connection string for redis not initalized")
  49. }
  50. err, pool := r.connectRedis()
  51. if err != nil {
  52. return err
  53. }
  54. conn := pool.Get()
  55. defer conn.Close()
  56. reply, err := conn.Do("PING")
  57. if err != nil {
  58. return err
  59. }
  60. response, err := redis.String(reply, err)
  61. if err != nil {
  62. return err
  63. }
  64. if response != "PONG" {
  65. return fmt.Errorf("failed to connect to redis")
  66. }
  67. r.pool = pool
  68. return nil
  69. }
  70. func (r *Instance) connectRedis() (error, *redis.Pool) {
  71. opts := []redis.DialOption{
  72. redis.DialConnectTimeout(time.Duration(r.config.Timeout) * time.Millisecond),
  73. }
  74. if r.config.Password != "" {
  75. opts = append(opts, redis.DialPassword(r.config.Password))
  76. }
  77. dialFunction := func() (redis.Conn, error) {
  78. conn, err := redis.Dial("tcp", r.ConnectionString, opts...)
  79. if err == nil {
  80. _, err = conn.Do("PING")
  81. if err == nil {
  82. return conn, nil
  83. }
  84. }
  85. return nil, fmt.Errorf("could not dial redis: %s", err)
  86. }
  87. pool := &redis.Pool{
  88. IdleTimeout: 0,
  89. MaxIdle: 10,
  90. Dial: dialFunction,
  91. }
  92. return nil, pool
  93. }
  94. func connectionString(host string, port int) string {
  95. return fmt.Sprintf("%s:%d", host, port)
  96. }
  97. func (r *Instance) Disconnect() error {
  98. r.mu.Lock()
  99. defer r.mu.Unlock()
  100. if r == nil {
  101. return nil
  102. }
  103. err := r.pool.Close()
  104. r.pool = nil
  105. return err
  106. }
  107. func (r *Instance) Apply(f func(conn redis.Conn) error) error {
  108. connection := r.pool.Get()
  109. defer connection.Close()
  110. return f(connection)
  111. }