Browse Source

feat(rule): refactor rule state

1. Make all rule state to be a shadow of rules inside kv. Do not replace it, only delete it if needed(when deleting a rule)
2. Let rule state change in event loop

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 năm trước cách đây
mục cha
commit
529b10f102

+ 1 - 1
internal/plugin/portable/test/portable_rule_test.go

@@ -185,5 +185,5 @@ func compareMetrics(tp *topo.Topo, m map[string]interface{}) bool {
 func CreateRule(name, sql string) (*api.Rule, error) {
 func CreateRule(name, sql string) (*api.Rule, error) {
 	p := processor.NewRuleProcessor()
 	p := processor.NewRuleProcessor()
 	p.ExecDrop(name)
 	p.ExecDrop(name)
-	return p.ExecCreate(name, sql)
+	return p.ExecCreateWithValidation(name, sql)
 }
 }

+ 19 - 6
internal/processor/rule.go

@@ -41,8 +41,8 @@ func NewRuleProcessor() *RuleProcessor {
 	return processor
 	return processor
 }
 }
 
 
-func (p *RuleProcessor) ExecCreate(name, ruleJson string) (*api.Rule, error) {
-	rule, err := p.getRuleByJson(name, ruleJson)
+func (p *RuleProcessor) ExecCreateWithValidation(name, ruleJson string) (*api.Rule, error) {
+	rule, err := p.GetRuleByJson(name, ruleJson)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
@@ -56,8 +56,20 @@ func (p *RuleProcessor) ExecCreate(name, ruleJson string) (*api.Rule, error) {
 
 
 	return rule, nil
 	return rule, nil
 }
 }
+
+func (p *RuleProcessor) ExecCreate(name, ruleJson string) error {
+	err := p.db.Setnx(name, ruleJson)
+	if err != nil {
+		return err
+	} else {
+		log.Infof("Rule %s is created.", name)
+	}
+
+	return nil
+}
+
 func (p *RuleProcessor) ExecUpdate(name, ruleJson string) (*api.Rule, error) {
 func (p *RuleProcessor) ExecUpdate(name, ruleJson string) (*api.Rule, error) {
-	rule, err := p.getRuleByJson(name, ruleJson)
+	rule, err := p.GetRuleByJson(name, ruleJson)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
@@ -108,7 +120,7 @@ func (p *RuleProcessor) GetRuleById(id string) (*api.Rule, error) {
 	if !f {
 	if !f {
 		return nil, errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("Rule %s is not found.", id))
 		return nil, errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("Rule %s is not found.", id))
 	}
 	}
-	return p.getRuleByJson(id, s1)
+	return p.GetRuleByJson(id, s1)
 }
 }
 
 
 func (p *RuleProcessor) getDefaultRule(name, sql string) *api.Rule {
 func (p *RuleProcessor) getDefaultRule(name, sql string) *api.Rule {
@@ -135,11 +147,12 @@ func (p *RuleProcessor) getDefaultRule(name, sql string) *api.Rule {
 	}
 	}
 }
 }
 
 
-func (p *RuleProcessor) getRuleByJson(id, ruleJson string) (*api.Rule, error) {
+func (p *RuleProcessor) GetRuleByJson(id, ruleJson string) (*api.Rule, error) {
 	opt := conf.Config.Rule
 	opt := conf.Config.Rule
 	//set default rule options
 	//set default rule options
 	rule := &api.Rule{
 	rule := &api.Rule{
-		Options: clone(opt),
+		Triggered: true,
+		Options:   clone(opt),
 	}
 	}
 	if err := json.Unmarshal([]byte(ruleJson), &rule); err != nil {
 	if err := json.Unmarshal([]byte(ruleJson), &rule); err != nil {
 		return nil, fmt.Errorf("Parse rule %s error : %s.", ruleJson, err)
 		return nil, fmt.Errorf("Parse rule %s error : %s.", ruleJson, err)

+ 24 - 5
internal/processor/rule_test.go

@@ -47,10 +47,15 @@ func TestRuleActionParse_Apply(t *testing.T) {
 						]
 						]
 					}
 					}
 				}
 				}
-			  ]
+			  ],
+              "options": {
+				"restartStrategy": {
+				  "attempts": 20
+				}
+			  }
 			}`,
 			}`,
 			result: &api.Rule{
 			result: &api.Rule{
-				Triggered: false,
+				Triggered: true,
 				Id:        "ruleTest",
 				Id:        "ruleTest",
 				Sql:       "SELECT * from demo",
 				Sql:       "SELECT * from demo",
 				Actions: []map[string]interface{}{
 				Actions: []map[string]interface{}{
@@ -81,6 +86,13 @@ func TestRuleActionParse_Apply(t *testing.T) {
 					Qos:                api.AtMostOnce,
 					Qos:                api.AtMostOnce,
 					CheckpointInterval: 300000,
 					CheckpointInterval: 300000,
 					SendError:          true,
 					SendError:          true,
+					Restart: &api.RestartStrategy{
+						Attempts:     20,
+						Delay:        1000,
+						Multiplier:   2,
+						MaxDelay:     30000,
+						JitterFactor: 0.1,
+					},
 				},
 				},
 			},
 			},
 		}, {
 		}, {
@@ -111,7 +123,7 @@ func TestRuleActionParse_Apply(t *testing.T) {
 				}
 				}
 			}`,
 			}`,
 			result: &api.Rule{
 			result: &api.Rule{
-				Triggered: false,
+				Triggered: true,
 				Id:        "ruleTest2",
 				Id:        "ruleTest2",
 				Sql:       "SELECT * from demo",
 				Sql:       "SELECT * from demo",
 				Actions: []map[string]interface{}{
 				Actions: []map[string]interface{}{
@@ -137,6 +149,13 @@ func TestRuleActionParse_Apply(t *testing.T) {
 					Qos:                api.ExactlyOnce,
 					Qos:                api.ExactlyOnce,
 					CheckpointInterval: 60000,
 					CheckpointInterval: 60000,
 					SendError:          true,
 					SendError:          true,
+					Restart: &api.RestartStrategy{
+						Attempts:     0,
+						Delay:        1000,
+						Multiplier:   2,
+						MaxDelay:     30000,
+						JitterFactor: 0.1,
+					},
 				},
 				},
 			},
 			},
 		},
 		},
@@ -144,7 +163,7 @@ func TestRuleActionParse_Apply(t *testing.T) {
 
 
 	p := NewRuleProcessor()
 	p := NewRuleProcessor()
 	for i, tt := range tests {
 	for i, tt := range tests {
-		r, err := p.getRuleByJson(tt.result.Id, tt.ruleStr)
+		r, err := p.GetRuleByJson(tt.result.Id, tt.ruleStr)
 		if err != nil {
 		if err != nil {
 			t.Errorf("get rule error: %s", err)
 			t.Errorf("get rule error: %s", err)
 		}
 		}
@@ -169,7 +188,7 @@ func TestAllRules(t *testing.T) {
 	defer p.db.Clean()
 	defer p.db.Clean()
 
 
 	for k, v := range expected {
 	for k, v := range expected {
-		_, err := p.ExecCreate(k, v)
+		_, err := p.ExecCreateWithValidation(k, v)
 		if err != nil {
 		if err != nil {
 			t.Error(err)
 			t.Error(err)
 			return
 			return

+ 1 - 1
internal/processor/ruleset.go

@@ -89,7 +89,7 @@ func (rs *RulesetProcessor) Import(content []byte) ([]string, []int, error) {
 	var rules []string
 	var rules []string
 	// restore rules
 	// restore rules
 	for k, v := range all.Rules {
 	for k, v := range all.Rules {
-		_, e := rs.r.ExecCreate(k, v)
+		_, e := rs.r.ExecCreateWithValidation(k, v)
 		if e != nil {
 		if e != nil {
 			conf.Log.Errorf("Fail to import rule %s(%s) with error: %v", k, v, e)
 			conf.Log.Errorf("Fail to import rule %s(%s) with error: %v", k, v, e)
 		} else {
 		} else {

+ 19 - 32
internal/server/rest.go

@@ -396,29 +396,12 @@ func rulesHandler(w http.ResponseWriter, r *http.Request) {
 			handleError(w, err, "Invalid body", logger)
 			handleError(w, err, "Invalid body", logger)
 			return
 			return
 		}
 		}
-		r, err := ruleProcessor.ExecCreate("", string(body))
-		var result string
+		id, err := createRule("", string(body), true)
 		if err != nil {
 		if err != nil {
-			handleError(w, err, "Create rule error", logger)
+			handleError(w, err, "", logger)
 			return
 			return
-		} else {
-			result = fmt.Sprintf("Rule %s was created successfully.", r.Id)
 		}
 		}
-		go func() {
-			panicOrError := infra.SafeRun(func() error {
-				//Start the rule
-				rs, err := createRuleState(r)
-				if err != nil {
-					return err
-				} else {
-					err = doStartRule(rs, r.Options.Restart)
-					return err
-				}
-			})
-			if panicOrError != nil {
-				logger.Errorf("Rule %s start failed: %s", r.Id, panicOrError)
-			}
-		}()
+		result := fmt.Sprintf("Rule %s was created successfully.", id)
 		w.WriteHeader(http.StatusCreated)
 		w.WriteHeader(http.StatusCreated)
 		w.Write([]byte(result))
 		w.Write([]byte(result))
 	case http.MethodGet:
 	case http.MethodGet:
@@ -467,20 +450,19 @@ func ruleHandler(w http.ResponseWriter, r *http.Request) {
 			handleError(w, err, "Invalid body", logger)
 			handleError(w, err, "Invalid body", logger)
 			return
 			return
 		}
 		}
-
-		r, err := ruleProcessor.ExecUpdate(name, string(body))
-		var result string
+		err = updateRule(name, string(body))
 		if err != nil {
 		if err != nil {
-			handleError(w, err, "Update rule error", logger)
+			handleError(w, err, "restart rule error", logger)
 			return
 			return
-		} else {
-			result = fmt.Sprintf("Rule %s was updated successfully.", r.Id)
 		}
 		}
-
-		err = restartRule(name)
+		// Update to db after validation
+		_, err = ruleProcessor.ExecUpdate(name, string(body))
+		var result string
 		if err != nil {
 		if err != nil {
-			handleError(w, err, "restart rule error", logger)
+			handleError(w, err, "Update rule error, suggest to delete it and recreate", logger)
 			return
 			return
+		} else {
+			result = fmt.Sprintf("Rule %s was updated successfully.", name)
 		}
 		}
 		w.WriteHeader(http.StatusOK)
 		w.WriteHeader(http.StatusOK)
 		w.Write([]byte(result))
 		w.Write([]byte(result))
@@ -601,9 +583,14 @@ func importHandler(w http.ResponseWriter, r *http.Request) {
 	}
 	}
 	infra.SafeRun(func() error {
 	infra.SafeRun(func() error {
 		for _, name := range rules {
 		for _, name := range rules {
-			err := startRule(name)
-			if err != nil {
-				logger.Error(err)
+			rul, ee := ruleProcessor.GetRuleById(name)
+			if ee != nil {
+				logger.Error(ee)
+				continue
+			}
+			reply := recoverRule(rul)
+			if reply != "" {
+				logger.Error(reply)
 			}
 			}
 		}
 		}
 		return nil
 		return nil

+ 12 - 15
internal/server/rpc.go

@@ -24,6 +24,7 @@ import (
 	"fmt"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/pkg/model"
 	"github.com/lf-edge/ekuiper/internal/pkg/model"
+	"github.com/lf-edge/ekuiper/internal/topo/rule"
 	"github.com/lf-edge/ekuiper/internal/topo/sink"
 	"github.com/lf-edge/ekuiper/internal/topo/sink"
 	"github.com/lf-edge/ekuiper/pkg/infra"
 	"github.com/lf-edge/ekuiper/pkg/infra"
 	"io"
 	"io"
@@ -90,7 +91,7 @@ func (t *Server) CreateQuery(sql string, reply *string) error {
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	} else {
 	} else {
-		rs := &RuleState{RuleId: QueryRuleId, Topology: tp, Triggered: true}
+		rs := &rule.RuleState{RuleId: QueryRuleId, Topology: tp}
 		registry.Store(QueryRuleId, rs)
 		registry.Store(QueryRuleId, rs)
 		msg := fmt.Sprintf("Query was submit successfully.")
 		msg := fmt.Sprintf("Query was submit successfully.")
 		logger.Println(msg)
 		logger.Println(msg)
@@ -143,21 +144,12 @@ func (t *Server) Stream(stream string, reply *string) error {
 }
 }
 
 
 func (t *Server) CreateRule(rule *model.RPCArgDesc, reply *string) error {
 func (t *Server) CreateRule(rule *model.RPCArgDesc, reply *string) error {
-	r, err := ruleProcessor.ExecCreate(rule.Name, rule.Json)
+	id, err := createRule(rule.Name, rule.Json, true)
 	if err != nil {
 	if err != nil {
-		return fmt.Errorf("Create rule error : %s.", err)
+		return fmt.Errorf("Create rule %s error : %s.", id, err)
 	} else {
 	} else {
 		*reply = fmt.Sprintf("Rule %s was created successfully, please use 'bin/kuiper getstatus rule %s' command to get rule status.", rule.Name, rule.Name)
 		*reply = fmt.Sprintf("Rule %s was created successfully, please use 'bin/kuiper getstatus rule %s' command to get rule status.", rule.Name, rule.Name)
 	}
 	}
-	//Start the rule
-	rs, err := createRuleState(r)
-	if err != nil {
-		return err
-	}
-	err = doStartRule(rs, r.Options.Restart)
-	if err != nil {
-		return err
-	}
 	return nil
 	return nil
 }
 }
 
 
@@ -271,9 +263,14 @@ func (t *Server) Import(file string, reply *string) error {
 	}
 	}
 	infra.SafeRun(func() error {
 	infra.SafeRun(func() error {
 		for _, name := range rules {
 		for _, name := range rules {
-			err := startRule(name)
-			if err != nil {
-				logger.Error(err)
+			rul, ee := ruleProcessor.GetRuleById(name)
+			if ee != nil {
+				logger.Error(ee)
+				continue
+			}
+			reply := recoverRule(rul)
+			if reply != "" {
+				logger.Error(reply)
 			}
 			}
 		}
 		}
 		return nil
 		return nil

+ 149 - 211
internal/server/rule_manager.go

@@ -16,70 +16,50 @@ package server
 
 
 import (
 import (
 	"bytes"
 	"bytes"
-	"context"
 	"encoding/json"
 	"encoding/json"
 	"fmt"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
-	"github.com/lf-edge/ekuiper/internal/topo"
-	"github.com/lf-edge/ekuiper/internal/topo/planner"
+	"github.com/lf-edge/ekuiper/internal/topo/rule"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
 	"github.com/lf-edge/ekuiper/pkg/infra"
 	"github.com/lf-edge/ekuiper/pkg/infra"
-	"math"
-	"math/rand"
 	"sort"
 	"sort"
 	"sync"
 	"sync"
 	"time"
 	"time"
 )
 )
 
 
+// Rule storage includes kv and in memory registry
+// Kv stores the rule text with *expected* status so that the rule can be restored after restart
+// Registry stores the current rule state in runtime
+// Here registry is the in memory registry
 var registry *RuleRegistry
 var registry *RuleRegistry
 
 
-type RuleState struct {
-	RuleId    string
-	Topology  *topo.Topo
-	Triggered bool
-	// temporary storage for topo graph to make sure even rule close, the graph is still available
-	topoGraph *api.PrintableTopo
-}
-
-func (rs *RuleState) GetTopoGraph() *api.PrintableTopo {
-	if rs.topoGraph != nil {
-		return rs.topoGraph
-	} else if rs.Topology != nil {
-		return rs.Topology.GetTopo()
-	} else {
-		return nil
-	}
-}
-
-// Stop Assume rule has started and the topo has instantiated
-func (rs *RuleState) Stop() {
-	rs.Triggered = false
-	rs.Topology.Cancel()
-	rs.topoGraph = rs.Topology.GetTopo()
-	rs.Topology = nil
-}
-
 type RuleRegistry struct {
 type RuleRegistry struct {
 	sync.RWMutex
 	sync.RWMutex
-	internal map[string]*RuleState
+	internal map[string]*rule.RuleState
 }
 }
 
 
-func (rr *RuleRegistry) Store(key string, value *RuleState) {
+// Store create the in memory entry for a rule. Run in:
+// 1. Restore the rules from KV at startup
+// 2. Restore the rules when importing
+// 3. Create a rule
+func (rr *RuleRegistry) Store(key string, value *rule.RuleState) {
 	rr.Lock()
 	rr.Lock()
 	rr.internal[key] = value
 	rr.internal[key] = value
 	rr.Unlock()
 	rr.Unlock()
 }
 }
 
 
-func (rr *RuleRegistry) Load(key string) (value *RuleState, ok bool) {
+// Load the entry of a rule by id. It is used to get the current rule state
+// or send command to a running rule
+func (rr *RuleRegistry) Load(key string) (value *rule.RuleState, ok bool) {
 	rr.RLock()
 	rr.RLock()
 	result, ok := rr.internal[key]
 	result, ok := rr.internal[key]
 	rr.RUnlock()
 	rr.RUnlock()
 	return result, ok
 	return result, ok
 }
 }
 
 
-// Delete Atomic get and delete
-func (rr *RuleRegistry) Delete(key string) (*RuleState, bool) {
+// Delete Atomic get and delete. Only run when deleting a rule in runtime.
+func (rr *RuleRegistry) Delete(key string) (*rule.RuleState, bool) {
 	rr.Lock()
 	rr.Lock()
 	result, ok := rr.internal[key]
 	result, ok := rr.internal[key]
 	if ok {
 	if ok {
@@ -89,140 +69,133 @@ func (rr *RuleRegistry) Delete(key string) (*RuleState, bool) {
 	return result, ok
 	return result, ok
 }
 }
 
 
-func createRuleState(rule *api.Rule) (*RuleState, error) {
-	rs := &RuleState{
-		RuleId: rule.Id,
+func createRule(name, ruleJson string, shouldStart bool) (string, error) {
+	// Validate the rule json
+	r, err := ruleProcessor.GetRuleByJson(name, ruleJson)
+	if err != nil {
+		return "", fmt.Errorf("Invalid rule json: %v", err)
 	}
 	}
-	registry.Store(rule.Id, rs)
-	if tp, err := planner.Plan(rule); err != nil {
-		return rs, err
-	} else {
-		rs.Topology = tp
-		rs.Triggered = true
-		return rs, nil
+	if shouldStart {
+		r.Triggered = true
 	}
 	}
-}
-
-// Assume rs is started with topo instantiated
-func doStartRule(rs *RuleState, option *api.RestartStrategy) error {
-	err := ruleProcessor.ExecReplaceRuleState(rs.RuleId, true)
+	// Validate the topo
+	rs, err := createRuleState(r)
 	if err != nil {
 	if err != nil {
-		return err
+		return r.Id, fmt.Errorf("Create rule topo error: %v", err)
 	}
 	}
+	// Store to KV
+	err = ruleProcessor.ExecCreate(r.Id, ruleJson)
+	if err != nil {
+		// Do not store to KV so also delete the in memory shadow
+		deleteRule(r.Id)
+		return r.Id, fmt.Errorf("Store the rule error: %v", err)
+	}
+	// Start the rule asyncly
 	go func() {
 	go func() {
-		tp := rs.Topology
-		err := infra.SafeRun(func() error {
-			count := 0
-			d := option.Delay
-			var (
-				er error
-			)
-			for {
-				select {
-				case e := <-tp.Open():
-					er = e
-					if er != nil { // Only restart rule for errors
-						tp.GetContext().SetError(err)
-						logger.Errorf("closing rule %s for error: %v", rs.RuleId, err)
-						tp.Cancel()
-						rs.Triggered = false
-					} else {
-						return nil
-					}
-				}
-				if count <= option.Attempts {
-					if d > option.MaxDelay {
-						d = option.MaxDelay
-					}
-					if option.JitterFactor > 0 {
-						d = int(math.Round(float64(d) * ((rand.Float64()*2-1)*0.1 + 1)))
-						conf.Log.Infof("Rule %s will restart with jitterred delay %d", rs.RuleId, d)
-					} else {
-						conf.Log.Infof("Rule %s will restart with delay %d", rs.RuleId, d)
-					}
-					time.Sleep(time.Duration(d) * time.Millisecond)
-					count++
-					if option.Multiplier > 0 {
-						d = option.Delay * int(math.Pow(float64(option.Multiplier), float64(count)))
-					}
-				} else {
-					return er
-				}
-			}
+		panicOrError := infra.SafeRun(func() error {
+			//Start the rule which runs async
+			rs.Start()
+			return nil
 		})
 		})
-		// If the rule is stopped manually
-		if err == nil {
-			rs.Triggered = false
-			logger.Infof("closing rule %s", rs.RuleId)
-		} else {
-			logger.Infof("closing rule %s after %d retries with error: %v", rs.RuleId, option.Attempts, err)
+		if panicOrError != nil {
+			logger.Errorf("Rule %s start failed: %s", r.Id, panicOrError)
 		}
 		}
 	}()
 	}()
-	return nil
+	return r.Id, nil
 }
 }
 
 
-func getAllRulesWithStatus() ([]map[string]interface{}, error) {
-	ruleIds, err := ruleProcessor.GetAllRules()
+// Create and initialize a rule state.
+// Errors are possible during plan the topo.
+// If error happens return immediately without add it to the registry
+func createRuleState(r *api.Rule) (*rule.RuleState, error) {
+	rs, err := rule.NewRuleState(r)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	sort.Strings(ruleIds)
-	result := make([]map[string]interface{}, len(ruleIds))
-	for i, id := range ruleIds {
-		ruleName := id
-		rule, _ := ruleProcessor.GetRuleById(id)
-		if rule != nil && rule.Name != "" {
-			ruleName = rule.Name
-		}
-		s, err := getRuleState(id)
-		if err != nil {
-			s = fmt.Sprintf("error: %s", err)
-		}
-		result[i] = map[string]interface{}{
-			"id":     id,
-			"name":   ruleName,
-			"status": s,
+	registry.Store(r.Id, rs)
+	return rs, nil
+}
+
+func recoverRule(r *api.Rule) string {
+	// Validate the topo
+	rs, err := createRuleState(r)
+	if err != nil {
+		return fmt.Sprintf("Create rule topo error: %v", err)
+	}
+	if !r.Triggered {
+		return fmt.Sprintf("Rule %s was stopped.", r.Id)
+	} else {
+		panicOrError := infra.SafeRun(func() error {
+			//Start the rule which runs async
+			return rs.Start()
+		})
+		if panicOrError != nil {
+			return fmt.Sprintf("Rule %s start failed: %s", r.Id, panicOrError)
 		}
 		}
 	}
 	}
-	return result, nil
+	return fmt.Sprintf("Rule %s was started.", r.Id)
 }
 }
 
 
-func getRuleState(name string) (string, error) {
-	if rs, ok := registry.Load(name); ok {
-		return doGetRuleState(rs)
+func updateRule(ruleId, ruleJson string) error {
+	// Validate the rule json
+	r, err := ruleProcessor.GetRuleByJson(ruleId, ruleJson)
+	if err != nil {
+		return fmt.Errorf("Invalid rule json: %v", err)
+	}
+	if rs, ok := registry.Load(r.Id); ok {
+		rs.UpdateTopo(r)
+		return nil
 	} else {
 	} else {
-		return "", fmt.Errorf("Rule %s is not found in registry", name)
+		return fmt.Errorf("Rule %s registry not found, try to delete it and recreate", r.Id)
 	}
 	}
 }
 }
 
 
-func doGetRuleState(rs *RuleState) (string, error) {
-	result := ""
-	if rs.Topology == nil {
-		result = "Stopped: canceled manually or by error."
+func deleteRule(name string) (result string) {
+	if rs, ok := registry.Delete(name); ok {
+		rs.Close()
+		result = fmt.Sprintf("Rule %s was deleted.", name)
 	} else {
 	} else {
-		c := (*rs.Topology).GetContext()
-		if c != nil {
-			err := c.Err()
-			switch err {
-			case nil:
-				result = "Running"
-			case context.Canceled:
-				result = "Stopped: canceled by error."
-			case context.DeadlineExceeded:
-				result = "Stopped: deadline exceed."
-			default:
-				result = fmt.Sprintf("Stopped: %v.", err)
-			}
-		} else {
-			result = "Stopped: no context found."
+		result = fmt.Sprintf("Rule %s was not found.", name)
+	}
+	return
+}
+
+func startRule(name string) error {
+	rs, ok := registry.Load(name)
+	if !ok {
+		return fmt.Errorf("Rule %s is not found in registry, please check if it is created", name)
+	} else {
+		rs.Start()
+	}
+	return nil
+}
+
+func stopRule(name string) (result string) {
+	if rs, ok := registry.Load(name); ok {
+		err := rs.Stop()
+		if err != nil {
+			conf.Log.Warn(err)
+		}
+		err = ruleProcessor.ExecReplaceRuleState(name, false)
+		if err != nil {
+			conf.Log.Warnf("stop rule found error: %s", err.Error())
 		}
 		}
+		result = fmt.Sprintf("Rule %s was stopped.", name)
+	} else {
+		result = fmt.Sprintf("Rule %s was not found.", name)
 	}
 	}
-	return result, nil
+	return
+}
+
+func restartRule(name string) error {
+	stopRule(name)
+	time.Sleep(1 * time.Millisecond)
+	return startRule(name)
 }
 }
 
 
 func getRuleStatus(name string) (string, error) {
 func getRuleStatus(name string) (string, error) {
 	if rs, ok := registry.Load(name); ok {
 	if rs, ok := registry.Load(name); ok {
-		result, err := doGetRuleState(rs)
+		result, err := rs.GetState()
 		if err != nil {
 		if err != nil {
 			return "", err
 			return "", err
 		}
 		}
@@ -255,88 +228,53 @@ func getRuleStatus(name string) (string, error) {
 	}
 	}
 }
 }
 
 
-func getRuleTopo(name string) (string, error) {
-	if rs, ok := registry.Load(name); ok {
-		graph := rs.GetTopoGraph()
-		if graph == nil {
-			return "", errorx.New(fmt.Sprintf("Fail to get rule %s's topo, make sure the rule has been started before", name))
-		}
-		bs, err := json.Marshal(graph)
-		if err != nil {
-			return "", errorx.New(fmt.Sprintf("Fail to encode rule %s's topo", name))
-		} else {
-			return string(bs), nil
-		}
-	} else {
-		return "", errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("Rule %s is not found", name))
+func getAllRulesWithStatus() ([]map[string]interface{}, error) {
+	ruleIds, err := ruleProcessor.GetAllRules()
+	if err != nil {
+		return nil, err
 	}
 	}
-}
-
-func startRule(name string) error {
-	var rs *RuleState
-	rs, ok := registry.Load(name)
-	if !ok || (!rs.Triggered) {
-		r, err := ruleProcessor.GetRuleById(name)
-		if err != nil {
-			return err
+	sort.Strings(ruleIds)
+	result := make([]map[string]interface{}, len(ruleIds))
+	for i, id := range ruleIds {
+		ruleName := id
+		rule, _ := ruleProcessor.GetRuleById(id)
+		if rule != nil && rule.Name != "" {
+			ruleName = rule.Name
 		}
 		}
-		rs, err = createRuleState(r)
+		s, err := getRuleState(id)
 		if err != nil {
 		if err != nil {
-			return err
+			s = fmt.Sprintf("error: %s", err)
 		}
 		}
-		err = doStartRule(rs, r.Options.Restart)
-		if err != nil {
-			return err
+		result[i] = map[string]interface{}{
+			"id":     id,
+			"name":   ruleName,
+			"status": s,
 		}
 		}
-	} else {
-		conf.Log.Warnf("Rule %s is already started", name)
 	}
 	}
-	return nil
+	return result, nil
 }
 }
 
 
-func stopRule(name string) (result string) {
-	if rs, ok := registry.Load(name); ok && rs.Triggered {
-		rs.Stop()
-		err := ruleProcessor.ExecReplaceRuleState(name, false)
-		if err != nil {
-			conf.Log.Warnf("stop rule found error: %s", err.Error())
-		}
-		result = fmt.Sprintf("Rule %s was stopped.", name)
+func getRuleState(name string) (string, error) {
+	if rs, ok := registry.Load(name); ok {
+		return rs.GetState()
 	} else {
 	} else {
-		result = fmt.Sprintf("Rule %s was not found.", name)
+		return "", fmt.Errorf("Rule %s is not found in registry", name)
 	}
 	}
-	return
 }
 }
 
 
-func deleteRule(name string) (result string) {
-	if rs, ok := registry.Delete(name); ok {
-		if rs.Triggered {
-			(*rs.Topology).Cancel()
+func getRuleTopo(name string) (string, error) {
+	if rs, ok := registry.Load(name); ok {
+		graph := rs.GetTopoGraph()
+		if graph == nil {
+			return "", errorx.New(fmt.Sprintf("Fail to get rule %s's topo, make sure the rule has been started before", name))
 		}
 		}
-		result = fmt.Sprintf("Rule %s was deleted.", name)
-	} else {
-		result = fmt.Sprintf("Rule %s was not found.", name)
-	}
-	return
-}
-
-func restartRule(name string) error {
-	stopRule(name)
-	time.Sleep(1 * time.Millisecond)
-	return startRule(name)
-}
-
-func recoverRule(rule *api.Rule) string {
-	if !rule.Triggered {
-		rs := &RuleState{
-			RuleId: rule.Id,
+		bs, err := json.Marshal(graph)
+		if err != nil {
+			return "", errorx.New(fmt.Sprintf("Fail to encode rule %s's topo", name))
+		} else {
+			return string(bs), nil
 		}
 		}
-		registry.Store(rule.Id, rs)
-		return fmt.Sprintf("Rule %s was stopped.", rule.Id)
-	}
-	err := startRule(rule.Id)
-	if err != nil {
-		return fmt.Sprintf("%v", err)
+	} else {
+		return "", errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("Rule %s is not found", name))
 	}
 	}
-	return fmt.Sprintf("Rule %s was started.", rule.Id)
 }
 }

+ 2 - 1
internal/server/server.go

@@ -25,6 +25,7 @@ import (
 	"github.com/lf-edge/ekuiper/internal/pkg/store"
 	"github.com/lf-edge/ekuiper/internal/pkg/store"
 	"github.com/lf-edge/ekuiper/internal/processor"
 	"github.com/lf-edge/ekuiper/internal/processor"
 	"github.com/lf-edge/ekuiper/internal/topo/connection/factory"
 	"github.com/lf-edge/ekuiper/internal/topo/connection/factory"
+	"github.com/lf-edge/ekuiper/internal/topo/rule"
 	"net/http"
 	"net/http"
 	"os"
 	"os"
 	"os/signal"
 	"os/signal"
@@ -110,7 +111,7 @@ func StartUp(Version, LoadFileType string) {
 	}
 	}
 	meta.Bind()
 	meta.Bind()
 
 
-	registry = &RuleRegistry{internal: make(map[string]*RuleState)}
+	registry = &RuleRegistry{internal: make(map[string]*rule.RuleState)}
 	//Start lookup tables
 	//Start lookup tables
 	streamProcessor.RecoverLookupTable()
 	streamProcessor.RecoverLookupTable()
 	//Start rules
 	//Start rules

+ 300 - 0
internal/topo/rule/ruleState.go

@@ -0,0 +1,300 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rule
+
+import (
+	"context"
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/topo"
+	"github.com/lf-edge/ekuiper/internal/topo/planner"
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/infra"
+	"math"
+	"math/rand"
+	"sync"
+	"time"
+)
+
+type ActionSignal int
+
+const (
+	ActionSignalStart ActionSignal = iota
+	ActionSignalStop
+)
+
+/*********
+ *  RuleState is created for each rule. Each ruleState runs two loops:
+ *  1. action event loop to accept commands, such as start, stop, getStatus, delete
+ *  2. topo running loop
+ *  Both loops need to access the status, so lock is needed
+ */
+
+type RuleState struct {
+	// Constant, never change. Channel to send signals to manage connection retry. When deleting the rule, close it.
+	RuleId   string
+	ActionCh chan ActionSignal
+	// Nearly constant, only change when update the rule
+	Rule *api.Rule
+	// States, create through rule in each rule start
+	Topology *topo.Topo
+	// 0 stop, 1 start, -1 delete, changed in actions
+	triggered int
+	// temporary storage for topo graph to make sure even rule close, the graph is still available
+	topoGraph *api.PrintableTopo
+	sync.RWMutex
+}
+
+// NewRuleState Create and initialize a rule state.
+// Errors are possible during plan the topo.
+// If error happens return immediately without add it to the registry
+func NewRuleState(rule *api.Rule) (*RuleState, error) {
+	if tp, err := planner.Plan(rule); err != nil {
+		return nil, err
+	} else {
+		rs := &RuleState{
+			RuleId:   rule.Id,
+			Rule:     rule,
+			ActionCh: make(chan ActionSignal),
+		}
+		rs.Topology = tp
+		rs.Run()
+		return rs, nil
+	}
+}
+
+// UpdateTopo update the rule and the topology AND restart the topology
+// Do not need to call restart after update
+func (rs *RuleState) UpdateTopo(rule *api.Rule) error {
+	if tp, err := planner.Plan(rule); err != nil {
+		return err
+	} else {
+		rs.Lock()
+		defer rs.Unlock()
+		// Update rule
+		rs.Rule = rule
+		rs.topoGraph = nil
+		// Stop the old topo
+		if rs.triggered == 1 {
+			rs.Topology.Cancel()
+			rs.ActionCh <- ActionSignalStop
+			// wait a little to make sure the old topo is stopped
+			time.Sleep(1 * time.Millisecond)
+		}
+		// Update the topo and start
+		rs.Topology = tp
+		rs.triggered = 1
+		rs.ActionCh <- ActionSignalStart
+		return nil
+	}
+}
+
+// Run start to run the two loops, do not access any changeable states
+func (rs *RuleState) Run() {
+	var (
+		ctx    context.Context
+		cancel context.CancelFunc
+	)
+	// action loop, once start never end until the rule is deleted
+	go func() {
+		conf.Log.Infof("Start rulestate %s", rs.RuleId)
+		for {
+			s, opened := <-rs.ActionCh
+			if !opened {
+				conf.Log.Infof("Stop rulestate %s", rs.RuleId)
+				if cancel != nil {
+					cancel()
+				}
+				return
+			}
+			switch s {
+			case ActionSignalStart:
+				if ctx != nil {
+					conf.Log.Warnf("rule %s is already started", rs.RuleId)
+				} else {
+					ctx, cancel = context.WithCancel(context.Background())
+					go rs.runTopo(ctx)
+				}
+			case ActionSignalStop:
+				// Stop the running loop
+				if cancel != nil {
+					cancel()
+					ctx = nil
+					cancel = nil
+				} else {
+					conf.Log.Warnf("rule %s is already stopped", rs.RuleId)
+				}
+			}
+		}
+	}()
+}
+
+func (rs *RuleState) runTopo(ctx context.Context) {
+	// Load the changeable states once
+	rs.Lock()
+	tp := rs.Topology
+	option := rs.Rule.Options.Restart
+	rs.Unlock()
+	if tp == nil {
+		conf.Log.Warnf("rule %s is not initialized or just stopped", rs.RuleId)
+		return
+	}
+	err := infra.SafeRun(func() error {
+		count := 0
+		d := option.Delay
+		var (
+			er error
+		)
+		for {
+			select {
+			case e := <-tp.Open():
+				er = e
+				if er != nil { // Only restart rule for errors
+					tp.GetContext().SetError(er)
+					conf.Log.Errorf("closing rule %s for error: %v", rs.RuleId, er)
+					tp.Cancel()
+				} else { // exit normally
+					return nil
+				}
+			}
+			if count <= option.Attempts {
+				if d > option.MaxDelay {
+					d = option.MaxDelay
+				}
+				if option.JitterFactor > 0 {
+					d = int(math.Round(float64(d) * ((rand.Float64()*2-1)*0.1 + 1)))
+					conf.Log.Infof("Rule %s will restart with jitterred delay %d", rs.RuleId, d)
+				} else {
+					conf.Log.Infof("Rule %s will restart with delay %d", rs.RuleId, d)
+				}
+				// retry after delay
+				select {
+				case <-time.Tick(time.Duration(d) * time.Millisecond):
+					break
+				case <-ctx.Done():
+					conf.Log.Errorf("stop rule %s retry as cancelled", rs.RuleId)
+					return nil
+				}
+				count++
+				if option.Multiplier > 0 {
+					d = option.Delay * int(math.Pow(option.Multiplier, float64(count)))
+				}
+			} else {
+				return er
+			}
+		}
+	})
+	if err != nil { // Exit after retries
+		rs.Lock()
+		// The only change the state by error
+		rs.triggered = 0
+		rs.topoGraph = rs.Topology.GetTopo()
+		rs.Topology = nil
+		rs.Unlock()
+	}
+}
+
+// The action functions are state machine.
+
+func (rs *RuleState) Start() error {
+	rs.Lock()
+	defer rs.Unlock()
+	switch rs.triggered {
+	case 1:
+		return fmt.Errorf("Rule %s is already starting", rs.RuleId)
+	case -1:
+		return fmt.Errorf("Rule %s is already deleted", rs.RuleId)
+	default:
+		// Start from stop
+		if rs.Topology == nil {
+			if tp, err := planner.Plan(rs.Rule); err != nil {
+				return err
+			} else {
+				rs.Topology = tp
+			}
+		} // else start after create
+		rs.triggered = 1
+		rs.ActionCh <- ActionSignalStart
+		return nil
+	}
+}
+
+// Stop remove the Topology
+func (rs *RuleState) Stop() error {
+	rs.Lock()
+	defer rs.Unlock()
+	switch rs.triggered {
+	case 0:
+		return fmt.Errorf("Rule %s is already stopping", rs.RuleId)
+	case -1:
+		return fmt.Errorf("Rule %s is already deleted", rs.RuleId)
+	default:
+		rs.triggered = 0
+		rs.Topology.Cancel()
+		rs.Topology = nil
+		rs.ActionCh <- ActionSignalStop
+		return nil
+	}
+}
+
+func (rs *RuleState) Close() error {
+	rs.Lock()
+	defer rs.Unlock()
+	if rs.triggered == 1 && rs.Topology != nil {
+		rs.Topology.Cancel()
+	}
+	rs.triggered = -1
+	close(rs.ActionCh)
+	return nil
+}
+
+func (rs *RuleState) GetState() (string, error) {
+	rs.RLock()
+	defer rs.RUnlock()
+	result := ""
+	if rs.Topology == nil {
+		result = "Stopped: canceled manually or by error."
+	} else {
+		c := (*rs.Topology).GetContext()
+		if c != nil {
+			err := c.Err()
+			switch err {
+			case nil:
+				result = "Running"
+			case context.Canceled:
+				result = "Stopped: canceled by error."
+			case context.DeadlineExceeded:
+				result = "Stopped: deadline exceed."
+			default:
+				result = fmt.Sprintf("Stopped: %v.", err)
+			}
+		} else {
+			result = "Stopped: no context found."
+		}
+	}
+	return result, nil
+}
+
+func (rs *RuleState) GetTopoGraph() *api.PrintableTopo {
+	rs.RLock()
+	defer rs.RUnlock()
+	if rs.topoGraph != nil {
+		return rs.topoGraph
+	} else if rs.Topology != nil {
+		return rs.Topology.GetTopo()
+	} else {
+		return nil
+	}
+}

+ 232 - 0
internal/topo/rule/ruleState_test.go

@@ -0,0 +1,232 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rule
+
+import (
+	"errors"
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/processor"
+	"github.com/lf-edge/ekuiper/internal/testx"
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"reflect"
+	"testing"
+	"time"
+)
+
+var defaultOption = &api.RuleOption{
+	IsEventTime:        false,
+	LateTol:            1000,
+	Concurrency:        1,
+	BufferLength:       1024,
+	SendMetaToSink:     false,
+	SendError:          true,
+	Qos:                api.AtMostOnce,
+	CheckpointInterval: 300000,
+	Restart: &api.RestartStrategy{
+		Attempts:     0,
+		Delay:        1000,
+		Multiplier:   2,
+		MaxDelay:     30000,
+		JitterFactor: 0.1,
+	},
+}
+
+func init() {
+	testx.InitEnv()
+}
+
+func TestCreate(t *testing.T) {
+	sp := processor.NewStreamProcessor()
+	sp.ExecStmt(`CREATE STREAM demo () WITH (DATASOURCE="users", FORMAT="JSON")`)
+	defer sp.ExecStmt(`DROP STREAM demo`)
+	var tests = []struct {
+		r *api.Rule
+		e error
+	}{
+		{
+			r: &api.Rule{
+				Triggered: false,
+				Id:        "test",
+				Sql:       "SELECT ts FROM demo",
+				Actions: []map[string]interface{}{
+					{
+						"log": map[string]interface{}{},
+					},
+				},
+				Options: defaultOption,
+			},
+			e: nil,
+		}, {
+			r: &api.Rule{
+				Triggered: false,
+				Id:        "test",
+				Sql:       "SELECT timestamp FROM demo",
+				Actions: []map[string]interface{}{
+					{
+						"log": map[string]interface{}{},
+					},
+				},
+				Options: defaultOption,
+			},
+			e: errors.New("Parse SQL SELECT timestamp FROM demo error: found \"TIMESTAMP\", expected expression.."),
+		},
+		{
+			r: &api.Rule{
+				Triggered: false,
+				Id:        "test",
+				Sql:       "SELECT * FROM demo1",
+				Actions: []map[string]interface{}{
+					{
+						"log": map[string]interface{}{},
+					},
+				},
+				Options: defaultOption,
+			},
+			e: errors.New("fail to get stream demo1, please check if stream is created"),
+		},
+	}
+	for i, tt := range tests {
+		_, err := NewRuleState(tt.r)
+		if !reflect.DeepEqual(err, tt.e) {
+			t.Errorf("%d.\n\nerror mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.e, err)
+		}
+	}
+}
+
+func TestUpdate(t *testing.T) {
+	sp := processor.NewStreamProcessor()
+	sp.ExecStmt(`CREATE STREAM demo () WITH (DATASOURCE="users", FORMAT="JSON")`)
+	defer sp.ExecStmt(`DROP STREAM demo`)
+	rs, err := NewRuleState(&api.Rule{
+		Triggered: false,
+		Id:        "test",
+		Sql:       "SELECT ts FROM demo",
+		Actions: []map[string]interface{}{
+			{
+				"log": map[string]interface{}{},
+			},
+		},
+		Options: defaultOption,
+	})
+	if err != nil {
+		t.Error(err)
+		return
+	}
+	defer rs.Close()
+	err = rs.Start()
+	if err != nil {
+		t.Error(err)
+		return
+	}
+	var tests = []struct {
+		r *api.Rule
+		e error
+	}{
+		{
+			r: &api.Rule{
+				Triggered: false,
+				Id:        "test",
+				Sql:       "SELECT timestamp FROM demo",
+				Actions: []map[string]interface{}{
+					{
+						"log": map[string]interface{}{},
+					},
+				},
+				Options: defaultOption,
+			},
+			e: errors.New("Parse SQL SELECT timestamp FROM demo error: found \"TIMESTAMP\", expected expression.."),
+		},
+		{
+			r: &api.Rule{
+				Triggered: false,
+				Id:        "test",
+				Sql:       "SELECT * FROM demo1",
+				Actions: []map[string]interface{}{
+					{
+						"log": map[string]interface{}{},
+					},
+				},
+				Options: defaultOption,
+			},
+			e: errors.New("fail to get stream demo1, please check if stream is created"),
+		},
+		{
+			r: &api.Rule{
+				Triggered: false,
+				Id:        "test",
+				Sql:       "SELECT * FROM demo",
+				Actions: []map[string]interface{}{
+					{
+						"log": map[string]interface{}{},
+					},
+				},
+				Options: defaultOption,
+			},
+			e: nil,
+		},
+	}
+	for i, tt := range tests {
+		err = rs.UpdateTopo(tt.r)
+		if !reflect.DeepEqual(err, tt.e) {
+			t.Errorf("%d.\n\nerror mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.e, err)
+		}
+	}
+}
+
+func TestMultipleAccess(t *testing.T) {
+	sp := processor.NewStreamProcessor()
+	sp.ExecStmt(`CREATE STREAM demo () WITH (DATASOURCE="users", FORMAT="JSON")`)
+	defer sp.ExecStmt(`DROP STREAM demo`)
+	rs, err := NewRuleState(&api.Rule{
+		Triggered: false,
+		Id:        "test",
+		Sql:       "SELECT ts FROM demo",
+		Actions: []map[string]interface{}{
+			{
+				"log": map[string]interface{}{},
+			},
+		},
+		Options: defaultOption,
+	})
+	if err != nil {
+		t.Error(err)
+		return
+	}
+	defer rs.Close()
+	err = rs.Start()
+	if err != nil {
+		t.Error(err)
+		return
+	}
+	for i := 0; i < 10; i++ {
+		if i%3 == 0 {
+			go func(i int) {
+				rs.Stop()
+				fmt.Printf("%d:%d\n", i, rs.triggered)
+			}(i)
+		} else {
+			go func(i int) {
+				rs.Start()
+				fmt.Printf("%d:%d\n", i, rs.triggered)
+			}(i)
+		}
+	}
+	time.Sleep(1 * time.Millisecond)
+	rs.Start()
+	time.Sleep(10 * time.Millisecond)
+	if rs.triggered != 1 {
+		t.Errorf("triggered mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", 1, rs.triggered)
+	}
+}

+ 3 - 1
internal/topo/topo.go

@@ -68,7 +68,9 @@ func (s *Topo) Cancel() {
 	defer s.mu.Unlock()
 	defer s.mu.Unlock()
 	// completion signal
 	// completion signal
 	infra.DrainError(s.ctx, nil, s.drain)
 	infra.DrainError(s.ctx, nil, s.drain)
-	s.cancel()
+	if s.cancel != nil {
+		s.cancel()
+	}
 	s.store = nil
 	s.store = nil
 	s.coordinator = nil
 	s.coordinator = nil
 }
 }

+ 1 - 1
internal/topo/topotest/mock_topo.go

@@ -448,5 +448,5 @@ func DoCheckpointRuleTest(t *testing.T, tests []RuleCheckpointTest, j int, opt *
 func CreateRule(name, sql string) (*api.Rule, error) {
 func CreateRule(name, sql string) (*api.Rule, error) {
 	p := processor.NewRuleProcessor()
 	p := processor.NewRuleProcessor()
 	p.ExecDrop(name)
 	p.ExecDrop(name)
-	return p.ExecCreate(name, sql)
+	return p.ExecCreateWithValidation(name, sql)
 }
 }

+ 1 - 1
pkg/api/stream.go

@@ -156,7 +156,7 @@ type RuleGraph struct {
 // Rule the definition of the business logic
 // Rule the definition of the business logic
 // Sql and Graph are mutually exclusive, at least one of them should be set
 // Sql and Graph are mutually exclusive, at least one of them should be set
 type Rule struct {
 type Rule struct {
-	Triggered bool                     `json:"triggered,omitempty"`
+	Triggered bool                     `json:"triggered"`
 	Id        string                   `json:"id,omitempty"`
 	Id        string                   `json:"id,omitempty"`
 	Name      string                   `json:"name,omitempty"` // The display name of a rule
 	Name      string                   `json:"name,omitempty"` // The display name of a rule
 	Sql       string                   `json:"sql,omitempty"`
 	Sql       string                   `json:"sql,omitempty"`

+ 6 - 1
pkg/infra/saferun.go

@@ -17,6 +17,7 @@ package infra
 import (
 import (
 	"errors"
 	"errors"
 	"fmt"
 	"fmt"
+	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"runtime/debug"
 	"runtime/debug"
 )
 )
@@ -49,7 +50,11 @@ func SafeRun(fn func() error) (err error) {
 // Thus the latter error will just skip
 // Thus the latter error will just skip
 // It is usually the error outlet of a op/rule.
 // It is usually the error outlet of a op/rule.
 func DrainError(ctx api.StreamContext, err error, errCh chan<- error) {
 func DrainError(ctx api.StreamContext, err error, errCh chan<- error) {
-	ctx.GetLogger().Errorf("runtime error: %v", err)
+	if ctx != nil {
+		ctx.GetLogger().Errorf("runtime error: %v", err)
+	} else {
+		conf.Log.Errorf("runtime error: %v", err)
+	}
 	select {
 	select {
 	case errCh <- err:
 	case errCh <- err:
 	default:
 	default: