123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356 |
- package server
- import (
- "encoding/json"
- "fmt"
- "github.com/emqx/kuiper/plugins"
- "github.com/emqx/kuiper/xstream/api"
- "github.com/gorilla/mux"
- "io"
- "io/ioutil"
- "log"
- "net/http"
- "time"
- )
- const (
- ContentType = "Content-Type"
- ContentTypeJSON = "application/json"
- )
- type statementDescriptor struct {
- Sql string `json:"sql,omitempty"`
- }
- func decodeStatementDescriptor(reader io.ReadCloser) (statementDescriptor, error) {
- sd := statementDescriptor{}
- err := json.NewDecoder(reader).Decode(&sd)
- // Problems decoding
- if err != nil {
- return sd, fmt.Errorf("Error decoding the statement descriptor: %v", err)
- }
- return sd, nil
- }
- // Handle applies the specified error and error concept tot he HTTP response writer
- func handleError(w http.ResponseWriter, err error, ec int, logger api.Logger) {
- message := err.Error()
- logger.Error(message)
- http.Error(w, message, ec)
- }
- func jsonResponse(i interface{}, w http.ResponseWriter, logger api.Logger) {
- w.Header().Add(ContentType, ContentTypeJSON)
- enc := json.NewEncoder(w)
- err := enc.Encode(i)
- // Problems encoding
- if err != nil {
- handleError(w, err, http.StatusBadRequest, logger)
- return
- }
- }
- func createRestServer(port int) *http.Server {
- r := mux.NewRouter()
- r.HandleFunc("/", rootHandler).Methods(http.MethodGet, http.MethodPost)
- r.HandleFunc("/streams", streamsHandler).Methods(http.MethodGet, http.MethodPost)
- r.HandleFunc("/streams/{name}", streamHandler).Methods(http.MethodGet, http.MethodDelete)
- r.HandleFunc("/rules", rulesHandler).Methods(http.MethodGet, http.MethodPost)
- r.HandleFunc("/rules/{name}", ruleHandler).Methods(http.MethodDelete, http.MethodGet)
- r.HandleFunc("/rules/{name}/status", getStatusRuleHandler).Methods(http.MethodGet)
- r.HandleFunc("/rules/{name}/start", startRuleHandler).Methods(http.MethodPost)
- r.HandleFunc("/rules/{name}/stop", stopRuleHandler).Methods(http.MethodPost)
- r.HandleFunc("/rules/{name}/restart", restartRuleHandler).Methods(http.MethodPost)
- r.HandleFunc("/plugins/sources", sourcesHandler).Methods(http.MethodGet, http.MethodPost)
- r.HandleFunc("/plugins/sources/{name}", sourceHandler).Methods(http.MethodDelete, http.MethodGet)
- r.HandleFunc("/plugins/sinks", sinksHandler).Methods(http.MethodGet, http.MethodPost)
- r.HandleFunc("/plugins/sinks/{name}", sinkHandler).Methods(http.MethodDelete, http.MethodGet)
- r.HandleFunc("/plugins/functions", functionsHandler).Methods(http.MethodGet, http.MethodPost)
- r.HandleFunc("/plugins/functions/{name}", functionHandler).Methods(http.MethodDelete, http.MethodGet)
- server := &http.Server{
- Addr: fmt.Sprintf("0.0.0.0:%d", port),
- // Good practice to set timeouts to avoid Slowloris attacks.
- WriteTimeout: time.Second * 15,
- ReadTimeout: time.Second * 15,
- IdleTimeout: time.Second * 60,
- Handler: r, // Pass our instance of gorilla/mux in.
- }
- server.SetKeepAlivesEnabled(false)
- return server
- }
- //The handler for root
- func rootHandler(w http.ResponseWriter, r *http.Request) {
- defer r.Body.Close()
- switch r.Method {
- case http.MethodGet, http.MethodPost:
- w.WriteHeader(http.StatusOK)
- w.Write([]byte("OK\n"))
- }
- }
- //list or create streams
- func streamsHandler(w http.ResponseWriter, r *http.Request) {
- defer r.Body.Close()
- switch r.Method {
- case http.MethodGet:
- content, err := streamProcessor.ShowStream()
- if err != nil {
- handleError(w, fmt.Errorf("Stream command error: %s", err), http.StatusBadRequest, logger)
- return
- }
- jsonResponse(content, w, logger)
- case http.MethodPost:
- v, err := decodeStatementDescriptor(r.Body)
- if err != nil {
- handleError(w, fmt.Errorf("Invalid body: %s", err), http.StatusBadRequest, logger)
- return
- }
- content, err := streamProcessor.ExecStreamSql(v.Sql)
- if err != nil {
- handleError(w, fmt.Errorf("Stream command error: %s", err), http.StatusBadRequest, logger)
- return
- }
- w.WriteHeader(http.StatusCreated)
- w.Write([]byte(content))
- }
- }
- //describe or delete a stream
- func streamHandler(w http.ResponseWriter, r *http.Request) {
- defer r.Body.Close()
- vars := mux.Vars(r)
- name := vars["name"]
- switch r.Method {
- case http.MethodGet:
- content, err := streamProcessor.DescStream(name)
- if err != nil {
- handleError(w, fmt.Errorf("describe stream error: %s", err), http.StatusBadRequest, logger)
- return
- }
- jsonResponse(content, w, logger)
- case http.MethodDelete:
- content, err := streamProcessor.DropStream(name)
- if err != nil {
- handleError(w, fmt.Errorf("describe stream error: %s", err), http.StatusBadRequest, logger)
- return
- }
- w.WriteHeader(http.StatusOK)
- w.Write([]byte(content))
- }
- }
- //list or create rules
- func rulesHandler(w http.ResponseWriter, r *http.Request) {
- defer r.Body.Close()
- switch r.Method {
- case http.MethodPost:
- body, err := ioutil.ReadAll(r.Body)
- if err != nil {
- log.Printf("Error reading body: %v", err)
- handleError(w, fmt.Errorf("Invalid body: %s", err), http.StatusBadRequest, logger)
- return
- }
- r, err := ruleProcessor.ExecCreate("", string(body))
- var result string
- if err != nil {
- handleError(w, fmt.Errorf("Create rule error : %s.", err), http.StatusBadRequest, logger)
- return
- } else {
- result = fmt.Sprintf("Rule %s was created, please use 'cli getstatus rule $rule_name' command to get rule status.", r.Id)
- }
- //Start the rule
- rs, err := createRuleState(r)
- if err != nil {
- result = err.Error()
- } else {
- err = doStartRule(rs)
- if err != nil {
- result = err.Error()
- }
- }
- w.WriteHeader(http.StatusCreated)
- w.Write([]byte(result))
- case http.MethodGet:
- content, err := ruleProcessor.GetAllRules()
- if err != nil {
- handleError(w, fmt.Errorf("Show rules error: %s", err), http.StatusBadRequest, logger)
- return
- }
- jsonResponse(content, w, logger)
- }
- }
- //describe or delete a rule
- func ruleHandler(w http.ResponseWriter, r *http.Request) {
- defer r.Body.Close()
- vars := mux.Vars(r)
- name := vars["name"]
- switch r.Method {
- case http.MethodGet:
- rule, err := ruleProcessor.GetRuleByName(name)
- if err != nil {
- handleError(w, fmt.Errorf("describe stream error: %s", err), http.StatusBadRequest, logger)
- return
- }
- jsonResponse(rule, w, logger)
- case http.MethodDelete:
- stopRule(name)
- content, err := ruleProcessor.ExecDrop(name)
- if err != nil {
- handleError(w, fmt.Errorf("drop rule error: %s", err), http.StatusBadRequest, logger)
- return
- }
- w.WriteHeader(http.StatusOK)
- w.Write([]byte(content))
- }
- }
- //get status of a rule
- func getStatusRuleHandler(w http.ResponseWriter, r *http.Request) {
- defer r.Body.Close()
- vars := mux.Vars(r)
- name := vars["name"]
- content, err := getRuleStatus(name)
- if err != nil {
- handleError(w, fmt.Errorf("get rule status error: %s", err), http.StatusBadRequest, logger)
- return
- }
- w.WriteHeader(http.StatusOK)
- w.Write([]byte(content))
- }
- //start a rule
- func startRuleHandler(w http.ResponseWriter, r *http.Request) {
- defer r.Body.Close()
- vars := mux.Vars(r)
- name := vars["name"]
- err := startRule(name)
- if err != nil {
- handleError(w, fmt.Errorf("start rule error: %s", err), http.StatusBadRequest, logger)
- return
- }
- w.WriteHeader(http.StatusOK)
- w.Write([]byte(fmt.Sprintf("Rule %s was started", name)))
- }
- //stop a rule
- func stopRuleHandler(w http.ResponseWriter, r *http.Request) {
- defer r.Body.Close()
- vars := mux.Vars(r)
- name := vars["name"]
- result := stopRule(name)
- w.WriteHeader(http.StatusOK)
- w.Write([]byte(result))
- }
- //restart a rule
- func restartRuleHandler(w http.ResponseWriter, r *http.Request) {
- defer r.Body.Close()
- vars := mux.Vars(r)
- name := vars["name"]
- err := restartRule(name)
- if err != nil {
- handleError(w, fmt.Errorf("restart rule error: %s", err), http.StatusBadRequest, logger)
- return
- }
- w.WriteHeader(http.StatusOK)
- w.Write([]byte(fmt.Sprintf("Rule %s was restarted", name)))
- }
- func pluginsHandler(w http.ResponseWriter, r *http.Request, t plugins.PluginType) {
- defer r.Body.Close()
- switch r.Method {
- case http.MethodGet:
- content, err := pluginManager.List(t)
- if err != nil {
- handleError(w, fmt.Errorf("%s plugins list command error: %s", plugins.PluginTypes[t], err), http.StatusBadRequest, logger)
- return
- }
- jsonResponse(content, w, logger)
- case http.MethodPost:
- sd := plugins.Plugin{}
- err := json.NewDecoder(r.Body).Decode(&sd)
- // Problems decoding
- if err != nil {
- handleError(w, fmt.Errorf("Invalid body: Error decoding the %s plugin json: %v", plugins.PluginTypes[t], err), http.StatusBadRequest, logger)
- return
- }
- err = pluginManager.Register(t, &sd)
- if err != nil {
- handleError(w, fmt.Errorf("%s plugins create command error: %s", plugins.PluginTypes[t], err), http.StatusBadRequest, logger)
- return
- }
- w.WriteHeader(http.StatusCreated)
- w.Write([]byte(fmt.Sprintf("%s plugin %s is created", plugins.PluginTypes[t], sd.Name)))
- }
- }
- func pluginHandler(w http.ResponseWriter, r *http.Request, t plugins.PluginType) {
- defer r.Body.Close()
- vars := mux.Vars(r)
- name := vars["name"]
- cb := r.URL.Query().Get("stop")
- switch r.Method {
- case http.MethodDelete:
- r := cb == "1"
- err := pluginManager.Delete(t, name, r)
- if err != nil {
- handleError(w, fmt.Errorf("delete %s plugin %s error: %s", plugins.PluginTypes[t], name, err), http.StatusBadRequest, logger)
- return
- }
- w.WriteHeader(http.StatusOK)
- result := fmt.Sprintf("%s plugin %s is deleted", plugins.PluginTypes[t], name)
- if r {
- result = fmt.Sprintf("%s and Kuiper will be stopped", result)
- }
- w.Write([]byte(result))
- case http.MethodGet:
- j, ok := pluginManager.Get(t, name)
- if !ok {
- handleError(w, fmt.Errorf("describe %s plugin %s error: not found", plugins.PluginTypes[t], name), http.StatusBadRequest, logger)
- return
- }
- jsonResponse(j, w, logger)
- }
- }
- //list or create source plugin
- func sourcesHandler(w http.ResponseWriter, r *http.Request) {
- pluginsHandler(w, r, plugins.SOURCE)
- }
- //delete a source plugin
- func sourceHandler(w http.ResponseWriter, r *http.Request) {
- pluginHandler(w, r, plugins.SOURCE)
- }
- //list or create sink plugin
- func sinksHandler(w http.ResponseWriter, r *http.Request) {
- pluginsHandler(w, r, plugins.SINK)
- }
- //delete a sink plugin
- func sinkHandler(w http.ResponseWriter, r *http.Request) {
- pluginHandler(w, r, plugins.SINK)
- }
- //list or create function plugin
- func functionsHandler(w http.ResponseWriter, r *http.Request) {
- pluginsHandler(w, r, plugins.FUNCTION)
- }
- //delete a function plugin
- func functionHandler(w http.ResponseWriter, r *http.Request) {
- pluginHandler(w, r, plugins.FUNCTION)
- }
|