|
@@ -71,6 +71,9 @@ func (rr *RuleRegistry) Delete(key string) (*rule.RuleState, bool) {
|
|
|
}
|
|
|
|
|
|
func createRule(name, ruleJson string) (string, error) {
|
|
|
+ var rs *rule.RuleState = nil
|
|
|
+ var err error = nil
|
|
|
+
|
|
|
// Validate the rule json
|
|
|
r, err := ruleProcessor.GetRuleByJson(name, ruleJson)
|
|
|
if err != nil {
|
|
@@ -83,14 +86,16 @@ func createRule(name, ruleJson string) (string, error) {
|
|
|
}
|
|
|
|
|
|
// Validate the topo
|
|
|
- rs, err := createRuleState(r)
|
|
|
- if err != nil {
|
|
|
+ panicOrError := infra.SafeRun(func() error {
|
|
|
+ rs, err = createRuleState(r)
|
|
|
+ return err
|
|
|
+ })
|
|
|
+ if panicOrError != nil {
|
|
|
// Do not store to registry so also delete the KV
|
|
|
deleteRule(r.Id)
|
|
|
_, _ = ruleProcessor.ExecDrop(r.Id)
|
|
|
- return r.Id, fmt.Errorf("create rule topo error: %v", err)
|
|
|
+ return r.Id, fmt.Errorf("create rule topo error: %v", panicOrError)
|
|
|
}
|
|
|
-
|
|
|
// Start the rule asyncly
|
|
|
if r.Triggered {
|
|
|
go func() {
|
|
@@ -119,9 +124,15 @@ func createRuleState(r *api.Rule) (*rule.RuleState, error) {
|
|
|
}
|
|
|
|
|
|
func recoverRule(r *api.Rule) string {
|
|
|
+ var rs *rule.RuleState = nil
|
|
|
+ var err error = nil
|
|
|
// Validate the topo
|
|
|
- rs, err := createRuleState(r)
|
|
|
- if err != nil { // when recovering rules, assume the rules are valid, so always add it to the registry
|
|
|
+ panicOrError := infra.SafeRun(func() error {
|
|
|
+ rs, err = createRuleState(r)
|
|
|
+ return err
|
|
|
+ })
|
|
|
+
|
|
|
+ if panicOrError != nil { // when recovering rules, assume the rules are valid, so always add it to the registry
|
|
|
conf.Log.Errorf("Create rule topo error: %v", err)
|
|
|
r.Triggered = false
|
|
|
registry.Store(r.Id, rs)
|