Quellcode durchsuchen

offline cache (#1304)

* feat(cache): sync cache for sink

Signed-off-by: Jiyong Huang <huangjy@emqx.io>

* feat(mqtt): publish immediately return error if connection lost

Signed-off-by: Jiyong Huang <huangjy@emqx.io>

* doc(sink): sink caching

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
ngjaying vor 2 Jahren
Ursprung
Commit
c26776ba5d

+ 18 - 7
docs/en_US/operation/config/configuration_file.md

@@ -114,16 +114,27 @@ After get the plugin info, users can try these plugins, [more info](../restapi/p
 
 
 ## Sink configurations
 ## Sink configurations
 
 
+Configure the default properties of sink, currently mainly used to configure [cache policy](../../rules/sinks/overview.md#Caching). The same configuration options are available at the rules level to override these default configurations.
+
 ```yaml
 ```yaml
-  #The cache persistence threshold size. If the message in sink cache is larger than 10, then it triggers persistence. If you find the remote system is slow to response, or sink throughput is small, then it's recommend to increase below 2 configurations.More memory is required with the increase of below 2 configurations.
+  sink:
+  # Control to disable cache or not. If it's set to true, then the cache will be disabled, otherwise, it will be enabled.
+  enableCache: false
 
 
-  # If the message count reaches below value, then it triggers persistence.
-  cacheThreshold: 10
-  # The message persistence is triggered by a ticker, and cacheTriggerCount is for using configure the count to trigger the persistence procedure regardless if the message number reaches cacheThreshold or not. This is to prevent the data won't be saved as the cache never pass the threshold.
-  cacheTriggerCount: 15
+  # The maximum number of messages to be cached in memory.
+  memoryCacheThreshold: 1024
 
 
-  # Control to disable cache or not. If it's set to true, then the cache will be disabled, otherwise, it will be enabled.
-  disableCache: false
+  # The maximum number of messages to be cached in the disk.
+  maxDiskCache: 1024000
+
+  # The number of messages for a buffer page which is the unit to read/write to disk batchly to prevent frequent IO
+  bufferPageSize: 256
+
+  # The interval in millisecond to resend the cached messages
+  resendInterval: 0
+
+  # Whether to clean the cache when the rule stops
+  cleanCacheAtStop: false
 ```
 ```
 
 
 ## Store configurations
 ## Store configurations

Datei-Diff unterdrückt, da er zu groß ist
+ 48 - 1
docs/en_US/rules/sinks/overview.md


+ 18 - 7
docs/zh_CN/operation/config/configuration_file.md

@@ -23,6 +23,7 @@ basic:
 
 
 ## 系统日志
 ## 系统日志
 用户将名为 KuiperSyslogKey 的环境变量的值设置为 true 时,日志将打印到系统日志中。
 用户将名为 KuiperSyslogKey 的环境变量的值设置为 true 时,日志将打印到系统日志中。
+
 ## Cli 地址
 ## Cli 地址
 ```yaml
 ```yaml
 basic:
 basic:
@@ -96,16 +97,26 @@ GET http://localhost:9081/plugins/functions/prebuild
 
 
 ## Sink 配置
 ## Sink 配置
 
 
+配置 sink 的默认属性,目前主要用于配置[缓存策略](../../rules/sinks/overview.md#缓存)。在规则层有同样的配置选项,可以覆盖这些默认配置。
+
 ```yaml
 ```yaml
-  #缓存持久化阈值。 如果接收器高速缓存中的消息大于10,则它将触发持久化。 如果发现远程系统响应速度慢或接收器吞吐量很小,则建议增加2种以下配置,此时需要更多内存。
+  # 是否开启缓存
+  enableCache: false
+  
+  # 内存缓存的最大存储条数
+  memoryCacheThreshold: 1024
+
+  # 磁盘缓存的最大存储条数
+  maxDiskCache: 1024000
+
+  # 读写磁盘的缓存页条数,作为磁盘读写的基本单位
+  bufferPageSize: 256
 
 
-  # 如果消息计数达到以下值,则会触发持久化。
-  cacheThreshold: 10
-  # 消息持久化由代码触发,cacheTriggerCount 用于使用配置计数来触发持久化过程,而不管消息号是否达到cacheThreshold。 这是为了防止由于缓存永远不会超过阈值而无法保存数据。
-  cacheTriggerCount: 15
+  # 重发的间隔时间,单位为毫秒
+  resendInterval: 0
 
 
-  # 控制是否禁用缓存。 如果将其设置为true,则将禁用缓存,否则将启用缓存。
-  disableCache: false
+  # 规则停止后是否清除缓存
+  cleanCacheAtStop: false
 ```
 ```
 
 
 ## 存储配置
 ## 存储配置

Datei-Diff unterdrückt, da er zu groß ist
+ 48 - 1
docs/zh_CN/rules/sinks/overview.md


+ 16 - 10
etc/kuiper.yaml

@@ -43,17 +43,23 @@ rule:
   sendError: true
   sendError: true
 
 
 sink:
 sink:
-  # The cache persistence threshold size. If the message in sink cache is larger than 10, then it triggers persistence. If you find
-  # the remote system is slow to response, or sink throughput is small, then it's recommend to increase below 2 configurations.
-  # More memory is required with the increase of below 2 configurations.
-  # If the message count reaches below value, then it triggers persistence.
-  cacheThreshold: 10
-  # The message persistence is triggered by a ticker, and cacheTriggerCount is for using configure the count to trigger the persistence procedure
-  # regardless if the message number reaches cacheThreshold or not. This is to prevent the data won't be saved as the cache never pass the threshold.
-  cacheTriggerCount: 15
-
   # Control to disable cache or not. If it's set to true, then the cache will be disabled, otherwise, it will be enabled.
   # Control to disable cache or not. If it's set to true, then the cache will be disabled, otherwise, it will be enabled.
-  disableCache: true
+  enableCache: false
+
+  # The maximum number of messages to be cached in memory.
+  memoryCacheThreshold: 1024
+
+  # The maximum number of messages to be cached in the disk.
+  maxDiskCache: 1024000
+
+  # The number of messages for a buffer page which is the unit to read/write to disk batchly to prevent frequent IO
+  bufferPageSize: 256
+
+  # The interval in millisecond to resend the cached messages
+  resendInterval: 0
+
+  # Whether to clean the cache when the rule stops
+  cleanCacheAtStop: false
 
 
 store:
 store:
   #Type of store that will be used for keeping state of the application
   #Type of store that will be used for keeping state of the application

+ 60 - 7
internal/conf/conf.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@ import (
 	"fmt"
 	"fmt"
 	"github.com/lestrrat-go/file-rotatelogs"
 	"github.com/lestrrat-go/file-rotatelogs"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/errorx"
 	"github.com/sirupsen/logrus"
 	"github.com/sirupsen/logrus"
 	"io"
 	"io"
 	"os"
 	"os"
@@ -37,6 +38,61 @@ type tlsConf struct {
 	Keyfile  string `yaml:"keyfile"`
 	Keyfile  string `yaml:"keyfile"`
 }
 }
 
 
+type SinkConf struct {
+	MemoryCacheThreshold int  `json:"memoryCacheThreshold" yaml:"memoryCacheThreshold"`
+	MaxDiskCache         int  `json:"maxDiskCache" yaml:"maxDiskCache"`
+	BufferPageSize       int  `json:"bufferPageSize" yaml:"bufferPageSize"`
+	EnableCache          bool `json:"enableCache" yaml:"enableCache"`
+	ResendInterval       int  `json:"resendInterval" yaml:"resendInterval"`
+	CleanCacheAtStop     bool `json:"cleanCacheAtStop" yaml:"cleanCacheAtStop"`
+}
+
+// Validate the configuration and reset to the default value for invalid values.
+func (sc SinkConf) Validate() error {
+	e := make(errorx.MultiError)
+	if sc.MemoryCacheThreshold < 0 {
+		sc.MemoryCacheThreshold = 1024
+		Log.Warnf("memoryCacheThreshold is less than 0, set to 1024")
+		e["memoryCacheThreshold"] = fmt.Errorf("memoryCacheThreshold must be positive")
+	}
+	if sc.MaxDiskCache < 0 {
+		sc.MaxDiskCache = 1024000
+		Log.Warnf("maxDiskCache is less than 0, set to 1024000")
+		e["maxDiskCache"] = fmt.Errorf("maxDiskCache must be positive")
+	}
+	if sc.BufferPageSize < 0 {
+		sc.BufferPageSize = 256
+		Log.Warnf("bufferPageSize is less than 0, set to 256")
+		e["bufferPageSize"] = fmt.Errorf("bufferPageSize must be positive")
+	}
+	if sc.ResendInterval < 0 {
+		sc.ResendInterval = 0
+		Log.Warnf("resendInterval is less than 0, set to 0")
+		e["resendInterval"] = fmt.Errorf("resendInterval must be positive")
+	}
+	if sc.BufferPageSize > sc.MemoryCacheThreshold {
+		sc.MemoryCacheThreshold = sc.BufferPageSize
+		Log.Warnf("memoryCacheThreshold is less than bufferPageSize, set to %d", sc.BufferPageSize)
+		e["memoryCacheThresholdTooSmall"] = fmt.Errorf("memoryCacheThreshold must be greater than or equal to bufferPageSize")
+	}
+	if sc.MemoryCacheThreshold%sc.BufferPageSize != 0 {
+		sc.MemoryCacheThreshold = sc.BufferPageSize * (sc.MemoryCacheThreshold/sc.BufferPageSize + 1)
+		Log.Warnf("memoryCacheThreshold is not a multiple of bufferPageSize, set to %d", sc.MemoryCacheThreshold)
+		e["memoryCacheThresholdNotMultiple"] = fmt.Errorf("memoryCacheThreshold must be a multiple of bufferPageSize")
+	}
+	if sc.BufferPageSize > sc.MaxDiskCache {
+		sc.MaxDiskCache = sc.BufferPageSize
+		Log.Warnf("maxDiskCache is less than bufferPageSize, set to %d", sc.BufferPageSize)
+		e["maxDiskCacheTooSmall"] = fmt.Errorf("maxDiskCache must be greater than bufferPageSize")
+	}
+	if sc.MaxDiskCache%sc.BufferPageSize != 0 {
+		sc.MaxDiskCache = sc.BufferPageSize * (sc.MaxDiskCache/sc.BufferPageSize + 1)
+		Log.Warnf("maxDiskCache is not a multiple of bufferPageSize, set to %d", sc.MaxDiskCache)
+		e["maxDiskCacheNotMultiple"] = fmt.Errorf("maxDiskCache must be a multiple of bufferPageSize")
+	}
+	return e.GetError()
+}
+
 type KuiperConf struct {
 type KuiperConf struct {
 	Basic struct {
 	Basic struct {
 		Debug          bool     `yaml:"debug"`
 		Debug          bool     `yaml:"debug"`
@@ -55,12 +111,8 @@ type KuiperConf struct {
 		Authentication bool     `yaml:"authentication"`
 		Authentication bool     `yaml:"authentication"`
 		IgnoreCase     bool     `yaml:"ignoreCase"`
 		IgnoreCase     bool     `yaml:"ignoreCase"`
 	}
 	}
-	Rule api.RuleOption
-	Sink struct {
-		CacheThreshold    int  `yaml:"cacheThreshold"`
-		CacheTriggerCount int  `yaml:"cacheTriggerCount"`
-		DisableCache      bool `yaml:"disableCache"`
-	}
+	Rule  api.RuleOption
+	Sink  *SinkConf
 	Store struct {
 	Store struct {
 		Type  string `yaml:"type"`
 		Type  string `yaml:"type"`
 		Redis struct {
 		Redis struct {
@@ -147,6 +199,7 @@ func InitConf() {
 	if Config.Portable.PythonBin == "" {
 	if Config.Portable.PythonBin == "" {
 		Config.Portable.PythonBin = "python"
 		Config.Portable.PythonBin = "python"
 	}
 	}
+	_ = Config.Sink.Validate()
 }
 }
 
 
 func init() {
 func init() {

+ 1 - 1
internal/pkg/store/redis/stores_builder.go

@@ -22,7 +22,7 @@ import (
 	"github.com/lf-edge/ekuiper/internal/pkg/store/definition"
 	"github.com/lf-edge/ekuiper/internal/pkg/store/definition"
 )
 )
 
 
-func BuildStores(c definition.Config) (definition.StoreBuilder, definition.TsBuilder, error) {
+func BuildStores(c definition.Config, _ string) (definition.StoreBuilder, definition.TsBuilder, error) {
 	db, err := NewRedisFromConf(c)
 	db, err := NewRedisFromConf(c)
 	if err != nil {
 	if err != nil {
 		return nil, nil, err
 		return nil, nil, err

+ 6 - 1
internal/pkg/store/setup.go

@@ -59,10 +59,15 @@ func SetupWithKuiperConfig(kconf *conf.KuiperConf) error {
 }
 }
 
 
 func Setup(config definition.Config) error {
 func Setup(config definition.Config) error {
-	s, err := newStores(config)
+	s, err := newStores(config, "sqliteKV.db")
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
 	globalStores = s
 	globalStores = s
+	s, err = newStores(config, "cache.db")
+	if err != nil {
+		return err
+	}
+	cacheStores = s
 	return nil
 	return nil
 }
 }

+ 1 - 1
internal/pkg/store/sql/sqlKv_test.go

@@ -80,7 +80,7 @@ func setupSqlKv() (kv.KeyValue, definition.Database, string) {
 		},
 		},
 	}
 	}
 
 
-	db, _ := sqlite.NewSqliteDatabase(config)
+	db, _ := sqlite.NewSqliteDatabase(config, "sqliteKV.db")
 	err = db.Connect()
 	err = db.Connect()
 	if err != nil {
 	if err != nil {
 		panic(err)
 		panic(err)

+ 1 - 1
internal/pkg/store/sql/sqlTs_test.go

@@ -89,7 +89,7 @@ func setupTSqlKv() (ts2.Tskv, definition.Database, string) {
 			Name: TDbName,
 			Name: TDbName,
 		},
 		},
 	}
 	}
-	db, _ := sqlite.NewSqliteDatabase(config)
+	db, _ := sqlite.NewSqliteDatabase(config, "sqliteKV.db")
 	err = db.Connect()
 	err = db.Connect()
 	if err != nil {
 	if err != nil {
 		panic(err)
 		panic(err)

+ 2 - 3
internal/pkg/store/sql/sqlite/database.go

@@ -30,10 +30,9 @@ type Database struct {
 	mu   sync.Mutex
 	mu   sync.Mutex
 }
 }
 
 
-func NewSqliteDatabase(c definition.Config) (definition.Database, error) {
+func NewSqliteDatabase(c definition.Config, name string) (definition.Database, error) {
 	conf := c.Sqlite
 	conf := c.Sqlite
 	dir := conf.Path
 	dir := conf.Path
-	name := "sqliteKV.db"
 	if conf.Name != "" {
 	if conf.Name != "" {
 		name = conf.Name
 		name = conf.Name
 	}
 	}
@@ -61,7 +60,7 @@ func (d *Database) Connect() error {
 }
 }
 
 
 func connectionString(dpath string) string {
 func connectionString(dpath string) string {
-	return fmt.Sprintf("file:%s?cache=shared", dpath)
+	return fmt.Sprintf("file:%s?cache=shared&_journal=WAL&sync=2", dpath)
 }
 }
 
 
 func (d *Database) Disconnect() error {
 func (d *Database) Disconnect() error {

+ 2 - 2
internal/pkg/store/sql/stores_builder.go

@@ -20,8 +20,8 @@ import (
 	"github.com/lf-edge/ekuiper/internal/pkg/store/sql/sqlite"
 	"github.com/lf-edge/ekuiper/internal/pkg/store/sql/sqlite"
 )
 )
 
 
-func BuildStores(c definition.Config) (definition.StoreBuilder, definition.TsBuilder, error) {
-	db, err := sqlite.NewSqliteDatabase(c)
+func BuildStores(c definition.Config, name string) (definition.StoreBuilder, definition.TsBuilder, error) {
+	db, err := sqlite.NewSqliteDatabase(c, name)
 	if err != nil {
 	if err != nil {
 		return nil, nil, err
 		return nil, nil, err
 	}
 	}

+ 19 - 3
internal/pkg/store/stores.go

@@ -22,13 +22,14 @@ import (
 	"sync"
 	"sync"
 )
 )
 
 
-type StoreCreator func(conf definition.Config) (definition.StoreBuilder, definition.TsBuilder, error)
+type StoreCreator func(conf definition.Config, name string) (definition.StoreBuilder, definition.TsBuilder, error)
 
 
 var (
 var (
 	storeBuilders = map[string]StoreCreator{
 	storeBuilders = map[string]StoreCreator{
 		"sqlite": sql.BuildStores,
 		"sqlite": sql.BuildStores,
 	}
 	}
 	globalStores *stores = nil
 	globalStores *stores = nil
+	cacheStores  *stores = nil
 )
 )
 
 
 type stores struct {
 type stores struct {
@@ -39,10 +40,10 @@ type stores struct {
 	tsBuilder definition.TsBuilder
 	tsBuilder definition.TsBuilder
 }
 }
 
 
-func newStores(c definition.Config) (*stores, error) {
+func newStores(c definition.Config, name string) (*stores, error) {
 	databaseType := c.Type
 	databaseType := c.Type
 	if builder, ok := storeBuilders[databaseType]; ok {
 	if builder, ok := storeBuilders[databaseType]; ok {
-		kvBuilder, tsBuilder, err := builder(c)
+		kvBuilder, tsBuilder, err := builder(c, name)
 		if err != nil {
 		if err != nil {
 			return nil, err
 			return nil, err
 		} else {
 		} else {
@@ -136,3 +137,18 @@ func DropKV(table string) error {
 	globalStores.DropKV(table)
 	globalStores.DropKV(table)
 	return nil
 	return nil
 }
 }
+
+func GetCacheKV(table string) (error, kv.KeyValue) {
+	if cacheStores == nil {
+		return fmt.Errorf("cache stores are not initialized"), nil
+	}
+	return cacheStores.GetKV(table)
+}
+
+func DropCacheKV(table string) error {
+	if cacheStores == nil {
+		return fmt.Errorf("cache stores are not initialized")
+	}
+	cacheStores.DropKV(table)
+	return nil
+}

+ 0 - 4
internal/topo/checkpoint/coordinator.go

@@ -262,10 +262,6 @@ func (c *Coordinator) complete(checkpointId int64) {
 			//TODO handle checkpoint error
 			//TODO handle checkpoint error
 			return
 			return
 		}
 		}
-		//sink save cache
-		for _, sink := range c.sinkTasks {
-			sink.SaveCache()
-		}
 		c.completedCheckpoints.add(ccp.(*pendingCheckpoint).finalize())
 		c.completedCheckpoints.add(ccp.(*pendingCheckpoint).finalize())
 		c.pendingCheckpoints.Delete(checkpointId)
 		c.pendingCheckpoints.Delete(checkpointId)
 		//Drop the previous pendingCheckpoints
 		//Drop the previous pendingCheckpoints

+ 1 - 3
internal/topo/checkpoint/defs.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -35,8 +35,6 @@ type NonSourceTask interface {
 
 
 type SinkTask interface {
 type SinkTask interface {
 	NonSourceTask
 	NonSourceTask
-
-	SaveCache()
 }
 }
 
 
 type BufferOrEvent struct {
 type BufferOrEvent struct {

+ 3 - 7
internal/topo/connection/clients/mqtt/mqtt.go

@@ -97,11 +97,7 @@ func (ms *MQTTClient) CfgValidate(props map[string]interface{}) error {
 	return nil
 	return nil
 }
 }
 
 
-func (ms *MQTTClient) onConnectLost(_ MQTT.Client, err error) {
-	conf.Log.Warnf("The connection to mqtt broker %s client id %s disconnected with error: %s ", ms.srv, ms.clientid, err.Error())
-}
-
-func (ms *MQTTClient) Connect(handler MQTT.OnConnectHandler) error {
+func (ms *MQTTClient) Connect(connHandler MQTT.OnConnectHandler, lostHandler MQTT.ConnectionLostHandler) error {
 
 
 	opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetProtocolVersion(ms.pVersion)
 	opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetProtocolVersion(ms.pVersion)
 
 
@@ -115,8 +111,8 @@ func (ms *MQTTClient) Connect(handler MQTT.OnConnectHandler) error {
 	}
 	}
 	opts = opts.SetClientID(ms.clientid)
 	opts = opts.SetClientID(ms.clientid)
 	opts = opts.SetAutoReconnect(true)
 	opts = opts.SetAutoReconnect(true)
-	opts.OnConnect = handler
-	opts.OnConnectionLost = ms.onConnectLost
+	opts.OnConnect = connHandler
+	opts.OnConnectionLost = lostHandler
 
 
 	c := MQTT.NewClient(opts)
 	c := MQTT.NewClient(opts)
 	if token := c.Connect(); token.Wait() && token.Error() != nil {
 	if token := c.Connect(); token.Wait() && token.Error() != nil {

+ 28 - 4
internal/topo/connection/clients/mqtt/mqtt_wrapper.go

@@ -20,6 +20,7 @@ import (
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/topo/connection/clients"
 	"github.com/lf-edge/ekuiper/internal/topo/connection/clients"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/errorx"
 	"strings"
 	"strings"
 	"sync"
 	"sync"
 )
 )
@@ -45,6 +46,8 @@ type mqttClientWrapper struct {
 
 
 	conSelector string
 	conSelector string
 
 
+	connected bool
+
 	refLock sync.RWMutex
 	refLock sync.RWMutex
 	refCnt  uint64
 	refCnt  uint64
 }
 }
@@ -66,7 +69,7 @@ func NewMqttClientWrapper(props map[string]interface{}) (clients.ClientWrapper,
 		refCnt:             1,
 		refCnt:             1,
 	}
 	}
 
 
-	err = client.Connect(cliWpr.onConnectHandler)
+	err = client.Connect(cliWpr.onConnectHandler, cliWpr.onConnectLost)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
@@ -78,8 +81,9 @@ func (mc *mqttClientWrapper) onConnectHandler(_ pahoMqtt.Client) {
 	// activeSubscriptions will be empty on the first connection.
 	// activeSubscriptions will be empty on the first connection.
 	// On a re-connect is when the subscriptions must be re-created.
 	// On a re-connect is when the subscriptions must be re-created.
 	conf.Log.Infof("The connection to mqtt broker %s client id %s established", mc.cli.srv, mc.cli.clientid)
 	conf.Log.Infof("The connection to mqtt broker %s client id %s established", mc.cli.srv, mc.cli.clientid)
-	mc.subLock.RLock()
-	defer mc.subLock.RUnlock()
+	mc.subLock.Lock()
+	defer mc.subLock.Unlock()
+	mc.connected = true
 	for topic, subscription := range mc.topicSubscriptions {
 	for topic, subscription := range mc.topicSubscriptions {
 		token := mc.cli.conn.Subscribe(topic, subscription.qos, subscription.topicHandler)
 		token := mc.cli.conn.Subscribe(topic, subscription.qos, subscription.topicHandler)
 		if token.Error() != nil {
 		if token.Error() != nil {
@@ -90,6 +94,13 @@ func (mc *mqttClientWrapper) onConnectHandler(_ pahoMqtt.Client) {
 	}
 	}
 }
 }
 
 
+func (mc *mqttClientWrapper) onConnectLost(_ pahoMqtt.Client, err error) {
+	mc.subLock.Lock()
+	defer mc.subLock.Unlock()
+	mc.connected = false
+	conf.Log.Warnf("The connection to mqtt broker %s client id %s disconnected with error: %s ", mc.cli.srv, mc.cli.clientid, err.Error())
+}
+
 func (mc *mqttClientWrapper) newMessageHandler(sub *mqttSubscriptionInfo) pahoMqtt.MessageHandler {
 func (mc *mqttClientWrapper) newMessageHandler(sub *mqttSubscriptionInfo) pahoMqtt.MessageHandler {
 	return func(client pahoMqtt.Client, message pahoMqtt.Message) {
 	return func(client pahoMqtt.Client, message pahoMqtt.Message) {
 		if sub != nil {
 		if sub != nil {
@@ -107,6 +118,10 @@ func (mc *mqttClientWrapper) newMessageHandler(sub *mqttSubscriptionInfo) pahoMq
 }
 }
 
 
 func (mc *mqttClientWrapper) Publish(_ api.StreamContext, topic string, message []byte, params map[string]interface{}) error {
 func (mc *mqttClientWrapper) Publish(_ api.StreamContext, topic string, message []byte, params map[string]interface{}) error {
+	err := mc.checkConn()
+	if err != nil {
+		return err
+	}
 	var Qos byte = 0
 	var Qos byte = 0
 	if pq, ok := params["qos"]; ok {
 	if pq, ok := params["qos"]; ok {
 		if v, ok := pq.(byte); ok {
 		if v, ok := pq.(byte); ok {
@@ -120,7 +135,7 @@ func (mc *mqttClientWrapper) Publish(_ api.StreamContext, topic string, message
 		}
 		}
 	}
 	}
 
 
-	err := mc.cli.Publish(topic, Qos, retained, message)
+	err = mc.cli.Publish(topic, Qos, retained, message)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
@@ -128,6 +143,15 @@ func (mc *mqttClientWrapper) Publish(_ api.StreamContext, topic string, message
 	return nil
 	return nil
 }
 }
 
 
+func (mc *mqttClientWrapper) checkConn() error {
+	mc.subLock.RLock()
+	defer mc.subLock.RUnlock()
+	if !mc.connected {
+		return fmt.Errorf("%s: %s", errorx.IOErr, "mqtt client is not connected")
+	}
+	return nil
+}
+
 func (mc *mqttClientWrapper) Subscribe(c api.StreamContext, subChan []api.TopicChannel, messageErrors chan error, params map[string]interface{}) error {
 func (mc *mqttClientWrapper) Subscribe(c api.StreamContext, subChan []api.TopicChannel, messageErrors chan error, params map[string]interface{}) error {
 	log := c.GetLogger()
 	log := c.GetLogger()
 
 

+ 431 - 0
internal/topo/node/cache/sync_cache.go

@@ -0,0 +1,431 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package cache
+
+import (
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/pkg/store"
+	"github.com/lf-edge/ekuiper/internal/topo/checkpoint"
+	"github.com/lf-edge/ekuiper/internal/topo/node/metric"
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/infra"
+	"github.com/lf-edge/ekuiper/pkg/kv"
+	"path"
+	"sort"
+	"strconv"
+	"time"
+)
+
+type AckResult bool
+
+// page Rotate storage for in memory cache
+// Not thread safe!
+type page struct {
+	Data []interface{}
+	H    int
+	T    int
+	L    int
+	Size int
+}
+
+// newPage create a new cache page
+// TODO the page is created even not used, need dynamic?
+func newPage(size int) *page {
+	return &page{
+		Data: make([]interface{}, size),
+		H:    0, // When deleting, head++, if tail == head, it is empty
+		T:    0, // When append, tail++, if tail== head, it is full
+		Size: size,
+	}
+}
+
+// append item if list is not full and return true; otherwise return false
+func (p *page) append(item interface{}) bool {
+	if p.L == p.Size { // full
+		return false
+	}
+	p.Data[p.T] = item
+	p.T++
+	if p.T == p.Size {
+		p.T = 0
+	}
+	p.L++
+	return true
+}
+
+// peak get the first item in the cache
+func (p *page) peak() (interface{}, bool) {
+	if p.L == 0 {
+		return nil, false
+	}
+	return p.Data[p.H], true
+}
+
+func (p *page) delete() bool {
+	if p.L == 0 {
+		return false
+	}
+	p.H++
+	if p.H == p.Size {
+		p.H = 0
+	}
+	p.L--
+	return true
+}
+
+func (p *page) isEmpty() bool {
+	return p.L == 0
+}
+
+func (p *page) reset() {
+	p.H = 0
+	p.T = 0
+	p.L = 0
+}
+
+type SyncCache struct {
+	// The input data to the cache
+	in        <-chan interface{}
+	Out       chan interface{}
+	Ack       chan bool
+	cacheCtrl chan interface{} // CacheCtrl is the only place to control the cache; sync in and ack result
+	errorCh   chan<- error
+	stats     metric.StatManager
+	// cache config
+	cacheConf   *conf.SinkConf
+	maxDiskPage int
+	// cache storage
+	memCache       []*page
+	diskBufferPage *page
+	// status
+	memSize      int // how many pages in memory has been saved
+	diskSize     int // how many pages has been saved
+	cacheLength  int //readonly, for metrics only to save calculation
+	diskPageTail int // init from the database
+	diskPageHead int
+	pending      bool
+	//serialize
+	store kv.KeyValue
+}
+
+func NewSyncCache(ctx api.StreamContext, in <-chan interface{}, errCh chan<- error, stats metric.StatManager, cacheConf *conf.SinkConf, bufferLength int) *SyncCache {
+	c := &SyncCache{
+		cacheConf: cacheConf,
+		in:        in,
+		Out:       make(chan interface{}, bufferLength),
+		Ack:       make(chan bool, 10),
+		cacheCtrl: make(chan interface{}, 10),
+		errorCh:   errCh,
+		memCache:  make([]*page, 0, cacheConf.MemoryCacheThreshold/cacheConf.BufferPageSize),
+		// add one more slot so that there will be at least one slot between head and tail to find out the head/tail id
+		maxDiskPage: (cacheConf.MaxDiskCache / cacheConf.BufferPageSize) + 1,
+		stats:       stats,
+	}
+	go func() {
+		err := infra.SafeRun(func() error {
+			c.run(ctx)
+			return nil
+		})
+		if err != nil {
+			infra.DrainError(ctx, err, errCh)
+		}
+	}()
+	return c
+}
+
+func (c *SyncCache) run(ctx api.StreamContext) {
+	c.initStore(ctx)
+	defer c.onClose(ctx)
+	for {
+		select {
+		case item := <-c.in:
+			// possibility of barrier, ignore if found
+			if boe, ok := item.(*checkpoint.BufferOrEvent); ok {
+				if _, ok := boe.Data.(*checkpoint.Barrier); ok {
+					c.Out <- item
+					ctx.GetLogger().Debugf("sink cache send out barrier %v", boe.Data)
+					break
+				}
+			}
+			c.stats.IncTotalRecordsIn()
+			ctx.GetLogger().Debugf("send to cache")
+			c.cacheCtrl <- item
+			ctx.GetLogger().Debugf("cache done")
+		case isSuccess := <-c.Ack:
+			// only send the next sink after receiving an ack
+			ctx.GetLogger().Debugf("cache ack")
+			c.cacheCtrl <- AckResult(isSuccess)
+			ctx.GetLogger().Debugf("cache ack done")
+		case data := <-c.cacheCtrl: // The only place to manipulate cache
+			switch r := data.(type) {
+			case AckResult:
+				if r {
+					ctx.GetLogger().Debugf("deleting cache")
+					c.deleteCache(ctx)
+				}
+				c.pending = false
+			default:
+				ctx.GetLogger().Debugf("adding cache %v", data)
+				c.addCache(ctx, data)
+			}
+			c.stats.SetBufferLength(int64(len(c.in) + c.cacheLength))
+			if !c.pending {
+				if c.pending {
+					time.Sleep(time.Duration(c.cacheConf.ResendInterval) * time.Millisecond)
+				}
+				d, ok := c.peakMemCache(ctx)
+				if ok {
+					ctx.GetLogger().Debugf("sending cache item %v", d)
+					c.pending = true
+					select {
+					case c.Out <- d:
+						ctx.GetLogger().Debugf("sink cache send out %v", d)
+					case <-ctx.Done():
+						ctx.GetLogger().Debugf("stop sink cache send")
+					}
+				} else {
+					c.pending = false
+				}
+			}
+		case <-ctx.Done():
+			ctx.GetLogger().Infof("sink node %s instance cache %d done", ctx.GetOpId(), ctx.GetInstanceId())
+			return
+		}
+	}
+}
+
+// addCache not thread safe!
+func (c *SyncCache) addCache(ctx api.StreamContext, item interface{}) {
+	isNotFull := c.appendMemCache(item)
+	if !isNotFull {
+		if c.diskBufferPage == nil {
+			c.diskBufferPage = newPage(c.cacheConf.BufferPageSize)
+		}
+		isBufferNotFull := c.diskBufferPage.append(item)
+		if !isBufferNotFull { // cool page full, save to disk
+			if c.diskSize == c.maxDiskPage {
+				// disk full, read the oldest page to the hot page
+				c.loadFromDisk(ctx)
+				c.cacheLength -= c.cacheConf.BufferPageSize
+			}
+			err := c.store.Set(strconv.Itoa(c.diskPageTail), c.diskBufferPage)
+			if err != nil {
+				ctx.GetLogger().Errorf("fail to store disk cache %v", err)
+				return
+			} else {
+				c.diskPageTail++
+				c.diskSize++
+				// rotate
+				if c.diskPageTail == c.maxDiskPage {
+					c.diskPageTail = 0
+				}
+			}
+			c.diskBufferPage.reset()
+			c.diskBufferPage.append(item)
+		}
+	}
+	c.cacheLength++
+	ctx.GetLogger().Debugf("added cache")
+}
+
+// deleteCache not thread safe!
+func (c *SyncCache) deleteCache(ctx api.StreamContext) {
+	if len(c.memCache) == 0 {
+		return
+	}
+	isNotEmpty := c.memCache[0].delete()
+	if isNotEmpty {
+		c.cacheLength--
+		ctx.GetLogger().Debugf("deleted cache: %d", c.cacheLength)
+	}
+	if c.memCache[0].isEmpty() { // read from disk or cool list
+		c.memCache = c.memCache[1:]
+		if c.diskSize > 0 {
+			c.loadFromDisk(ctx)
+		} else if c.diskBufferPage != nil { // use cool page as the new page
+			c.memCache = append(c.memCache, c.diskBufferPage)
+			c.diskBufferPage = nil
+		}
+	}
+}
+
+func (c *SyncCache) loadFromDisk(ctx api.StreamContext) {
+	// load page from the disk
+	hotPage := newPage(c.cacheConf.BufferPageSize)
+	ok, err := c.store.Get(strconv.Itoa(c.diskPageHead), hotPage)
+	if err != nil {
+		ctx.GetLogger().Errorf("fail to load disk cache %v", err)
+	} else if !ok {
+		ctx.GetLogger().Errorf("nothing in the disk, should not happen")
+	} else {
+		if len(c.memCache) > 0 {
+			ctx.GetLogger().Debugf("drop a page in memory")
+			c.memCache = c.memCache[1:]
+		}
+		c.memCache = append(c.memCache, hotPage)
+		c.diskPageHead++
+		c.diskSize--
+		if c.diskPageHead == c.maxDiskPage {
+			c.diskPageHead = 0
+		}
+	}
+}
+
+func (c *SyncCache) appendMemCache(item interface{}) bool {
+	if c.memSize == cap(c.memCache) {
+		return false
+	}
+	if len(c.memCache) <= c.memSize {
+		c.memCache = append(c.memCache, newPage(c.cacheConf.BufferPageSize))
+	}
+	isNotFull := c.memCache[c.memSize].append(item)
+	if !isNotFull {
+		c.memSize++
+		if c.memSize == cap(c.memCache) {
+			return false
+		}
+		c.memCache = append(c.memCache, newPage(c.cacheConf.BufferPageSize))
+		return c.memCache[c.memSize].append(item)
+	}
+	return true
+}
+
+func (c *SyncCache) peakMemCache(ctx api.StreamContext) (interface{}, bool) {
+	if len(c.memCache) == 0 {
+		return nil, false
+	}
+	return c.memCache[0].peak()
+}
+
+func (c *SyncCache) initStore(ctx api.StreamContext) {
+	kvTable := path.Join("sink", ctx.GetRuleId()+ctx.GetOpId()+strconv.Itoa(ctx.GetInstanceId()))
+	if c.cacheConf.CleanCacheAtStop {
+		ctx.GetLogger().Infof("creating cache store %s", kvTable)
+		store.DropCacheKV(kvTable)
+	}
+	var err error
+	err, c.store = store.GetCacheKV(kvTable)
+	if err != nil {
+		infra.DrainError(ctx, err, c.errorCh)
+	}
+	// restore the sink cache from disk
+	if !c.cacheConf.CleanCacheAtStop {
+		// Save 0 when init and save 1 when close. Wait for close for newly started sink node
+		var set int
+		ok, err := c.store.Get("storeSig", &set)
+		if ok && set == 0 { // may be saving
+			var i = 0
+			for ; i < 100; i++ {
+				time.Sleep(time.Millisecond * 10)
+				_, err = c.store.Get("storeSig", &set)
+				if set == 1 {
+					ctx.GetLogger().Infof("waiting for previous cache for %d times", i)
+					break
+				}
+			}
+			if i == 100 {
+				ctx.GetLogger().Errorf("waiting for previous cache for %d times, exit and drop", i)
+			}
+		}
+		c.store.Set("storeSig", 0)
+		ctx.GetLogger().Infof("start to restore cache from disk")
+		// restore the memCache
+		_, err = c.store.Get("memcache", &c.memCache)
+		if err != nil {
+			ctx.GetLogger().Errorf("fail to restore mem cache %v", err)
+		}
+		c.memSize = len(c.memCache)
+		for _, p := range c.memCache {
+			c.cacheLength += p.L
+		}
+		err = c.store.Delete("memcache")
+		if err != nil {
+			ctx.GetLogger().Errorf("fail to delete mem cache %v", err)
+		}
+		ctx.GetLogger().Infof("restored mem cache %d", c.cacheLength)
+		// restore the disk cache
+		dks, err := c.store.Keys()
+		if err != nil {
+			ctx.GetLogger().Errorf("fail to restore disk cache %v", err)
+			return
+		}
+		if len(dks) == 0 {
+			return
+		}
+		dk := make([]int, 0, len(dks))
+		for _, d := range dks {
+			aint, err := strconv.Atoi(d)
+			if err == nil { // filter mem cache
+				dk = append(dk, aint)
+			}
+		}
+		if len(dk) == 0 {
+			return
+		}
+		c.diskSize = len(dk) - 1
+		c.cacheLength += c.diskSize * c.cacheConf.BufferPageSize
+		sort.Ints(dk)
+		// default
+		c.diskPageHead = dk[0]
+		c.diskPageTail = dk[len(dk)-1]
+		for i, k := range dk {
+			if i-1 >= 0 {
+				if k-dk[i-1] > 1 {
+					c.diskPageTail = i - 1
+					c.diskPageHead = k
+				}
+			}
+		}
+		// load buffer page
+		hotPage := newPage(c.cacheConf.BufferPageSize)
+		ok, err = c.store.Get(strconv.Itoa(c.diskPageTail), hotPage)
+		if err != nil {
+			ctx.GetLogger().Errorf("fail to load disk cache to buffer %v", err)
+		} else if !ok {
+			ctx.GetLogger().Errorf("nothing in the disk, should not happen")
+		} else {
+			c.diskBufferPage = hotPage
+			c.cacheLength += c.diskBufferPage.L
+		}
+		ctx.GetLogger().Infof("restored all cache %d", c.cacheLength)
+	}
+}
+
+// save memory states to disk
+func (c *SyncCache) onClose(ctx api.StreamContext) {
+	ctx.GetLogger().Infof("sink node %s instance cache %d closing", ctx.GetOpId(), ctx.GetInstanceId())
+	if c.cacheConf.CleanCacheAtStop {
+		kvTable := path.Join("sink", ctx.GetRuleId()+ctx.GetOpId()+strconv.Itoa(ctx.GetInstanceId()))
+		ctx.GetLogger().Infof("cleaning cache store %s", kvTable)
+		store.DropCacheKV(kvTable)
+	} else {
+		if c.diskBufferPage != nil {
+			err := c.store.Set(strconv.Itoa(c.diskPageTail), c.diskBufferPage)
+			if err != nil {
+				ctx.GetLogger().Errorf("fail to store disk cache %v", err)
+			}
+			ctx.GetLogger().Debug("store disk cache")
+		}
+		// store the memory states
+		if len(c.memCache) > 0 {
+			err := c.store.Set("memcache", c.memCache)
+			if err != nil {
+				ctx.GetLogger().Errorf("fail to store memory cache to disk %v", err)
+			}
+			ctx.GetLogger().Debugf("store memory cache %d", c.memSize)
+		}
+		c.store.Set("storeSig", 1)
+	}
+}

+ 236 - 0
internal/topo/node/cache/sync_cache_test.go

@@ -0,0 +1,236 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package cache
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/testx"
+	"github.com/lf-edge/ekuiper/internal/topo/context"
+	"github.com/lf-edge/ekuiper/internal/topo/node/metric"
+	"github.com/lf-edge/ekuiper/internal/topo/state"
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"os"
+	"path/filepath"
+	"reflect"
+	"testing"
+	"time"
+)
+
+func TestPage(t *testing.T) {
+	p := newPage(2)
+	if !p.isEmpty() {
+		t.Errorf("page is not empty")
+	}
+	if !p.append(1) {
+		t.Fatal("append failed")
+	}
+	if !p.append(2) {
+		t.Fatal("append failed")
+	}
+	if p.append(3) {
+		t.Fatal("should append fail")
+	}
+	v, ok := p.peak()
+	if !ok {
+		t.Fatal("peak failed")
+	}
+	if v != 1 {
+		t.Fatalf("peak value mismatch, expect 3 but got %v", v)
+	}
+	if p.append(4) {
+		t.Fatal("should append failed")
+	}
+	if !p.delete() {
+		t.Fatal("delete failed")
+	}
+	v, ok = p.peak()
+	if !ok {
+		t.Fatal("peak failed")
+	}
+	if v != 2 {
+		t.Fatalf("peak value mismatch, expect 2 but got %v", v)
+	}
+	p.reset()
+	if !p.append(5) {
+		t.Fatal("append failed")
+	}
+	if p.isEmpty() {
+		t.Fatal("page should not empty")
+	}
+	if !p.delete() {
+		t.Fatal("delete failed")
+	}
+	if !p.append(5) {
+		t.Fatal("append failed")
+	}
+	if !p.append(6) {
+		t.Fatal("append failed")
+	}
+	if !p.delete() {
+		t.Fatal("delete failed")
+	}
+	if !p.delete() {
+		t.Fatal("delete failed")
+	}
+	if p.delete() {
+		t.Fatal("should delete failed")
+	}
+	if !p.isEmpty() {
+		t.Fatal("page should be empty")
+	}
+}
+
+// TestRun test for
+// 1. cache in memory only
+// 2. cache in memory and disk buffer only
+// 3. cache in memory and disk
+// 4. cache in memory and disk buffer and overflow
+// Each flow test rule restart
+// Each flow use slightly different config like bufferPageSize
+func TestRun(t *testing.T) {
+	var tests = []struct {
+		sconf   *conf.SinkConf
+		dataIn  []interface{}
+		dataOut []interface{}
+		stopPt  int // restart the rule in this point
+	}{
+		{ // 0
+			sconf: &conf.SinkConf{
+				MemoryCacheThreshold: 6,
+				MaxDiskCache:         12,
+				BufferPageSize:       3,
+				EnableCache:          true,
+				ResendInterval:       0,
+				CleanCacheAtStop:     false,
+			},
+			dataIn: []interface{}{
+				1, 2, 3, 4, 5,
+			},
+			stopPt: 4,
+		},
+		{ // 1
+			sconf: &conf.SinkConf{
+				MemoryCacheThreshold: 4,
+				MaxDiskCache:         8,
+				BufferPageSize:       2,
+				EnableCache:          true,
+				ResendInterval:       0,
+				CleanCacheAtStop:     false,
+			},
+			dataIn: []interface{}{
+				1, 2, 3, 4, 5, 6,
+			},
+			stopPt: 5,
+		},
+		{ // 2
+			sconf: &conf.SinkConf{
+				MemoryCacheThreshold: 1,
+				MaxDiskCache:         8,
+				BufferPageSize:       1,
+				EnableCache:          true,
+				ResendInterval:       0,
+				CleanCacheAtStop:     false,
+			},
+			dataIn: []interface{}{
+				1, 2, 3, 4, 5, 6,
+			},
+			stopPt: 4,
+		},
+		{ // 3
+			sconf: &conf.SinkConf{
+				MemoryCacheThreshold: 2,
+				MaxDiskCache:         4,
+				BufferPageSize:       2,
+				EnableCache:          true,
+				ResendInterval:       0,
+				CleanCacheAtStop:     false,
+			},
+			dataIn: []interface{}{
+				1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13,
+			},
+			dataOut: []interface{}{
+				1, 6, 7, 8, 9, 10, 11, 12, 13,
+			},
+			stopPt: 4,
+		},
+	}
+	testx.InitEnv()
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	tempStore, _ := state.CreateStore("mock", api.AtMostOnce)
+	deleteCachedb()
+	for i, tt := range tests {
+		contextLogger := conf.Log.WithField("rule", fmt.Sprintf("TestRun-%d", i))
+		ctx, cancel := context.WithValue(context.Background(), context.LoggerKey, contextLogger).WithMeta(fmt.Sprintf("rule%d", i), fmt.Sprintf("op%d", i), tempStore).WithCancel()
+		stats, err := metric.NewStatManager(ctx, "sink")
+		if err != nil {
+			t.Fatal(err)
+			return
+		}
+		in := make(chan interface{})
+		errCh := make(chan error)
+		var result []interface{}
+		go func() {
+			err := <-errCh
+			t.Fatal(err)
+			return
+		}()
+		// send data
+		sc := NewSyncCache(ctx, in, errCh, stats, tt.sconf, 100)
+		for i := 0; i < tt.stopPt; i++ {
+			in <- tt.dataIn[i]
+			time.Sleep(1 * time.Millisecond)
+		}
+		cancel()
+
+		// send the second half data
+		ctx, cancel = context.WithValue(context.Background(), context.LoggerKey, contextLogger).WithMeta(fmt.Sprintf("rule%d", i), fmt.Sprintf("op%d", i), tempStore).WithCancel()
+		sc = NewSyncCache(ctx, in, errCh, stats, tt.sconf, 100)
+		for i := tt.stopPt; i < len(tt.dataIn); i++ {
+			in <- tt.dataIn[i]
+			time.Sleep(1 * time.Millisecond)
+		}
+	loop:
+		for range tt.dataIn {
+			sc.Ack <- true
+			select {
+			case r := <-sc.Out:
+				result = append(result, r)
+			case <-time.After(1 * time.Second):
+				t.Log(fmt.Sprintf("test %d no data", i))
+				break loop
+			}
+		}
+
+		cancel()
+		if tt.dataOut == nil {
+			tt.dataOut = tt.dataIn
+		}
+		if !reflect.DeepEqual(tt.dataOut, result) {
+			t.Errorf("test %d data mismatch\nexpect\t%v\nbut got\t%v", i, tt.dataOut, result)
+		}
+	}
+}
+
+func deleteCachedb() {
+	loc, err := conf.GetDataLoc()
+	if err != nil {
+		fmt.Println(err)
+	}
+	err = os.RemoveAll(filepath.Join(loc, "cache.db"))
+	if err != nil {
+		fmt.Println(err)
+	}
+}

+ 3 - 2
internal/topo/node/join_align_node.go

@@ -16,6 +16,7 @@ package node
 
 
 import (
 import (
 	"fmt"
 	"fmt"
+	"github.com/lf-edge/ekuiper/internal/topo/node/metric"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/infra"
 	"github.com/lf-edge/ekuiper/pkg/infra"
@@ -25,7 +26,7 @@ import (
 // The input for batch table MUST be *WindowTuples
 // The input for batch table MUST be *WindowTuples
 type JoinAlignNode struct {
 type JoinAlignNode struct {
 	*defaultSinkNode
 	*defaultSinkNode
-	statManager StatManager
+	statManager metric.StatManager
 	emitters    map[string]int
 	emitters    map[string]int
 	// states
 	// states
 	batch *xsql.WindowTuplesSet
 	batch *xsql.WindowTuplesSet
@@ -61,7 +62,7 @@ func (n *JoinAlignNode) Exec(ctx api.StreamContext, errCh chan<- error) {
 		infra.DrainError(ctx, fmt.Errorf("no output channel found"), errCh)
 		infra.DrainError(ctx, fmt.Errorf("no output channel found"), errCh)
 		return
 		return
 	}
 	}
-	stats, err := NewStatManager(ctx, "op")
+	stats, err := metric.NewStatManager(ctx, "op")
 	if err != nil {
 	if err != nil {
 		infra.DrainError(ctx, fmt.Errorf("no output channel found"), errCh)
 		infra.DrainError(ctx, fmt.Errorf("no output channel found"), errCh)
 		return
 		return

+ 1 - 1
internal/topo/node/prometheus.go

@@ -15,7 +15,7 @@
 //go:build prometheus || !core
 //go:build prometheus || !core
 // +build prometheus !core
 // +build prometheus !core
 
 
-package node
+package metric
 
 
 import (
 import (
 	"github.com/prometheus/client_golang/prometheus"
 	"github.com/prometheus/client_golang/prometheus"

+ 1 - 1
internal/topo/node/stats_manager.go

@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // See the License for the specific language governing permissions and
 // limitations under the License.
 // limitations under the License.
 
 
-package node
+package metric
 
 
 import (
 import (
 	"fmt"
 	"fmt"

+ 1 - 1
internal/topo/node/stats_mem.go

@@ -15,7 +15,7 @@
 //go:build !prometheus && core
 //go:build !prometheus && core
 // +build !prometheus,core
 // +build !prometheus,core
 
 
-package node
+package metric
 
 
 import (
 import (
 	"fmt"
 	"fmt"

+ 1 - 1
internal/topo/node/stats_prom.go

@@ -15,7 +15,7 @@
 //go:build prometheus || !core
 //go:build prometheus || !core
 // +build prometheus !core
 // +build prometheus !core
 
 
-package node
+package metric
 
 
 import (
 import (
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"

+ 2 - 1
internal/topo/node/node.go

@@ -18,6 +18,7 @@ import (
 	"fmt"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/topo/checkpoint"
 	"github.com/lf-edge/ekuiper/internal/topo/checkpoint"
+	"github.com/lf-edge/ekuiper/internal/topo/node/metric"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"strings"
 	"strings"
@@ -48,7 +49,7 @@ type defaultNode struct {
 	outputs      map[string]chan<- interface{}
 	outputs      map[string]chan<- interface{}
 	concurrency  int
 	concurrency  int
 	sendError    bool
 	sendError    bool
-	statManagers []StatManager
+	statManagers []metric.StatManager
 	ctx          api.StreamContext
 	ctx          api.StreamContext
 	qos          api.Qos
 	qos          api.Qos
 }
 }

+ 2 - 1
internal/topo/node/operations.go

@@ -16,6 +16,7 @@ package node
 
 
 import (
 import (
 	"fmt"
 	"fmt"
+	"github.com/lf-edge/ekuiper/internal/topo/node/metric"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/infra"
 	"github.com/lf-edge/ekuiper/pkg/infra"
@@ -107,7 +108,7 @@ func (o *UnaryOperator) doOp(ctx api.StreamContext, errCh chan<- error) {
 		cancel()
 		cancel()
 	}()
 	}()
 
 
-	stats, err := NewStatManager(ctx, "op")
+	stats, err := metric.NewStatManager(ctx, "op")
 	if err != nil {
 	if err != nil {
 		infra.DrainError(ctx, err, errCh)
 		infra.DrainError(ctx, err, errCh)
 		return
 		return

+ 0 - 292
internal/topo/node/sink_cache.go

@@ -1,292 +0,0 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package node
-
-import (
-	"encoding/gob"
-	"fmt"
-	"github.com/lf-edge/ekuiper/internal/conf"
-	"github.com/lf-edge/ekuiper/internal/pkg/store"
-	"github.com/lf-edge/ekuiper/internal/topo/checkpoint"
-	"github.com/lf-edge/ekuiper/pkg/api"
-	"github.com/lf-edge/ekuiper/pkg/infra"
-	"github.com/lf-edge/ekuiper/pkg/kv"
-	"path"
-	"sort"
-	"strconv"
-)
-
-type CacheTuple struct {
-	index int
-	data  interface{}
-}
-
-type LinkedQueue struct {
-	Data map[int]interface{}
-	Tail int
-}
-
-func (l *LinkedQueue) append(item interface{}) {
-	l.Data[l.Tail] = item
-	l.Tail++
-}
-
-func (l *LinkedQueue) delete(index int) {
-	delete(l.Data, index)
-}
-
-func (l *LinkedQueue) reset() {
-	l.Tail = 0
-}
-
-func (l *LinkedQueue) length() int {
-	return len(l.Data)
-}
-
-func (l *LinkedQueue) clone() *LinkedQueue {
-	result := &LinkedQueue{
-		Data: make(map[int]interface{}),
-		Tail: l.Tail,
-	}
-	for k, v := range l.Data {
-		result.Data[k] = v
-	}
-	return result
-}
-
-func (l *LinkedQueue) String() string {
-	return fmt.Sprintf("tail: %d, data: %v", l.Tail, l.Data)
-}
-
-type Cache struct {
-	//Data and control channels
-	in       <-chan interface{}
-	Out      chan *CacheTuple
-	Complete chan int
-	errorCh  chan<- error
-	//states
-	pending *LinkedQueue
-	changed bool
-	//serialize
-	key   string //the key for current cache
-	store kv.KeyValue
-}
-
-func NewTimebasedCache(in <-chan interface{}, limit int, saveInterval int, errCh chan<- error, ctx api.StreamContext) *Cache {
-	c := &Cache{
-		in:       in,
-		Out:      make(chan *CacheTuple, limit),
-		Complete: make(chan int),
-		errorCh:  errCh,
-	}
-	go func() {
-		err := infra.SafeRun(func() error {
-			c.timebasedRun(ctx, saveInterval)
-			return nil
-		})
-		if err != nil {
-			infra.DrainError(ctx, err, errCh)
-		}
-	}()
-	return c
-}
-
-func (c *Cache) initStore(ctx api.StreamContext) {
-	logger := ctx.GetLogger()
-	c.pending = &LinkedQueue{
-		Data: make(map[int]interface{}),
-		Tail: 0,
-	}
-	var err error
-	err, c.store = store.GetKV(path.Join("sink", ctx.GetRuleId()))
-	if err != nil {
-		infra.DrainError(ctx, err, c.errorCh)
-		return
-	}
-	c.key = ctx.GetOpId() + strconv.Itoa(ctx.GetInstanceId())
-	logger.Debugf("cache saved to key %s", c.key)
-	//load cache
-	if err := c.loadCache(); err != nil {
-		infra.DrainError(ctx, err, c.errorCh)
-		return
-	}
-}
-
-func (c *Cache) timebasedRun(ctx api.StreamContext, saveInterval int) {
-	logger := ctx.GetLogger()
-	c.initStore(ctx)
-	ticker := conf.GetTicker(saveInterval)
-	defer ticker.Stop()
-	var tcount = 0
-	for {
-		select {
-		case item := <-c.in:
-			index := c.pending.Tail
-			c.pending.append(item)
-			//non blocking until limit exceeded
-			c.Out <- &CacheTuple{
-				index: index,
-				data:  item,
-			}
-			c.changed = true
-		case index := <-c.Complete:
-			c.pending.delete(index)
-			c.changed = true
-		case <-ticker.C:
-			tcount++
-			l := c.pending.length()
-			if l == 0 {
-				c.pending.reset()
-			}
-			//If the data is still changing, only do a save when the cache has more than threshold to prevent too much file IO
-			//If the data is not changing in the time slot and have not saved before, save it. This is to prevent the
-			//data won't be saved as the cache never pass the threshold
-			//logger.Infof("ticker %t, l=%d\n", c.changed, l)
-			if (c.changed && l > conf.Config.Sink.CacheThreshold) || (tcount == conf.Config.Sink.CacheTriggerCount && c.changed) {
-				logger.Infof("save cache for rule %s, %s", ctx.GetRuleId(), c.pending.String())
-				clone := c.pending.clone()
-				c.changed = false
-				go func() {
-					err := infra.SafeRun(func() error {
-						return c.saveCache(logger, clone)
-					})
-					if err != nil {
-						logger.Debugf("%v", err)
-						infra.DrainError(ctx, err, c.errorCh)
-						return
-					}
-				}()
-			}
-			if tcount >= conf.Config.Sink.CacheThreshold {
-				tcount = 0
-			}
-		case <-ctx.Done():
-			err := c.saveCache(logger, c.pending)
-			if err != nil {
-				logger.Warnf("Error found during saving cache: %s \n ", err)
-			}
-			logger.Infof("sink node %s instance cache %d done", ctx.GetOpId(), ctx.GetInstanceId())
-			return
-		}
-	}
-}
-
-func (c *Cache) loadCache() error {
-	gob.Register(c.pending)
-	mt := new(LinkedQueue)
-	if f, err := c.store.Get(c.key, &mt); f {
-		if nil != err {
-			return fmt.Errorf("load malform cache, found %v(%v)", c.key, mt)
-		}
-		c.pending = mt
-		c.changed = true
-		// To store the keys in slice in sorted order
-		var keys []int
-		for k := range mt.Data {
-			keys = append(keys, k)
-		}
-		sort.Ints(keys)
-		for _, k := range keys {
-			t := &CacheTuple{
-				index: k,
-				data:  mt.Data[k],
-			}
-			c.Out <- t
-		}
-		return nil
-	}
-	return nil
-}
-
-func (c *Cache) saveCache(logger api.Logger, p *LinkedQueue) error {
-	logger.Infof("clean the cache and reopen")
-	_ = c.store.Delete(c.key)
-
-	return c.store.Set(c.key, p)
-}
-
-func (c *Cache) Length() int {
-	return c.pending.length()
-}
-
-func NewCheckpointbasedCache(in <-chan interface{}, limit int, tch <-chan struct{}, errCh chan<- error, ctx api.StreamContext) *Cache {
-	c := &Cache{
-		in:       in,
-		Out:      make(chan *CacheTuple, limit),
-		Complete: make(chan int),
-		errorCh:  errCh,
-	}
-	go func() {
-		err := infra.SafeRun(func() error {
-			c.checkpointbasedRun(ctx, tch)
-			return nil
-		})
-		if err != nil {
-			infra.DrainError(ctx, err, c.errorCh)
-		}
-	}()
-	return c
-}
-
-func (c *Cache) checkpointbasedRun(ctx api.StreamContext, tch <-chan struct{}) {
-	logger := ctx.GetLogger()
-	c.initStore(ctx)
-
-	for {
-		select {
-		case item := <-c.in:
-			// possibility of barrier, ignore if found
-			if boe, ok := item.(*checkpoint.BufferOrEvent); ok {
-				if _, ok := boe.Data.(*checkpoint.Barrier); ok {
-					c.Out <- &CacheTuple{
-						data: item,
-					}
-					logger.Debugf("sink cache send out barrier %v", boe.Data)
-					break
-				}
-			}
-			index := c.pending.Tail
-			c.pending.append(item)
-			//non blocking until limit exceeded
-			c.Out <- &CacheTuple{
-				index: index,
-				data:  item,
-			}
-			logger.Debugf("sink cache send out tuple %v", item)
-			c.changed = true
-		case index := <-c.Complete:
-			c.pending.delete(index)
-			c.changed = true
-		case <-tch:
-			logger.Infof("save cache for rule %s, %s", ctx.GetRuleId(), c.pending.String())
-			clone := c.pending.clone()
-			if c.changed {
-				go func() {
-					err := infra.SafeRun(func() error {
-						return c.saveCache(logger, clone)
-					})
-					if err != nil {
-						logger.Debugf("%v", err)
-						infra.DrainError(ctx, err, c.errorCh)
-					}
-				}()
-			}
-			c.changed = false
-		case <-ctx.Done():
-			logger.Infof("sink node %s instance cache %d done", ctx.GetOpId(), ctx.GetInstanceId())
-			return
-		}
-	}
-}

+ 115 - 136
internal/topo/node/sink_node.go

@@ -19,6 +19,8 @@ import (
 	"github.com/lf-edge/ekuiper/internal/binder/io"
 	"github.com/lf-edge/ekuiper/internal/binder/io"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
+	"github.com/lf-edge/ekuiper/internal/topo/node/cache"
+	"github.com/lf-edge/ekuiper/internal/topo/node/metric"
 	"github.com/lf-edge/ekuiper/internal/topo/transform"
 	"github.com/lf-edge/ekuiper/internal/topo/transform"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
@@ -27,21 +29,18 @@ import (
 	"github.com/lf-edge/ekuiper/pkg/message"
 	"github.com/lf-edge/ekuiper/pkg/message"
 	"strings"
 	"strings"
 	"sync"
 	"sync"
-	"time"
 )
 )
 
 
 type SinkConf struct {
 type SinkConf struct {
-	Concurrency       int    `json:"concurrency"`
-	RunAsync          bool   `json:"runAsync"`
-	RetryInterval     int    `json:"retryInterval"`
-	RetryCount        int    `json:"retryCount"`
-	CacheLength       int    `json:"cacheLength"`
-	CacheSaveInterval int    `json:"cacheSaveInterval"`
-	Omitempty         bool   `json:"omitIfEmpty"`
-	SendSingle        bool   `json:"sendSingle"`
-	DataTemplate      string `json:"dataTemplate"`
-	Format            string `json:"format"`
-	SchemaId          string `json:"schemaId"`
+	Concurrency  int    `json:"concurrency"`
+	RunAsync     bool   `json:"runAsync"`
+	Omitempty    bool   `json:"omitIfEmpty"`
+	SendSingle   bool   `json:"sendSingle"`
+	DataTemplate string `json:"dataTemplate"`
+	Format       string `json:"format"`
+	SchemaId     string `json:"schemaId"`
+	BufferLength int    `json:"bufferLength"`
+	conf.SinkConf
 }
 }
 
 
 type SinkNode struct {
 type SinkNode struct {
@@ -54,7 +53,6 @@ type SinkNode struct {
 	isMock  bool
 	isMock  bool
 	//states varies after restart
 	//states varies after restart
 	sinks []api.Sink
 	sinks []api.Sink
-	tch   chan struct{} //channel to trigger cache saved, will be trigger by checkpoint only
 }
 }
 
 
 func NewSinkNode(name string, sinkType string, props map[string]interface{}) *SinkNode {
 func NewSinkNode(name string, sinkType string, props map[string]interface{}) *SinkNode {
@@ -101,52 +99,11 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 	m.ctx = ctx
 	m.ctx = ctx
 	logger := ctx.GetLogger()
 	logger := ctx.GetLogger()
 	logger.Debugf("open sink node %s", m.name)
 	logger.Debugf("open sink node %s", m.name)
-	if m.qos >= api.AtLeastOnce {
-		m.tch = make(chan struct{})
-	}
 	go func() {
 	go func() {
 		err := infra.SafeRun(func() error {
 		err := infra.SafeRun(func() error {
-			sconf := &SinkConf{
-				Concurrency:       1,
-				RunAsync:          false,
-				RetryInterval:     1000,
-				RetryCount:        0,
-				CacheLength:       1024,
-				CacheSaveInterval: 1000,
-				Omitempty:         false,
-				SendSingle:        false,
-				DataTemplate:      "",
-			}
-			err := cast.MapToStruct(m.options, sconf)
+			sconf, err := m.parseConf(logger)
 			if err != nil {
 			if err != nil {
-				return fmt.Errorf("read properties %v fail with error: %v", m.options, err)
-			}
-			if sconf.Concurrency <= 0 {
-				logger.Warnf("invalid type for concurrency property, should be positive integer but found %t", sconf.Concurrency)
-				sconf.Concurrency = 1
-			}
-			m.concurrency = sconf.Concurrency
-			if sconf.RetryInterval <= 0 {
-				logger.Warnf("invalid type for retryInterval property, should be positive integer but found %t", sconf.RetryInterval)
-				sconf.RetryInterval = 1000
-			}
-			if sconf.RetryCount < 0 {
-				logger.Warnf("invalid type for retryCount property, should be positive integer but found %t", sconf.RetryCount)
-				sconf.RetryCount = 3
-			}
-			if sconf.CacheLength < 0 {
-				logger.Warnf("invalid type for cacheLength property, should be positive integer but found %t", sconf.CacheLength)
-				sconf.CacheLength = 1024
-			}
-			if sconf.CacheSaveInterval < 0 {
-				logger.Warnf("invalid type for cacheSaveInterval property, should be positive integer but found %t", sconf.CacheSaveInterval)
-				sconf.CacheSaveInterval = 1000
-			}
-			if sconf.Format == "" {
-				sconf.Format = "json"
-			} else if sconf.Format != message.FormatJson && sconf.Format != message.FormatProtobuf {
-				logger.Warnf("invalid type for format property, should be json or protobuf but found %s", sconf.Format)
-				sconf.Format = "json"
+				return err
 			}
 			}
 
 
 			tf, err := transform.GenTransform(sconf.DataTemplate, sconf.Format, sconf.SchemaId)
 			tf, err := transform.GenTransform(sconf.DataTemplate, sconf.Format, sconf.SchemaId)
@@ -185,7 +142,7 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 							sink = m.sinks[instance]
 							sink = m.sinks[instance]
 						}
 						}
 
 
-						stats, err := NewStatManager(ctx, "sink")
+						stats, err := metric.NewStatManager(ctx, "sink")
 						if err != nil {
 						if err != nil {
 							return err
 							return err
 						}
 						}
@@ -193,7 +150,7 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 						m.statManagers = append(m.statManagers, stats)
 						m.statManagers = append(m.statManagers, stats)
 						m.mutex.Unlock()
 						m.mutex.Unlock()
 
 
-						if conf.Config.Sink.DisableCache {
+						if !sconf.EnableCache {
 							for {
 							for {
 								select {
 								select {
 								case data := <-m.input:
 								case data := <-m.input:
@@ -203,10 +160,11 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 										data = temp
 										data = temp
 									}
 									}
 									stats.SetBufferLength(int64(len(m.input)))
 									stats.SetBufferLength(int64(len(m.input)))
+									stats.IncTotalRecordsIn()
 									if sconf.RunAsync {
 									if sconf.RunAsync {
 										go func() {
 										go func() {
 											p := infra.SafeRun(func() error {
 											p := infra.SafeRun(func() error {
-												doCollect(ctx, sink, data, stats, sconf)
+												_ = doCollect(ctx, sink, data, stats, sconf)
 												return nil
 												return nil
 											})
 											})
 											if p != nil {
 											if p != nil {
@@ -214,7 +172,7 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 											}
 											}
 										}()
 										}()
 									} else {
 									} else {
-										doCollect(ctx, sink, data, stats, sconf)
+										_ = doCollect(ctx, sink, data, stats, sconf)
 									}
 									}
 								case <-ctx.Done():
 								case <-ctx.Done():
 									logger.Infof("sink node %s instance %d done", m.name, instance)
 									logger.Infof("sink node %s instance %d done", m.name, instance)
@@ -222,56 +180,51 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 										logger.Warnf("close sink node %s instance %d fails: %v", m.name, instance, err)
 										logger.Warnf("close sink node %s instance %d fails: %v", m.name, instance, err)
 									}
 									}
 									return nil
 									return nil
-								case <-m.tch:
-									logger.Debugf("rule %s sink receive checkpoint, do nothing", ctx.GetRuleId())
 								}
 								}
 							}
 							}
 						} else {
 						} else {
 							logger.Infof("Creating sink cache")
 							logger.Infof("Creating sink cache")
-							var cache *Cache
-							if m.qos >= api.AtLeastOnce {
-								cache = NewCheckpointbasedCache(m.input, sconf.CacheLength, m.tch, result, ctx.WithInstance(instance))
-							} else {
-								cache = NewTimebasedCache(m.input, sconf.CacheLength, sconf.CacheSaveInterval, result, ctx.WithInstance(instance))
-							}
-							for {
-								select {
-								case data := <-cache.Out:
-									if temp, processed := m.preprocess(data.data); processed {
-										break
-									} else {
-										data.data = temp
-									}
-									stats.SetBufferLength(int64(len(m.input)))
-									if sconf.RunAsync {
-										go func() {
-											p := infra.SafeRun(func() error {
-												doCollect(ctx, sink, data.data, stats, sconf)
-												return nil
-											})
-											if p != nil {
-												infra.DrainError(ctx, p, result)
-											}
-										}()
-									} else {
-										doCollect(ctx, sink, data.data, stats, sconf)
-										if cache.Complete != nil {
-											select {
-											case cache.Complete <- data.index:
-											default:
-												ctx.GetLogger().Warnf("sink cache missing response for %d", data.index)
+							if sconf.RunAsync { // async mode, the ack must have an id
+								// is not supported and validated in the configure, should not go here
+								return fmt.Errorf("async mode is not supported for cache sink")
+							} else { // sync mode, the ack is already in order
+								c := cache.NewSyncCache(ctx, m.input, result, stats, &sconf.SinkConf, sconf.BufferLength)
+								for {
+									select {
+									case data := <-c.Out:
+										if temp, processed := m.preprocess(data); processed {
+											break
+										} else {
+											data = temp
+										}
+										ack := true
+										err := doCollect(ctx, sink, data, stats, sconf)
+										// Only recoverable error should be cached
+										if err != nil {
+											if strings.HasPrefix(err.Error(), errorx.IOErr) { // do not log to prevent a lot of logs!
+												ack = false
+											} else {
+												ctx.GetLogger().Warnf("sink node %s instance %d publish %s error: %v", ctx.GetOpId(), ctx.GetInstanceId(), data, err)
 											}
 											}
+										} else {
+											ctx.GetLogger().Debugf("sent data to MQTT: %v", data)
 										}
 										}
+										select {
+										case c.Ack <- ack:
+										case <-ctx.Done():
+										}
+									case <-ctx.Done():
+										logger.Infof("sink node %s instance %d done", m.name, instance)
+										if err := sink.Close(ctx); err != nil {
+											logger.Warnf("close sink node %s instance %d fails: %v", m.name, instance, err)
+										}
+										return nil
 									}
 									}
-								case <-ctx.Done():
-									logger.Infof("sink node %s instance %d done", m.name, instance)
-									if err := sink.Close(ctx); err != nil {
-										logger.Warnf("close sink node %s instance %d fails: %v", m.name, instance, err)
-									}
-									return nil
 								}
 								}
 							}
 							}
+
 						}
 						}
+						return nil
 					})
 					})
 					if panicOrError != nil {
 					if panicOrError != nil {
 						infra.DrainError(ctx, panicOrError, result)
 						infra.DrainError(ctx, panicOrError, result)
@@ -286,6 +239,45 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 	}()
 	}()
 }
 }
 
 
+func (m *SinkNode) parseConf(logger api.Logger) (*SinkConf, error) {
+	sconf := &SinkConf{
+		Concurrency:  1,
+		RunAsync:     false,
+		Omitempty:    false,
+		SendSingle:   false,
+		DataTemplate: "",
+		SinkConf:     *conf.Config.Sink,
+		BufferLength: 1024,
+	}
+	err := cast.MapToStruct(m.options, sconf)
+	if err != nil {
+		return nil, fmt.Errorf("read properties %v fail with error: %v", m.options, err)
+	}
+	if sconf.Concurrency <= 0 {
+		logger.Warnf("invalid type for concurrency property, should be positive integer but found %t", sconf.Concurrency)
+		sconf.Concurrency = 1
+	}
+	m.concurrency = sconf.Concurrency
+	if sconf.Format == "" {
+		sconf.Format = "json"
+	} else if sconf.Format != message.FormatJson && sconf.Format != message.FormatProtobuf {
+		logger.Warnf("invalid type for format property, should be json or protobuf but found %s", sconf.Format)
+		sconf.Format = "json"
+	}
+	err = cast.MapToStruct(m.options, &sconf.SinkConf)
+	if err != nil {
+		return nil, fmt.Errorf("read properties %v to cache conf fail with error: %v", m.options, err)
+	}
+	if sconf.SinkConf.EnableCache && sconf.RunAsync {
+		return nil, fmt.Errorf("cache is not supported for async sink, do not use enableCache and runAsync properties together")
+	}
+	err = sconf.SinkConf.Validate()
+	if err != nil {
+		return nil, fmt.Errorf("invalid cache properties: %v", err)
+	}
+	return sconf, err
+}
+
 func (m *SinkNode) reset() {
 func (m *SinkNode) reset() {
 	if !m.isMock {
 	if !m.isMock {
 		m.sinks = nil
 		m.sinks = nil
@@ -293,8 +285,7 @@ func (m *SinkNode) reset() {
 	m.statManagers = nil
 	m.statManagers = nil
 }
 }
 
 
-func doCollect(ctx api.StreamContext, sink api.Sink, item interface{}, stats StatManager, sconf *SinkConf) {
-	stats.IncTotalRecordsIn()
+func doCollect(ctx api.StreamContext, sink api.Sink, item interface{}, stats metric.StatManager, sconf *SinkConf) error {
 	stats.ProcessTimeStart()
 	stats.ProcessTimeStart()
 	defer stats.ProcessTimeEnd()
 	defer stats.ProcessTimeEnd()
 	var outs []map[string]interface{}
 	var outs []map[string]interface{}
@@ -312,47 +303,43 @@ func doCollect(ctx api.StreamContext, sink api.Sink, item interface{}, stats Sta
 	}
 	}
 	if sconf.Omitempty && (item == nil || len(outs) == 0) {
 	if sconf.Omitempty && (item == nil || len(outs) == 0) {
 		ctx.GetLogger().Debugf("receive empty in sink")
 		ctx.GetLogger().Debugf("receive empty in sink")
-		return
+		return nil
 	}
 	}
 	if !sconf.SendSingle {
 	if !sconf.SendSingle {
-		doCollectData(ctx, sink, outs, stats, sconf)
+		return doCollectData(ctx, sink, outs, stats)
 	} else {
 	} else {
+		var err error
 		for _, d := range outs {
 		for _, d := range outs {
 			if sconf.Omitempty && (d == nil || len(d) == 0) {
 			if sconf.Omitempty && (d == nil || len(d) == 0) {
 				ctx.GetLogger().Debugf("receive empty in sink")
 				ctx.GetLogger().Debugf("receive empty in sink")
 				continue
 				continue
 			}
 			}
-			doCollectData(ctx, sink, d, stats, sconf)
+			newErr := doCollectData(ctx, sink, d, stats)
+			if newErr != nil {
+				err = newErr
+			}
 		}
 		}
+		return err
 	}
 	}
 }
 }
 
 
 // doCollectData outData must be map or []map
 // doCollectData outData must be map or []map
-func doCollectData(ctx api.StreamContext, sink api.Sink, outData interface{}, stats StatManager, sconf *SinkConf) {
-	retries := sconf.RetryCount
-	for {
-		select {
-		case <-ctx.Done():
-			ctx.GetLogger().Infof("sink node %s instance %d stops data resending", ctx.GetOpId(), ctx.GetInstanceId())
-			return
-		default:
-			if err := sink.Collect(ctx, outData); err != nil {
-				stats.IncTotalExceptions()
-				ctx.GetLogger().Warnf("sink node %s instance %d publish %s error: %v", ctx.GetOpId(), ctx.GetInstanceId(), outData, err)
-				if sconf.RetryInterval > 0 && retries > 0 && strings.HasPrefix(err.Error(), errorx.IOErr) {
-					retries--
-					time.Sleep(time.Duration(sconf.RetryInterval) * time.Millisecond)
-					ctx.GetLogger().Debugf("try again")
-				} else {
-					return
-				}
-			} else {
-				ctx.GetLogger().Debugf("success")
-				stats.IncTotalRecordsOut()
-				return
-			}
+func doCollectData(ctx api.StreamContext, sink api.Sink, outData interface{}, stats metric.StatManager) error {
+	select {
+	case <-ctx.Done():
+		ctx.GetLogger().Infof("sink node %s instance %d stops data resending", ctx.GetOpId(), ctx.GetInstanceId())
+		return nil
+	default:
+		if err := sink.Collect(ctx, outData); err != nil {
+			stats.IncTotalExceptions()
+			return err
+		} else {
+			ctx.GetLogger().Debugf("success")
+			stats.IncTotalRecordsOut()
+			return nil
 		}
 		}
 	}
 	}
+
 }
 }
 
 
 func getSink(name string, action map[string]interface{}) (api.Sink, error) {
 func getSink(name string, action map[string]interface{}) (api.Sink, error) {
@@ -385,11 +372,3 @@ func (m *SinkNode) AddOutput(_ chan<- interface{}, name string) error {
 func (m *SinkNode) Broadcast(_ interface{}) error {
 func (m *SinkNode) Broadcast(_ interface{}) error {
 	return fmt.Errorf("sink %s cannot add broadcast", m.name)
 	return fmt.Errorf("sink %s cannot add broadcast", m.name)
 }
 }
-
-// SaveCache Only called when checkpoint enabled
-func (m *SinkNode) SaveCache() {
-	select {
-	case m.tch <- struct{}{}:
-	case <-m.ctx.Done():
-	}
-}

+ 103 - 0
internal/topo/node/sink_node_test.go

@@ -18,6 +18,7 @@
 package node
 package node
 
 
 import (
 import (
+	"errors"
 	"fmt"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/schema"
 	"github.com/lf-edge/ekuiper/internal/schema"
@@ -299,3 +300,105 @@ func TestFormat_Apply(t *testing.T) {
 		}
 		}
 	}
 	}
 }
 }
+
+func TestConfig(t *testing.T) {
+	var tests = []struct {
+		config map[string]interface{}
+		sconf  *SinkConf
+		err    error
+	}{
+		{
+			config: map[string]interface{}{
+				"sendSingle": true,
+			},
+			sconf: &SinkConf{
+				Concurrency:  1,
+				SendSingle:   true,
+				Format:       "json",
+				BufferLength: 1024,
+				SinkConf: conf.SinkConf{
+					MemoryCacheThreshold: 1024,
+					MaxDiskCache:         1024000,
+					BufferPageSize:       256,
+					EnableCache:          false,
+					ResendInterval:       0,
+					CleanCacheAtStop:     false,
+				},
+			},
+		}, {
+			config: map[string]interface{}{
+				"enableCache":          true,
+				"memoryCacheThreshold": 2,
+				"bufferPageSize":       2,
+				"sendSingle":           true,
+				"maxDiskCache":         6,
+				"resendInterval":       10,
+			},
+			sconf: &SinkConf{
+				Concurrency:  1,
+				SendSingle:   true,
+				Format:       "json",
+				BufferLength: 1024,
+				SinkConf: conf.SinkConf{
+					MemoryCacheThreshold: 2,
+					MaxDiskCache:         6,
+					BufferPageSize:       2,
+					EnableCache:          true,
+					ResendInterval:       10,
+					CleanCacheAtStop:     false,
+				},
+			},
+		}, {
+			config: map[string]interface{}{
+				"enableCache":          true,
+				"memoryCacheThreshold": 2,
+				"bufferPageSize":       2,
+				"runAsync":             true,
+				"maxDiskCache":         6,
+				"resendInterval":       10,
+			},
+			err: errors.New("cache is not supported for async sink, do not use enableCache and runAsync properties together"),
+		}, {
+			config: map[string]interface{}{
+				"enableCache":          true,
+				"memoryCacheThreshold": 256,
+				"bufferLength":         10,
+				"maxDiskCache":         6,
+				"resendInterval":       10,
+			},
+			err: errors.New("invalid cache properties: \nmaxDiskCacheTooSmall:maxDiskCache must be greater than bufferPageSize"),
+		}, {
+			config: map[string]interface{}{
+				"enableCache":          true,
+				"memoryCacheThreshold": 7,
+				"bufferPageSize":       3,
+				"sendSingle":           true,
+				"maxDiskCache":         21,
+				"resendInterval":       10,
+			},
+			err: errors.New("invalid cache properties: \nmemoryCacheThresholdNotMultiple:memoryCacheThreshold must be a multiple of bufferPageSize"),
+		}, {
+			config: map[string]interface{}{
+				"enableCache":          true,
+				"memoryCacheThreshold": 9,
+				"bufferPageSize":       3,
+				"sendSingle":           true,
+				"maxDiskCache":         22,
+				"resendInterval":       10,
+			},
+			err: errors.New("invalid cache properties: \nmaxDiskCacheNotMultiple:maxDiskCache must be a multiple of bufferPageSize"),
+		},
+	}
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := conf.Log.WithField("rule", "TestConfig")
+	conf.InitConf()
+	for i, tt := range tests {
+		mockSink := NewSinkNode(fmt.Sprintf("test_%d", i), "mockSink", tt.config)
+		sconf, err := mockSink.parseConf(contextLogger)
+		if !reflect.DeepEqual(tt.err, err) {
+			t.Errorf("%d \terror mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.err, err)
+		} else if !reflect.DeepEqual(tt.sconf, sconf) {
+			t.Errorf("%d \tresult mismatch:\n\nexp=%v\n\ngot=%v\n\n", i, tt.sconf, sconf)
+		}
+	}
+}

+ 2 - 1
internal/topo/node/source_node.go

@@ -19,6 +19,7 @@ import (
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/converter"
 	"github.com/lf-edge/ekuiper/internal/converter"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
+	"github.com/lf-edge/ekuiper/internal/topo/node/metric"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/ast"
@@ -137,7 +138,7 @@ func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
 							buffer.Close()
 							buffer.Close()
 						}()
 						}()
 
 
-						stats, err := NewStatManager(ctx, "source")
+						stats, err := metric.NewStatManager(ctx, "source")
 						if err != nil {
 						if err != nil {
 							return err
 							return err
 						}
 						}

+ 4 - 2
internal/topo/node/window_op.go

@@ -19,6 +19,7 @@ import (
 	"fmt"
 	"fmt"
 	"github.com/benbjohnson/clock"
 	"github.com/benbjohnson/clock"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/topo/node/metric"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/ast"
@@ -41,7 +42,7 @@ type WindowOperator struct {
 	isEventTime        bool
 	isEventTime        bool
 	watermarkGenerator *WatermarkGenerator //For event time only
 	watermarkGenerator *WatermarkGenerator //For event time only
 
 
-	statManager StatManager
+	statManager metric.StatManager
 	ticker      *clock.Ticker //For processing time only
 	ticker      *clock.Ticker //For processing time only
 	// states
 	// states
 	triggerTime int64
 	triggerTime int64
@@ -54,6 +55,7 @@ const MSG_COUNT_KEY = "$$msgCount"
 
 
 func init() {
 func init() {
 	gob.Register([]*xsql.Tuple{})
 	gob.Register([]*xsql.Tuple{})
+	gob.Register([]map[string]interface{}{})
 }
 }
 
 
 func NewWindowOp(name string, w WindowConfig, streams []string, options *api.RuleOption) (*WindowOperator, error) {
 func NewWindowOp(name string, w WindowConfig, streams []string, options *api.RuleOption) (*WindowOperator, error) {
@@ -96,7 +98,7 @@ func (o *WindowOperator) Exec(ctx api.StreamContext, errCh chan<- error) {
 		infra.DrainError(ctx, fmt.Errorf("no output channel found"), errCh)
 		infra.DrainError(ctx, fmt.Errorf("no output channel found"), errCh)
 		return
 		return
 	}
 	}
-	stats, err := NewStatManager(ctx, "op")
+	stats, err := metric.NewStatManager(ctx, "op")
 	if err != nil {
 	if err != nil {
 		infra.DrainError(ctx, err, errCh)
 		infra.DrainError(ctx, err, errCh)
 		return
 		return

+ 4 - 3
internal/topo/topo.go

@@ -21,6 +21,7 @@ import (
 	"github.com/lf-edge/ekuiper/internal/topo/checkpoint"
 	"github.com/lf-edge/ekuiper/internal/topo/checkpoint"
 	kctx "github.com/lf-edge/ekuiper/internal/topo/context"
 	kctx "github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/internal/topo/node"
 	"github.com/lf-edge/ekuiper/internal/topo/node"
+	"github.com/lf-edge/ekuiper/internal/topo/node/metric"
 	"github.com/lf-edge/ekuiper/internal/topo/state"
 	"github.com/lf-edge/ekuiper/internal/topo/state"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/infra"
 	"github.com/lf-edge/ekuiper/pkg/infra"
@@ -204,7 +205,7 @@ func (s *Topo) GetMetrics() (keys []string, values []interface{}) {
 	for _, sn := range s.sources {
 	for _, sn := range s.sources {
 		for ins, metrics := range sn.GetMetrics() {
 		for ins, metrics := range sn.GetMetrics() {
 			for i, v := range metrics {
 			for i, v := range metrics {
-				keys = append(keys, "source_"+sn.GetName()+"_"+strconv.Itoa(ins)+"_"+node.MetricNames[i])
+				keys = append(keys, "source_"+sn.GetName()+"_"+strconv.Itoa(ins)+"_"+metric.MetricNames[i])
 				values = append(values, v)
 				values = append(values, v)
 			}
 			}
 		}
 		}
@@ -212,7 +213,7 @@ func (s *Topo) GetMetrics() (keys []string, values []interface{}) {
 	for _, so := range s.ops {
 	for _, so := range s.ops {
 		for ins, metrics := range so.GetMetrics() {
 		for ins, metrics := range so.GetMetrics() {
 			for i, v := range metrics {
 			for i, v := range metrics {
-				keys = append(keys, "op_"+so.GetName()+"_"+strconv.Itoa(ins)+"_"+node.MetricNames[i])
+				keys = append(keys, "op_"+so.GetName()+"_"+strconv.Itoa(ins)+"_"+metric.MetricNames[i])
 				values = append(values, v)
 				values = append(values, v)
 			}
 			}
 		}
 		}
@@ -220,7 +221,7 @@ func (s *Topo) GetMetrics() (keys []string, values []interface{}) {
 	for _, sn := range s.sinks {
 	for _, sn := range s.sinks {
 		for ins, metrics := range sn.GetMetrics() {
 		for ins, metrics := range sn.GetMetrics() {
 			for i, v := range metrics {
 			for i, v := range metrics {
-				keys = append(keys, "sink_"+sn.GetName()+"_"+strconv.Itoa(ins)+"_"+node.MetricNames[i])
+				keys = append(keys, "sink_"+sn.GetName()+"_"+strconv.Itoa(ins)+"_"+metric.MetricNames[i])
 				values = append(values, v)
 				values = append(values, v)
 			}
 			}
 		}
 		}