sink_test.go 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package redis
  15. import (
  16. "reflect"
  17. "testing"
  18. econf "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/internal/topo/context"
  20. "github.com/lf-edge/ekuiper/pkg/cast"
  21. )
  22. func TestSink(t *testing.T) {
  23. s := &RedisSink{}
  24. err := s.Configure(map[string]interface{}{
  25. "addr": addr,
  26. "key": "test",
  27. })
  28. if err != nil {
  29. t.Error(err)
  30. return
  31. }
  32. contextLogger := econf.Log.WithField("rule", "test")
  33. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  34. err = s.Open(ctx)
  35. if err != nil {
  36. t.Error(err)
  37. return
  38. }
  39. tests := []struct {
  40. c map[string]interface{}
  41. d interface{}
  42. k string
  43. v interface{}
  44. }{
  45. {
  46. c: map[string]interface{}{"key": "1"},
  47. d: map[string]interface{}{"id": 1, "name": "John", "address": 34, "mobile": "334433"},
  48. k: "1",
  49. v: `{"address":34,"id":1,"mobile":"334433","name":"John"}`,
  50. },
  51. {
  52. c: map[string]interface{}{"field": "id"},
  53. d: map[string]interface{}{"id": 2, "name": "Susan", "address": 34, "mobile": "334433"},
  54. k: "2",
  55. v: `{"address":34,"id":2,"mobile":"334433","name":"Susan"}`,
  56. },
  57. {
  58. c: map[string]interface{}{"field": "name", "datatype": "list"},
  59. d: map[string]interface{}{"id": 3, "name": "Susan"},
  60. k: "Susan",
  61. v: `{"id":3,"name":"Susan"}`,
  62. },
  63. {
  64. c: map[string]interface{}{"field": "id", "datatype": "list"},
  65. d: []map[string]interface{}{
  66. {"id": 4, "name": "Susan"},
  67. {"id": 4, "name": "Bob"},
  68. {"id": 4, "name": "John"},
  69. },
  70. k: "4",
  71. v: `{"id":4,"name":"John"}`,
  72. },
  73. {
  74. c: map[string]interface{}{"field": "id", "datatype": "string"},
  75. d: []map[string]interface{}{
  76. {"id": 25, "name": "Susan"},
  77. {"id": 25, "name": "Bob"},
  78. {"id": 25, "name": "John"},
  79. },
  80. k: "25",
  81. v: `{"id":25,"name":"John"}`,
  82. },
  83. }
  84. for i, tt := range tests {
  85. cast.MapToStruct(tt.c, s.c)
  86. err = s.Collect(ctx, tt.d)
  87. if err != nil {
  88. t.Error(err)
  89. return
  90. }
  91. var (
  92. r string
  93. err error
  94. )
  95. switch tt.c["datatype"] {
  96. case "list":
  97. r, err = mr.Lpop(tt.k)
  98. default:
  99. r, err = mr.Get(tt.k)
  100. }
  101. if err != nil {
  102. t.Errorf("case %d err %v", i, err)
  103. return
  104. }
  105. if !reflect.DeepEqual(r, tt.v) {
  106. t.Errorf("case %d expect %v, but got %v", i, tt.v, r)
  107. }
  108. }
  109. }
  110. func TestSinkMultipleFields(t *testing.T) {
  111. s := &RedisSink{}
  112. err := s.Configure(map[string]interface{}{
  113. "addr": addr,
  114. "key": "test",
  115. })
  116. if err != nil {
  117. t.Error(err)
  118. return
  119. }
  120. contextLogger := econf.Log.WithField("rule", "test")
  121. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  122. err = s.Open(ctx)
  123. if err != nil {
  124. t.Error(err)
  125. return
  126. }
  127. tests := []struct {
  128. c map[string]interface{}
  129. d interface{}
  130. kvPair map[string]interface{}
  131. }{
  132. {
  133. c: map[string]interface{}{"keyType": "multiple"},
  134. d: map[string]interface{}{"id": 1, "name": "John", "address": 34, "mobile": "334433"},
  135. kvPair: map[string]interface{}{"id": "1", "name": "John", "address": "34", "mobile": "334433"},
  136. },
  137. {
  138. c: map[string]interface{}{"keyType": "multiple", "datatype": "string"},
  139. d: []map[string]interface{}{
  140. {"id": 24, "name": "Susan"},
  141. {"id": 25, "name": "Bob"},
  142. {"id": 26, "name": "John"},
  143. },
  144. kvPair: map[string]interface{}{"id": "26", "name": "John"},
  145. },
  146. {
  147. c: map[string]interface{}{"datatype": "list", "keyType": "multiple"},
  148. d: map[string]interface{}{
  149. "listId": 4, "listName": "Susan",
  150. },
  151. kvPair: map[string]interface{}{"listId": "4", "listName": "Susan"},
  152. },
  153. {
  154. c: map[string]interface{}{"datatype": "list", "keyType": "multiple"},
  155. d: []map[string]interface{}{
  156. {"listId": 4, "listName": "Susan"},
  157. {"listId": 5, "listName": "Bob"},
  158. {"listId": 6, "listName": "John"},
  159. },
  160. kvPair: map[string]interface{}{"listId": "6", "listName": "John"},
  161. },
  162. }
  163. for i, tt := range tests {
  164. cast.MapToStruct(tt.c, s.c)
  165. err = s.Collect(ctx, tt.d)
  166. if err != nil {
  167. t.Error(err)
  168. return
  169. }
  170. var (
  171. r string
  172. err error
  173. )
  174. for k, v := range tt.kvPair {
  175. switch tt.c["datatype"] {
  176. case "list":
  177. r, err = mr.Lpop(k)
  178. default:
  179. r, err = mr.Get(k)
  180. }
  181. if err != nil {
  182. t.Errorf("case %d err %v", i, err)
  183. return
  184. }
  185. if !reflect.DeepEqual(r, v) {
  186. t.Errorf("case %d expect %v, but got %v", i, v, r)
  187. }
  188. }
  189. }
  190. }
  191. func TestUpdateString(t *testing.T) {
  192. s := &RedisSink{}
  193. err := s.Configure(map[string]interface{}{
  194. "addr": addr,
  195. "field": "id",
  196. "rowkindField": "action",
  197. })
  198. if err != nil {
  199. t.Error(err)
  200. return
  201. }
  202. contextLogger := econf.Log.WithField("rule", "test")
  203. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  204. err = s.Open(ctx)
  205. if err != nil {
  206. t.Error(err)
  207. return
  208. }
  209. tests := []struct {
  210. d interface{}
  211. k string
  212. v interface{}
  213. }{
  214. {
  215. d: map[string]interface{}{ // add without action
  216. "id": "testUpdate1", "name": "Susan",
  217. },
  218. k: "testUpdate1",
  219. v: `{"id":"testUpdate1","name":"Susan"}`,
  220. },
  221. {
  222. d: map[string]interface{}{ // update with action
  223. "action": "update", "id": "testUpdate1", "name": "John",
  224. },
  225. k: "testUpdate1",
  226. v: `{"action":"update","id":"testUpdate1","name":"John"}`,
  227. },
  228. {
  229. d: map[string]interface{}{ // delete
  230. "action": "delete", "id": "testUpdate1",
  231. },
  232. k: "testUpdate1",
  233. v: ``,
  234. },
  235. {
  236. d: []map[string]interface{}{ // multiple actions
  237. {"action": "delete", "id": "testUpdate1"},
  238. {"action": "insert", "id": "testUpdate1", "name": "Susan"},
  239. },
  240. k: "testUpdate1",
  241. v: `{"action":"insert","id":"testUpdate1","name":"Susan"}`,
  242. },
  243. }
  244. for i, tt := range tests {
  245. err = s.Collect(ctx, tt.d)
  246. if err != nil {
  247. t.Error(err)
  248. return
  249. }
  250. r, err := mr.Get(tt.k)
  251. if tt.v == "" {
  252. if err == nil || err.Error() != "ERR no such key" {
  253. t.Errorf("case %d err %v", i, err)
  254. return
  255. }
  256. } else {
  257. if err != nil {
  258. t.Errorf("case %d err %v", i, err)
  259. return
  260. }
  261. if !reflect.DeepEqual(r, tt.v) {
  262. t.Errorf("case %d expect %v, but got %v", i, tt.v, r)
  263. }
  264. }
  265. }
  266. }
  267. func TestUpdateList(t *testing.T) {
  268. s := &RedisSink{}
  269. err := s.Configure(map[string]interface{}{
  270. "addr": addr,
  271. "field": "id",
  272. "datatype": "list",
  273. "rowkindField": "action",
  274. })
  275. if err != nil {
  276. t.Error(err)
  277. return
  278. }
  279. contextLogger := econf.Log.WithField("rule", "test")
  280. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  281. err = s.Open(ctx)
  282. if err != nil {
  283. t.Error(err)
  284. return
  285. }
  286. tests := []struct {
  287. d interface{}
  288. k string
  289. v []string
  290. }{
  291. {
  292. d: map[string]interface{}{ // add without action
  293. "id": "testUpdateList", "name": "Susan",
  294. },
  295. k: "testUpdateList",
  296. v: []string{`{"id":"testUpdateList","name":"Susan"}`},
  297. },
  298. {
  299. d: map[string]interface{}{ // update with action
  300. "action": "update", "id": "testUpdateList", "name": "John",
  301. },
  302. k: "testUpdateList",
  303. v: []string{`{"action":"update","id":"testUpdateList","name":"John"}`, `{"id":"testUpdateList","name":"Susan"}`},
  304. },
  305. {
  306. d: map[string]interface{}{ // delete
  307. "action": "delete", "id": "testUpdateList",
  308. },
  309. k: "testUpdateList",
  310. v: []string{`{"id":"testUpdateList","name":"Susan"}`},
  311. },
  312. {
  313. d: []map[string]interface{}{ // multiple actions
  314. {"action": "delete", "id": "testUpdateList"},
  315. {"action": "insert", "id": "testUpdateList", "name": "Susan"},
  316. },
  317. k: "testUpdateList",
  318. v: []string{`{"action":"insert","id":"testUpdateList","name":"Susan"}`},
  319. },
  320. {
  321. d: map[string]interface{}{ // delete
  322. "action": "delete", "id": "testUpdateList",
  323. },
  324. k: "testUpdateList",
  325. v: nil,
  326. },
  327. }
  328. for i, tt := range tests {
  329. err = s.Collect(ctx, tt.d)
  330. if err != nil {
  331. t.Error(err)
  332. return
  333. }
  334. r, err := mr.List(tt.k)
  335. if tt.v == nil {
  336. if err == nil || err.Error() != "ERR no such key" {
  337. t.Errorf("case %d err %v", i, err)
  338. return
  339. }
  340. } else {
  341. if err != nil {
  342. t.Errorf("case %d err %v", i, err)
  343. return
  344. }
  345. if !reflect.DeepEqual(r, tt.v) {
  346. t.Errorf("case %d expect %v, but got %v", i, tt.v, r)
  347. }
  348. }
  349. }
  350. }
  351. func TestRedisSink_Configure(t *testing.T) {
  352. type args struct {
  353. props map[string]interface{}
  354. }
  355. tests := []struct {
  356. name string
  357. args args
  358. wantErr bool
  359. }{
  360. {
  361. name: "missing key and field and default keyType is single",
  362. args: args{map[string]interface{}{
  363. "addr": addr,
  364. "datatype": "list",
  365. }},
  366. wantErr: true,
  367. },
  368. {
  369. name: "missing key and field and keyType is multiple",
  370. args: args{map[string]interface{}{
  371. "addr": addr,
  372. "datatype": "list",
  373. "keyType": "multiple",
  374. }},
  375. wantErr: false,
  376. },
  377. {
  378. name: "key type do not support",
  379. args: args{map[string]interface{}{
  380. "addr": addr,
  381. "datatype": "list",
  382. "keyType": "ttt",
  383. }},
  384. wantErr: true,
  385. },
  386. {
  387. name: "data type do not support",
  388. args: args{map[string]interface{}{
  389. "addr": addr,
  390. "datatype": "stream",
  391. "keyType": "multiple",
  392. }},
  393. wantErr: true,
  394. },
  395. }
  396. for _, tt := range tests {
  397. t.Run(tt.name, func(t *testing.T) {
  398. r := &RedisSink{
  399. c: nil,
  400. }
  401. if err := r.Configure(tt.args.props); (err != nil) != tt.wantErr {
  402. t.Errorf("Configure() error = %v, wantErr %v", err, tt.wantErr)
  403. }
  404. })
  405. }
  406. }