Просмотр исходного кода

fix(portable plugin): support portable plugin load and run when ekuiper set up

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>
Jianxiang Ran 2 лет назад
Родитель
Сommit
f3df926c4f

+ 1 - 0
internal/plugin/native/manager.go

@@ -841,6 +841,7 @@ func (rr *Manager) pluginInstallWhenReboot() {
 		}
 		}
 		err = rr.Register(plgType, sd)
 		err = rr.Register(plgType, sd)
 		if err != nil {
 		if err != nil {
+			conf.Log.Errorf(`install native plugin %s error: %v`, k, err)
 			_ = rr.plgStatusDb.Set(k, err.Error())
 			_ = rr.plgStatusDb.Set(k, err.Error())
 			continue
 			continue
 		}
 		}

+ 1 - 0
internal/plugin/portable/manager.go

@@ -438,6 +438,7 @@ func (m *Manager) PluginImport(plugins map[string]string) {
 		}
 		}
 		err = m.Register(sd)
 		err = m.Register(sd)
 		if err != nil {
 		if err != nil {
+			conf.Log.Errorf(`install portable plugin %s error: %v`, k, err)
 			_ = m.plgStatusDb.Set(k, err.Error())
 			_ = m.plgStatusDb.Set(k, err.Error())
 			continue
 			continue
 		}
 		}

+ 7 - 6
internal/server/rest.go

@@ -17,6 +17,7 @@ package server
 import (
 import (
 	"bytes"
 	"bytes"
 	"encoding/json"
 	"encoding/json"
+	"errors"
 	"fmt"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/meta"
 	"github.com/lf-edge/ekuiper/internal/meta"
@@ -579,17 +580,17 @@ func importHandler(w http.ResponseWriter, r *http.Request) {
 		return
 		return
 	}
 	}
 	if rsi.Content != "" && rsi.FilePath != "" {
 	if rsi.Content != "" && rsi.FilePath != "" {
-		handleError(w, nil, "Invalid body: Cannot specify both content and file", logger)
+		handleError(w, errors.New("bad request"), "Invalid body: Cannot specify both content and file", logger)
 		return
 		return
 	} else if rsi.Content == "" && rsi.FilePath == "" {
 	} else if rsi.Content == "" && rsi.FilePath == "" {
-		handleError(w, nil, "Invalid body: must specify content or file", logger)
+		handleError(w, errors.New("bad request"), "Invalid body: must specify content or file", logger)
 		return
 		return
 	}
 	}
 	content := []byte(rsi.Content)
 	content := []byte(rsi.Content)
 	if rsi.FilePath != "" {
 	if rsi.FilePath != "" {
 		reader, err := httpx.ReadFile(rsi.FilePath)
 		reader, err := httpx.ReadFile(rsi.FilePath)
 		if err != nil {
 		if err != nil {
-			handleError(w, nil, "Fail to read file", logger)
+			handleError(w, err, "Fail to read file", logger)
 			return
 			return
 		}
 		}
 		defer reader.Close()
 		defer reader.Close()
@@ -783,17 +784,17 @@ func configurationImportHandler(w http.ResponseWriter, r *http.Request) {
 		return
 		return
 	}
 	}
 	if rsi.Content != "" && rsi.FilePath != "" {
 	if rsi.Content != "" && rsi.FilePath != "" {
-		handleError(w, nil, "Invalid body: Cannot specify both content and file", logger)
+		handleError(w, errors.New("bad request"), "Invalid body: Cannot specify both content and file", logger)
 		return
 		return
 	} else if rsi.Content == "" && rsi.FilePath == "" {
 	} else if rsi.Content == "" && rsi.FilePath == "" {
-		handleError(w, nil, "Invalid body: must specify content or file", logger)
+		handleError(w, errors.New("bad request"), "Invalid body: must specify content or file", logger)
 		return
 		return
 	}
 	}
 	content := []byte(rsi.Content)
 	content := []byte(rsi.Content)
 	if rsi.FilePath != "" {
 	if rsi.FilePath != "" {
 		reader, err := httpx.ReadFile(rsi.FilePath)
 		reader, err := httpx.ReadFile(rsi.FilePath)
 		if err != nil {
 		if err != nil {
-			handleError(w, nil, "Fail to read file", logger)
+			handleError(w, err, "Fail to read file", logger)
 			return
 			return
 		}
 		}
 		defer reader.Close()
 		defer reader.Close()

+ 1 - 1
internal/server/server.go

@@ -94,7 +94,6 @@ func StartUp(Version, LoadFileType string) {
 	ruleProcessor = processor.NewRuleProcessor()
 	ruleProcessor = processor.NewRuleProcessor()
 	streamProcessor = processor.NewStreamProcessor()
 	streamProcessor = processor.NewStreamProcessor()
 	rulesetProcessor = processor.NewRulesetProcessor(ruleProcessor, streamProcessor)
 	rulesetProcessor = processor.NewRulesetProcessor(ruleProcessor, streamProcessor)
-	initRuleset()
 
 
 	// register all extensions
 	// register all extensions
 	for k, v := range components {
 	for k, v := range components {
@@ -113,6 +112,7 @@ func StartUp(Version, LoadFileType string) {
 		panic(err)
 		panic(err)
 	}
 	}
 	meta.Bind()
 	meta.Bind()
+	initRuleset()
 
 
 	registry = &RuleRegistry{internal: make(map[string]*rule.RuleState)}
 	registry = &RuleRegistry{internal: make(map[string]*rule.RuleState)}
 	//Start lookup tables
 	//Start lookup tables