Prechádzať zdrojové kódy

fix: The fix for #409, user can disable the cache

RockyJin 4 rokov pred
rodič
commit
57e2f93f76

+ 3 - 2
common/util.go

@@ -70,8 +70,9 @@ type KuiperConf struct {
 	}
 	Rule api.RuleOption
 	Sink struct {
-		CacheThreshold    int `yaml:"cacheThreshold"`
-		CacheTriggerCount int `yaml:"cacheTriggerCount"`
+		CacheThreshold    int  `yaml:"cacheThreshold"`
+		CacheTriggerCount int  `yaml:"cacheTriggerCount"`
+		DisableCache      bool `yaml:"disableCache""`
 	}
 }
 

+ 14 - 0
docs/en_US/operation/configuration_file.md

@@ -49,3 +49,17 @@ basic:
 ```
 For such a default configuration, Kuiper will export metrics and serve prometheus at ``http://localhost:20499/metrics``
 
+## Sink configurations
+
+```yaml
+  #The cache persistence threshold size. If the message in sink cache is larger than 10, then it triggers persistence. If you find the remote system is slow to response, or sink throughput is small, then it's recommend to increase below 2 configurations.More memory is required with the increase of below 2 configurations.
+
+  # If the message count reaches below value, then it triggers persistence.
+  cacheThreshold: 10
+  # The message persistence is triggered by a ticker, and cacheTriggerCount is for using configure the count to trigger the persistence procedure regardless if the message number reaches cacheThreshold or not. This is to prevent the data won't be saved as the cache never pass the threshold.
+  cacheTriggerCount: 15
+
+  # Control to disable cache or not. If it's set to true, then the cache will be disabled, otherwise, it will be enabled.
+  disableCache: false
+```
+

+ 1 - 4
docs/en_US/operation/operations.md

@@ -5,8 +5,5 @@
 
 # Restful APIs
 
-Kuiper provides some RESTful management APIs.
-
-
-
+Kuiper provides some RESTful management APIs. Please refer to [Rest-API doc](../restapi/overview.md) for more detailed information.
 

+ 64 - 50
docs/zh_CN/operation/configuration_file.md

@@ -1,50 +1,64 @@
-# 基本配置
-Kuiper 的配置文件位于 `$ kuiper / etc / kuiper.yaml` 中。 配置文件为 yaml 格式。
-
-## 日志级别
-
-```yaml
-basic:
-  # true|false, with debug level, it prints more debug info
-  debug: false
-  # true|false, if it's set to true, then the log will be print to console
-  consoleLog: false
-  # true|false, if it's set to true, then the log will be print to log file
-  fileLog: true
-```
-## Cli 端口
-```yaml
-basic:
-  # CLI port
-  port: 20498
-```
-CLI 服务器监听端口
-
-## REST 服务配置
-
-```yaml
-basic:
-  # REST service port
-  restPort: 9081
-  restTls:
-    certfile: /var/https-server.crt
-    keyfile: /var/https-server.key
-```
-
-#### restPort
-REST http 服务器监听端口
-
-#### restTls
-TLS 证书 cert 文件和 key 文件位置。如果 restTls 选项未配置,则 REST 服务器将启动为 http 服务器,否则启动为 https 服务器。
-
-## Prometheus 配置
-
-如果 `prometheus` 参数设置为 true,Kuiper 将把运行指标暴露到 prometheus。Prometheus 将运行在 `prometheusPort` 参数指定的端口上。
-
-```yaml
-basic:
-  prometheus: true
-  prometheusPort: 20499
-```
-在如上默认配置中,Kuiper 暴露于 Prometheusd 运行指标可通过 `http://localhost:20499/metrics` 访问。
-
+# 基本配置
+Kuiper 的配置文件位于 `$ kuiper / etc / kuiper.yaml` 中。 配置文件为 yaml 格式。
+
+## 日志级别
+
+```yaml
+basic:
+  # true|false, with debug level, it prints more debug info
+  debug: false
+  # true|false, if it's set to true, then the log will be print to console
+  consoleLog: false
+  # true|false, if it's set to true, then the log will be print to log file
+  fileLog: true
+```
+## Cli 端口
+```yaml
+basic:
+  # CLI port
+  port: 20498
+```
+CLI 服务器监听端口
+
+## REST 服务配置
+
+```yaml
+basic:
+  # REST service port
+  restPort: 9081
+  restTls:
+    certfile: /var/https-server.crt
+    keyfile: /var/https-server.key
+```
+
+#### restPort
+REST http 服务器监听端口
+
+#### restTls
+TLS 证书 cert 文件和 key 文件位置。如果 restTls 选项未配置,则 REST 服务器将启动为 http 服务器,否则启动为 https 服务器。
+
+## Prometheus 配置
+
+如果 `prometheus` 参数设置为 true,Kuiper 将把运行指标暴露到 prometheus。Prometheus 将运行在 `prometheusPort` 参数指定的端口上。
+
+```yaml
+basic:
+  prometheus: true
+  prometheusPort: 20499
+```
+在如上默认配置中,Kuiper 暴露于 Prometheusd 运行指标可通过 `http://localhost:20499/metrics` 访问。
+
+## Sink configurations
+
+```yaml
+  #The cache persistence threshold size. If the message in sink cache is larger than 10, then it triggers persistence. If you find the remote system is slow to response, or sink throughput is small, then it's recommend to increase below 2 configurations.More memory is required with the increase of below 2 configurations.
+
+  # If the message count reaches below value, then it triggers persistence.
+  cacheThreshold: 10
+  # The message persistence is triggered by a ticker, and cacheTriggerCount is for using configure the count to trigger the persistence procedure regardless if the message number reaches cacheThreshold or not. This is to prevent the data won't be saved as the cache never pass the threshold.
+  cacheTriggerCount: 15
+
+  # Control to disable cache or not. If it's set to true, then the cache will be disabled, otherwise, it will be enabled.
+  disableCache: false
+```
+

+ 1 - 5
docs/zh_CN/operation/operations.md

@@ -5,8 +5,4 @@
 
 # Restful APIs
 
-Kuiper 提供了一些 RESTful 管理 API。
-
-
-
-
+Kuiper 提供了一些 RESTful 管理 API。Kuiper 提供了一些 RESTful 管理 APIs。请参考 [Rest-API 文档](../restapi/overview.md)以获取更详细信息。

+ 4 - 1
etc/kuiper.yaml

@@ -34,4 +34,7 @@ sink:
   cacheThreshold: 10
   # The message persistence is triggered by a ticker, and cacheTriggerCount is for using configure the count to trigger the persistence procedure
   # regardless if the message number reaches cacheThreshold or not. This is to prevent the data won't be saved as the cache never pass the threshold.
-  cacheTriggerCount: 15
+  cacheTriggerCount: 15
+
+  # Control to disable cache or not. If it's set to true, then the cache will be disabled, otherwise, it will be enabled.
+  disableCache: false

+ 11 - 4
xstream/nodes/sink_cache.go

@@ -81,6 +81,14 @@ func NewTimebasedCache(in <-chan interface{}, limit int, saveInterval int, errCh
 
 func (c *Cache) initStore(ctx api.StreamContext) {
 	logger := ctx.GetLogger()
+	c.pending = &LinkedQueue{
+		Data: make(map[int]interface{}),
+		Tail: 0,
+	}
+	if common.Config.Sink.DisableCache {
+		logger.Infof("The cache is disabled, and skip the initialization of cache.")
+		return
+	}
 	dbDir, err := common.GetDataLoc()
 	logger.Debugf("cache saved to %s", dbDir)
 	if err != nil {
@@ -151,10 +159,6 @@ func (c *Cache) timebasedRun(ctx api.StreamContext, saveInterval int) {
 }
 
 func (c *Cache) loadCache() error {
-	c.pending = &LinkedQueue{
-		Data: make(map[int]interface{}),
-		Tail: 0,
-	}
 	gob.Register(c.pending)
 	err := c.store.Open()
 	if err != nil && err != io.EOF {
@@ -189,6 +193,9 @@ func (c *Cache) loadCache() error {
 }
 
 func (c *Cache) saveCache(logger api.Logger, p *LinkedQueue) error {
+	if common.Config.Sink.DisableCache {
+		return nil
+	}
 	err := c.store.Open()
 	if err != nil {
 		logger.Errorf("save cache error while opening cache store: %s", err)