send_manager_test.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. // Copyright 2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package sink
  15. import (
  16. "context"
  17. "fmt"
  18. "testing"
  19. "time"
  20. "github.com/benbjohnson/clock"
  21. "github.com/lf-edge/ekuiper/internal/conf"
  22. )
  23. func TestSendManager(t *testing.T) {
  24. testcases := []struct {
  25. sendCount int
  26. batchSize int
  27. lingerInterval int
  28. err string
  29. expectItems int
  30. }{
  31. {
  32. batchSize: 0,
  33. lingerInterval: 0,
  34. err: "either batchSize or lingerInterval should be larger than 0",
  35. },
  36. {
  37. sendCount: 3,
  38. batchSize: 3,
  39. lingerInterval: 0,
  40. expectItems: 3,
  41. },
  42. {
  43. sendCount: 4,
  44. batchSize: 10,
  45. lingerInterval: 100,
  46. expectItems: 4,
  47. },
  48. {
  49. sendCount: 4,
  50. batchSize: 0,
  51. lingerInterval: 100,
  52. expectItems: 4,
  53. },
  54. }
  55. mc := conf.Clock.(*clock.Mock)
  56. for i, tc := range testcases {
  57. mc.Set(mc.Now())
  58. testF := func() error {
  59. sm, err := NewSendManager(tc.batchSize, tc.lingerInterval)
  60. if len(tc.err) > 0 {
  61. if err == nil || err.Error() != tc.err {
  62. return fmt.Errorf("expect err:%v, actual: %v", tc.err, err)
  63. }
  64. return nil
  65. }
  66. ctx, cancel := context.WithCancel(context.Background())
  67. defer cancel()
  68. go sm.Run(ctx)
  69. for i := 0; i < tc.sendCount; i++ {
  70. sm.RecvData(map[string]interface{}{})
  71. mc.Add(30 * time.Millisecond)
  72. }
  73. r := <-sm.GetOutputChan()
  74. if len(r) != tc.expectItems {
  75. return fmt.Errorf("testcase %v expect %v output data, actual %v", i, tc.expectItems, len(r))
  76. }
  77. return nil
  78. }
  79. if err := testF(); err != nil {
  80. t.Fatal(err)
  81. }
  82. }
  83. }
  84. func TestSendEmpty(t *testing.T) {
  85. sm, err := NewSendManager(1, 1)
  86. if err != nil {
  87. t.Fatal(err)
  88. }
  89. sm.outputCh = make(chan []map[string]interface{})
  90. // test shouldn't be blocked
  91. sm.send()
  92. }
  93. func TestCancelRun(t *testing.T) {
  94. testcases := []struct {
  95. batchSize int
  96. lingerInterval int
  97. }{
  98. {
  99. batchSize: 0,
  100. lingerInterval: 1,
  101. },
  102. {
  103. batchSize: 3,
  104. lingerInterval: 0,
  105. },
  106. {
  107. batchSize: 10,
  108. lingerInterval: 100,
  109. },
  110. }
  111. for _, tc := range testcases {
  112. sm, err := NewSendManager(tc.batchSize, tc.lingerInterval)
  113. if err != nil {
  114. t.Fatal(err)
  115. }
  116. ctx, cancel := context.WithCancel(context.Background())
  117. c := make(chan struct{})
  118. go func() {
  119. sm.Run(ctx)
  120. c <- struct{}{}
  121. }()
  122. cancel()
  123. <-c
  124. if !sm.finished {
  125. t.Fatal("send manager should be finished")
  126. }
  127. }
  128. }