Quellcode durchsuchen

feat(sink): resend strategy (#2112)

* feat(sink): support resend priority

Signed-off-by: Jiyong Huang <huangjy@emqx.io>

* feat(sink): support resend indicator

If the tuple is resending, the resend indicator is set to true

Signed-off-by: Jiyong Huang <huangjy@emqx.io>

* fix(cache): remove async send to keep order

Also refine the tests

Signed-off-by: Jiyong Huang <huangjy@emqx.io>

* docs(sink): resend policy

Signed-off-by: Jiyong Huang <huangjy@emqx.io>

---------

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
ngjaying vor 1 Jahr
Ursprung
Commit
7af9b26d29

+ 38 - 8
docs/en_US/guide/sinks/overview.md

@@ -115,20 +115,50 @@ The storage location of the offline cache is determined by the storage configura
 
 
 Each sink can configure its own caching mechanism. The caching process is the same for each sink. If caching is enabled, all sink's events go through two phases: first, saving all content to the cache; then deleting the cache after receiving an ack.
 Each sink can configure its own caching mechanism. The caching process is the same for each sink. If caching is enabled, all sink's events go through two phases: first, saving all content to the cache; then deleting the cache after receiving an ack.
 
 
-- Error detection: After a failed send, sink should identify recoverable failures (network, etc.) by returning a specific error type, which will return a failed ack so that the cache can be retained. For successful sends or unrecoverable errors, a successful ack will be sent to delete the cache.
-- Cache mechanism: The cache will first be kept in memory. If the memory threshold is exceeded, the later cache will be saved to disk. Once the disk cache exceeds the disk storage threshold, the cache will start to rotate, i.e. the earliest cache in memory will be discarded and the earliest cache on disk will be loaded instead.
-- Resend policy: Currently the caching mechanism can only run in the default synchronous mode, where if a message is being sent, it will wait for the result of the send to continue sending the next cached data. Otherwise, when new data arrives, the first data in the cache is sent to detect network conditions. If the send is successful, all caches in memory and on disk are sent in a sequential chain. Chained sends can define a send interval to prevent message storms.
+- Error detection: After a failed send, sink should identify recoverable failures (network, etc.) by returning a
+  specific error type, which will return a failed ack so that the cache can be retained. For successful sends or
+  unrecoverable errors, a successful ack will be sent to delete the cache.
+- Cache mechanism: The cache will first be kept in memory. If the memory threshold is exceeded, the later cache will be
+  saved to disk. Once the disk cache exceeds the disk storage threshold, the cache will start to rotate, i.e. the
+  earliest cache in memory will be discarded and the earliest cache on disk will be loaded instead.
+- Resend policy: Currently the caching mechanism can only run in the default synchronous mode, where if a message is
+  being sent, it will wait for the sending result to continue sending the next cached data. Otherwise, when new data
+  arrives, the first data in the cache is sent to detect network conditions. If the sending result is successful, all
+  caches in memory and on disk are sent in a sequential chain. Chained sends can define a send interval to prevent
+  message storms.
+- Separation of normal data and retransmission data: Users can configure retransmission data and normal data to be sent
+  separately to different destinations. It is also possible to configure the priority of sending. For example, send
+  normal data with higher priority. You can even change the content of the retransmission data. For example, add a field
+  to the retransmission data in order to distinguish it at the receiving end.
 
 
 ### Configuration
 ### Configuration
 
 
 There are two levels of configuration for the Sink cache. A global configuration in `etc/kuiper.yaml` that defines the default behavior of all rules. There is also a rule sink level definition to override the default behavior.
 There are two levels of configuration for the Sink cache. A global configuration in `etc/kuiper.yaml` that defines the default behavior of all rules. There is also a rule sink level definition to override the default behavior.
 
 
-- enableCache: whether to enable sink cache. cache storage configuration follows the configuration of the metadata store defined in `etc/kuiper.yaml`.
-- memoryCacheThreshold: the number of messages to be cached in memory. For performance reasons, the earliest cached messages are stored in memory so that they can be resent immediately upon failure recovery. Data here can be lost due to failures such as power outages.
-- maxDiskCache: The maximum number of messages to be cached on disk. The disk cache is first-in, first-out. If the disk cache is full, the earliest page of information will be loaded into the memory cache, replacing the old memory cache.
-- bufferPageSize. buffer pages are units of bulk reads/writes to disk to prevent frequent IO. if the pages are not full and eKuiper crashes due to hardware or software errors, the last unwritten pages to disk will be lost.
+- enableCache: whether to enable sink cache. cache storage configuration follows the configuration of the metadata store
+  defined in `etc/kuiper.yaml`.
+- memoryCacheThreshold: the number of messages to be cached in memory. For performance reasons, the earliest cached
+  messages are stored in memory so that they can be resent immediately upon failure recovery. Data here can be lost due
+  to failures such as power outages.
+- maxDiskCache: The maximum number of messages to be cached on disk. The disk cache is first-in, first-out. If the disk
+  cache is full, the earliest page of information will be loaded into the memory cache, replacing the old memory cache.
+- bufferPageSize. buffer pages are units of bulk reads/writes to disk to prevent frequent IO. if the pages are not full
+  and eKuiper crashes due to hardware or software errors, the last unwritten pages to disk will be lost.
 - resendInterval: The time interval to resend information after failure recovery to prevent message storms.
 - resendInterval: The time interval to resend information after failure recovery to prevent message storms.
-- cleanCacheAtStop: whether to clean all caches when the rule is stopped, to prevent mass resending of expired messages when the rule is restarted. If not set to true, the in-memory cache will be stored to disk once the rule is stopped. Otherwise, the memory and disk rules will be cleared out.
+- cleanCacheAtStop: whether to clean all caches when the rule is stopped, to prevent mass resending of expired messages
+  when the rule is restarted. If not set to true, the in-memory cache will be stored to disk once the rule is stopped.
+  Otherwise, the memory and disk rules will be cleared out.
+- resendAlterQueue: whether to use the alternate queue when resending the cache. If set to true, the cache will be sent
+  to the alternate queue instead of the original queue. This will result in real-time messages and resend messages being
+  sent using different queues and the order of the messages will change. The following resend-related configurations
+  will only take effect if set to true.
+- resendPriority: resend cached priority, int type, default is 0. -1 means resend real-time data first; 0 means equal
+  priority; 1 means resend cached data first.
+- resendIndicatorField: field name of the resend cache, the field type must be a bool value. If the field is set, it
+  will be set to true when resending. e.g., if resendIndicatorField is `resend`, then the `resend` field will be set to
+  true when resending the cache.
+- resendDestination: the destination to resend the cache to, which may have different meanings or support depending on
+  the sink. For example, the mqtt sink can send the resend data to a different topic.
 
 
 In the following example configuration of the rule, log sink has no cache-related options configured, so the global default configuration will be used; whereas mqtt sink performs its own caching policy configuration.
 In the following example configuration of the rule, log sink has no cache-related options configured, so the global default configuration will be used; whereas mqtt sink performs its own caching policy configuration.
 
 

Datei-Diff unterdrückt, da er zu groß ist
+ 68 - 52
docs/zh_CN/guide/sinks/overview.md


+ 14 - 7
internal/conf/conf.go

@@ -43,13 +43,15 @@ type tlsConf struct {
 }
 }
 
 
 type SinkConf struct {
 type SinkConf struct {
-	MemoryCacheThreshold int  `json:"memoryCacheThreshold" yaml:"memoryCacheThreshold"`
-	MaxDiskCache         int  `json:"maxDiskCache" yaml:"maxDiskCache"`
-	BufferPageSize       int  `json:"bufferPageSize" yaml:"bufferPageSize"`
-	EnableCache          bool `json:"enableCache" yaml:"enableCache"`
-	ResendInterval       int  `json:"resendInterval" yaml:"resendInterval"`
-	CleanCacheAtStop     bool `json:"cleanCacheAtStop" yaml:"cleanCacheAtStop"`
-	ResendAlterQueue     bool `json:"resendAlterQueue" yaml:"resendAlterQueue"`
+	MemoryCacheThreshold int    `json:"memoryCacheThreshold" yaml:"memoryCacheThreshold"`
+	MaxDiskCache         int    `json:"maxDiskCache" yaml:"maxDiskCache"`
+	BufferPageSize       int    `json:"bufferPageSize" yaml:"bufferPageSize"`
+	EnableCache          bool   `json:"enableCache" yaml:"enableCache"`
+	ResendInterval       int    `json:"resendInterval" yaml:"resendInterval"`
+	CleanCacheAtStop     bool   `json:"cleanCacheAtStop" yaml:"cleanCacheAtStop"`
+	ResendAlterQueue     bool   `json:"resendAlterQueue" yaml:"resendAlterQueue"`
+	ResendPriority       int    `json:"resendPriority" yaml:"resendPriority"`
+	ResendIndicatorField string `json:"resendIndicatorField" yaml:"resendIndicatorField"`
 }
 }
 
 
 // Validate the configuration and reset to the default value for invalid values.
 // Validate the configuration and reset to the default value for invalid values.
@@ -95,6 +97,11 @@ func (sc *SinkConf) Validate() error {
 		Log.Warnf("maxDiskCache is not a multiple of bufferPageSize, set to %d", sc.MaxDiskCache)
 		Log.Warnf("maxDiskCache is not a multiple of bufferPageSize, set to %d", sc.MaxDiskCache)
 		errs = errors.Join(errs, errors.New("maxDiskCacheNotMultiple:maxDiskCache must be a multiple of bufferPageSize"))
 		errs = errors.Join(errs, errors.New("maxDiskCacheNotMultiple:maxDiskCache must be a multiple of bufferPageSize"))
 	}
 	}
+	if sc.ResendPriority < -1 || sc.ResendPriority > 1 {
+		sc.ResendPriority = 0
+		Log.Warnf("resendPriority is not in [-1, 1], set to 0")
+		errs = errors.Join(errs, errors.New("resendPriority:resendPriority must be -1, 0 or 1"))
+	}
 	return errs
 	return errs
 }
 }
 
 

+ 160 - 1
internal/conf/conf_test.go

@@ -1,4 +1,4 @@
-// Copyright 2022 EMQ Technologies Co., Ltd.
+// Copyright 2022-2023 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -14,10 +14,13 @@
 package conf
 package conf
 
 
 import (
 import (
+	"errors"
 	"fmt"
 	"fmt"
 	"reflect"
 	"reflect"
 	"testing"
 	"testing"
 
 
+	"github.com/stretchr/testify/assert"
+
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 )
 )
 
 
@@ -225,3 +228,159 @@ func TestRuleOptionValidate(t *testing.T) {
 		}
 		}
 	}
 	}
 }
 }
+
+func TestSinkConf_Validate(t *testing.T) {
+	tests := []struct {
+		name    string
+		sc      SinkConf
+		wantErr error
+	}{
+		{
+			name: "valid config",
+			sc: SinkConf{
+				MemoryCacheThreshold: 1024,
+				MaxDiskCache:         1024000,
+				BufferPageSize:       256,
+				EnableCache:          true,
+				ResendInterval:       0,
+				CleanCacheAtStop:     true,
+				ResendAlterQueue:     true,
+				ResendPriority:       0,
+			},
+			wantErr: nil,
+		},
+		{
+			name: "invalid memoryCacheThreshold",
+			sc: SinkConf{
+				MemoryCacheThreshold: -1,
+				MaxDiskCache:         1024000,
+				BufferPageSize:       256,
+				EnableCache:          true,
+				ResendInterval:       0,
+				CleanCacheAtStop:     true,
+				ResendAlterQueue:     true,
+				ResendPriority:       0,
+			},
+			wantErr: errors.Join(errors.New("memoryCacheThreshold:memoryCacheThreshold must be positive")),
+		},
+		{
+			name: "invalid maxDiskCache",
+			sc: SinkConf{
+				MemoryCacheThreshold: 1024,
+				MaxDiskCache:         -1,
+				BufferPageSize:       256,
+				EnableCache:          true,
+				ResendInterval:       0,
+				CleanCacheAtStop:     true,
+				ResendAlterQueue:     true,
+				ResendPriority:       0,
+			},
+			wantErr: errors.Join(errors.New("maxDiskCache:maxDiskCache must be positive")),
+		},
+		{
+			name: "invalid bufferPageSize",
+			sc: SinkConf{
+				MemoryCacheThreshold: 1024,
+				MaxDiskCache:         1024000,
+				BufferPageSize:       0,
+				EnableCache:          true,
+				ResendInterval:       0,
+				CleanCacheAtStop:     true,
+				ResendAlterQueue:     true,
+				ResendPriority:       0,
+			},
+			wantErr: errors.Join(errors.New("bufferPageSize:bufferPageSize must be positive")),
+		},
+		{
+			name: "invalid resendInterval",
+			sc: SinkConf{
+				MemoryCacheThreshold: 1024,
+				MaxDiskCache:         1024000,
+				BufferPageSize:       256,
+				EnableCache:          true,
+				ResendInterval:       -1,
+				CleanCacheAtStop:     true,
+				ResendAlterQueue:     true,
+				ResendPriority:       0,
+			},
+			wantErr: errors.Join(errors.New("resendInterval:resendInterval must be positive")),
+		},
+		{
+			name: "memoryCacheThresholdTooSmall",
+			sc: SinkConf{
+				MemoryCacheThreshold: 128,
+				MaxDiskCache:         1024000,
+				BufferPageSize:       256,
+				EnableCache:          true,
+				ResendInterval:       0,
+				CleanCacheAtStop:     true,
+				ResendAlterQueue:     true,
+				ResendPriority:       0,
+			},
+			wantErr: errors.Join(errors.New("memoryCacheThresholdTooSmall:memoryCacheThreshold must be greater than or equal to bufferPageSize")),
+		},
+		{
+			name: "memoryCacheThresholdNotMultiple",
+			sc: SinkConf{
+				MemoryCacheThreshold: 300,
+				MaxDiskCache:         1024000,
+				BufferPageSize:       256,
+				EnableCache:          true,
+				ResendInterval:       0,
+				CleanCacheAtStop:     true,
+				ResendAlterQueue:     true,
+				ResendPriority:       0,
+			},
+			wantErr: errors.Join(errors.New("memoryCacheThresholdNotMultiple:memoryCacheThreshold must be a multiple of bufferPageSize")),
+		},
+		{
+			name: "maxDiskCacheTooSmall",
+			sc: SinkConf{
+				MemoryCacheThreshold: 1024,
+				MaxDiskCache:         128,
+				BufferPageSize:       256,
+				EnableCache:          true,
+				ResendInterval:       0,
+				CleanCacheAtStop:     true,
+				ResendAlterQueue:     true,
+				ResendPriority:       0,
+			},
+			wantErr: errors.Join(errors.New("maxDiskCacheTooSmall:maxDiskCache must be greater than bufferPageSize")),
+		},
+		{
+			name: "maxDiskCacheNotMultiple",
+			sc: SinkConf{
+				MemoryCacheThreshold: 1024,
+				MaxDiskCache:         300,
+				BufferPageSize:       256,
+				EnableCache:          true,
+				ResendInterval:       0,
+				CleanCacheAtStop:     true,
+				ResendAlterQueue:     true,
+				ResendPriority:       0,
+			},
+			wantErr: errors.Join(errors.New("maxDiskCacheNotMultiple:maxDiskCache must be a multiple of bufferPageSize")),
+		},
+		{
+			name: "invalid resendPriority",
+			sc: SinkConf{
+				MemoryCacheThreshold: 1024,
+				MaxDiskCache:         1024000,
+				BufferPageSize:       256,
+				EnableCache:          true,
+				ResendInterval:       0,
+				CleanCacheAtStop:     true,
+				ResendAlterQueue:     true,
+				ResendPriority:       2,
+			},
+			wantErr: errors.Join(errors.New("resendPriority:resendPriority must be -1, 0 or 1")),
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			err := tt.sc.Validate()
+			assert.Equal(t, tt.wantErr, err)
+		})
+	}
+}

+ 4 - 8
internal/topo/node/cache/sync_cache.go

@@ -160,15 +160,11 @@ func (c *SyncCache) run(ctx api.StreamContext) {
 		select {
 		select {
 		case item := <-c.in:
 		case item := <-c.in:
 			ctx.GetLogger().Debugf("send to cache")
 			ctx.GetLogger().Debugf("send to cache")
-			go func() { // avoid deadlock when cacheCtrl is full
-				c.cacheCtrl <- item
-			}()
+			c.cacheCtrl <- item
 		case isSuccess := <-c.Ack:
 		case isSuccess := <-c.Ack:
 			// only send the next sink after receiving an ack
 			// only send the next sink after receiving an ack
 			ctx.GetLogger().Debugf("cache ack")
 			ctx.GetLogger().Debugf("cache ack")
-			go func() {
-				c.cacheCtrl <- AckResult(isSuccess)
-			}()
+			c.cacheCtrl <- AckResult(isSuccess)
 		case data := <-c.cacheCtrl: // The only place to manipulate cache
 		case data := <-c.cacheCtrl: // The only place to manipulate cache
 			switch r := data.(type) {
 			switch r := data.(type) {
 			case AckResult:
 			case AckResult:
@@ -213,7 +209,7 @@ func (c *SyncCache) send(ctx api.StreamContext) {
 	}
 	}
 	d, ok := c.peakMemCache(ctx)
 	d, ok := c.peakMemCache(ctx)
 	if ok {
 	if ok {
-		ctx.GetLogger().Debugf("sending cache item %v", d)
+		ctx.GetLogger().Infof("sending cache item %v", d)
 		c.sendStatus = 1
 		c.sendStatus = 1
 		ctx.GetLogger().Debug("send status to 0 after sending tuple")
 		ctx.GetLogger().Debug("send status to 0 after sending tuple")
 		select {
 		select {
@@ -264,7 +260,7 @@ func (c *SyncCache) addCache(ctx api.StreamContext, item []map[string]interface{
 			ctx.GetLogger().Debugf("added cache to disk buffer page %v", c.diskBufferPage)
 			ctx.GetLogger().Debugf("added cache to disk buffer page %v", c.diskBufferPage)
 		}
 		}
 	} else {
 	} else {
-		ctx.GetLogger().Debugf("added cache to mem cache %v", c.memCache)
+		ctx.GetLogger().Infof("added cache to mem cache %v", item)
 	}
 	}
 	c.CacheLength++
 	c.CacheLength++
 	ctx.GetLogger().Debugf("added cache %d", c.CacheLength)
 	ctx.GetLogger().Debugf("added cache %d", c.CacheLength)

+ 102 - 43
internal/topo/node/sink_node.go

@@ -217,7 +217,7 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 										outs := itemToMap(data)
 										outs := itemToMap(data)
 										if sconf.Omitempty && (data == nil || len(outs) == 0) {
 										if sconf.Omitempty && (data == nil || len(outs) == 0) {
 											ctx.GetLogger().Debugf("receive empty in sink")
 											ctx.GetLogger().Debugf("receive empty in sink")
-											return nil
+											break
 										}
 										}
 										select {
 										select {
 										case dataCh <- outs:
 										case dataCh <- outs:
@@ -244,64 +244,123 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 							} else {
 							} else {
 								resendCh := make(chan []map[string]interface{}, sconf.BufferLength)
 								resendCh := make(chan []map[string]interface{}, sconf.BufferLength)
 								rq := cache.NewSyncCache(ctx, resendCh, result, &sconf.SinkConf, sconf.BufferLength)
 								rq := cache.NewSyncCache(ctx, resendCh, result, &sconf.SinkConf, sconf.BufferLength)
-								for {
+								receiveQ := func(data interface{}) {
+									processed := false
+									if data, processed = m.preprocess(data); processed {
+										return
+									}
+									stats.IncTotalRecordsIn()
+									stats.SetBufferLength(int64(len(dataCh) + c.CacheLength + rq.CacheLength))
+									outs := itemToMap(data)
+									if sconf.Omitempty && (data == nil || len(outs) == 0) {
+										ctx.GetLogger().Debugf("receive empty in sink")
+										return
+									}
 									select {
 									select {
-									case data := <-m.input:
-										processed := false
-										if data, processed = m.preprocess(data); processed {
-											break
+									case dataCh <- outs:
+									case <-ctx.Done():
+									}
+									select {
+									case resendCh <- nil:
+									case <-ctx.Done():
+									}
+								}
+								normalQ := func(data []map[string]interface{}) {
+									stats.ProcessTimeStart()
+									stats.SetBufferLength(int64(len(dataCh) + c.CacheLength + rq.CacheLength))
+									ctx.GetLogger().Debugf("sending data: %v", data)
+									err := doCollectMaps(ctx, sink, sconf, data, sendManager, stats, false)
+									ack := checkAck(ctx, data, err)
+									// If ack is false, add it to the resend queue
+									if !ack {
+										select {
+										case resendCh <- data:
+										case <-ctx.Done():
 										}
 										}
-										stats.IncTotalRecordsIn()
-										stats.SetBufferLength(int64(len(dataCh) + c.CacheLength + rq.CacheLength))
-										outs := itemToMap(data)
-										if sconf.Omitempty && (data == nil || len(outs) == 0) {
-											ctx.GetLogger().Debugf("receive empty in sink")
-											return nil
+									}
+									// Always ack for the normal queue as fail items are handled by the resend queue
+									select {
+									case c.Ack <- true:
+									case <-ctx.Done():
+									}
+									stats.ProcessTimeEnd()
+								}
+								resendQ := func(data []map[string]interface{}) {
+									ctx.GetLogger().Debugf("resend data: %v", data)
+									stats.SetBufferLength(int64(len(dataCh) + c.CacheLength + rq.CacheLength))
+									if sconf.ResendIndicatorField != "" {
+										for _, item := range data {
+											item[sconf.ResendIndicatorField] = true
 										}
 										}
+									}
+									err := doCollectMaps(ctx, sink, sconf, data, sendManager, stats, true)
+									ack := checkAck(ctx, data, err)
+									select {
+									case rq.Ack <- ack:
+									case <-ctx.Done():
+									}
+								}
+								doneQ := func() {
+									logger.Infof("sink node %s instance %d done", m.name, instance)
+									if err := sink.Close(ctx); err != nil {
+										logger.Warnf("close sink node %s instance %d fails: %v", m.name, instance, err)
+									}
+								}
+
+								if sconf.ResendPriority == 0 {
+									for {
 										select {
 										select {
-										case dataCh <- outs:
+										case data := <-m.input:
+											receiveQ(data)
+										case data := <-c.Out:
+											normalQ(data)
+										case data := <-rq.Out:
+											resendQ(data)
 										case <-ctx.Done():
 										case <-ctx.Done():
+											doneQ()
+											return nil
 										}
 										}
+									}
+								} else if sconf.ResendPriority < 0 { // normal queue has higher priority
+									for {
 										select {
 										select {
-										case resendCh <- nil:
+										case data := <-m.input:
+											receiveQ(data)
+										case data := <-c.Out:
+											normalQ(data)
 										case <-ctx.Done():
 										case <-ctx.Done():
-										}
-									case data := <-c.Out:
-										stats.ProcessTimeStart()
-										stats.SetBufferLength(int64(len(dataCh) + c.CacheLength + rq.CacheLength))
-										ctx.GetLogger().Debugf("sending data: %v", data)
-										err := doCollectMaps(ctx, sink, sconf, data, sendManager, stats, false)
-										ack := checkAck(ctx, data, err)
-										// If ack is false, add it to the resend queue
-										if !ack {
+											doneQ()
+											return nil
+										default:
 											select {
 											select {
-											case resendCh <- data:
-											case <-ctx.Done():
+											case data := <-c.Out:
+												normalQ(data)
+											case data := <-rq.Out:
+												resendQ(data)
 											}
 											}
 										}
 										}
-										// Always ack for the normal queue as fail items are handled by the resend queue
-										select {
-										case c.Ack <- true:
-										case <-ctx.Done():
-										}
-										stats.ProcessTimeEnd()
-									case data := <-rq.Out:
-										ctx.GetLogger().Debugf("resend data: %v", data)
-										stats.SetBufferLength(int64(len(dataCh) + c.CacheLength + rq.CacheLength))
-										err := doCollectMaps(ctx, sink, sconf, data, sendManager, stats, true)
-										ack := checkAck(ctx, data, err)
+									}
+								} else {
+									for {
 										select {
 										select {
-										case rq.Ack <- ack:
+										case data := <-m.input:
+											receiveQ(data)
+										case data := <-rq.Out:
+											resendQ(data)
 										case <-ctx.Done():
 										case <-ctx.Done():
+											doneQ()
+											return nil
+										default:
+											select {
+											case data := <-c.Out:
+												normalQ(data)
+											case data := <-rq.Out:
+												resendQ(data)
+											}
 										}
 										}
-									case <-ctx.Done():
-										logger.Infof("sink node %s instance %d done", m.name, instance)
-										if err := sink.Close(ctx); err != nil {
-											logger.Warnf("close sink node %s instance %d fails: %v", m.name, instance, err)
-										}
-										return nil
 									}
 									}
 								}
 								}
+
 							}
 							}
 						}
 						}
 					})
 					})

+ 132 - 75
internal/topo/node/sink_node_test.go

@@ -634,83 +634,140 @@ func TestSinkCache(t *testing.T) {
 
 
 	contextLogger := conf.Log.WithField("rule", "TestSinkCache")
 	contextLogger := conf.Log.WithField("rule", "TestSinkCache")
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
-	data := [][]map[string]interface{}{
-		{{"a": 1}},
-		{{"a": 2}},
-		{{"a": 3}},
-		{{"a": 4}},
-		{{"a": 5}},
-		{{"a": 6}},
-		{{"a": 7}},
-		{{"a": 8}},
-		{{"a": 9}},
-		{{"a": 10}},
+
+	tests := []struct {
+		name         string
+		config       map[string]interface{}
+		result       [][]byte
+		resendResult [][]byte
+	}{
+		{
+			name: "test cache",
+			config: map[string]interface{}{
+				"enableCache": true,
+			},
+			result: [][]byte{
+				[]byte(`[{"a":1}]`),
+				[]byte(`[{"a":2}]`),
+				[]byte(`[{"a":3}]`),
+				[]byte(`[{"a":4}]`),
+				[]byte(`[{"a":5}]`),
+				[]byte(`[{"a":6}]`),
+				[]byte(`[{"a":7}]`),
+				[]byte(`[{"a":8}]`),
+				[]byte(`[{"a":9}]`),
+				[]byte(`[{"a":10}]`),
+			},
+		},
+		{
+			name: "test resend",
+			config: map[string]interface{}{
+				"enableCache":      true,
+				"resendAlterQueue": true,
+			},
+			result: [][]byte{
+				[]byte(`[{"a":2}]`),
+				[]byte(`[{"a":4}]`),
+				[]byte(`[{"a":6}]`),
+				[]byte(`[{"a":8}]`),
+				[]byte(`[{"a":10}]`),
+			},
+			resendResult: [][]byte{
+				[]byte(`[{"a":1}]`),
+				[]byte(`[{"a":3}]`),
+				[]byte(`[{"a":5}]`),
+			},
+		},
+		{
+			name: "test resend priority low",
+			config: map[string]interface{}{
+				"enableCache":          true,
+				"resendAlterQueue":     true,
+				"resendPriority":       -1,
+				"resendIndicatorField": "isResend",
+			},
+			result: [][]byte{
+				[]byte(`[{"a":2}]`),
+				[]byte(`[{"a":4}]`),
+				[]byte(`[{"a":6}]`),
+				[]byte(`[{"a":8}]`),
+				[]byte(`[{"a":10}]`),
+			},
+			resendResult: [][]byte{
+				[]byte(`[{"a":1,"isResend":true}]`),
+				[]byte(`[{"a":3,"isResend":true}]`),
+				[]byte(`[{"a":5,"isResend":true}]`),
+			},
+		},
+		{
+			name: "test resend priority high",
+			config: map[string]interface{}{
+				"enableCache":      true,
+				"resendAlterQueue": true,
+				"resendPriority":   1,
+			},
+			result: [][]byte{
+				[]byte(`[{"a":2}]`),
+				[]byte(`[{"a":4}]`),
+				[]byte(`[{"a":6}]`),
+				[]byte(`[{"a":8}]`),
+				[]byte(`[{"a":10}]`),
+			},
+			resendResult: [][]byte{
+				[]byte(`[{"a":1}]`),
+				[]byte(`[{"a":3}]`),
+				[]byte(`[{"a":5}]`),
+			},
+		},
 	}
 	}
 
 
-	t.Run("test cache", func(t *testing.T) {
-		hitch := make(chan int, 10)
-		config := map[string]interface{}{
-			"enableCache": true,
-		}
-		result := [][]byte{
-			[]byte(`[{"a":1}]`),
-			[]byte(`[{"a":2}]`),
-			[]byte(`[{"a":3}]`),
-			[]byte(`[{"a":4}]`),
-			[]byte(`[{"a":5}]`),
-			[]byte(`[{"a":6}]`),
-			[]byte(`[{"a":7}]`),
-			[]byte(`[{"a":8}]`),
-			[]byte(`[{"a":9}]`),
-			[]byte(`[{"a":10}]`),
-		}
-		mockSink := mocknode.NewMockResendSink(hitch)
-		s := NewSinkNodeWithSink("mockSink", mockSink, config)
-		s.Open(ctx, make(chan error))
-		for i := 0; i < 200; i++ {
-			s.input <- data[i%10]
-			select {
-			case count := <-hitch:
-				if count == len(data)*2 {
-					goto end
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			data := [][]map[string]interface{}{
+				{{"a": 1}},
+				{{"a": 2}},
+				{{"a": 3}},
+				{{"a": 4}},
+				{{"a": 5}},
+				{{"a": 6}},
+				{{"a": 7}},
+				{{"a": 8}},
+				{{"a": 9}},
+				{{"a": 10}},
+			}
+			hitch := make(chan int, 10)
+			mockSink := mocknode.NewMockResendSink(hitch)
+			fmt.Printf("mockSink: %+v\n", tt.config)
+			s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
+			s.Open(ctx, make(chan error))
+			for i := 0; i < 200; i++ {
+				s.input <- data[i%len(data)]
+				select {
+				case <-hitch:
+					done := true
+					results := mockSink.GetResults()
+					if len(results) != len(tt.result) {
+						done = false
+					}
+					if done && tt.resendResult != nil {
+						resentResults := mockSink.GetResendResults()
+						if len(resentResults) != len(tt.resendResult) {
+							done = false
+						}
+					}
+					if done {
+						goto end
+					}
+				case <-time.After(1 * time.Second):
 				}
 				}
-			case <-time.After(1 * time.Second):
 			}
 			}
-		}
-	end:
-		results := mockSink.GetResults()
-		assert.Equal(t, result, results)
-	})
-
-	t.Run("test resend cache", func(t *testing.T) {
-		hitch := make(chan int, 10)
-		config := map[string]interface{}{
-			"enableCache":      true,
-			"resendAlterQueue": true,
-		}
-		result := [][]byte{
-			[]byte(`[{"a":2}]`),
-			[]byte(`[{"a":4}]`),
-			[]byte(`[{"a":6}]`),
-			[]byte(`[{"a":8}]`),
-			[]byte(`[{"a":10}]`),
-		}
-		resendResult := [][]byte{
-			[]byte(`[{"a":1}]`),
-			[]byte(`[{"a":3}]`),
-			[]byte(`[{"a":5}]`),
-		}
-		mockSink := mocknode.NewMockResendSink(hitch)
-		s := NewSinkNodeWithSink("mockSink", mockSink, config)
-		s.Open(ctx, make(chan error))
-		for _, d := range data {
-			s.input <- d
-			<-hitch
-		}
-		time.Sleep(1 * time.Second)
-		results := mockSink.GetResults()
-		assert.Equal(t, results, result)
-		resentResults := mockSink.GetResendResults()
-		assert.Equal(t, resendResult, resentResults[:3])
-	})
+		end:
+			results := mockSink.GetResults()
+			assert.Equal(t, results[:len(tt.result)], tt.result)
+			if tt.resendResult != nil {
+				resentResults := mockSink.GetResendResults()
+				assert.Equal(t, resentResults[:len(tt.resendResult)], tt.resendResult)
+			}
+		})
+	}
 }
 }