123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265 |
- // Copyright 2022-2023 EMQ Technologies Co., Ltd.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package cache
- import (
- "fmt"
- "os"
- "path/filepath"
- "reflect"
- "testing"
- "time"
- "github.com/lf-edge/ekuiper/internal/conf"
- "github.com/lf-edge/ekuiper/internal/testx"
- "github.com/lf-edge/ekuiper/internal/topo/context"
- "github.com/lf-edge/ekuiper/internal/topo/node/metric"
- "github.com/lf-edge/ekuiper/internal/topo/state"
- "github.com/lf-edge/ekuiper/pkg/api"
- )
- func TestPage(t *testing.T) {
- p := newPage(2)
- if !p.isEmpty() {
- t.Errorf("page is not empty")
- }
- if !p.append([]map[string]interface{}{
- {"a": 1},
- }) {
- t.Fatal("append failed")
- }
- if !p.append([]map[string]interface{}{
- {"a": 2},
- }) {
- t.Fatal("append failed")
- }
- if p.append([]map[string]interface{}{
- {"a": 3},
- }) {
- t.Fatal("should append fail")
- }
- v, ok := p.peak()
- if !ok {
- t.Fatal("peak failed")
- }
- if !reflect.DeepEqual(v, []map[string]interface{}{
- {"a": 1},
- }) {
- t.Fatalf("peak value mismatch, expect 1 but got %v", v)
- }
- if p.append([]map[string]interface{}{
- {"a": 4},
- }) {
- t.Fatal("should append failed")
- }
- if !p.delete() {
- t.Fatal("delete failed")
- }
- v, ok = p.peak()
- if !ok {
- t.Fatal("peak failed")
- }
- if !reflect.DeepEqual(v, []map[string]interface{}{
- {"a": 2},
- }) {
- t.Fatalf("peak value mismatch, expect 2 but got %v", v)
- }
- p.reset()
- if !p.append([]map[string]interface{}{
- {"a": 5},
- }) {
- t.Fatal("append failed")
- }
- if p.isEmpty() {
- t.Fatal("page should not empty")
- }
- if !p.delete() {
- t.Fatal("delete failed")
- }
- if !p.append([]map[string]interface{}{
- {"a": 5},
- }) {
- t.Fatal("append failed")
- }
- if !p.append([]map[string]interface{}{
- {"a": 6},
- }) {
- t.Fatal("append failed")
- }
- if !p.delete() {
- t.Fatal("delete failed")
- }
- if !p.delete() {
- t.Fatal("delete failed")
- }
- if p.delete() {
- t.Fatal("should delete failed")
- }
- if !p.isEmpty() {
- t.Fatal("page should be empty")
- }
- }
- // TestRun test for
- // 1. cache in memory only
- // 2. cache in memory and disk buffer only
- // 3. cache in memory and disk
- // 4. cache in memory and disk buffer and overflow
- // Each flow test rule restart
- // Each flow use slightly different config like bufferPageSize
- func TestRun(t *testing.T) {
- tests := []struct {
- sconf *conf.SinkConf
- dataIn [][]map[string]interface{}
- dataOut [][]map[string]interface{}
- stopPt int // restart the rule in this point
- }{
- { // 0
- sconf: &conf.SinkConf{
- MemoryCacheThreshold: 4,
- MaxDiskCache: 12,
- BufferPageSize: 2,
- EnableCache: true,
- ResendInterval: 0,
- CleanCacheAtStop: false,
- },
- dataIn: [][]map[string]interface{}{
- {{"a": 1}}, {{"a": 2}}, {{"a": 3}}, {{"a": 4}}, {{"a": 5}},
- },
- stopPt: 4,
- },
- { // 1
- sconf: &conf.SinkConf{
- MemoryCacheThreshold: 4,
- MaxDiskCache: 8,
- BufferPageSize: 2,
- EnableCache: true,
- ResendInterval: 0,
- CleanCacheAtStop: false,
- },
- dataIn: [][]map[string]interface{}{
- {{"a": 1}}, {{"a": 2}}, {{"a": 3}}, {{"a": 4}}, {{"a": 5}}, {{"a": 6}},
- },
- stopPt: 5,
- },
- { // 2
- sconf: &conf.SinkConf{
- MemoryCacheThreshold: 1,
- MaxDiskCache: 8,
- BufferPageSize: 1,
- EnableCache: true,
- ResendInterval: 0,
- CleanCacheAtStop: false,
- },
- dataIn: [][]map[string]interface{}{
- {{"a": 1}}, {{"a": 2}}, {{"a": 3}}, {{"a": 4}}, {{"a": 5}}, {{"a": 6}},
- },
- stopPt: 4,
- },
- { // 3
- sconf: &conf.SinkConf{
- MemoryCacheThreshold: 2,
- MaxDiskCache: 4,
- BufferPageSize: 2,
- EnableCache: true,
- ResendInterval: 0,
- CleanCacheAtStop: false,
- },
- dataIn: [][]map[string]interface{}{
- {{"a": 1}}, {{"a": 2}}, {{"a": 3}}, {{"a": 4}}, {{"a": 5}}, {{"a": 6}}, {{"a": 7}}, {{"a": 8}}, {{"a": 9}}, {{"a": 10}}, {{"a": 11}}, {{"a": 12}}, {{"a": 13}},
- },
- dataOut: [][]map[string]interface{}{
- {{"a": 1}}, {{"a": 6}}, {{"a": 7}}, {{"a": 8}}, {{"a": 9}}, {{"a": 10}}, {{"a": 11}}, {{"a": 12}}, {{"a": 13}},
- },
- stopPt: 4,
- },
- }
- testx.InitEnv()
- fmt.Printf("The test bucket size is %d.\n\n", len(tests))
- tempStore, _ := state.CreateStore("mock", api.AtMostOnce)
- deleteCachedb()
- for i, tt := range tests {
- contextLogger := conf.Log.WithField("rule", fmt.Sprintf("TestRun-%d", i))
- ctx, cancel := context.WithValue(context.Background(), context.LoggerKey, contextLogger).WithMeta(fmt.Sprintf("rule%d", i), fmt.Sprintf("op%d", i), tempStore).WithCancel()
- stats, err := metric.NewStatManager(ctx, "sink")
- if err != nil {
- t.Fatal(err)
- return
- }
- in := make(chan []map[string]interface{})
- errCh := make(chan error)
- var result []interface{}
- go func() {
- err := <-errCh
- t.Log(err)
- return
- }()
- exitCh := make(chan struct{})
- // send data
- _ = NewSyncCacheWithExitChanel(ctx, in, errCh, stats, tt.sconf, 100, exitCh)
- for i := 0; i < tt.stopPt; i++ {
- in <- tt.dataIn[i]
- time.Sleep(1 * time.Millisecond)
- }
- cancel()
- // wait cleanup job done
- <-exitCh
- // send the second half data
- ctx, cancel = context.WithValue(context.Background(), context.LoggerKey, contextLogger).WithMeta(fmt.Sprintf("rule%d", i), fmt.Sprintf("op%d", i), tempStore).WithCancel()
- sc := NewSyncCache(ctx, in, errCh, stats, tt.sconf, 100)
- for i := tt.stopPt; i < len(tt.dataIn); i++ {
- in <- tt.dataIn[i]
- time.Sleep(1 * time.Millisecond)
- }
- loop:
- for range tt.dataIn {
- sc.Ack <- true
- select {
- case r := <-sc.Out:
- result = append(result, r)
- case <-time.After(1 * time.Second):
- t.Log(fmt.Sprintf("test %d no data", i))
- break loop
- }
- }
- cancel()
- if tt.dataOut == nil {
- tt.dataOut = tt.dataIn
- }
- if len(tt.dataOut) != len(result) {
- t.Errorf("test %d data mismatch\nexpect\t%v\nbut got\t%v", i, tt.dataOut, result)
- continue
- }
- for i, v := range result {
- if !reflect.DeepEqual(tt.dataOut[i], v) {
- t.Errorf("test %d data mismatch\nexpect\t%v\nbut got\t%v", i, tt.dataOut, result)
- break
- }
- }
- }
- }
- func deleteCachedb() {
- loc, err := conf.GetDataLoc()
- if err != nil {
- fmt.Println(err)
- }
- err = os.RemoveAll(filepath.Join(loc, "cache.db"))
- if err != nil {
- fmt.Println(err)
- }
- }
|