redis.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "errors"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/pkg/errorx"
  19. "time"
  20. "github.com/go-redis/redis/v7"
  21. "github.com/lf-edge/ekuiper/pkg/api"
  22. "github.com/lf-edge/ekuiper/pkg/cast"
  23. )
  24. type RedisSink struct {
  25. // host:port address.
  26. addr string
  27. username string
  28. // Optional password. Must match the password specified in the
  29. password string
  30. // Database to be selected after connecting to the server.
  31. db int
  32. // key of field
  33. field string
  34. // key define
  35. key string
  36. dataType string
  37. expiration time.Duration
  38. sendSingle bool
  39. cli *redis.Client
  40. }
  41. func (r *RedisSink) Configure(props map[string]interface{}) error {
  42. if i, ok := props["addr"]; ok {
  43. if i, ok := i.(string); ok {
  44. r.addr = i
  45. }
  46. } else {
  47. return errors.New("redis addr is null")
  48. }
  49. if i, ok := props["password"]; ok {
  50. if i, ok := i.(string); ok {
  51. r.password = i
  52. }
  53. }
  54. r.db = 0
  55. if i, ok := props["db"]; ok {
  56. if t, err := cast.ToInt(i, cast.STRICT); err == nil {
  57. r.db = t
  58. }
  59. }
  60. if i, ok := props["key"]; ok {
  61. if i, ok := i.(string); ok {
  62. r.key = i
  63. }
  64. } else {
  65. return errors.New("not config data key for redis")
  66. }
  67. if i, ok := props["field"]; ok {
  68. if i, ok := i.(string); ok {
  69. r.field = i
  70. }
  71. }
  72. r.sendSingle = true
  73. if i, ok := props["sendSingle"]; ok {
  74. if i, ok := i.(bool); ok {
  75. r.sendSingle = i
  76. }
  77. }
  78. r.dataType = "string"
  79. if i, ok := props["dataType"]; ok {
  80. if i, ok := i.(string); ok {
  81. r.dataType = i
  82. }
  83. }
  84. r.expiration = -1
  85. if i, ok := props["expiration"]; ok {
  86. if t, err := cast.ToInt(i, cast.STRICT); err == nil {
  87. r.expiration = time.Duration(t)
  88. }
  89. }
  90. return nil
  91. }
  92. func (r *RedisSink) Open(ctx api.StreamContext) (err error) {
  93. logger := ctx.GetLogger()
  94. logger.Debug("Opening redis sink")
  95. r.cli = redis.NewClient(&redis.Options{
  96. Addr: r.addr,
  97. Username: r.username,
  98. Password: r.password,
  99. DB: r.db, // use default DB
  100. })
  101. return nil
  102. }
  103. func (r *RedisSink) Collect(ctx api.StreamContext, data interface{}) error {
  104. logger := ctx.GetLogger()
  105. v, _, err := ctx.TransformOutput(data)
  106. if err != nil {
  107. logger.Error(err)
  108. return err
  109. }
  110. if r.field != "" {
  111. switch out := data.(type) {
  112. case []map[string]interface{}:
  113. for _, m := range out {
  114. key := r.field
  115. field, ok := m[key].(string)
  116. if ok {
  117. key = field
  118. }
  119. if r.dataType == "list" {
  120. err := r.cli.LPush(key, v).Err()
  121. if err != nil {
  122. logger.Error(err)
  123. return fmt.Errorf("%s:%s", errorx.IOErr, err.Error())
  124. }
  125. logger.Debugf("send redis list success, key:%s data: %s", key, string(v))
  126. } else {
  127. err := r.cli.Set(key, v, r.expiration*time.Second).Err()
  128. if err != nil {
  129. logger.Error(err)
  130. return fmt.Errorf("%s:%s", errorx.IOErr, err.Error())
  131. }
  132. logger.Debugf("send redis string success, key:%s data: %s", key, string(v))
  133. }
  134. }
  135. case map[string]interface{}:
  136. key := r.field
  137. field, ok := out[key].(string)
  138. if ok {
  139. key = field
  140. }
  141. if r.dataType == "list" {
  142. err := r.cli.LPush(key, v).Err()
  143. if err != nil {
  144. logger.Error(err)
  145. return fmt.Errorf("%s:%s", errorx.IOErr, err.Error())
  146. }
  147. logger.Debugf("send redis list success, key:%s data: %s", key, string(v))
  148. } else {
  149. err := r.cli.Set(key, v, r.expiration*time.Second).Err()
  150. if err != nil {
  151. logger.Error(err)
  152. return fmt.Errorf("%s:%s", errorx.IOErr, err.Error())
  153. }
  154. logger.Debugf("send redis string success, key:%s data: %s", key, string(v))
  155. }
  156. }
  157. } else if r.key != "" {
  158. if r.dataType == "list" {
  159. err := r.cli.LPush(r.key, v).Err()
  160. if err != nil {
  161. logger.Error(err)
  162. return fmt.Errorf("%s:%s", errorx.IOErr, err.Error())
  163. }
  164. logger.Debugf("send redis list success, key:%s data: %s", r.key, string(v))
  165. } else {
  166. err := r.cli.Set(r.key, v, r.expiration*time.Second).Err()
  167. if err != nil {
  168. logger.Error(err)
  169. return fmt.Errorf("%s:%s", errorx.IOErr, err.Error())
  170. }
  171. logger.Debugf("send redis string success, key:%s data: %s", r.key, string(v))
  172. }
  173. }
  174. logger.Debug("insert success %v", data)
  175. return nil
  176. }
  177. func (r *RedisSink) Close(ctx api.StreamContext) error {
  178. err := r.cli.Close()
  179. return err
  180. }
  181. func Redis() api.Sink {
  182. return &RedisSink{}
  183. }