Explorar el Código

“fix:The state of rule is not persistent #313

mayuedong hace 4 años
padre
commit
3ab62f26bf

+ 27 - 0
xsql/processors/xsql_processor.go

@@ -229,6 +229,33 @@ func (p *RuleProcessor) ExecCreate(name, ruleJson string) (*api.Rule, error) {
 	return rule, nil
 }
 
+func (p *RuleProcessor) ExecReplaceRuleState(name string, triggered bool) (err error) {
+	rule, err := p.GetRuleByName(name)
+	if err != nil {
+		return err
+	}
+
+	rule.Triggered = triggered
+	ruleJson, err := json.Marshal(rule)
+	if err != nil {
+		return fmt.Errorf("Marshal rule %s error : %s.", name, err)
+	}
+
+	err = p.db.Open()
+	if err != nil {
+		return err
+	}
+	defer p.db.Close()
+
+	err = p.db.Replace(name, string(ruleJson))
+	if err != nil {
+		return err
+	} else {
+		log.Infof("Rule %s is replaced.", name)
+	}
+	return err
+}
+
 func (p *RuleProcessor) GetRuleByName(name string) (*api.Rule, error) {
 	err := p.db.Open()
 	if err != nil {

+ 5 - 4
xstream/api/stream.go

@@ -79,10 +79,11 @@ type TopNode interface {
 }
 
 type Rule struct {
-	Id      string                   `json:"id"`
-	Sql     string                   `json:"sql"`
-	Actions []map[string]interface{} `json:"actions"`
-	Options map[string]interface{}   `json:"options"`
+	Triggered bool                     `json:"triggered"`
+	Id        string                   `json:"id"`
+	Sql       string                   `json:"sql"`
+	Actions   []map[string]interface{} `json:"actions"`
+	Options   map[string]interface{}   `json:"options"`
 }
 
 type StreamContext interface {

+ 22 - 0
xstream/server/server/ruleManager.go

@@ -60,6 +60,7 @@ func createRuleState(rule *api.Rule) (*RuleState, error) {
 
 func doStartRule(rs *RuleState) error {
 	rs.Triggered = true
+	ruleProcessor.ExecReplaceRuleState(rs.Name, true)
 	go func() {
 		tp := rs.Topology
 		select {
@@ -181,6 +182,7 @@ func stopRule(name string) (result string) {
 	if rs, ok := registry.Load(name); ok && rs.Triggered {
 		(*rs.Topology).Cancel()
 		rs.Triggered = false
+		ruleProcessor.ExecReplaceRuleState(name, false)
 		result = fmt.Sprintf("Rule %s was stopped.", name)
 	} else {
 		result = fmt.Sprintf("Rule %s was not found.", name)
@@ -192,3 +194,23 @@ func restartRule(name string) error {
 	stopRule(name)
 	return startRule(name)
 }
+
+func recoverRule(name string) string {
+	rule, err := ruleProcessor.GetRuleByName(name)
+	if err != nil {
+		return fmt.Sprintf("%v", err)
+	}
+
+	rs, err := createRuleState(rule)
+	if err != nil {
+		return fmt.Sprintf("%v", err)
+	}
+
+	if rule.Triggered {
+		if err = doStartRule(rs); err != nil {
+			return fmt.Sprintf("%v", err)
+		}
+		return fmt.Sprintf("Rule %s was started.", name)
+	}
+	return stopRule(name)
+}

+ 3 - 4
xstream/server/server/server.go

@@ -48,10 +48,9 @@ func StartUp(Version string) {
 		logger.Info("Starting rules")
 		var reply string
 		for _, rule := range rules {
-			err = server.StartRule(rule, &reply)
-			if err != nil {
-				logger.Info(err)
-			} else {
+			//err = server.StartRule(rule, &reply)
+			reply = recoverRule(rule)
+			if 0 != len(reply) {
 				logger.Info(reply)
 			}
 		}