sync_cache_test.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package cache
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/conf"
  18. "github.com/lf-edge/ekuiper/internal/testx"
  19. "github.com/lf-edge/ekuiper/internal/topo/context"
  20. "github.com/lf-edge/ekuiper/internal/topo/node/metric"
  21. "github.com/lf-edge/ekuiper/internal/topo/state"
  22. "github.com/lf-edge/ekuiper/pkg/api"
  23. "os"
  24. "path/filepath"
  25. "reflect"
  26. "testing"
  27. "time"
  28. )
  29. func TestPage(t *testing.T) {
  30. p := newPage(2)
  31. if !p.isEmpty() {
  32. t.Errorf("page is not empty")
  33. }
  34. if !p.append(1) {
  35. t.Fatal("append failed")
  36. }
  37. if !p.append(2) {
  38. t.Fatal("append failed")
  39. }
  40. if p.append(3) {
  41. t.Fatal("should append fail")
  42. }
  43. v, ok := p.peak()
  44. if !ok {
  45. t.Fatal("peak failed")
  46. }
  47. if v != 1 {
  48. t.Fatalf("peak value mismatch, expect 3 but got %v", v)
  49. }
  50. if p.append(4) {
  51. t.Fatal("should append failed")
  52. }
  53. if !p.delete() {
  54. t.Fatal("delete failed")
  55. }
  56. v, ok = p.peak()
  57. if !ok {
  58. t.Fatal("peak failed")
  59. }
  60. if v != 2 {
  61. t.Fatalf("peak value mismatch, expect 2 but got %v", v)
  62. }
  63. p.reset()
  64. if !p.append(5) {
  65. t.Fatal("append failed")
  66. }
  67. if p.isEmpty() {
  68. t.Fatal("page should not empty")
  69. }
  70. if !p.delete() {
  71. t.Fatal("delete failed")
  72. }
  73. if !p.append(5) {
  74. t.Fatal("append failed")
  75. }
  76. if !p.append(6) {
  77. t.Fatal("append failed")
  78. }
  79. if !p.delete() {
  80. t.Fatal("delete failed")
  81. }
  82. if !p.delete() {
  83. t.Fatal("delete failed")
  84. }
  85. if p.delete() {
  86. t.Fatal("should delete failed")
  87. }
  88. if !p.isEmpty() {
  89. t.Fatal("page should be empty")
  90. }
  91. }
  92. // TestRun test for
  93. // 1. cache in memory only
  94. // 2. cache in memory and disk buffer only
  95. // 3. cache in memory and disk
  96. // 4. cache in memory and disk buffer and overflow
  97. // Each flow test rule restart
  98. // Each flow use slightly different config like bufferPageSize
  99. func TestRun(t *testing.T) {
  100. var tests = []struct {
  101. sconf *conf.SinkConf
  102. dataIn []interface{}
  103. dataOut []interface{}
  104. stopPt int // restart the rule in this point
  105. }{
  106. { // 0
  107. sconf: &conf.SinkConf{
  108. MemoryCacheThreshold: 6,
  109. MaxDiskCache: 12,
  110. BufferPageSize: 3,
  111. EnableCache: true,
  112. ResendInterval: 0,
  113. CleanCacheAtStop: false,
  114. },
  115. dataIn: []interface{}{
  116. 1, 2, 3, 4, 5,
  117. },
  118. stopPt: 4,
  119. },
  120. { // 1
  121. sconf: &conf.SinkConf{
  122. MemoryCacheThreshold: 4,
  123. MaxDiskCache: 8,
  124. BufferPageSize: 2,
  125. EnableCache: true,
  126. ResendInterval: 0,
  127. CleanCacheAtStop: false,
  128. },
  129. dataIn: []interface{}{
  130. 1, 2, 3, 4, 5, 6,
  131. },
  132. stopPt: 5,
  133. },
  134. { // 2
  135. sconf: &conf.SinkConf{
  136. MemoryCacheThreshold: 1,
  137. MaxDiskCache: 8,
  138. BufferPageSize: 1,
  139. EnableCache: true,
  140. ResendInterval: 0,
  141. CleanCacheAtStop: false,
  142. },
  143. dataIn: []interface{}{
  144. 1, 2, 3, 4, 5, 6,
  145. },
  146. stopPt: 4,
  147. },
  148. { // 3
  149. sconf: &conf.SinkConf{
  150. MemoryCacheThreshold: 2,
  151. MaxDiskCache: 4,
  152. BufferPageSize: 2,
  153. EnableCache: true,
  154. ResendInterval: 0,
  155. CleanCacheAtStop: false,
  156. },
  157. dataIn: []interface{}{
  158. 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13,
  159. },
  160. dataOut: []interface{}{
  161. 1, 6, 7, 8, 9, 10, 11, 12, 13,
  162. },
  163. stopPt: 4,
  164. },
  165. }
  166. testx.InitEnv()
  167. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  168. tempStore, _ := state.CreateStore("mock", api.AtMostOnce)
  169. deleteCachedb()
  170. for i, tt := range tests {
  171. contextLogger := conf.Log.WithField("rule", fmt.Sprintf("TestRun-%d", i))
  172. ctx, cancel := context.WithValue(context.Background(), context.LoggerKey, contextLogger).WithMeta(fmt.Sprintf("rule%d", i), fmt.Sprintf("op%d", i), tempStore).WithCancel()
  173. stats, err := metric.NewStatManager(ctx, "sink")
  174. if err != nil {
  175. t.Fatal(err)
  176. return
  177. }
  178. in := make(chan interface{})
  179. errCh := make(chan error)
  180. var result []interface{}
  181. go func() {
  182. err := <-errCh
  183. t.Fatal(err)
  184. return
  185. }()
  186. // send data
  187. sc := NewSyncCache(ctx, in, errCh, stats, tt.sconf, 100)
  188. for i := 0; i < tt.stopPt; i++ {
  189. in <- tt.dataIn[i]
  190. time.Sleep(1 * time.Millisecond)
  191. }
  192. cancel()
  193. // send the second half data
  194. ctx, cancel = context.WithValue(context.Background(), context.LoggerKey, contextLogger).WithMeta(fmt.Sprintf("rule%d", i), fmt.Sprintf("op%d", i), tempStore).WithCancel()
  195. sc = NewSyncCache(ctx, in, errCh, stats, tt.sconf, 100)
  196. for i := tt.stopPt; i < len(tt.dataIn); i++ {
  197. in <- tt.dataIn[i]
  198. time.Sleep(1 * time.Millisecond)
  199. }
  200. loop:
  201. for range tt.dataIn {
  202. sc.Ack <- true
  203. select {
  204. case r := <-sc.Out:
  205. result = append(result, r)
  206. case <-time.After(1 * time.Second):
  207. t.Log(fmt.Sprintf("test %d no data", i))
  208. break loop
  209. }
  210. }
  211. cancel()
  212. if tt.dataOut == nil {
  213. tt.dataOut = tt.dataIn
  214. }
  215. if !reflect.DeepEqual(tt.dataOut, result) {
  216. t.Errorf("test %d data mismatch\nexpect\t%v\nbut got\t%v", i, tt.dataOut, result)
  217. }
  218. }
  219. }
  220. func deleteCachedb() {
  221. loc, err := conf.GetDataLoc()
  222. if err != nil {
  223. fmt.Println(err)
  224. }
  225. err = os.RemoveAll(filepath.Join(loc, "cache.db"))
  226. if err != nil {
  227. fmt.Println(err)
  228. }
  229. }