|
@@ -16,6 +16,7 @@ package server
|
|
|
|
|
|
import (
|
|
import (
|
|
"context"
|
|
"context"
|
|
|
|
+ "errors"
|
|
"fmt"
|
|
"fmt"
|
|
"github.com/lf-edge/ekuiper/internal/binder/function"
|
|
"github.com/lf-edge/ekuiper/internal/binder/function"
|
|
"github.com/lf-edge/ekuiper/internal/binder/io"
|
|
"github.com/lf-edge/ekuiper/internal/binder/io"
|
|
@@ -88,6 +89,7 @@ func StartUp(Version, LoadFileType string) {
|
|
ruleProcessor = processor.NewRuleProcessor()
|
|
ruleProcessor = processor.NewRuleProcessor()
|
|
streamProcessor = processor.NewStreamProcessor()
|
|
streamProcessor = processor.NewStreamProcessor()
|
|
rulesetProcessor = processor.NewRulesetProcessor(ruleProcessor, streamProcessor)
|
|
rulesetProcessor = processor.NewRulesetProcessor(ruleProcessor, streamProcessor)
|
|
|
|
+ initRuleset()
|
|
|
|
|
|
// register all extensions
|
|
// register all extensions
|
|
for k, v := range components {
|
|
for k, v := range components {
|
|
@@ -176,3 +178,23 @@ func StartUp(Version, LoadFileType string) {
|
|
|
|
|
|
os.Exit(0)
|
|
os.Exit(0)
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+func initRuleset() error {
|
|
|
|
+ loc, err := conf.GetDataLoc()
|
|
|
|
+ if err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+ signalFile := filepath.Join(loc, "initialized")
|
|
|
|
+ if _, err := os.Stat(signalFile); errors.Is(err, os.ErrNotExist) {
|
|
|
|
+ defer os.Create(signalFile)
|
|
|
|
+ content, err := os.ReadFile(filepath.Join(loc, "init.json"))
|
|
|
|
+ if err != nil {
|
|
|
|
+ conf.Log.Infof("fail to read init file: %v", err)
|
|
|
|
+ return nil
|
|
|
|
+ }
|
|
|
|
+ conf.Log.Infof("start to initialize ruleset")
|
|
|
|
+ _, counts, err := rulesetProcessor.Import(content)
|
|
|
|
+ conf.Log.Infof("initialzie %d streams, %d tables and %d rules", counts[0], counts[1], counts[2])
|
|
|
|
+ }
|
|
|
|
+ return nil
|
|
|
|
+}
|