Przeglądaj źródła

feat(rule):support for update streams

EMqmyd 4 lat temu
rodzic
commit
628d5db100
1 zmienionych plików z 15 dodań i 1 usunięć
  1. 15 1
      xstream/server/server/rest.go

+ 15 - 1
xstream/server/server/rest.go

@@ -76,7 +76,7 @@ func createRestServer(port int) *http.Server {
 	r := mux.NewRouter()
 	r.HandleFunc("/", rootHandler).Methods(http.MethodGet, http.MethodPost)
 	r.HandleFunc("/streams", streamsHandler).Methods(http.MethodGet, http.MethodPost)
-	r.HandleFunc("/streams/{name}", streamHandler).Methods(http.MethodGet, http.MethodDelete)
+	r.HandleFunc("/streams/{name}", streamHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
 	r.HandleFunc("/rules", rulesHandler).Methods(http.MethodGet, http.MethodPost)
 	r.HandleFunc("/rules/{name}", ruleHandler).Methods(http.MethodDelete, http.MethodGet)
 	r.HandleFunc("/rules/{name}/status", getStatusRuleHandler).Methods(http.MethodGet)
@@ -191,6 +191,20 @@ func streamHandler(w http.ResponseWriter, r *http.Request) {
 		}
 		w.WriteHeader(http.StatusOK)
 		w.Write([]byte(content))
+	case http.MethodPut:
+		v, err := decodeStatementDescriptor(r.Body)
+		if err != nil {
+			handleError(w, err, "Invalid body", logger)
+			return
+		}
+		streamProcessor.DropStream(name)
+		content, err := streamProcessor.ExecStreamSql(v.Sql)
+		if err != nil {
+			handleError(w, err, "Stream command error", logger)
+			return
+		}
+		w.WriteHeader(http.StatusOK)
+		w.Write([]byte(content))
 	}
 }