|
@@ -17,8 +17,15 @@ package topo
|
|
import (
|
|
import (
|
|
"context"
|
|
"context"
|
|
"fmt"
|
|
"fmt"
|
|
|
|
+ "io"
|
|
|
|
+ "os"
|
|
|
|
+ "path"
|
|
"strconv"
|
|
"strconv"
|
|
"sync"
|
|
"sync"
|
|
|
|
+ "time"
|
|
|
|
+
|
|
|
|
+ rotatelogs "github.com/lestrrat-go/file-rotatelogs"
|
|
|
|
+ "github.com/sirupsen/logrus"
|
|
|
|
|
|
"github.com/lf-edge/ekuiper/internal/conf"
|
|
"github.com/lf-edge/ekuiper/internal/conf"
|
|
"github.com/lf-edge/ekuiper/internal/topo/checkpoint"
|
|
"github.com/lf-edge/ekuiper/internal/topo/checkpoint"
|
|
@@ -31,26 +38,24 @@ import (
|
|
)
|
|
)
|
|
|
|
|
|
type Topo struct {
|
|
type Topo struct {
|
|
- sources []node.DataSourceNode
|
|
|
|
- sinks []*node.SinkNode
|
|
|
|
- ctx api.StreamContext
|
|
|
|
- cancel context.CancelFunc
|
|
|
|
- drain chan error
|
|
|
|
- ops []node.OperatorNode
|
|
|
|
- name string
|
|
|
|
- qos api.Qos
|
|
|
|
- checkpointInterval int
|
|
|
|
- store api.Store
|
|
|
|
- coordinator *checkpoint.Coordinator
|
|
|
|
- topo *api.PrintableTopo
|
|
|
|
- mu sync.Mutex
|
|
|
|
|
|
+ sources []node.DataSourceNode
|
|
|
|
+ sinks []*node.SinkNode
|
|
|
|
+ ctx api.StreamContext
|
|
|
|
+ cancel context.CancelFunc
|
|
|
|
+ drain chan error
|
|
|
|
+ ops []node.OperatorNode
|
|
|
|
+ name string
|
|
|
|
+ options *api.RuleOption
|
|
|
|
+ store api.Store
|
|
|
|
+ coordinator *checkpoint.Coordinator
|
|
|
|
+ topo *api.PrintableTopo
|
|
|
|
+ mu sync.Mutex
|
|
}
|
|
}
|
|
|
|
|
|
-func NewWithNameAndQos(name string, qos api.Qos, checkpointInterval int) (*Topo, error) {
|
|
|
|
|
|
+func NewWithNameAndOptions(name string, options *api.RuleOption) (*Topo, error) {
|
|
tp := &Topo{
|
|
tp := &Topo{
|
|
- name: name,
|
|
|
|
- qos: qos,
|
|
|
|
- checkpointInterval: checkpointInterval,
|
|
|
|
|
|
+ name: name,
|
|
|
|
+ options: options,
|
|
topo: &api.PrintableTopo{
|
|
topo: &api.PrintableTopo{
|
|
Sources: make([]string, 0),
|
|
Sources: make([]string, 0),
|
|
Edges: make(map[string][]interface{}),
|
|
Edges: make(map[string][]interface{}),
|
|
@@ -121,6 +126,38 @@ func (s *Topo) addEdge(from api.TopNode, to api.TopNode, toType string) {
|
|
func (s *Topo) prepareContext() {
|
|
func (s *Topo) prepareContext() {
|
|
if s.ctx == nil || s.ctx.Err() != nil {
|
|
if s.ctx == nil || s.ctx.Err() != nil {
|
|
contextLogger := conf.Log.WithField("rule", s.name)
|
|
contextLogger := conf.Log.WithField("rule", s.name)
|
|
|
|
+ if s.options.Debug || s.options.LogFilename != "" {
|
|
|
|
+ contextLogger.Logger = &logrus.Logger{
|
|
|
|
+ Out: conf.Log.Out,
|
|
|
|
+ Hooks: conf.Log.Hooks,
|
|
|
|
+ Level: conf.Log.Level,
|
|
|
|
+ Formatter: conf.Log.Formatter,
|
|
|
|
+ ReportCaller: conf.Log.ReportCaller,
|
|
|
|
+ ExitFunc: conf.Log.ExitFunc,
|
|
|
|
+ BufferPool: conf.Log.BufferPool,
|
|
|
|
+ }
|
|
|
|
+ if conf.Config.Basic.Debug || s.options.Debug {
|
|
|
|
+ contextLogger.Logger.SetLevel(logrus.DebugLevel)
|
|
|
|
+ }
|
|
|
|
+ if s.options.LogFilename != "" {
|
|
|
|
+ logDir, _ := conf.GetLogLoc()
|
|
|
|
+
|
|
|
|
+ file := path.Join(logDir, path.Base(s.options.LogFilename))
|
|
|
|
+ output, err := rotatelogs.New(
|
|
|
|
+ file+".%Y-%m-%d_%H-%M-%S",
|
|
|
|
+ rotatelogs.WithLinkName(file),
|
|
|
|
+ rotatelogs.WithRotationTime(time.Hour*time.Duration(conf.Config.Basic.RotateTime)),
|
|
|
|
+ rotatelogs.WithMaxAge(time.Hour*time.Duration(conf.Config.Basic.MaxAge)),
|
|
|
|
+ )
|
|
|
|
+ if err != nil {
|
|
|
|
+ conf.Log.Warnf("Create rule log file failed: %s", file)
|
|
|
|
+ } else if conf.Config.Basic.ConsoleLog {
|
|
|
|
+ contextLogger.Logger.SetOutput(io.MultiWriter(output, os.Stdout))
|
|
|
|
+ } else if !conf.Config.Basic.ConsoleLog {
|
|
|
|
+ contextLogger.Logger.SetOutput(output)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
|
|
ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
|
|
s.ctx, s.cancel = ctx.WithCancel()
|
|
s.ctx, s.cancel = ctx.WithCancel()
|
|
}
|
|
}
|
|
@@ -141,7 +178,7 @@ func (s *Topo) Open() <-chan error {
|
|
s.mu.Lock()
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
defer s.mu.Unlock()
|
|
var err error
|
|
var err error
|
|
- if s.store, err = state.CreateStore(s.name, s.qos); err != nil {
|
|
|
|
|
|
+ if s.store, err = state.CreateStore(s.name, s.options.Qos); err != nil {
|
|
return fmt.Errorf("topo %s create store error %v", s.name, err)
|
|
return fmt.Errorf("topo %s create store error %v", s.name, err)
|
|
}
|
|
}
|
|
s.enableCheckpoint()
|
|
s.enableCheckpoint()
|
|
@@ -175,7 +212,7 @@ func (s *Topo) Open() <-chan error {
|
|
}
|
|
}
|
|
|
|
|
|
func (s *Topo) enableCheckpoint() error {
|
|
func (s *Topo) enableCheckpoint() error {
|
|
- if s.qos >= api.AtLeastOnce {
|
|
|
|
|
|
+ if s.options.Qos >= api.AtLeastOnce {
|
|
var sources []checkpoint.StreamTask
|
|
var sources []checkpoint.StreamTask
|
|
for _, r := range s.sources {
|
|
for _, r := range s.sources {
|
|
sources = append(sources, r)
|
|
sources = append(sources, r)
|
|
@@ -188,7 +225,7 @@ func (s *Topo) enableCheckpoint() error {
|
|
for _, r := range s.sinks {
|
|
for _, r := range s.sinks {
|
|
sinks = append(sinks, r)
|
|
sinks = append(sinks, r)
|
|
}
|
|
}
|
|
- c := checkpoint.NewCoordinator(s.name, sources, ops, sinks, s.qos, s.store, s.checkpointInterval, s.ctx)
|
|
|
|
|
|
+ c := checkpoint.NewCoordinator(s.name, sources, ops, sinks, s.options.Qos, s.store, s.options.CheckpointInterval, s.ctx)
|
|
s.coordinator = c
|
|
s.coordinator = c
|
|
}
|
|
}
|
|
return nil
|
|
return nil
|