浏览代码

feat(restapi): support refresh basic configs (#2117)

* feat(restapi): support refresh basic configs

Signed-off-by: xjasonlyu <xjasonlyu@gmail.com>

* docs: add reload api doc

Signed-off-by: xjasonlyu <xjasonlyu@gmail.com>

* add rest api unit tests

Signed-off-by: xjasonlyu <xjasonlyu@gmail.com>

* add doc details

Signed-off-by: xjasonlyu <xjasonlyu@gmail.com>

---------

Signed-off-by: xjasonlyu <xjasonlyu@gmail.com>
Jason Lyu 1 年之前
父节点
当前提交
0561c5c896
共有 5 个文件被更改,包括 195 次插入24 次删除
  1. 28 0
      docs/en_US/api/restapi/configs.md
  2. 27 0
      docs/zh_CN/api/restapi/configs.md
  3. 53 24
      internal/conf/conf.go
  4. 43 0
      internal/server/rest.go
  5. 44 0
      internal/server/rest_test.go

+ 28 - 0
docs/en_US/api/restapi/configs.md

@@ -0,0 +1,28 @@
+# Dynamic Reload Configs
+
+By dynamically reloading [configuration](../../configuration/global_configurations.md), parameters such as debug and timezone
+can be updated for running eKuiper without restarting the application.
+
+## Reload Basic Configs
+
+```shell
+PATCH http://localhost:9081/configs
+```
+
+Request demo:
+
+```json
+{
+  "debug": true,
+  "consoleLog": true,
+  "fileLog": true,
+  "timezone": "UTC"
+}
+```
+
+Current supported dynamic reloadable parameters:
+
+- `debug`
+- `consoleLog`
+- `fileLog`
+- `timezone`

+ 27 - 0
docs/zh_CN/api/restapi/configs.md

@@ -0,0 +1,27 @@
+# 动态重载配置
+
+通过动态重载[配置](../../configuration/global_configurations.md),可以给运行中的 eKuiper 更新如 debug,timezone 之类的参数,而不用重启应用。
+
+## 重载 Basic 配置
+
+```shell
+PATCH http://localhost:9081/configs
+```
+
+请求示例:
+
+```json
+{
+  "debug": true,
+  "consoleLog": true,
+  "fileLog": true,
+  "timezone": "UTC"
+}
+```
+
+目前支持的动态可重载的参数有:
+
+- `debug`
+- `consoleLog`
+- `fileLog`
+- `timezone`

+ 53 - 24
internal/conf/conf.go

@@ -18,6 +18,7 @@ import (
 	"errors"
 	"errors"
 	"fmt"
 	"fmt"
 	"io"
 	"io"
+	"log"
 	"os"
 	"os"
 	"path"
 	"path"
 	"strings"
 	"strings"
@@ -171,6 +172,54 @@ type KuiperConf struct {
 	}
 	}
 }
 }
 
 
+func SetDebugLevel(v bool) {
+	lvl := logrus.InfoLevel
+	if v {
+		lvl = logrus.DebugLevel
+	}
+	Log.SetLevel(lvl)
+}
+
+func SetConsoleLog(v bool) {
+	out := io.Discard
+	if v {
+		out = os.Stdout
+	}
+	Log.SetOutput(out)
+}
+
+func SetFileLog(v bool) error {
+	if !v {
+		SetConsoleLog(Config.Basic.ConsoleLog)
+		return nil
+	}
+
+	logDir, err := GetLoc(logDir)
+	if err != nil {
+		return err
+	}
+
+	file := path.Join(logDir, logFileName)
+	logWriter, err := rotatelogs.New(
+		file+".%Y-%m-%d_%H-%M-%S",
+		rotatelogs.WithLinkName(file),
+		rotatelogs.WithRotationTime(time.Hour*time.Duration(Config.Basic.RotateTime)),
+		rotatelogs.WithMaxAge(time.Hour*time.Duration(Config.Basic.MaxAge)),
+	)
+
+	if err != nil {
+		fmt.Printf("Failed to init log file settings: %v", err)
+		Log.Infof("Failed to log to file, using default stderr.")
+	} else if Config.Basic.ConsoleLog {
+		mw := io.MultiWriter(os.Stdout, logWriter)
+		Log.SetOutput(mw)
+	} else if !Config.Basic.ConsoleLog {
+		Log.SetOutput(logWriter)
+	}
+	gcOutdatedLog(logDir, time.Hour*time.Duration(Config.Basic.MaxAge))
+	return nil
+}
+
 func InitConf() {
 func InitConf() {
 	cpath, err := GetConfLoc()
 	cpath, err := GetConfLoc()
 	if err != nil {
 	if err != nil {
@@ -207,35 +256,15 @@ func InitConf() {
 	}
 	}
 
 
 	if Config.Basic.Debug {
 	if Config.Basic.Debug {
-		Log.SetLevel(logrus.DebugLevel)
+		SetDebugLevel(true)
 	}
 	}
 
 
 	if Config.Basic.FileLog {
 	if Config.Basic.FileLog {
-		logDir, err := GetLoc(logDir)
-		if err != nil {
-			Log.Fatal(err)
-		}
-
-		file := path.Join(logDir, logFileName)
-		logWriter, err := rotatelogs.New(
-			file+".%Y-%m-%d_%H-%M-%S",
-			rotatelogs.WithLinkName(file),
-			rotatelogs.WithRotationTime(time.Hour*time.Duration(Config.Basic.RotateTime)),
-			rotatelogs.WithMaxAge(time.Hour*time.Duration(Config.Basic.MaxAge)),
-		)
-
-		if err != nil {
-			fmt.Println("Failed to init log file settings..." + err.Error())
-			Log.Infof("Failed to log to file, using default stderr.")
-		} else if Config.Basic.ConsoleLog {
-			mw := io.MultiWriter(os.Stdout, logWriter)
-			Log.SetOutput(mw)
-		} else if !Config.Basic.ConsoleLog {
-			Log.SetOutput(logWriter)
+		if err := SetFileLog(true); err != nil {
+			log.Fatal(err)
 		}
 		}
-		gcOutdatedLog(logDir, time.Hour*time.Duration(Config.Basic.MaxAge))
 	} else if Config.Basic.ConsoleLog {
 	} else if Config.Basic.ConsoleLog {
-		Log.SetOutput(os.Stdout)
+		SetConsoleLog(true)
 	}
 	}
 
 
 	if Config.Basic.TimeZone != "" {
 	if Config.Basic.TimeZone != "" {

+ 43 - 0
internal/server/rest.go

@@ -156,6 +156,7 @@ func createRestServer(ip string, port int, needToken bool) *http.Server {
 	r.HandleFunc("/rules/{name}/topo", getTopoRuleHandler).Methods(http.MethodGet)
 	r.HandleFunc("/rules/{name}/topo", getTopoRuleHandler).Methods(http.MethodGet)
 	r.HandleFunc("/ruleset/export", exportHandler).Methods(http.MethodPost)
 	r.HandleFunc("/ruleset/export", exportHandler).Methods(http.MethodPost)
 	r.HandleFunc("/ruleset/import", importHandler).Methods(http.MethodPost)
 	r.HandleFunc("/ruleset/import", importHandler).Methods(http.MethodPost)
+	r.HandleFunc("/configs", configurationUpdateHandler).Methods(http.MethodPatch)
 	r.HandleFunc("/config/uploads", fileUploadHandler).Methods(http.MethodPost, http.MethodGet)
 	r.HandleFunc("/config/uploads", fileUploadHandler).Methods(http.MethodPost, http.MethodGet)
 	r.HandleFunc("/config/uploads/{name}", fileDeleteHandler).Methods(http.MethodDelete)
 	r.HandleFunc("/config/uploads/{name}", fileDeleteHandler).Methods(http.MethodDelete)
 	r.HandleFunc("/data/export", configurationExportHandler).Methods(http.MethodGet, http.MethodPost)
 	r.HandleFunc("/data/export", configurationExportHandler).Methods(http.MethodGet, http.MethodPost)
@@ -1094,6 +1095,48 @@ func configurationStatusExport() Configuration {
 	return conf
 	return conf
 }
 }
 
 
+func configurationUpdateHandler(w http.ResponseWriter, r *http.Request) {
+	basic := struct {
+		Debug      *bool   `json:"debug"`
+		ConsoleLog *bool   `json:"consoleLog"`
+		FileLog    *bool   `json:"fileLog"`
+		TimeZone   *string `json:"timezone"`
+	}{}
+	if err := json.NewDecoder(r.Body).Decode(&basic); err != nil {
+		w.WriteHeader(http.StatusBadRequest)
+		handleError(w, err, "Invalid JSON", logger)
+		return
+	}
+
+	if basic.Debug != nil {
+		conf.SetDebugLevel(*basic.Debug)
+		conf.Config.Basic.Debug = *basic.Debug
+	}
+
+	if basic.TimeZone != nil {
+		if err := cast.SetTimeZone(*basic.TimeZone); err != nil {
+			w.WriteHeader(http.StatusBadRequest)
+			handleError(w, err, "Invalid TZ", logger)
+			return
+		}
+		conf.Config.Basic.TimeZone = *basic.TimeZone
+	}
+
+	if basic.FileLog != nil {
+		if err := conf.SetFileLog(*basic.FileLog); err != nil {
+			w.WriteHeader(http.StatusBadRequest)
+			handleError(w, err, "", logger)
+			return
+		}
+		conf.Config.Basic.FileLog = *basic.FileLog
+	} else if basic.ConsoleLog != nil {
+		conf.SetConsoleLog(*basic.ConsoleLog)
+		conf.Config.Basic.ConsoleLog = *basic.ConsoleLog
+	}
+
+	w.WriteHeader(http.StatusNoContent)
+}
+
 func configurationStatusHandler(w http.ResponseWriter, r *http.Request) {
 func configurationStatusHandler(w http.ResponseWriter, r *http.Request) {
 	defer r.Body.Close()
 	defer r.Body.Close()
 	content := configurationStatusExport()
 	content := configurationStatusExport()

+ 44 - 0
internal/server/rest_test.go

@@ -77,6 +77,7 @@ func (suite *RestTestSuite) SetupTest() {
 	r.HandleFunc("/rules/{name}/topo", getTopoRuleHandler).Methods(http.MethodGet)
 	r.HandleFunc("/rules/{name}/topo", getTopoRuleHandler).Methods(http.MethodGet)
 	r.HandleFunc("/ruleset/export", exportHandler).Methods(http.MethodPost)
 	r.HandleFunc("/ruleset/export", exportHandler).Methods(http.MethodPost)
 	r.HandleFunc("/ruleset/import", importHandler).Methods(http.MethodPost)
 	r.HandleFunc("/ruleset/import", importHandler).Methods(http.MethodPost)
+	r.HandleFunc("/configs", configurationUpdateHandler).Methods(http.MethodPatch)
 	r.HandleFunc("/config/uploads", fileUploadHandler).Methods(http.MethodPost, http.MethodGet)
 	r.HandleFunc("/config/uploads", fileUploadHandler).Methods(http.MethodPost, http.MethodGet)
 	r.HandleFunc("/config/uploads/{name}", fileDeleteHandler).Methods(http.MethodDelete)
 	r.HandleFunc("/config/uploads/{name}", fileDeleteHandler).Methods(http.MethodDelete)
 	r.HandleFunc("/data/export", configurationExportHandler).Methods(http.MethodGet, http.MethodPost)
 	r.HandleFunc("/data/export", configurationExportHandler).Methods(http.MethodGet, http.MethodPost)
@@ -313,6 +314,49 @@ func (suite *RestTestSuite) Test_rulesManageHandler() {
 	suite.r.ServeHTTP(w, req)
 	suite.r.ServeHTTP(w, req)
 }
 }
 
 
+func (suite *RestTestSuite) Test_configUpdate() {
+	req, _ := http.NewRequest(http.MethodPatch, "http://localhost:8080/configs", bytes.NewBufferString(""))
+	w := httptest.NewRecorder()
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusBadRequest, w.Code)
+
+	b, _ := json.Marshal(map[string]any{
+		"debug":    true,
+		"timezone": "",
+	})
+	req, _ = http.NewRequest(http.MethodPatch, "http://localhost:8080/configs", bytes.NewBuffer(b))
+	w = httptest.NewRecorder()
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusNoContent, w.Code)
+
+	b, _ = json.Marshal(map[string]any{
+		"debug":    true,
+		"timezone": "unknown",
+	})
+	req, _ = http.NewRequest(http.MethodPatch, "http://localhost:8080/configs", bytes.NewBuffer(b))
+	w = httptest.NewRecorder()
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusBadRequest, w.Code)
+
+	b, _ = json.Marshal(map[string]any{
+		"debug":   true,
+		"fileLog": true,
+	})
+	req, _ = http.NewRequest(http.MethodPatch, "http://localhost:8080/configs", bytes.NewBuffer(b))
+	w = httptest.NewRecorder()
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusNoContent, w.Code)
+
+	b, _ = json.Marshal(map[string]any{
+		"debug":      true,
+		"consoleLog": true,
+	})
+	req, _ = http.NewRequest(http.MethodPatch, "http://localhost:8080/configs", bytes.NewBuffer(b))
+	w = httptest.NewRecorder()
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusNoContent, w.Code)
+}
+
 func (suite *RestTestSuite) Test_ruleSetImport() {
 func (suite *RestTestSuite) Test_ruleSetImport() {
 	ruleJson := `{"streams":{"plugin":"\n              CREATE STREAM plugin\n              ()\n              WITH (FORMAT=\"json\", CONF_KEY=\"default\", TYPE=\"mqtt\", SHARED=\"false\", );\n          "},"tables":{},"rules":{"rule1":"{\"id\":\"rule1\",\"name\":\"\",\"sql\":\"select name from plugin\",\"actions\":[{\"log\":{\"runAsync\":false,\"omitIfEmpty\":false,\"sendSingle\":true,\"bufferLength\":1024,\"enableCache\":false,\"format\":\"json\"}}],\"options\":{\"restartStrategy\":{}}}"}}`
 	ruleJson := `{"streams":{"plugin":"\n              CREATE STREAM plugin\n              ()\n              WITH (FORMAT=\"json\", CONF_KEY=\"default\", TYPE=\"mqtt\", SHARED=\"false\", );\n          "},"tables":{},"rules":{"rule1":"{\"id\":\"rule1\",\"name\":\"\",\"sql\":\"select name from plugin\",\"actions\":[{\"log\":{\"runAsync\":false,\"omitIfEmpty\":false,\"sendSingle\":true,\"bufferLength\":1024,\"enableCache\":false,\"format\":\"json\"}}],\"options\":{\"restartStrategy\":{}}}"}}`
 	ruleSetJson := map[string]string{
 	ruleSetJson := map[string]string{