瀏覽代碼

feat(rest): add config file upload API (#1302)

* feat(rest): add config file upload API

Signed-off-by: Jiyong Huang <huangjy@emqx.io>

* fix(schema): merge 1.5.1

Signed-off-by: Jiyong Huang <huangjy@emqx.io>

* feat(rest): 通过文本上传文件和删除文件 API

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
ngjaying 2 年之前
父節點
當前提交
87bfde5b04

+ 1 - 1
docs/en_US/operation/restapi/schemas.md

@@ -1,4 +1,4 @@
-The eKuiper REST api for schemas allows you to manage schemas, such as create, show, drop and describe rules
+The eKuiper REST api for schemas allows you to manage schemas, such as create, show, drop and describe schemas.
 
 ## Create a schema
 

+ 69 - 0
docs/en_US/operation/restapi/uploads.md

@@ -0,0 +1,69 @@
+The eKuiper REST api for configuration file uploads allows you to upload configuration files and list all uploaded files.
+
+## Upload a configuration file
+
+The API supports to upload a local file or provide the text content of file. The upload request will save the file into your `${configPath}/uploads`. It will override the existed file of the same name. The response is the absolute path of the uploaded file which you can refer in other configurations.
+
+### Upload by a file
+
+The API accepts a multipart file upload requests. Below is an example html file to upload file to `http://127.0.0.1:9081/config/uploads`. In the form data, the file input name must be `uploadFile`.
+
+```html
+<!DOCTYPE html>
+<html lang="en">
+  <head>
+    <meta charset="UTF-8" />
+    <meta name="viewport" content="width=device-width, initial-scale=1.0" />
+    <meta http-equiv="X-UA-Compatible" content="ie=edge" />
+    <title>Upload File</title>
+  </head>
+  <body>
+    <form
+      enctype="multipart/form-data"
+      action="http://127.0.0.1:9081/config/uploads"
+      method="post"
+    >
+      <input type="file" name="uploadFile" />
+      <input type="submit" value="upload" />
+    </form>
+  </body>
+</html>
+```
+
+### Upload by content
+
+Provide the text content and file name to create a configuration file.
+
+```shell
+POST http://localhost:9081/config/uploads
+
+{
+  "name": "my.json",
+  "content": "{\"hello\":\"world\"}"
+}
+```
+
+## Show uploaded file list
+
+The API is used for displaying all files in the `${configPath}/uploads` path.
+
+```shell
+GET http://localhost:9081/config/uploads
+```
+
+Response Sample:
+
+```json
+[
+   "/ekuiper/etc/uploads/zk.gif",
+   "/ekuiper/etc/uploads/abc.gif"
+]
+```
+
+## Delete an uploaded file
+
+The API is used for deleting a file in the `${configPath}/uploads` path.
+
+```shell
+DELETE http://localhost:9081/config/uploads/{fileName}
+```

+ 70 - 0
docs/zh_CN/operation/restapi/uploads.md

@@ -0,0 +1,70 @@
+eKuiper REST api允许您上传配置文件并列出所有上传的文件。
+
+## 上传配置文件
+
+支持两种方式上传配置文件:上传文件或者提供文件名和文本内容。上传请求将把文件保存到你的 `${configPath}/uploads` 。它将覆盖现有的同名文件。返回的响应是上传文件的绝对路径,从而可以在其他配置中使用。
+
+### 上传文件
+
+该API接受多部分的文件上传请求。下面是一个上传文件到 `http://127.0.0.1:9081/config/uploads` 的 html 文件例子。在表格数据中,文件输入名称必须是 `uploadFile` 。
+
+```html
+<!DOCTYPE html>
+<html lang="en">
+  <head>
+    <meta charset="UTF-8" />
+    <meta name="viewport" content="width=device-width, initial-scale=1.0" />
+    <meta http-equiv="X-UA-Compatible" content="ie=edge" />
+    <title>Upload File</title>
+  </head>
+  <body>
+    <form
+      enctype="multipart/form-data"
+      action="http://127.0.0.1:9081/config/uploads"
+      method="post"
+    >
+      <input type="file" name="uploadFile" />
+      <input type="submit" value="upload" />
+    </form>
+  </body>
+</html>
+```
+
+### 通过文本内容创建文件
+
+若需要上传的为文本文件,可通过提供文件名和其文本内容来创建。
+
+```shell
+POST http://localhost:9081/config/uploads
+
+{
+  "name": "my.json",
+  "content": "{\"hello\":\"world\"}"
+}
+```
+
+## 获取上传文件的列表
+
+该API用于显示 `${configPath}/uploads` 路径中的所有文件。
+
+```shell
+GET http://localhost:9081/config/uploads
+```
+
+响应示例:
+
+```json
+[
+   "/ekuiper/etc/uploads/zk.gif",
+   "/ekuiper/etc/uploads/abc.gif"
+]
+```
+
+
+## 删除已上传文件
+
+该 API 用于删除 `${configPath}/uploads` 路径下的文件。
+
+```shell
+DELETE http://localhost:9081/config/uploads/{fileName}
+```

+ 118 - 0
internal/server/rest.go

@@ -19,6 +19,7 @@ import (
 	"fmt"
 	"github.com/gorilla/handlers"
 	"github.com/gorilla/mux"
+	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/server/middleware"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
@@ -27,6 +28,8 @@ import (
 	"io"
 	"io/ioutil"
 	"net/http"
+	"os"
+	"path/filepath"
 	"runtime"
 	"strings"
 	"time"
@@ -37,6 +40,8 @@ const (
 	ContentTypeJSON = "application/json"
 )
 
+var uploadDir string
+
 type statementDescriptor struct {
 	Sql string `json:"sql,omitempty"`
 }
@@ -85,6 +90,17 @@ func jsonResponse(i interface{}, w http.ResponseWriter, logger api.Logger) {
 }
 
 func createRestServer(ip string, port int, needToken bool) *http.Server {
+	// Create upload path for upload api
+	etcDir, err := conf.GetConfLoc()
+	if err != nil {
+		panic(err)
+	}
+	uploadDir = filepath.Join(etcDir, "uploads")
+	err = os.MkdirAll(uploadDir, os.ModePerm)
+	if err != nil {
+		panic(err)
+	}
+
 	r := mux.NewRouter()
 	r.HandleFunc("/", rootHandler).Methods(http.MethodGet, http.MethodPost)
 	r.HandleFunc("/ping", pingHandler).Methods(http.MethodGet)
@@ -99,6 +115,8 @@ func createRestServer(ip string, port int, needToken bool) *http.Server {
 	r.HandleFunc("/rules/{name}/stop", stopRuleHandler).Methods(http.MethodPost)
 	r.HandleFunc("/rules/{name}/restart", restartRuleHandler).Methods(http.MethodPost)
 	r.HandleFunc("/rules/{name}/topo", getTopoRuleHandler).Methods(http.MethodGet)
+	r.HandleFunc("/config/uploads", fileUploadHandler).Methods(http.MethodPost, http.MethodGet)
+	r.HandleFunc("/config/uploads/{name}", fileDeleteHandler).Methods(http.MethodDelete)
 
 	// Register extended routes
 	for k, v := range components {
@@ -122,6 +140,106 @@ func createRestServer(ip string, port int, needToken bool) *http.Server {
 	return server
 }
 
+type fileContent struct {
+	Name    string `json:"name"`
+	Content string `json:"content"`
+}
+
+func fileUploadHandler(w http.ResponseWriter, r *http.Request) {
+	switch r.Method {
+	// Upload or overwrite a file
+	case http.MethodPost:
+		switch r.Header.Get("Content-Type") {
+		case "application/json":
+			fc := &fileContent{}
+			defer r.Body.Close()
+			err := json.NewDecoder(r.Body).Decode(fc)
+			if err != nil {
+				handleError(w, err, "Invalid body: Error decoding file json", logger)
+				return
+			}
+			if fc.Content == "" || fc.Name == "" {
+				handleError(w, nil, "Invalid body: name and content are required", logger)
+				return
+			}
+			filePath := filepath.Join(uploadDir, fc.Name)
+			dst, err := os.Create(filePath)
+			defer dst.Close()
+			if err != nil {
+				handleError(w, err, "Error creating the file", logger)
+				return
+			}
+			_, err = dst.Write([]byte(fc.Content))
+			if err != nil {
+				handleError(w, err, "Error writing the file", logger)
+				return
+			}
+			w.WriteHeader(http.StatusCreated)
+			w.Write([]byte(filePath))
+		default:
+			// Maximum upload of 1 GB files
+			err := r.ParseMultipartForm(1024 << 20)
+			if err != nil {
+				handleError(w, err, "Error parse the multi part form", logger)
+				return
+			}
+
+			// Get handler for filename, size and headers
+			file, handler, err := r.FormFile("uploadFile")
+			if err != nil {
+				handleError(w, err, "Error Retrieving the File", logger)
+				return
+			}
+
+			defer file.Close()
+
+			// Create file
+			filePath := filepath.Join(uploadDir, handler.Filename)
+			dst, err := os.Create(filePath)
+			defer dst.Close()
+			if err != nil {
+				handleError(w, err, "Error creating the file", logger)
+				return
+			}
+
+			// Copy the uploaded file to the created file on the filesystem
+			if _, err := io.Copy(dst, file); err != nil {
+				handleError(w, err, "Error writing the file", logger)
+				return
+			}
+
+			w.WriteHeader(http.StatusCreated)
+			w.Write([]byte(filePath))
+		}
+
+	case http.MethodGet:
+		// Get the list of files in the upload directory
+		files, err := ioutil.ReadDir(uploadDir)
+		if err != nil {
+			handleError(w, err, "Error reading the file upload dir", logger)
+			return
+		}
+		fileNames := make([]string, len(files))
+		for i, f := range files {
+			fileNames[i] = filepath.Join(uploadDir, f.Name())
+		}
+		jsonResponse(fileNames, w, logger)
+	}
+}
+
+func fileDeleteHandler(w http.ResponseWriter, r *http.Request) {
+	vars := mux.Vars(r)
+	name := vars["name"]
+	filePath := filepath.Join(uploadDir, name)
+	e := os.Remove(filePath)
+	if e != nil {
+		handleError(w, e, "Error deleting the file", logger)
+		return
+	}
+	w.WriteHeader(http.StatusOK)
+	w.Write([]byte("ok"))
+}
+
 type information struct {
 	Version       string `json:"version"`
 	Os            string `json:"os"`

+ 1 - 0
internal/topo/node/source_pool.go

@@ -141,6 +141,7 @@ func (p *sourcePool) addInstance(k string, node *SourceNode, source api.Source)
 			return nil, err
 		}
 		sctx, cancel := ctx.WithMeta(ruleId, opId, store).WithCancel()
+		sctx = kctx.WithValue(sctx.(*kctx.DefaultContext), kctx.DecodeKey, node.ctx.Value(kctx.DecodeKey))
 		si, err := start(sctx, node, source)
 		if err != nil {
 			return nil, err

+ 1 - 1
internal/topo/sink/edgex_sink_test.go

@@ -598,7 +598,7 @@ func TestEdgeXTemplate_Apply(t1 *testing.T) {
 		var payload []map[string]interface{}
 		json.Unmarshal([]byte(t.input), &payload)
 		dt := t.conf["dataTemplate"]
-		tf, _ := transform.GenTransform(cast.ToStringAlways(dt))
+		tf, _ := transform.GenTransform(cast.ToStringAlways(dt), "json", "")
 		vCtx := context.WithValue(ctx, context.TransKey, tf)
 		result, err := ems.produceEvents(vCtx, payload[0])
 		if !reflect.DeepEqual(t.error, testx.Errstring(err)) {