|
@@ -17,16 +17,7 @@ package server
|
|
import (
|
|
import (
|
|
"encoding/json"
|
|
"encoding/json"
|
|
"fmt"
|
|
"fmt"
|
|
- "github.com/gorilla/handlers"
|
|
|
|
- "github.com/gorilla/mux"
|
|
|
|
- "github.com/lf-edge/ekuiper/internal/conf"
|
|
|
|
- "github.com/lf-edge/ekuiper/internal/server/middleware"
|
|
|
|
- "github.com/lf-edge/ekuiper/pkg/api"
|
|
|
|
- "github.com/lf-edge/ekuiper/pkg/ast"
|
|
|
|
- "github.com/lf-edge/ekuiper/pkg/errorx"
|
|
|
|
- "github.com/lf-edge/ekuiper/pkg/infra"
|
|
|
|
"io"
|
|
"io"
|
|
- "io/ioutil"
|
|
|
|
"net/http"
|
|
"net/http"
|
|
"os"
|
|
"os"
|
|
"path/filepath"
|
|
"path/filepath"
|
|
@@ -34,6 +25,15 @@ import (
|
|
"strconv"
|
|
"strconv"
|
|
"strings"
|
|
"strings"
|
|
"time"
|
|
"time"
|
|
|
|
+
|
|
|
|
+ "github.com/gorilla/handlers"
|
|
|
|
+ "github.com/gorilla/mux"
|
|
|
|
+ "github.com/lf-edge/ekuiper/internal/conf"
|
|
|
|
+ "github.com/lf-edge/ekuiper/internal/server/middleware"
|
|
|
|
+ "github.com/lf-edge/ekuiper/pkg/api"
|
|
|
|
+ "github.com/lf-edge/ekuiper/pkg/ast"
|
|
|
|
+ "github.com/lf-edge/ekuiper/pkg/errorx"
|
|
|
|
+ "github.com/lf-edge/ekuiper/pkg/infra"
|
|
)
|
|
)
|
|
|
|
|
|
const (
|
|
const (
|
|
@@ -221,7 +221,7 @@ func fileUploadHandler(w http.ResponseWriter, r *http.Request) {
|
|
|
|
|
|
case http.MethodGet:
|
|
case http.MethodGet:
|
|
// Get the list of files in the upload directory
|
|
// Get the list of files in the upload directory
|
|
- files, err := ioutil.ReadDir(uploadDir)
|
|
|
|
|
|
+ files, err := os.ReadDir(uploadDir)
|
|
if err != nil {
|
|
if err != nil {
|
|
handleError(w, err, "Error reading the file upload dir", logger)
|
|
handleError(w, err, "Error reading the file upload dir", logger)
|
|
return
|
|
return
|
|
@@ -254,7 +254,7 @@ type information struct {
|
|
UpTimeSeconds int64 `json:"upTimeSeconds"`
|
|
UpTimeSeconds int64 `json:"upTimeSeconds"`
|
|
}
|
|
}
|
|
|
|
|
|
-//The handler for root
|
|
|
|
|
|
+// The handler for root
|
|
func rootHandler(w http.ResponseWriter, r *http.Request) {
|
|
func rootHandler(w http.ResponseWriter, r *http.Request) {
|
|
defer r.Body.Close()
|
|
defer r.Body.Close()
|
|
switch r.Method {
|
|
switch r.Method {
|
|
@@ -337,17 +337,17 @@ func sourceManageHandler(w http.ResponseWriter, r *http.Request, st ast.StreamTy
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-//list or create streams
|
|
|
|
|
|
+// list or create streams
|
|
func streamsHandler(w http.ResponseWriter, r *http.Request) {
|
|
func streamsHandler(w http.ResponseWriter, r *http.Request) {
|
|
sourcesManageHandler(w, r, ast.TypeStream)
|
|
sourcesManageHandler(w, r, ast.TypeStream)
|
|
}
|
|
}
|
|
|
|
|
|
-//describe or delete a stream
|
|
|
|
|
|
+// describe or delete a stream
|
|
func streamHandler(w http.ResponseWriter, r *http.Request) {
|
|
func streamHandler(w http.ResponseWriter, r *http.Request) {
|
|
sourceManageHandler(w, r, ast.TypeStream)
|
|
sourceManageHandler(w, r, ast.TypeStream)
|
|
}
|
|
}
|
|
|
|
|
|
-//list or create tables
|
|
|
|
|
|
+// list or create tables
|
|
func tablesHandler(w http.ResponseWriter, r *http.Request) {
|
|
func tablesHandler(w http.ResponseWriter, r *http.Request) {
|
|
sourcesManageHandler(w, r, ast.TypeTable)
|
|
sourcesManageHandler(w, r, ast.TypeTable)
|
|
}
|
|
}
|
|
@@ -356,12 +356,12 @@ func tableHandler(w http.ResponseWriter, r *http.Request) {
|
|
sourceManageHandler(w, r, ast.TypeTable)
|
|
sourceManageHandler(w, r, ast.TypeTable)
|
|
}
|
|
}
|
|
|
|
|
|
-//list or create rules
|
|
|
|
|
|
+// list or create rules
|
|
func rulesHandler(w http.ResponseWriter, r *http.Request) {
|
|
func rulesHandler(w http.ResponseWriter, r *http.Request) {
|
|
defer r.Body.Close()
|
|
defer r.Body.Close()
|
|
switch r.Method {
|
|
switch r.Method {
|
|
case http.MethodPost:
|
|
case http.MethodPost:
|
|
- body, err := ioutil.ReadAll(r.Body)
|
|
|
|
|
|
+ body, err := io.ReadAll(r.Body)
|
|
if err != nil {
|
|
if err != nil {
|
|
handleError(w, err, "Invalid body", logger)
|
|
handleError(w, err, "Invalid body", logger)
|
|
return
|
|
return
|
|
@@ -401,7 +401,7 @@ func rulesHandler(w http.ResponseWriter, r *http.Request) {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-//describe or delete a rule
|
|
|
|
|
|
+// describe or delete a rule
|
|
func ruleHandler(w http.ResponseWriter, r *http.Request) {
|
|
func ruleHandler(w http.ResponseWriter, r *http.Request) {
|
|
defer r.Body.Close()
|
|
defer r.Body.Close()
|
|
vars := mux.Vars(r)
|
|
vars := mux.Vars(r)
|
|
@@ -432,7 +432,7 @@ func ruleHandler(w http.ResponseWriter, r *http.Request) {
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|
|
- body, err := ioutil.ReadAll(r.Body)
|
|
|
|
|
|
+ body, err := io.ReadAll(r.Body)
|
|
if err != nil {
|
|
if err != nil {
|
|
handleError(w, err, "Invalid body", logger)
|
|
handleError(w, err, "Invalid body", logger)
|
|
return
|
|
return
|
|
@@ -457,7 +457,7 @@ func ruleHandler(w http.ResponseWriter, r *http.Request) {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-//get status of a rule
|
|
|
|
|
|
+// get status of a rule
|
|
func getStatusRuleHandler(w http.ResponseWriter, r *http.Request) {
|
|
func getStatusRuleHandler(w http.ResponseWriter, r *http.Request) {
|
|
defer r.Body.Close()
|
|
defer r.Body.Close()
|
|
vars := mux.Vars(r)
|
|
vars := mux.Vars(r)
|
|
@@ -473,7 +473,7 @@ func getStatusRuleHandler(w http.ResponseWriter, r *http.Request) {
|
|
w.Write([]byte(content))
|
|
w.Write([]byte(content))
|
|
}
|
|
}
|
|
|
|
|
|
-//start a rule
|
|
|
|
|
|
+// start a rule
|
|
func startRuleHandler(w http.ResponseWriter, r *http.Request) {
|
|
func startRuleHandler(w http.ResponseWriter, r *http.Request) {
|
|
defer r.Body.Close()
|
|
defer r.Body.Close()
|
|
vars := mux.Vars(r)
|
|
vars := mux.Vars(r)
|
|
@@ -488,7 +488,7 @@ func startRuleHandler(w http.ResponseWriter, r *http.Request) {
|
|
w.Write([]byte(fmt.Sprintf("Rule %s was started", name)))
|
|
w.Write([]byte(fmt.Sprintf("Rule %s was started", name)))
|
|
}
|
|
}
|
|
|
|
|
|
-//stop a rule
|
|
|
|
|
|
+// stop a rule
|
|
func stopRuleHandler(w http.ResponseWriter, r *http.Request) {
|
|
func stopRuleHandler(w http.ResponseWriter, r *http.Request) {
|
|
defer r.Body.Close()
|
|
defer r.Body.Close()
|
|
vars := mux.Vars(r)
|
|
vars := mux.Vars(r)
|
|
@@ -499,7 +499,7 @@ func stopRuleHandler(w http.ResponseWriter, r *http.Request) {
|
|
w.Write([]byte(result))
|
|
w.Write([]byte(result))
|
|
}
|
|
}
|
|
|
|
|
|
-//restart a rule
|
|
|
|
|
|
+// restart a rule
|
|
func restartRuleHandler(w http.ResponseWriter, r *http.Request) {
|
|
func restartRuleHandler(w http.ResponseWriter, r *http.Request) {
|
|
defer r.Body.Close()
|
|
defer r.Body.Close()
|
|
vars := mux.Vars(r)
|
|
vars := mux.Vars(r)
|
|
@@ -514,7 +514,7 @@ func restartRuleHandler(w http.ResponseWriter, r *http.Request) {
|
|
w.Write([]byte(fmt.Sprintf("Rule %s was restarted", name)))
|
|
w.Write([]byte(fmt.Sprintf("Rule %s was restarted", name)))
|
|
}
|
|
}
|
|
|
|
|
|
-//get topo of a rule
|
|
|
|
|
|
+// get topo of a rule
|
|
func getTopoRuleHandler(w http.ResponseWriter, r *http.Request) {
|
|
func getTopoRuleHandler(w http.ResponseWriter, r *http.Request) {
|
|
defer r.Body.Close()
|
|
defer r.Body.Close()
|
|
vars := mux.Vars(r)
|
|
vars := mux.Vars(r)
|