|
@@ -17,15 +17,16 @@ package rule
|
|
import (
|
|
import (
|
|
"context"
|
|
"context"
|
|
"fmt"
|
|
"fmt"
|
|
|
|
+ "math"
|
|
|
|
+ "math/rand"
|
|
|
|
+ "sync"
|
|
|
|
+ "time"
|
|
|
|
+
|
|
"github.com/lf-edge/ekuiper/internal/conf"
|
|
"github.com/lf-edge/ekuiper/internal/conf"
|
|
"github.com/lf-edge/ekuiper/internal/topo"
|
|
"github.com/lf-edge/ekuiper/internal/topo"
|
|
"github.com/lf-edge/ekuiper/internal/topo/planner"
|
|
"github.com/lf-edge/ekuiper/internal/topo/planner"
|
|
"github.com/lf-edge/ekuiper/pkg/api"
|
|
"github.com/lf-edge/ekuiper/pkg/api"
|
|
"github.com/lf-edge/ekuiper/pkg/infra"
|
|
"github.com/lf-edge/ekuiper/pkg/infra"
|
|
- "math"
|
|
|
|
- "math/rand"
|
|
|
|
- "sync"
|
|
|
|
- "time"
|
|
|
|
)
|
|
)
|
|
|
|
|
|
type ActionSignal int
|
|
type ActionSignal int
|
|
@@ -157,6 +158,8 @@ func (rs *RuleState) runTopo(ctx context.Context) {
|
|
var (
|
|
var (
|
|
er error
|
|
er error
|
|
)
|
|
)
|
|
|
|
+ ticker := time.NewTicker(time.Duration(d) * time.Millisecond)
|
|
|
|
+ defer ticker.Stop()
|
|
for {
|
|
for {
|
|
select {
|
|
select {
|
|
case e := <-tp.Open():
|
|
case e := <-tp.Open():
|
|
@@ -181,7 +184,7 @@ func (rs *RuleState) runTopo(ctx context.Context) {
|
|
}
|
|
}
|
|
// retry after delay
|
|
// retry after delay
|
|
select {
|
|
select {
|
|
- case <-time.Tick(time.Duration(d) * time.Millisecond):
|
|
|
|
|
|
+ case <-ticker.C:
|
|
break
|
|
break
|
|
case <-ctx.Done():
|
|
case <-ctx.Done():
|
|
conf.Log.Errorf("stop rule %s retry as cancelled", rs.RuleId)
|
|
conf.Log.Errorf("stop rule %s retry as cancelled", rs.RuleId)
|