// Copyright 2021-2022 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package server import ( "encoding/json" "fmt" "github.com/gorilla/handlers" "github.com/gorilla/mux" "github.com/lf-edge/ekuiper/internal/server/middleware" "github.com/lf-edge/ekuiper/pkg/api" "github.com/lf-edge/ekuiper/pkg/ast" "github.com/lf-edge/ekuiper/pkg/errorx" "github.com/lf-edge/ekuiper/pkg/infra" "io" "io/ioutil" "net/http" "runtime" "strings" "time" ) const ( ContentType = "Content-Type" ContentTypeJSON = "application/json" ) type statementDescriptor struct { Sql string `json:"sql,omitempty"` } func decodeStatementDescriptor(reader io.ReadCloser) (statementDescriptor, error) { sd := statementDescriptor{} err := json.NewDecoder(reader).Decode(&sd) // Problems decoding if err != nil { return sd, fmt.Errorf("Error decoding the statement descriptor: %v", err) } return sd, nil } // Handle applies the specified error and error concept tot he HTTP response writer func handleError(w http.ResponseWriter, err error, prefix string, logger api.Logger) { message := prefix if message != "" { message += ": " } message += err.Error() logger.Error(message) var ec int switch e := err.(type) { case *errorx.Error: switch e.Code() { case errorx.NOT_FOUND: ec = http.StatusNotFound default: ec = http.StatusBadRequest } default: ec = http.StatusBadRequest } http.Error(w, message, ec) } func jsonResponse(i interface{}, w http.ResponseWriter, logger api.Logger) { w.Header().Add(ContentType, ContentTypeJSON) enc := json.NewEncoder(w) err := enc.Encode(i) // Problems encoding if err != nil { handleError(w, err, "", logger) } } func createRestServer(ip string, port int, needToken bool) *http.Server { r := mux.NewRouter() r.HandleFunc("/", rootHandler).Methods(http.MethodGet, http.MethodPost) r.HandleFunc("/ping", pingHandler).Methods(http.MethodGet) r.HandleFunc("/streams", streamsHandler).Methods(http.MethodGet, http.MethodPost) r.HandleFunc("/streams/{name}", streamHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut) r.HandleFunc("/tables", tablesHandler).Methods(http.MethodGet, http.MethodPost) r.HandleFunc("/tables/{name}", tableHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut) r.HandleFunc("/rules", rulesHandler).Methods(http.MethodGet, http.MethodPost) r.HandleFunc("/rules/{name}", ruleHandler).Methods(http.MethodDelete, http.MethodGet, http.MethodPut) r.HandleFunc("/rules/{name}/status", getStatusRuleHandler).Methods(http.MethodGet) r.HandleFunc("/rules/{name}/start", startRuleHandler).Methods(http.MethodPost) r.HandleFunc("/rules/{name}/stop", stopRuleHandler).Methods(http.MethodPost) r.HandleFunc("/rules/{name}/restart", restartRuleHandler).Methods(http.MethodPost) r.HandleFunc("/rules/{name}/topo", getTopoRuleHandler).Methods(http.MethodGet) // Register extended routes for k, v := range components { logger.Infof("register rest endpoint for component %s", k) v.rest(r) } if needToken { r.Use(middleware.Auth) } server := &http.Server{ Addr: fmt.Sprintf("%s:%d", ip, port), // Good practice to set timeouts to avoid Slowloris attacks. WriteTimeout: time.Second * 60 * 5, ReadTimeout: time.Second * 60 * 5, IdleTimeout: time.Second * 60, Handler: handlers.CORS(handlers.AllowedHeaders([]string{"Accept", "Accept-Language", "Content-Type", "Content-Language", "Origin", "Authorization"}), handlers.AllowedMethods([]string{"POST", "GET", "PUT", "DELETE", "HEAD"}))(r), } server.SetKeepAlivesEnabled(false) return server } type information struct { Version string `json:"version"` Os string `json:"os"` Arch string `json:"arch"` UpTimeSeconds int64 `json:"upTimeSeconds"` } //The handler for root func rootHandler(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() switch r.Method { case http.MethodGet, http.MethodPost: w.WriteHeader(http.StatusOK) info := new(information) info.Version = version info.UpTimeSeconds = time.Now().Unix() - startTimeStamp info.Os = runtime.GOOS info.Arch = runtime.GOARCH byteInfo, _ := json.Marshal(info) w.Write(byteInfo) } } func pingHandler(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) } func sourcesManageHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) { defer r.Body.Close() switch r.Method { case http.MethodGet: content, err := streamProcessor.ShowStream(st) if err != nil { handleError(w, err, fmt.Sprintf("%s command error", strings.Title(ast.StreamTypeMap[st])), logger) return } jsonResponse(content, w, logger) case http.MethodPost: v, err := decodeStatementDescriptor(r.Body) if err != nil { handleError(w, err, "Invalid body", logger) return } content, err := streamProcessor.ExecStreamSql(v.Sql) if err != nil { handleError(w, err, fmt.Sprintf("%s command error", strings.Title(ast.StreamTypeMap[st])), logger) return } w.WriteHeader(http.StatusCreated) w.Write([]byte(content)) } } func sourceManageHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) { defer r.Body.Close() vars := mux.Vars(r) name := vars["name"] switch r.Method { case http.MethodGet: content, err := streamProcessor.DescStream(name, st) if err != nil { handleError(w, err, fmt.Sprintf("describe %s error", ast.StreamTypeMap[st]), logger) return } jsonResponse(content, w, logger) case http.MethodDelete: content, err := streamProcessor.DropStream(name, st) if err != nil { handleError(w, err, fmt.Sprintf("delete %s error", ast.StreamTypeMap[st]), logger) return } w.WriteHeader(http.StatusOK) w.Write([]byte(content)) case http.MethodPut: v, err := decodeStatementDescriptor(r.Body) if err != nil { handleError(w, err, "Invalid body", logger) return } content, err := streamProcessor.ExecReplaceStream(name, v.Sql, st) if err != nil { handleError(w, err, fmt.Sprintf("%s command error", strings.Title(ast.StreamTypeMap[st])), logger) return } w.WriteHeader(http.StatusOK) w.Write([]byte(content)) } } //list or create streams func streamsHandler(w http.ResponseWriter, r *http.Request) { sourcesManageHandler(w, r, ast.TypeStream) } //describe or delete a stream func streamHandler(w http.ResponseWriter, r *http.Request) { sourceManageHandler(w, r, ast.TypeStream) } //list or create tables func tablesHandler(w http.ResponseWriter, r *http.Request) { sourcesManageHandler(w, r, ast.TypeTable) } func tableHandler(w http.ResponseWriter, r *http.Request) { sourceManageHandler(w, r, ast.TypeTable) } //list or create rules func rulesHandler(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() switch r.Method { case http.MethodPost: body, err := ioutil.ReadAll(r.Body) if err != nil { handleError(w, err, "Invalid body", logger) return } r, err := ruleProcessor.ExecCreate("", string(body)) var result string if err != nil { handleError(w, err, "Create rule error", logger) return } else { result = fmt.Sprintf("Rule %s was created successfully.", r.Id) } go func() { panicOrError := infra.SafeRun(func() error { //Start the rule rs, err := createRuleState(r) if err != nil { return err } else { err = doStartRule(rs) return err } }) if panicOrError != nil { logger.Errorf("Rule %s start failed: %s", r.Id, panicOrError) } }() w.WriteHeader(http.StatusCreated) w.Write([]byte(result)) case http.MethodGet: content, err := getAllRulesWithStatus() if err != nil { handleError(w, err, "Show rules error", logger) return } jsonResponse(content, w, logger) } } //describe or delete a rule func ruleHandler(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() vars := mux.Vars(r) name := vars["name"] switch r.Method { case http.MethodGet: rule, err := ruleProcessor.GetRuleByName(name) if err != nil { handleError(w, err, "describe rule error", logger) return } jsonResponse(rule, w, logger) case http.MethodDelete: deleteRule(name) content, err := ruleProcessor.ExecDrop(name) if err != nil { handleError(w, err, "delete rule error", logger) return } w.WriteHeader(http.StatusOK) w.Write([]byte(content)) case http.MethodPut: _, err := ruleProcessor.GetRuleByName(name) if err != nil { handleError(w, err, "not found this rule", logger) return } body, err := ioutil.ReadAll(r.Body) if err != nil { handleError(w, err, "Invalid body", logger) return } r, err := ruleProcessor.ExecUpdate(name, string(body)) var result string if err != nil { handleError(w, err, "Update rule error", logger) return } else { result = fmt.Sprintf("Rule %s was updated successfully.", r.Id) } err = restartRule(name) if err != nil { handleError(w, err, "restart rule error", logger) return } w.WriteHeader(http.StatusOK) w.Write([]byte(result)) } } //get status of a rule func getStatusRuleHandler(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() vars := mux.Vars(r) name := vars["name"] content, err := getRuleStatus(name) if err != nil { handleError(w, err, "get rule status error", logger) return } w.WriteHeader(http.StatusOK) w.Write([]byte(content)) } //start a rule func startRuleHandler(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() vars := mux.Vars(r) name := vars["name"] err := startRule(name) if err != nil { handleError(w, err, "start rule error", logger) return } w.WriteHeader(http.StatusOK) w.Write([]byte(fmt.Sprintf("Rule %s was started", name))) } //stop a rule func stopRuleHandler(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() vars := mux.Vars(r) name := vars["name"] result := stopRule(name) w.WriteHeader(http.StatusOK) w.Write([]byte(result)) } //restart a rule func restartRuleHandler(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() vars := mux.Vars(r) name := vars["name"] err := restartRule(name) if err != nil { handleError(w, err, "restart rule error", logger) return } w.WriteHeader(http.StatusOK) w.Write([]byte(fmt.Sprintf("Rule %s was restarted", name))) } //get topo of a rule func getTopoRuleHandler(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() vars := mux.Vars(r) name := vars["name"] content, err := getRuleTopo(name) if err != nil { handleError(w, err, "get rule topo error", logger) return } w.Header().Set(ContentType, ContentTypeJSON) w.Write([]byte(content)) }