Преглед изворни кода

feat(sink): support resend queue (#2107)

Allow sending failure data in another queue to separate data from the normal sink

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
ngjaying пре 1 година
родитељ
комит
cbc5996e41

+ 1 - 0
internal/conf/conf.go

@@ -49,6 +49,7 @@ type SinkConf struct {
 	EnableCache          bool `json:"enableCache" yaml:"enableCache"`
 	ResendInterval       int  `json:"resendInterval" yaml:"resendInterval"`
 	CleanCacheAtStop     bool `json:"cleanCacheAtStop" yaml:"cleanCacheAtStop"`
+	ResendAlterQueue     bool `json:"resendAlterQueue" yaml:"resendAlterQueue"`
 }
 
 // Validate the configuration and reset to the default value for invalid values.

+ 14 - 3
internal/io/mqtt/mqtt_sink.go

@@ -31,6 +31,7 @@ type AdConf struct {
 	Qos         byte   `json:"qos"`
 	Retained    bool   `json:"retained"`
 	Compression string `json:"compression"`
+	ResendTopic string `json:"resendDestination"`
 }
 
 type MQTTSink struct {
@@ -51,7 +52,10 @@ func (ms *MQTTSink) hasKeys(str []string, ps map[string]interface{}) bool {
 
 func (ms *MQTTSink) Configure(ps map[string]interface{}) error {
 	adconf := &AdConf{}
-	cast.MapToStruct(ps, adconf)
+	err := cast.MapToStruct(ps, adconf)
+	if err != nil {
+		return err
+	}
 
 	if adconf.Tpc == "" {
 		return fmt.Errorf("mqtt sink is missing property topic")
@@ -59,7 +63,6 @@ func (ms *MQTTSink) Configure(ps map[string]interface{}) error {
 	if adconf.Qos != 0 && adconf.Qos != 1 && adconf.Qos != 2 {
 		return fmt.Errorf("invalid qos value %v, the value could be only int 0 or 1 or 2", adconf.Qos)
 	}
-	var err error
 	if adconf.Compression != "" {
 		ms.compressor, err = compressor.GetCompressor(adconf.Compression)
 		if err != nil {
@@ -84,6 +87,14 @@ func (ms *MQTTSink) Open(ctx api.StreamContext) error {
 }
 
 func (ms *MQTTSink) Collect(ctx api.StreamContext, item interface{}) error {
+	return ms.collectWithTopic(ctx, item, ms.adconf.Tpc)
+}
+
+func (ms *MQTTSink) CollectResend(ctx api.StreamContext, item interface{}) error {
+	return ms.collectWithTopic(ctx, item, ms.adconf.ResendTopic)
+}
+
+func (ms *MQTTSink) collectWithTopic(ctx api.StreamContext, item interface{}, topic string) error {
 	logger := ctx.GetLogger()
 	jsonBytes, _, err := ctx.TransformOutput(item)
 	if err != nil {
@@ -97,7 +108,7 @@ func (ms *MQTTSink) Collect(ctx api.StreamContext, item interface{}) error {
 		}
 	}
 
-	tpc, err := ctx.ParseTemplate(ms.adconf.Tpc, item)
+	tpc, err := ctx.ParseTemplate(topic, item)
 	if err != nil {
 		return err
 	}

+ 36 - 34
internal/topo/node/cache/sync_cache.go

@@ -21,7 +21,6 @@ import (
 
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/pkg/store"
-	"github.com/lf-edge/ekuiper/internal/topo/node/metric"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/infra"
 	"github.com/lf-edge/ekuiper/pkg/kv"
@@ -29,7 +28,7 @@ import (
 
 type AckResult bool
 
-// page Rotate storage for in memory cache
+// page Rotates storage for in memory cache
 // Not thread safe!
 type page struct {
 	Data [][]map[string]interface{}
@@ -101,7 +100,6 @@ type SyncCache struct {
 	Ack       chan bool
 	cacheCtrl chan interface{} // CacheCtrl is the only place to control the cache; sync in and ack result
 	errorCh   chan<- error
-	stats     metric.StatManager
 	// cache config
 	cacheConf   *conf.SinkConf
 	maxDiskPage int
@@ -110,8 +108,8 @@ type SyncCache struct {
 	memCache       []*page
 	diskBufferPage *page
 	// status
-	diskSize     int // how many pages has been saved
-	cacheLength  int // readonly, for metrics only to save calculation
+	diskSize     int // the count of pages has been saved
+	CacheLength  int // readonly, for metrics only to save calculation
 	diskPageTail int // init from the database
 	diskPageHead int
 	sendStatus   int // 0: idle, 1: sending and waiting for ack, 2: stopped for error
@@ -121,13 +119,13 @@ type SyncCache struct {
 	exitCh chan<- struct{}
 }
 
-func NewSyncCacheWithExitChanel(ctx api.StreamContext, in <-chan []map[string]interface{}, errCh chan<- error, stats metric.StatManager, cacheConf *conf.SinkConf, bufferLength int, exitCh chan<- struct{}) *SyncCache {
-	c := NewSyncCache(ctx, in, errCh, stats, cacheConf, bufferLength)
+func NewSyncCacheWithExitChanel(ctx api.StreamContext, in <-chan []map[string]interface{}, errCh chan<- error, cacheConf *conf.SinkConf, bufferLength int, exitCh chan<- struct{}) *SyncCache {
+	c := NewSyncCache(ctx, in, errCh, cacheConf, bufferLength)
 	c.exitCh = exitCh
 	return c
 }
 
-func NewSyncCache(ctx api.StreamContext, in <-chan []map[string]interface{}, errCh chan<- error, stats metric.StatManager, cacheConf *conf.SinkConf, bufferLength int) *SyncCache {
+func NewSyncCache(ctx api.StreamContext, in <-chan []map[string]interface{}, errCh chan<- error, cacheConf *conf.SinkConf, bufferLength int) *SyncCache {
 	c := &SyncCache{
 		cacheConf:  cacheConf,
 		in:         in,
@@ -139,7 +137,6 @@ func NewSyncCache(ctx api.StreamContext, in <-chan []map[string]interface{}, err
 		memCache:   make([]*page, 0),
 		// add one more slot so that there will be at least one slot between head and tail to find out the head/tail id
 		maxDiskPage: (cacheConf.MaxDiskCache / cacheConf.BufferPageSize) + 1,
-		stats:       stats,
 	}
 	go func() {
 		err := infra.SafeRun(func() error {
@@ -156,7 +153,7 @@ func NewSyncCache(ctx api.StreamContext, in <-chan []map[string]interface{}, err
 func (c *SyncCache) run(ctx api.StreamContext) {
 	c.initStore(ctx)
 	defer c.onClose(ctx)
-	if c.cacheLength > 0 { // start to send the cache
+	if c.CacheLength > 0 { // start to send the cache
 		c.send(ctx)
 	}
 	for {
@@ -186,7 +183,12 @@ func (c *SyncCache) run(ctx api.StreamContext) {
 				}
 			case []map[string]interface{}:
 				ctx.GetLogger().Debugf("adding cache %v", data)
-				c.addCache(ctx, r)
+				// hack here: nil is a signal to continue sending, so not adding nil to cache
+				if r != nil {
+					c.addCache(ctx, r)
+				} else {
+					ctx.GetLogger().Debug("nil cache, continue sending")
+				}
 				if c.sendStatus == 2 {
 					c.sendStatus = 0
 					ctx.GetLogger().Debug("send status to 0 after adding cache in error state")
@@ -194,7 +196,7 @@ func (c *SyncCache) run(ctx api.StreamContext) {
 			default:
 				ctx.GetLogger().Errorf("unknown cache control command %v", data)
 			}
-			c.stats.SetBufferLength(int64(len(c.in) + c.cacheLength))
+			ctx.GetLogger().Debugf("cache status %d", c.sendStatus)
 			if c.sendStatus == 0 {
 				c.send(ctx)
 			}
@@ -206,7 +208,7 @@ func (c *SyncCache) run(ctx api.StreamContext) {
 }
 
 func (c *SyncCache) send(ctx api.StreamContext) {
-	if c.cacheLength > 1 && c.cacheConf.ResendInterval > 0 {
+	if c.CacheLength > 1 && c.cacheConf.ResendInterval > 0 {
 		time.Sleep(time.Duration(c.cacheConf.ResendInterval) * time.Millisecond)
 	}
 	d, ok := c.peakMemCache(ctx)
@@ -264,38 +266,38 @@ func (c *SyncCache) addCache(ctx api.StreamContext, item []map[string]interface{
 	} else {
 		ctx.GetLogger().Debugf("added cache to mem cache %v", c.memCache)
 	}
-	c.cacheLength++
-	ctx.GetLogger().Debugf("added cache %d", c.cacheLength)
+	c.CacheLength++
+	ctx.GetLogger().Debugf("added cache %d", c.CacheLength)
 }
 
 // deleteCache not thread safe!
 func (c *SyncCache) deleteCache(ctx api.StreamContext) {
-	ctx.GetLogger().Debugf("deleting cache. cacheLength: %d, diskSize: %d", c.cacheLength, c.diskSize)
+	ctx.GetLogger().Debugf("deleting cache. CacheLength: %d, diskSize: %d", c.CacheLength, c.diskSize)
 	if len(c.memCache) == 0 {
 		ctx.GetLogger().Debug("mem cache is empty")
 		return
 	}
 	isNotEmpty := c.memCache[0].delete()
 	if isNotEmpty {
-		c.cacheLength--
-		ctx.GetLogger().Debugf("deleted cache: %d", c.cacheLength)
+		c.CacheLength--
+		ctx.GetLogger().Debugf("deleted cache: %d", c.CacheLength)
 	}
 	if c.memCache[0].isEmpty() { // read from disk or cool list
 		c.memCache = c.memCache[1:]
 		if c.diskSize > 0 {
 			c.loadFromDisk(ctx)
 		} else if c.diskBufferPage != nil { // use cool page as the new page
-			ctx.GetLogger().Debugf("reading from diskBufferPage: %d", c.cacheLength)
+			ctx.GetLogger().Debugf("reading from diskBufferPage: %d", c.CacheLength)
 			c.memCache = append(c.memCache, c.diskBufferPage)
 			c.diskBufferPage = nil
 		}
 	}
-	ctx.GetLogger().Debugf("deleted cache. cacheLength: %d, diskSize: %d, memCache: %v", c.cacheLength, c.diskSize, c.memCache)
+	ctx.GetLogger().Debugf("deleted cache. CacheLength: %d, diskSize: %d, memCache: %v", c.CacheLength, c.diskSize, c.memCache)
 }
 
 func (c *SyncCache) loadFromDisk(ctx api.StreamContext) {
 	// load page from the disk
-	ctx.GetLogger().Debugf("loading from disk %d. cacheLength: %d, diskSize: %d", c.diskPageTail, c.cacheLength, c.diskSize)
+	ctx.GetLogger().Debugf("loading from disk %d. CacheLength: %d, diskSize: %d", c.diskPageTail, c.CacheLength, c.diskSize)
 	hotPage := newPage(c.cacheConf.BufferPageSize)
 	ok, err := c.store.Get(strconv.Itoa(c.diskPageHead), hotPage)
 	if err != nil {
@@ -306,7 +308,7 @@ func (c *SyncCache) loadFromDisk(ctx api.StreamContext) {
 		_ = c.store.Delete(strconv.Itoa(c.diskPageHead))
 		if len(c.memCache) >= c.maxMemPage {
 			ctx.GetLogger().Warnf("drop a page of %d items in memory", c.memCache[0].L)
-			c.cacheLength -= c.memCache[0].L
+			c.CacheLength -= c.memCache[0].L
 			c.memCache = c.memCache[1:]
 		}
 		c.memCache = append(c.memCache, hotPage)
@@ -324,7 +326,7 @@ func (c *SyncCache) loadFromDisk(ctx api.StreamContext) {
 			ctx.GetLogger().Warnf("fail to store disk cache size %v", err)
 		}
 	}
-	ctx.GetLogger().Debugf("loaded from disk %d. cacheLength: %d, diskSize: %d", c.diskPageTail, c.cacheLength, c.diskSize)
+	ctx.GetLogger().Debugf("loaded from disk %d. CacheLength: %d, diskSize: %d", c.diskPageTail, c.CacheLength, c.diskSize)
 }
 
 func (c *SyncCache) appendMemCache(item []map[string]interface{}) bool {
@@ -356,7 +358,7 @@ func (c *SyncCache) initStore(ctx api.StreamContext) {
 	kvTable := path.Join("sink", ctx.GetRuleId()+ctx.GetOpId()+strconv.Itoa(ctx.GetInstanceId()))
 	if c.cacheConf.CleanCacheAtStop {
 		ctx.GetLogger().Infof("creating cache store %s", kvTable)
-		store.DropCacheKV(kvTable)
+		_ = store.DropCacheKV(kvTable)
 	}
 	var err error
 	c.store, err = store.GetCacheKV(kvTable)
@@ -372,8 +374,8 @@ func (c *SyncCache) initStore(ctx api.StreamContext) {
 			i := 0
 			for ; i < 100; i++ {
 				time.Sleep(time.Millisecond * 10)
-				c.store.Get("storeSig", &set)
-				if set == 1 {
+				_, err := c.store.Get("storeSig", &set)
+				if err == nil && set == 1 {
 					ctx.GetLogger().Infof("waiting for previous cache for %d times", i)
 					break
 				}
@@ -382,7 +384,7 @@ func (c *SyncCache) initStore(ctx api.StreamContext) {
 				ctx.GetLogger().Errorf("waiting for previous cache for %d times, exit and drop", i)
 			}
 		}
-		c.store.Set("storeSig", 0)
+		_ = c.store.Set("storeSig", 0)
 		ctx.GetLogger().Infof("start to restore cache from disk")
 		// restore the memCache
 		_, err = c.store.Get("memcache", &c.memCache)
@@ -390,13 +392,13 @@ func (c *SyncCache) initStore(ctx api.StreamContext) {
 			ctx.GetLogger().Errorf("fail to restore mem cache %v", err)
 		}
 		for _, p := range c.memCache {
-			c.cacheLength += p.L
+			c.CacheLength += p.L
 		}
 		err = c.store.Delete("memcache")
 		if err != nil {
 			ctx.GetLogger().Errorf("fail to delete mem cache %v", err)
 		}
-		ctx.GetLogger().Infof("restored mem cache %d", c.cacheLength)
+		ctx.GetLogger().Infof("restored mem cache %d", c.CacheLength)
 		// restore the disk cache
 		var size int
 		ok, _ = c.store.Get("size", &size)
@@ -409,7 +411,7 @@ func (c *SyncCache) initStore(ctx api.StreamContext) {
 		if ok {
 			c.diskPageHead = head
 		}
-		c.cacheLength += (c.diskSize - 1) * c.cacheConf.BufferPageSize
+		c.CacheLength += (c.diskSize - 1) * c.cacheConf.BufferPageSize
 		c.diskPageTail = (c.diskPageHead + c.diskSize - 1) % c.maxDiskPage
 		// load buffer page
 		hotPage := newPage(c.cacheConf.BufferPageSize)
@@ -420,10 +422,10 @@ func (c *SyncCache) initStore(ctx api.StreamContext) {
 			ctx.GetLogger().Errorf("nothing in the disk, should not happen")
 		} else {
 			c.diskBufferPage = hotPage
-			c.cacheLength += c.diskBufferPage.L
+			c.CacheLength += c.diskBufferPage.L
 			c.diskSize--
 		}
-		ctx.GetLogger().Infof("restored all cache %d. diskSize %d", c.cacheLength, c.diskSize)
+		ctx.GetLogger().Infof("restored all cache %d. diskSize %d", c.CacheLength, c.diskSize)
 	}
 }
 
@@ -438,7 +440,7 @@ func (c *SyncCache) onClose(ctx api.StreamContext) {
 	if c.cacheConf.CleanCacheAtStop {
 		kvTable := path.Join("sink", ctx.GetRuleId()+ctx.GetOpId()+strconv.Itoa(ctx.GetInstanceId()))
 		ctx.GetLogger().Infof("cleaning cache store %s", kvTable)
-		store.DropCacheKV(kvTable)
+		_ = store.DropCacheKV(kvTable)
 	} else {
 		if c.diskBufferPage != nil {
 			err := c.store.Set(strconv.Itoa(c.diskPageTail), c.diskBufferPage)
@@ -459,6 +461,6 @@ func (c *SyncCache) onClose(ctx api.StreamContext) {
 			}
 			ctx.GetLogger().Debugf("store memory cache %d", len(c.memCache))
 		}
-		c.store.Set("storeSig", 1)
+		_ = c.store.Set("storeSig", 1)
 	}
 }

+ 3 - 9
internal/topo/node/cache/sync_cache_test.go

@@ -25,7 +25,6 @@ import (
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/testx"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
-	"github.com/lf-edge/ekuiper/internal/topo/node/metric"
 	"github.com/lf-edge/ekuiper/internal/topo/state"
 	"github.com/lf-edge/ekuiper/pkg/api"
 )
@@ -193,11 +192,6 @@ func TestRun(t *testing.T) {
 	for i, tt := range tests {
 		contextLogger := conf.Log.WithField("rule", fmt.Sprintf("TestRun-%d", i))
 		ctx, cancel := context.WithValue(context.Background(), context.LoggerKey, contextLogger).WithMeta(fmt.Sprintf("rule%d", i), fmt.Sprintf("op%d", i), tempStore).WithCancel()
-		stats, err := metric.NewStatManager(ctx, "sink")
-		if err != nil {
-			t.Fatal(err)
-			return
-		}
 		in := make(chan []map[string]interface{})
 		errCh := make(chan error)
 		var result []interface{}
@@ -208,18 +202,18 @@ func TestRun(t *testing.T) {
 		}()
 		exitCh := make(chan struct{})
 		// send data
-		_ = NewSyncCacheWithExitChanel(ctx, in, errCh, stats, tt.sconf, 100, exitCh)
+		_ = NewSyncCacheWithExitChanel(ctx, in, errCh, tt.sconf, 100, exitCh)
 		for i := 0; i < tt.stopPt; i++ {
 			in <- tt.dataIn[i]
 			time.Sleep(1 * time.Millisecond)
 		}
 		cancel()
-		// wait cleanup job done
+		// wait a cleanup job done
 		<-exitCh
 
 		// send the second half data
 		ctx, cancel = context.WithValue(context.Background(), context.LoggerKey, contextLogger).WithMeta(fmt.Sprintf("rule%d", i), fmt.Sprintf("op%d", i), tempStore).WithCancel()
-		sc := NewSyncCache(ctx, in, errCh, stats, tt.sconf, 100)
+		sc := NewSyncCache(ctx, in, errCh, tt.sconf, 100)
 		for i := tt.stopPt; i < len(tt.dataIn); i++ {
 			in <- tt.dataIn[i]
 			time.Sleep(1 * time.Millisecond)

+ 110 - 20
internal/topo/node/sink_node.go

@@ -199,9 +199,12 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 							}
 						} else {
 							logger.Infof("Creating sink cache")
-							{ // sync mode, the ack is already in order
-								dataCh := make(chan []map[string]interface{}, sconf.BufferLength)
-								c := cache.NewSyncCache(ctx, dataCh, result, stats, &sconf.SinkConf, sconf.BufferLength)
+							dataCh := make(chan []map[string]interface{}, sconf.BufferLength)
+							// cache for normal sink data
+							c := cache.NewSyncCache(ctx, dataCh, result, &sconf.SinkConf, sconf.BufferLength)
+							// One cache queue to mix live data and resend data. Send by time order
+							if !sconf.ResendAlterQueue {
+								// sync mode, the ack is already in order
 								for {
 									select {
 									case data := <-m.input:
@@ -210,6 +213,7 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 											break
 										}
 										stats.IncTotalRecordsIn()
+										stats.SetBufferLength(int64(len(dataCh) + c.CacheLength))
 										outs := itemToMap(data)
 										if sconf.Omitempty && (data == nil || len(outs) == 0) {
 											ctx.GetLogger().Debugf("receive empty in sink")
@@ -221,23 +225,75 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 										}
 									case data := <-c.Out:
 										stats.ProcessTimeStart()
-										ack := true
-										err := doCollectMaps(ctx, sink, sconf, data, sendManager, stats)
-										// Only recoverable error should be cached
-										if err != nil {
-											if strings.HasPrefix(err.Error(), errorx.IOErr) { // do not log to prevent a lot of logs!
-												ack = false
-											} else {
-												ctx.GetLogger().Warnf("sink node %s instance %d publish %s error: %v", ctx.GetOpId(), ctx.GetInstanceId(), data, err)
+										stats.SetBufferLength(int64(len(dataCh) + c.CacheLength))
+										err := doCollectMaps(ctx, sink, sconf, data, sendManager, stats, false)
+										ack := checkAck(ctx, data, err)
+										select {
+										case c.Ack <- ack:
+										case <-ctx.Done():
+										}
+										stats.ProcessTimeEnd()
+									case <-ctx.Done():
+										logger.Infof("sink node %s instance %d done", m.name, instance)
+										if err := sink.Close(ctx); err != nil {
+											logger.Warnf("close sink node %s instance %d fails: %v", m.name, instance, err)
+										}
+										return nil
+									}
+								}
+							} else {
+								resendCh := make(chan []map[string]interface{}, sconf.BufferLength)
+								rq := cache.NewSyncCache(ctx, resendCh, result, &sconf.SinkConf, sconf.BufferLength)
+								for {
+									select {
+									case data := <-m.input:
+										processed := false
+										if data, processed = m.preprocess(data); processed {
+											break
+										}
+										stats.IncTotalRecordsIn()
+										stats.SetBufferLength(int64(len(dataCh) + c.CacheLength + rq.CacheLength))
+										outs := itemToMap(data)
+										if sconf.Omitempty && (data == nil || len(outs) == 0) {
+											ctx.GetLogger().Debugf("receive empty in sink")
+											return nil
+										}
+										select {
+										case dataCh <- outs:
+										case <-ctx.Done():
+										}
+										select {
+										case resendCh <- nil:
+										case <-ctx.Done():
+										}
+									case data := <-c.Out:
+										stats.ProcessTimeStart()
+										stats.SetBufferLength(int64(len(dataCh) + c.CacheLength + rq.CacheLength))
+										ctx.GetLogger().Debugf("sending data: %v", data)
+										err := doCollectMaps(ctx, sink, sconf, data, sendManager, stats, false)
+										ack := checkAck(ctx, data, err)
+										// If ack is false, add it to the resend queue
+										if !ack {
+											select {
+											case resendCh <- data:
+											case <-ctx.Done():
 											}
-										} else {
-											ctx.GetLogger().Debugf("sent data to MQTT: %v", data)
 										}
+										// Always ack for the normal queue as fail items are handled by the resend queue
 										select {
-										case c.Ack <- ack:
+										case c.Ack <- true:
 										case <-ctx.Done():
 										}
 										stats.ProcessTimeEnd()
+									case data := <-rq.Out:
+										ctx.GetLogger().Debugf("resend data: %v", data)
+										stats.SetBufferLength(int64(len(dataCh) + c.CacheLength + rq.CacheLength))
+										err := doCollectMaps(ctx, sink, sconf, data, sendManager, stats, true)
+										ack := checkAck(ctx, data, err)
+										select {
+										case rq.Ack <- ack:
+										case <-ctx.Done():
+										}
 									case <-ctx.Done():
 										logger.Infof("sink node %s instance %d done", m.name, instance)
 										if err := sink.Close(ctx); err != nil {
@@ -262,6 +318,19 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 	}()
 }
 
+func checkAck(ctx api.StreamContext, data interface{}, err error) bool {
+	if err != nil {
+		if strings.HasPrefix(err.Error(), errorx.IOErr) { // do not log to prevent a lot of logs!
+			return false
+		} else {
+			ctx.GetLogger().Warnf("sink node %s instance %d publish %s error: %v", ctx.GetOpId(), ctx.GetInstanceId(), data, err)
+		}
+	} else {
+		ctx.GetLogger().Debugf("sent data: %v", data)
+	}
+	return true
+}
+
 func (m *SinkNode) parseConf(logger api.Logger) (*SinkConf, error) {
 	sconf := &SinkConf{
 		Concurrency:  1,
@@ -317,12 +386,12 @@ func doCollect(ctx api.StreamContext, sink api.Sink, item interface{}, sendManag
 		ctx.GetLogger().Debugf("receive empty in sink")
 		return nil
 	}
-	return doCollectMaps(ctx, sink, sconf, outs, sendManager, stats)
+	return doCollectMaps(ctx, sink, sconf, outs, sendManager, stats, false)
 }
 
-func doCollectMaps(ctx api.StreamContext, sink api.Sink, sconf *SinkConf, outs []map[string]interface{}, sendManager *sinkUtil.SendManager, stats metric.StatManager) error {
+func doCollectMaps(ctx api.StreamContext, sink api.Sink, sconf *SinkConf, outs []map[string]interface{}, sendManager *sinkUtil.SendManager, stats metric.StatManager, isResend bool) error {
 	if !sconf.SendSingle {
-		return doCollectData(ctx, sink, outs, sendManager, stats)
+		return doCollectData(ctx, sink, outs, sendManager, stats, isResend)
 	} else {
 		var err error
 		for _, d := range outs {
@@ -330,7 +399,7 @@ func doCollectMaps(ctx api.StreamContext, sink api.Sink, sconf *SinkConf, outs [
 				ctx.GetLogger().Debugf("receive empty in sink")
 				continue
 			}
-			newErr := doCollectData(ctx, sink, d, sendManager, stats)
+			newErr := doCollectData(ctx, sink, d, sendManager, stats, isResend)
 			if newErr != nil {
 				err = newErr
 			}
@@ -369,7 +438,7 @@ func itemToMap(item interface{}) []map[string]interface{} {
 }
 
 // doCollectData outData must be map or []map
-func doCollectData(ctx api.StreamContext, sink api.Sink, outData interface{}, sendManager *sinkUtil.SendManager, stats metric.StatManager) error {
+func doCollectData(ctx api.StreamContext, sink api.Sink, outData interface{}, sendManager *sinkUtil.SendManager, stats metric.StatManager, isResend bool) error {
 	if sendManager != nil {
 		switch v := outData.(type) {
 		case map[string]interface{}:
@@ -386,7 +455,11 @@ func doCollectData(ctx api.StreamContext, sink api.Sink, outData interface{}, se
 		ctx.GetLogger().Infof("sink node %s instance %d stops data resending", ctx.GetOpId(), ctx.GetInstanceId())
 		return nil
 	default:
-		return sendDataToSink(ctx, sink, outData, stats)
+		if isResend {
+			return resendDataToSink(ctx, sink, outData, stats)
+		} else {
+			return sendDataToSink(ctx, sink, outData, stats)
+		}
 	}
 }
 
@@ -415,6 +488,23 @@ func sendDataToSink(ctx api.StreamContext, sink api.Sink, outData interface{}, s
 	}
 }
 
+func resendDataToSink(ctx api.StreamContext, sink api.Sink, outData interface{}, stats metric.StatManager) error {
+	var err error
+	switch st := sink.(type) {
+	case api.ResendSink:
+		err = st.CollectResend(ctx, outData)
+	default:
+		err = st.Collect(ctx, outData)
+	}
+	if err != nil {
+		stats.IncTotalExceptions(err.Error())
+		return err
+	} else {
+		ctx.GetLogger().Debugf("success resend")
+		return nil
+	}
+}
+
 func getSink(name string, action map[string]interface{}) (api.Sink, error) {
 	var (
 		s   api.Sink

+ 89 - 2
internal/topo/node/sink_node_test.go

@@ -13,7 +13,6 @@
 // limitations under the License.
 
 //go:build template || !core
-// +build template !core
 
 package node
 
@@ -27,6 +26,7 @@ import (
 	"time"
 
 	"github.com/benbjohnson/clock"
+	"github.com/stretchr/testify/assert"
 
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/schema"
@@ -619,7 +619,7 @@ func TestSinkFields_Apply(t *testing.T) {
 		tf, _ := transform.GenTransform(tt.dt, tt.format, tt.schemaId, tt.delimiter, tt.dataField, tt.fields)
 		vCtx := context.WithValue(ctx, context.TransKey, tf)
 		mockSink := mocknode.NewMockSink()
-		mockSink.Collect(vCtx, tt.data)
+		_ = mockSink.Collect(vCtx, tt.data)
 		time.Sleep(1 * time.Second)
 		results := mockSink.GetResults()
 		if !reflect.DeepEqual(tt.result, results) {
@@ -627,3 +627,90 @@ func TestSinkFields_Apply(t *testing.T) {
 		}
 	}
 }
+
+func TestSinkCache(t *testing.T) {
+	conf.InitConf()
+	transform.RegisterAdditionalFuncs()
+
+	contextLogger := conf.Log.WithField("rule", "TestSinkCache")
+	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
+	data := [][]map[string]interface{}{
+		{{"a": 1}},
+		{{"a": 2}},
+		{{"a": 3}},
+		{{"a": 4}},
+		{{"a": 5}},
+		{{"a": 6}},
+		{{"a": 7}},
+		{{"a": 8}},
+		{{"a": 9}},
+		{{"a": 10}},
+	}
+
+	t.Run("test cache", func(t *testing.T) {
+		hitch := make(chan int, 10)
+		config := map[string]interface{}{
+			"enableCache": true,
+		}
+		result := [][]byte{
+			[]byte(`[{"a":1}]`),
+			[]byte(`[{"a":2}]`),
+			[]byte(`[{"a":3}]`),
+			[]byte(`[{"a":4}]`),
+			[]byte(`[{"a":5}]`),
+			[]byte(`[{"a":6}]`),
+			[]byte(`[{"a":7}]`),
+			[]byte(`[{"a":8}]`),
+			[]byte(`[{"a":9}]`),
+			[]byte(`[{"a":10}]`),
+		}
+		mockSink := mocknode.NewMockResendSink(hitch)
+		s := NewSinkNodeWithSink("mockSink", mockSink, config)
+		s.Open(ctx, make(chan error))
+		for i := 0; i < 200; i++ {
+			s.input <- data[i%10]
+			select {
+			case count := <-hitch:
+				if count == len(data)*2 {
+					goto end
+				}
+			case <-time.After(1 * time.Second):
+			}
+		}
+	end:
+		results := mockSink.GetResults()
+		assert.Equal(t, result, results)
+	})
+
+	t.Run("test resend cache", func(t *testing.T) {
+		hitch := make(chan int, 10)
+		config := map[string]interface{}{
+			"enableCache":      true,
+			"resendAlterQueue": true,
+		}
+		result := [][]byte{
+			[]byte(`[{"a":2}]`),
+			[]byte(`[{"a":4}]`),
+			[]byte(`[{"a":6}]`),
+			[]byte(`[{"a":8}]`),
+			[]byte(`[{"a":10}]`),
+		}
+		resendResult := [][]byte{
+			[]byte(`[{"a":1}]`),
+			[]byte(`[{"a":3}]`),
+			[]byte(`[{"a":5}]`),
+		}
+		mockSink := mocknode.NewMockResendSink(hitch)
+		s := NewSinkNodeWithSink("mockSink", mockSink, config)
+		s.Open(ctx, make(chan error))
+		for _, d := range data {
+			s.input <- d
+			<-hitch
+		}
+		time.Sleep(1 * time.Second)
+		results := mockSink.GetResults()
+		assert.Equal(t, results, result)
+		resentResults := mockSink.GetResendResults()
+		assert.Equal(t, resendResult, resentResults[:3])
+	})
+}

+ 91 - 0
internal/topo/topotest/mocknode/mock_resend_sink.go

@@ -0,0 +1,91 @@
+// Copyright 2023 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mocknode
+
+import (
+	"fmt"
+
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/errorx"
+)
+
+type MockResendSink struct {
+	results       [][]byte
+	resentResults [][]byte
+	count         int
+	onHit         chan int
+}
+
+func NewMockResendSink(onHit chan int) *MockResendSink {
+	m := &MockResendSink{onHit: onHit}
+	return m
+}
+
+func (m *MockResendSink) Open(ctx api.StreamContext) error {
+	log := ctx.GetLogger()
+	log.Debugln("Opening mock sink")
+	m.results = make([][]byte, 0)
+	m.resentResults = make([][]byte, 0)
+	return nil
+}
+
+func (m *MockResendSink) Collect(ctx api.StreamContext, item interface{}) error {
+	logger := ctx.GetLogger()
+	defer func() {
+		m.count++
+		m.onHit <- m.count
+	}()
+	if m.count%2 == 0 {
+		return fmt.Errorf("%s: mock io error", errorx.IOErr)
+	}
+	if v, _, err := ctx.TransformOutput(item); err == nil {
+		logger.Debugf("mock sink receive %s", item)
+		m.results = append(m.results, v)
+	} else {
+		logger.Info("mock sink transform data error: %v", err)
+	}
+	return nil
+}
+
+func (m *MockResendSink) CollectResend(ctx api.StreamContext, item interface{}) error {
+	logger := ctx.GetLogger()
+	if m.count%3 != 1 {
+		return fmt.Errorf("%s: mock io error", errorx.IOErr)
+	}
+	if v, _, err := ctx.TransformOutput(item); err == nil {
+		logger.Debugf("mock sink resend %s", item)
+		m.resentResults = append(m.resentResults, v)
+	} else {
+		logger.Info("mock sink transform data error: %v", err)
+	}
+	return nil
+}
+
+func (m *MockResendSink) Close(_ api.StreamContext) error {
+	// do nothing
+	return nil
+}
+
+func (m *MockResendSink) Configure(_ map[string]interface{}) error {
+	return nil
+}
+
+func (m *MockResendSink) GetResults() [][]byte {
+	return m.results
+}
+
+func (m *MockResendSink) GetResendResults() [][]byte {
+	return m.resentResults
+}

+ 6 - 0
pkg/api/stream.go

@@ -117,6 +117,12 @@ type Sink interface {
 	Closable
 }
 
+type ResendSink interface {
+	Sink
+	// CollectResend Called when the sink cache resend is triggered
+	CollectResend(ctx StreamContext, data interface{}) error
+}
+
 type Emitter interface {
 	AddOutput(chan<- interface{}, string) error
 }