Procházet zdrojové kódy

feat(rule):support for update rules

feat():update->updated
EMqmyd před 4 roky
rodič
revize
799db6e79d
2 změnil soubory, kde provedl 51 přidání a 1 odebrání
  1. 21 0
      xsql/processors/xsql_processor.go
  2. 30 1
      xstream/server/server/rest.go

+ 21 - 0
xsql/processors/xsql_processor.go

@@ -228,6 +228,27 @@ func (p *RuleProcessor) ExecCreate(name, ruleJson string) (*api.Rule, error) {
 
 	return rule, nil
 }
+func (p *RuleProcessor) ExecUpdate(name, ruleJson string) (*api.Rule, error) {
+	rule, err := p.getRuleByJson(name, ruleJson)
+	if err != nil {
+		return nil, err
+	}
+
+	err = p.db.Open()
+	if err != nil {
+		return nil, err
+	}
+	defer p.db.Close()
+
+	err = p.db.Replace(rule.Id, ruleJson)
+	if err != nil {
+		return nil, err
+	} else {
+		log.Infof("Rule %s is update.", rule.Id)
+	}
+
+	return rule, nil
+}
 
 func (p *RuleProcessor) ExecReplaceRuleState(name string, triggered bool) (err error) {
 	rule, err := p.GetRuleByName(name)

+ 30 - 1
xstream/server/server/rest.go

@@ -78,7 +78,7 @@ func createRestServer(port int) *http.Server {
 	r.HandleFunc("/streams", streamsHandler).Methods(http.MethodGet, http.MethodPost)
 	r.HandleFunc("/streams/{name}", streamHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
 	r.HandleFunc("/rules", rulesHandler).Methods(http.MethodGet, http.MethodPost)
-	r.HandleFunc("/rules/{name}", ruleHandler).Methods(http.MethodDelete, http.MethodGet)
+	r.HandleFunc("/rules/{name}", ruleHandler).Methods(http.MethodDelete, http.MethodGet, http.MethodPut)
 	r.HandleFunc("/rules/{name}/status", getStatusRuleHandler).Methods(http.MethodGet)
 	r.HandleFunc("/rules/{name}/start", startRuleHandler).Methods(http.MethodPost)
 	r.HandleFunc("/rules/{name}/stop", stopRuleHandler).Methods(http.MethodPost)
@@ -272,6 +272,35 @@ func ruleHandler(w http.ResponseWriter, r *http.Request) {
 		}
 		w.WriteHeader(http.StatusOK)
 		w.Write([]byte(content))
+	case http.MethodPut:
+		_, err := ruleProcessor.GetRuleByName(name)
+		if err != nil {
+			handleError(w, err, "not found this rule", logger)
+			return
+		}
+
+		body, err := ioutil.ReadAll(r.Body)
+		if err != nil {
+			handleError(w, err, "Invalid body", logger)
+			return
+		}
+
+		r, err := ruleProcessor.ExecUpdate(name, string(body))
+		var result string
+		if err != nil {
+			handleError(w, err, "Update rule error", logger)
+			return
+		} else {
+			result = fmt.Sprintf("Rule %s was updated successfully.", r.Id)
+		}
+
+		err = restartRule(name)
+		if err != nil {
+			handleError(w, err, "restart rule error", logger)
+			return
+		}
+		w.WriteHeader(http.StatusOK)
+		w.Write([]byte(result))
 	}
 }