Browse Source

address the comment

Signed-off-by: yisaer <disxiaofei@163.com>
yisaer 2 years ago
parent
commit
ab2934811a

+ 14 - 1
internal/topo/node/cache/sync_cache.go

@@ -1,4 +1,4 @@
-// Copyright 2022 EMQ Technologies Co., Ltd.
+// Copyright 2022-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -116,6 +116,14 @@ type SyncCache struct {
 	sendStatus   int // 0: idle, 1: sending and waiting for ack, 2: stopped for error
 	//serialize
 	store kv.KeyValue
+
+	exitCh chan<- struct{}
+}
+
+func NewSyncCacheWithExitChanel(ctx api.StreamContext, in <-chan []map[string]interface{}, errCh chan<- error, stats metric.StatManager, cacheConf *conf.SinkConf, bufferLength int, exitCh chan<- struct{}) *SyncCache {
+	c := NewSyncCache(ctx, in, errCh, stats, cacheConf, bufferLength)
+	c.exitCh = exitCh
+	return c
 }
 
 func NewSyncCache(ctx api.StreamContext, in <-chan []map[string]interface{}, errCh chan<- error, stats metric.StatManager, cacheConf *conf.SinkConf, bufferLength int) *SyncCache {
@@ -420,6 +428,11 @@ func (c *SyncCache) initStore(ctx api.StreamContext) {
 
 // save memory states to disk
 func (c *SyncCache) onClose(ctx api.StreamContext) {
+	defer func() {
+		if c.exitCh != nil {
+			c.exitCh <- struct{}{}
+		}
+	}()
 	ctx.GetLogger().Infof("sink node %s instance cache %d closing", ctx.GetOpId(), ctx.GetInstanceId())
 	if c.cacheConf.CleanCacheAtStop {
 		kvTable := path.Join("sink", ctx.GetRuleId()+ctx.GetOpId()+strconv.Itoa(ctx.GetInstanceId()))

+ 3 - 2
internal/topo/node/cache/sync_cache_test.go

@@ -205,15 +205,16 @@ func TestRun(t *testing.T) {
 			t.Fatal(err)
 			return
 		}()
+		exitCh := make(chan struct{})
 		// send data
-		sc := NewSyncCache(ctx, in, errCh, stats, tt.sconf, 100)
+		sc := NewSyncCacheWithExitChanel(ctx, in, errCh, stats, tt.sconf, 100, exitCh)
 		for i := 0; i < tt.stopPt; i++ {
 			in <- tt.dataIn[i]
 			time.Sleep(1 * time.Millisecond)
 		}
 		cancel()
 		// wait cleanup job done
-		time.Sleep(1 * time.Second)
+		<-exitCh
 
 		// send the second half data
 		ctx, cancel = context.WithValue(context.Background(), context.LoggerKey, contextLogger).WithMeta(fmt.Sprintf("rule%d", i), fmt.Sprintf("op%d", i), tempStore).WithCancel()