send_manager_test.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. // Copyright 2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package sink
  15. import (
  16. "context"
  17. "fmt"
  18. "testing"
  19. "time"
  20. "github.com/benbjohnson/clock"
  21. "github.com/lf-edge/ekuiper/internal/conf"
  22. )
  23. func TestSendManager(t *testing.T) {
  24. testcases := []struct {
  25. sendCount int
  26. batchSize int
  27. lingerInterval int
  28. err string
  29. expectItems int
  30. }{
  31. {
  32. batchSize: 0,
  33. lingerInterval: 0,
  34. err: "either batchSize or lingerInterval should be larger than 0",
  35. },
  36. {
  37. sendCount: 3,
  38. batchSize: 3,
  39. lingerInterval: 0,
  40. expectItems: 3,
  41. },
  42. {
  43. sendCount: 4,
  44. batchSize: 10,
  45. lingerInterval: 100,
  46. expectItems: 4,
  47. },
  48. {
  49. sendCount: 4,
  50. batchSize: 0,
  51. lingerInterval: 100,
  52. expectItems: 4,
  53. },
  54. {
  55. sendCount: 6,
  56. batchSize: 3,
  57. lingerInterval: 3000,
  58. expectItems: 3,
  59. },
  60. }
  61. mc := conf.Clock.(*clock.Mock)
  62. for i, tc := range testcases {
  63. mc.Set(mc.Now())
  64. testF := func() error {
  65. sm, err := NewSendManager(tc.batchSize, tc.lingerInterval)
  66. if len(tc.err) > 0 {
  67. if err == nil || err.Error() != tc.err {
  68. return fmt.Errorf("expect err:%v, actual: %v", tc.err, err)
  69. }
  70. return nil
  71. }
  72. ctx, cancel := context.WithCancel(context.Background())
  73. defer cancel()
  74. go sm.Run(ctx)
  75. for i := 0; i < tc.sendCount; i++ {
  76. sm.RecvData(map[string]interface{}{})
  77. mc.Add(30 * time.Millisecond)
  78. }
  79. r := <-sm.GetOutputChan()
  80. if len(r) != tc.expectItems {
  81. return fmt.Errorf("testcase %v expect %v output data, actual %v", i, tc.expectItems, len(r))
  82. }
  83. return nil
  84. }
  85. if err := testF(); err != nil {
  86. t.Fatal(err)
  87. }
  88. }
  89. }
  90. func TestSendEmpty(t *testing.T) {
  91. sm, err := NewSendManager(1, 1)
  92. if err != nil {
  93. t.Fatal(err)
  94. }
  95. sm.outputCh = make(chan []map[string]interface{})
  96. // test shouldn't be blocked
  97. sm.send()
  98. }
  99. func TestCancelRun(t *testing.T) {
  100. testcases := []struct {
  101. batchSize int
  102. lingerInterval int
  103. }{
  104. {
  105. batchSize: 0,
  106. lingerInterval: 1,
  107. },
  108. {
  109. batchSize: 3,
  110. lingerInterval: 0,
  111. },
  112. {
  113. batchSize: 10,
  114. lingerInterval: 100,
  115. },
  116. }
  117. for _, tc := range testcases {
  118. sm, err := NewSendManager(tc.batchSize, tc.lingerInterval)
  119. if err != nil {
  120. t.Fatal(err)
  121. }
  122. ctx, cancel := context.WithCancel(context.Background())
  123. c := make(chan struct{})
  124. go func() {
  125. sm.Run(ctx)
  126. c <- struct{}{}
  127. }()
  128. cancel()
  129. <-c
  130. if !sm.finished {
  131. t.Fatal("send manager should be finished")
  132. }
  133. }
  134. }
  135. func TestEnlargeSendManagerCap(t *testing.T) {
  136. sm, err := NewSendManager(0, 1000)
  137. if err != nil {
  138. t.Fatal(err)
  139. }
  140. count := 1025
  141. for i := 0; i < count; i++ {
  142. go sm.RecvData(map[string]interface{}{})
  143. sm.appendDataInBuffer(<-sm.bufferCh, false)
  144. }
  145. if len(sm.buffer) != count {
  146. t.Fatal(fmt.Sprintf("sm buffer should be %v", count))
  147. }
  148. if sm.currIndex != count {
  149. t.Fatal(fmt.Sprintf("sm index should be %v", count))
  150. }
  151. originCap := cap(sm.buffer)
  152. originLen := len(sm.buffer)
  153. sm.send()
  154. if sm.currIndex != 0 || originCap != cap(sm.buffer) || originLen != len(sm.buffer) {
  155. t.Fatal("sm buffer capacity shouldn't be changed after send")
  156. }
  157. }