sink_test.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package redis
  15. import (
  16. econf "github.com/lf-edge/ekuiper/internal/conf"
  17. "github.com/lf-edge/ekuiper/internal/topo/context"
  18. "github.com/lf-edge/ekuiper/pkg/cast"
  19. "reflect"
  20. "testing"
  21. )
  22. func TestSink(t *testing.T) {
  23. s := &RedisSink{}
  24. err := s.Configure(map[string]interface{}{
  25. "addr": addr,
  26. "key": "test",
  27. })
  28. if err != nil {
  29. t.Error(err)
  30. return
  31. }
  32. contextLogger := econf.Log.WithField("rule", "test")
  33. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  34. err = s.Open(ctx)
  35. if err != nil {
  36. t.Error(err)
  37. return
  38. }
  39. var tests = []struct {
  40. c map[string]interface{}
  41. d interface{}
  42. k string
  43. v interface{}
  44. }{
  45. {
  46. c: map[string]interface{}{"key": "1"},
  47. d: map[string]interface{}{"id": 1, "name": "John", "address": 34, "mobile": "334433"},
  48. k: "1",
  49. v: `{"address":34,"id":1,"mobile":"334433","name":"John"}`,
  50. },
  51. {
  52. c: map[string]interface{}{"field": "id"},
  53. d: map[string]interface{}{"id": 2, "name": "Susan", "address": 34, "mobile": "334433"},
  54. k: "2",
  55. v: `{"address":34,"id":2,"mobile":"334433","name":"Susan"}`,
  56. },
  57. {
  58. c: map[string]interface{}{"field": "name", "datatype": "list"},
  59. d: map[string]interface{}{"id": 3, "name": "Susan"},
  60. k: "Susan",
  61. v: `{"id":3,"name":"Susan"}`,
  62. },
  63. {
  64. c: map[string]interface{}{"field": "id", "datatype": "list"},
  65. d: []map[string]interface{}{
  66. {"id": 4, "name": "Susan"},
  67. {"id": 4, "name": "Bob"},
  68. {"id": 4, "name": "John"},
  69. },
  70. k: "4",
  71. v: `{"id":4,"name":"John"}`,
  72. },
  73. {
  74. c: map[string]interface{}{"field": "id", "datatype": "string"},
  75. d: []map[string]interface{}{
  76. {"id": 25, "name": "Susan"},
  77. {"id": 25, "name": "Bob"},
  78. {"id": 25, "name": "John"},
  79. },
  80. k: "25",
  81. v: `{"id":25,"name":"John"}`,
  82. },
  83. }
  84. for i, tt := range tests {
  85. cast.MapToStruct(tt.c, s.c)
  86. err = s.Collect(ctx, tt.d)
  87. if err != nil {
  88. t.Error(err)
  89. return
  90. }
  91. var (
  92. r string
  93. err error
  94. )
  95. switch tt.c["datatype"] {
  96. case "list":
  97. r, err = mr.Lpop(tt.k)
  98. default:
  99. r, err = mr.Get(tt.k)
  100. }
  101. if err != nil {
  102. t.Errorf("case %d err %v", i, err)
  103. return
  104. }
  105. if !reflect.DeepEqual(r, tt.v) {
  106. t.Errorf("case %d expect %v, but got %v", i, tt.v, r)
  107. }
  108. }
  109. }
  110. func TestUpdateString(t *testing.T) {
  111. s := &RedisSink{}
  112. err := s.Configure(map[string]interface{}{
  113. "addr": addr,
  114. "field": "id",
  115. "rowkindField": "action",
  116. })
  117. if err != nil {
  118. t.Error(err)
  119. return
  120. }
  121. contextLogger := econf.Log.WithField("rule", "test")
  122. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  123. err = s.Open(ctx)
  124. if err != nil {
  125. t.Error(err)
  126. return
  127. }
  128. var tests = []struct {
  129. d interface{}
  130. k string
  131. v interface{}
  132. }{
  133. {
  134. d: map[string]interface{}{ // add without action
  135. "id": "testUpdate1", "name": "Susan",
  136. },
  137. k: "testUpdate1",
  138. v: `{"id":"testUpdate1","name":"Susan"}`,
  139. },
  140. {
  141. d: map[string]interface{}{ // update with action
  142. "action": "update", "id": "testUpdate1", "name": "John",
  143. },
  144. k: "testUpdate1",
  145. v: `{"action":"update","id":"testUpdate1","name":"John"}`,
  146. },
  147. {
  148. d: map[string]interface{}{ // delete
  149. "action": "delete", "id": "testUpdate1",
  150. },
  151. k: "testUpdate1",
  152. v: ``,
  153. },
  154. {
  155. d: []map[string]interface{}{ // multiple actions
  156. {"action": "delete", "id": "testUpdate1"},
  157. {"action": "insert", "id": "testUpdate1", "name": "Susan"},
  158. },
  159. k: "testUpdate1",
  160. v: `{"action":"insert","id":"testUpdate1","name":"Susan"}`,
  161. },
  162. }
  163. for i, tt := range tests {
  164. err = s.Collect(ctx, tt.d)
  165. if err != nil {
  166. t.Error(err)
  167. return
  168. }
  169. r, err := mr.Get(tt.k)
  170. if tt.v == "" {
  171. if err == nil || err.Error() != "ERR no such key" {
  172. t.Errorf("case %d err %v", i, err)
  173. return
  174. }
  175. } else {
  176. if err != nil {
  177. t.Errorf("case %d err %v", i, err)
  178. return
  179. }
  180. if !reflect.DeepEqual(r, tt.v) {
  181. t.Errorf("case %d expect %v, but got %v", i, tt.v, r)
  182. }
  183. }
  184. }
  185. }
  186. func TestUpdateList(t *testing.T) {
  187. s := &RedisSink{}
  188. err := s.Configure(map[string]interface{}{
  189. "addr": addr,
  190. "field": "id",
  191. "datatype": "list",
  192. "rowkindField": "action",
  193. })
  194. if err != nil {
  195. t.Error(err)
  196. return
  197. }
  198. contextLogger := econf.Log.WithField("rule", "test")
  199. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  200. err = s.Open(ctx)
  201. if err != nil {
  202. t.Error(err)
  203. return
  204. }
  205. var tests = []struct {
  206. d interface{}
  207. k string
  208. v []string
  209. }{
  210. {
  211. d: map[string]interface{}{ // add without action
  212. "id": "testUpdateList", "name": "Susan",
  213. },
  214. k: "testUpdateList",
  215. v: []string{`{"id":"testUpdateList","name":"Susan"}`},
  216. },
  217. {
  218. d: map[string]interface{}{ // update with action
  219. "action": "update", "id": "testUpdateList", "name": "John",
  220. },
  221. k: "testUpdateList",
  222. v: []string{`{"action":"update","id":"testUpdateList","name":"John"}`, `{"id":"testUpdateList","name":"Susan"}`},
  223. },
  224. {
  225. d: map[string]interface{}{ // delete
  226. "action": "delete", "id": "testUpdateList",
  227. },
  228. k: "testUpdateList",
  229. v: []string{`{"id":"testUpdateList","name":"Susan"}`},
  230. },
  231. {
  232. d: []map[string]interface{}{ // multiple actions
  233. {"action": "delete", "id": "testUpdateList"},
  234. {"action": "insert", "id": "testUpdateList", "name": "Susan"},
  235. },
  236. k: "testUpdateList",
  237. v: []string{`{"action":"insert","id":"testUpdateList","name":"Susan"}`},
  238. },
  239. {
  240. d: map[string]interface{}{ // delete
  241. "action": "delete", "id": "testUpdateList",
  242. },
  243. k: "testUpdateList",
  244. v: nil,
  245. },
  246. }
  247. for i, tt := range tests {
  248. err = s.Collect(ctx, tt.d)
  249. if err != nil {
  250. t.Error(err)
  251. return
  252. }
  253. r, err := mr.List(tt.k)
  254. if tt.v == nil {
  255. if err == nil || err.Error() != "ERR no such key" {
  256. t.Errorf("case %d err %v", i, err)
  257. return
  258. }
  259. } else {
  260. if err != nil {
  261. t.Errorf("case %d err %v", i, err)
  262. return
  263. }
  264. if !reflect.DeepEqual(r, tt.v) {
  265. t.Errorf("case %d expect %v, but got %v", i, tt.v, r)
  266. }
  267. }
  268. }
  269. }