sink_test.go 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package memory
  15. import (
  16. "fmt"
  17. "reflect"
  18. "testing"
  19. "github.com/benbjohnson/clock"
  20. "github.com/lf-edge/ekuiper/internal/conf"
  21. "github.com/lf-edge/ekuiper/internal/io/memory/pubsub"
  22. "github.com/lf-edge/ekuiper/internal/topo/context"
  23. "github.com/lf-edge/ekuiper/pkg/api"
  24. )
  25. func TestUpdate(t *testing.T) {
  26. contextLogger := conf.Log.WithField("rule", "test2")
  27. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  28. ms := GetSink()
  29. err := ms.Configure(map[string]interface{}{"topic": "testupdate", "rowkindField": "verb", "keyField": "id"})
  30. if err != nil {
  31. t.Error(err)
  32. return
  33. }
  34. err = ms.Open(ctx)
  35. if err != nil {
  36. t.Error(err)
  37. return
  38. }
  39. data := []map[string]interface{}{
  40. {"id": "1", "verb": "insert", "name": "test1"},
  41. {"id": "2", "verb": "insert", "name": "test2"},
  42. {"id": "1", "verb": "update", "name": "test1"},
  43. {"id": "2", "verb": "delete", "name": "test2"},
  44. }
  45. c := pubsub.CreateSub("testupdate", nil, "testSource", 100)
  46. go func() {
  47. for _, d := range data {
  48. ms.Collect(ctx, d)
  49. }
  50. }()
  51. var actual []api.SourceTuple
  52. for i := 0; i < 4; i++ {
  53. d := <-c
  54. fmt.Println(d)
  55. actual = append(actual, d)
  56. }
  57. mc := conf.Clock.(*clock.Mock)
  58. expects := []api.SourceTuple{
  59. &pubsub.UpdatableTuple{
  60. DefaultSourceTuple: api.NewDefaultSourceTupleWithTime(map[string]interface{}{"id": "1", "verb": "insert", "name": "test1"}, map[string]interface{}{"topic": "testupdate"}, mc.Now()),
  61. Rowkind: "insert",
  62. Keyval: "1",
  63. },
  64. &pubsub.UpdatableTuple{
  65. DefaultSourceTuple: api.NewDefaultSourceTupleWithTime(map[string]interface{}{"id": "2", "verb": "insert", "name": "test2"}, map[string]interface{}{"topic": "testupdate"}, mc.Now()),
  66. Rowkind: "insert",
  67. Keyval: "2",
  68. },
  69. &pubsub.UpdatableTuple{
  70. DefaultSourceTuple: api.NewDefaultSourceTupleWithTime(map[string]interface{}{"id": "1", "verb": "update", "name": "test1"}, map[string]interface{}{"topic": "testupdate"}, mc.Now()),
  71. Rowkind: "update",
  72. Keyval: "1",
  73. },
  74. &pubsub.UpdatableTuple{
  75. DefaultSourceTuple: api.NewDefaultSourceTupleWithTime(map[string]interface{}{"id": "2", "verb": "delete", "name": "test2"}, map[string]interface{}{"topic": "testupdate"}, mc.Now()),
  76. Rowkind: "delete",
  77. Keyval: "2",
  78. },
  79. }
  80. if !reflect.DeepEqual(actual, expects) {
  81. t.Errorf("expect %v but got %v", expects, actual)
  82. }
  83. }