|
@@ -19,6 +19,7 @@ import (
|
|
|
"context"
|
|
|
"encoding/json"
|
|
|
"fmt"
|
|
|
+ "github.com/lf-edge/ekuiper/internal/conf"
|
|
|
"github.com/lf-edge/ekuiper/internal/topo"
|
|
|
"github.com/lf-edge/ekuiper/internal/topo/planner"
|
|
|
"github.com/lf-edge/ekuiper/pkg/api"
|
|
@@ -47,7 +48,7 @@ func (rs *RuleState) GetTopoGraph() *topo.PrintableTopo {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-// Assume rule has started and the topo has instantiated
|
|
|
+// Stop Assume rule has started and the topo has instantiated
|
|
|
func (rs *RuleState) Stop() {
|
|
|
rs.Triggered = false
|
|
|
rs.Topology.Cancel()
|
|
@@ -73,7 +74,7 @@ func (rr *RuleRegistry) Load(key string) (value *RuleState, ok bool) {
|
|
|
return result, ok
|
|
|
}
|
|
|
|
|
|
-// Atomic get and delete
|
|
|
+// Delete Atomic get and delete
|
|
|
func (rr *RuleRegistry) Delete(key string) (*RuleState, bool) {
|
|
|
rr.Lock()
|
|
|
result, ok := rr.internal[key]
|
|
@@ -100,7 +101,10 @@ func createRuleState(rule *api.Rule) (*RuleState, error) {
|
|
|
|
|
|
// Assume rs is started with topo instantiated
|
|
|
func doStartRule(rs *RuleState) error {
|
|
|
- ruleProcessor.ExecReplaceRuleState(rs.Name, true)
|
|
|
+ err := ruleProcessor.ExecReplaceRuleState(rs.Name, true)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
go func() {
|
|
|
tp := rs.Topology
|
|
|
select {
|
|
@@ -149,25 +153,25 @@ func getRuleState(name string) (string, error) {
|
|
|
|
|
|
func doGetRuleState(rs *RuleState) (string, error) {
|
|
|
result := ""
|
|
|
- if !rs.Triggered {
|
|
|
+ if rs.Topology == nil {
|
|
|
result = "Stopped: canceled manually or by error."
|
|
|
- return result, nil
|
|
|
- }
|
|
|
- c := (*rs.Topology).GetContext()
|
|
|
- if c != nil {
|
|
|
- err := c.Err()
|
|
|
- switch err {
|
|
|
- case nil:
|
|
|
- result = "Running"
|
|
|
- case context.Canceled:
|
|
|
- result = "Stopped: canceled by error."
|
|
|
- case context.DeadlineExceeded:
|
|
|
- result = "Stopped: deadline exceed."
|
|
|
- default:
|
|
|
- result = fmt.Sprintf("Stopped: %v.", err)
|
|
|
- }
|
|
|
} else {
|
|
|
- result = "Stopped: no context found."
|
|
|
+ c := (*rs.Topology).GetContext()
|
|
|
+ if c != nil {
|
|
|
+ err := c.Err()
|
|
|
+ switch err {
|
|
|
+ case nil:
|
|
|
+ result = "Running"
|
|
|
+ case context.Canceled:
|
|
|
+ result = "Stopped: canceled by error."
|
|
|
+ case context.DeadlineExceeded:
|
|
|
+ result = "Stopped: deadline exceed."
|
|
|
+ default:
|
|
|
+ result = fmt.Sprintf("Stopped: %v.", err)
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ result = "Stopped: no context found."
|
|
|
+ }
|
|
|
}
|
|
|
return result, nil
|
|
|
}
|
|
@@ -206,15 +210,15 @@ func getRuleStatus(name string) (string, error) {
|
|
|
|
|
|
func getRuleTopo(name string) (string, error) {
|
|
|
if rs, ok := registry.Load(name); ok {
|
|
|
- topo := rs.GetTopoGraph()
|
|
|
- if topo == nil {
|
|
|
+ graph := rs.GetTopoGraph()
|
|
|
+ if graph == nil {
|
|
|
return "", errorx.New(fmt.Sprintf("Fail to get rule %s's topo, make sure the rule has been started before", name))
|
|
|
}
|
|
|
- bytes, err := json.Marshal(topo)
|
|
|
+ bs, err := json.Marshal(graph)
|
|
|
if err != nil {
|
|
|
return "", errorx.New(fmt.Sprintf("Fail to encode rule %s's topo", name))
|
|
|
} else {
|
|
|
- return string(bytes), nil
|
|
|
+ return string(bs), nil
|
|
|
}
|
|
|
} else {
|
|
|
return "", errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("Rule %s is not found", name))
|
|
@@ -244,7 +248,10 @@ func startRule(name string) error {
|
|
|
func stopRule(name string) (result string) {
|
|
|
if rs, ok := registry.Load(name); ok && rs.Triggered {
|
|
|
rs.Stop()
|
|
|
- ruleProcessor.ExecReplaceRuleState(name, false)
|
|
|
+ err := ruleProcessor.ExecReplaceRuleState(name, false)
|
|
|
+ if err != nil {
|
|
|
+ conf.Log.Warnf("stop rule found error: %s", err.Error())
|
|
|
+ }
|
|
|
result = fmt.Sprintf("Rule %s was stopped.", name)
|
|
|
} else {
|
|
|
result = fmt.Sprintf("Rule %s was not found.", name)
|