瀏覽代碼

feat(plugin): allow file url scheme for local file (#464)

* feat(plugin): allow file url scheme for local file

* feat(plugin): allow file url scheme for local file
ngjaying 4 年之前
父節點
當前提交
a755cb38fc
共有 4 個文件被更改,包括 72 次插入17 次删除
  1. 11 2
      docs/en_US/restapi/plugins.md
  2. 10 2
      docs/zh_CN/restapi/plugins.md
  3. 50 12
      plugins/manager.go
  4. 1 1
      plugins/plugins.http

File diff suppressed because it is too large
+ 11 - 2
docs/en_US/restapi/plugins.md


File diff suppressed because it is too large
+ 10 - 2
docs/zh_CN/restapi/plugins.md


+ 50 - 12
plugins/manager.go

@@ -474,30 +474,68 @@ func unzipTo(f *zip.File, fpath string) error {
 }
 
 func isValidUrl(uri string) bool {
-	_, err := url.ParseRequestURI(uri)
+	pu, err := url.ParseRequestURI(uri)
 	if err != nil {
 		return false
 	}
 
-	u, err := url.Parse(uri)
-	if err != nil || u.Scheme == "" || u.Host == "" {
+	switch pu.Scheme {
+	case "http", "https":
+		u, err := url.Parse(uri)
+		if err != nil || u.Scheme == "" || u.Host == "" {
+			return false
+		}
+	case "file":
+		if pu.Host != "" || pu.Path == "" {
+			return false
+		}
+	default:
 		return false
 	}
-
 	return true
 }
 
-func downloadFile(filepath string, url string) error {
-	// Get the data
-	resp, err := http.Get(url)
+func downloadFile(filepath string, uri string) error {
+	u, err := url.ParseRequestURI(uri)
 	if err != nil {
 		return err
 	}
-	if resp.StatusCode != http.StatusOK {
-		return fmt.Errorf("cannot download the file with status: %s", resp.Status)
-	}
-	defer resp.Body.Close()
+	var src io.Reader
+	switch u.Scheme {
+	case "file":
+		// deal with windows path
+		if strings.Index(u.Path, ":") == 2 {
+			u.Path = u.Path[1:]
+		}
+		fmt.Printf(u.Path)
+		sourceFileStat, err := os.Stat(u.Path)
+		if err != nil {
+			return err
+		}
 
+		if !sourceFileStat.Mode().IsRegular() {
+			return fmt.Errorf("%s is not a regular file", u.Path)
+		}
+		srcFile, err := os.Open(u.Path)
+		if err != nil {
+			return err
+		}
+		defer srcFile.Close()
+		src = srcFile
+	case "http", "https":
+		// Get the data
+		resp, err := http.Get(uri)
+		if err != nil {
+			return err
+		}
+		if resp.StatusCode != http.StatusOK {
+			return fmt.Errorf("cannot download the file with status: %s", resp.Status)
+		}
+		defer resp.Body.Close()
+		src = resp.Body
+	default:
+		return fmt.Errorf("unsupported url scheme %s", u.Scheme)
+	}
 	// Create the file
 	out, err := os.Create(filepath)
 	if err != nil {
@@ -506,7 +544,7 @@ func downloadFile(filepath string, url string) error {
 	defer out.Close()
 
 	// Write the body to file
-	_, err = io.Copy(out, resp.Body)
+	_, err = io.Copy(out, src)
 	return err
 }
 

+ 1 - 1
plugins/plugins.http

@@ -18,7 +18,7 @@ DELETE http://127.0.0.1:9081/plugins/sources/random3
 POST http://127.0.0.1:9081/plugins/sinks
 Content-Type: application/json
 
-{"name":"file2","file":"http://127.0.0.1/testzips/sinks/file2.zip"}
+{"name":"file2","file":"file:///C:/repos/go/src/github.com/emqx/kuiper/plugins/testzips/sinks/file2.zip"}
 
 ###
 GET http://127.0.0.1:9081/plugins/sinks