Selaa lähdekoodia

fix(native plugin): load native plugin when set up (#1306)

* fix(native plugin): load native plugin when set up

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>

* fix(plugin): avoid load the plugin in UT

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>

* fix(plugin): try another tdengine client in function test

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>

* fix(plugin): use defer to do clean up job

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>
superxan 2 vuotta sitten
vanhempi
commit
f052f49262
2 muutettua tiedostoa jossa 63 lisäystä ja 38 poistoa
  1. 1 1
      .github/workflows/build_packages.yaml
  2. 62 37
      internal/plugin/native/manager.go

+ 1 - 1
.github/workflows/build_packages.yaml

@@ -228,7 +228,7 @@ jobs:
             curl \
             ${ip_address}:9081/plugins/${plugin_type} \
             -X POST \
-            -d "{\"name\":\"${plugin_name}\", \"file\":\"file:///var/plugins/${os}/${plugin_type}/${plugin_name}_amd64.zip\", \"shellParas\": [\"2.0.3.1\"]}"
+            -d "{\"name\":\"${plugin_name}\", \"file\":\"file:///var/plugins/${os}/${plugin_type}/${plugin_name}_amd64.zip\", \"shellParas\": [\"2.4.0.18\"]}"
         elif [ "${plugin_name}" = "image" ]; then
              curl \
              ${ip_address}:9081/plugins/${plugin_type} \

+ 62 - 37
internal/plugin/native/manager.go

@@ -77,6 +77,8 @@ func InitManager() (*Manager, error) {
 	if err != nil {
 		return nil, fmt.Errorf("error when opening db: %v", err)
 	}
+	registry := &Manager{symbols: make(map[string]string), db: db, pluginDir: pluginDir, etcDir: etcDir, runtime: make(map[string]plugin.Symbol)}
+	manager = registry
 	plugins := make([]map[string]string, 3)
 	for i := range plugin2.PluginTypes {
 		names, err := findAll(plugin2.PluginType(i), pluginDir)
@@ -85,7 +87,7 @@ func InitManager() (*Manager, error) {
 		}
 		plugins[i] = names
 	}
-	registry := &Manager{plugins: plugins, symbols: make(map[string]string), db: db, pluginDir: pluginDir, etcDir: etcDir, runtime: make(map[string]plugin.Symbol)}
+	registry.plugins = plugins
 
 	for pf := range plugins[plugin2.FUNCTION] {
 		l := make([]string, 0)
@@ -97,7 +99,6 @@ func InitManager() (*Manager, error) {
 			registry.storeSymbols(pf, []string{pf})
 		}
 	}
-	manager = registry
 	return registry, nil
 }
 
@@ -112,6 +113,12 @@ func findAll(t plugin2.PluginType, pluginDir string) (result map[string]string,
 	for _, file := range files {
 		baseName := filepath.Base(file.Name())
 		if strings.HasSuffix(baseName, ".so") {
+			//load the plugins when ekuiper set up
+			if !conf.IsTesting {
+				if _, err := manager.loadRuntime(t, "", path.Join(dir, baseName)); err != nil {
+					continue
+				}
+			}
 			n, v := parseName(baseName)
 			result[n] = v
 		}
@@ -234,7 +241,7 @@ func (rr *Manager) Register(t plugin2.PluginType, j plugin2.Plugin) error {
 
 	var err error
 	zipPath := path.Join(rr.pluginDir, name+".zip")
-	var unzipFiles []string
+
 	//clean up: delete zip file and unzip files in error
 	defer os.Remove(zipPath)
 	//download
@@ -259,14 +266,11 @@ func (rr *Manager) Register(t plugin2.PluginType, j plugin2.Plugin) error {
 	}
 
 	//unzip and copy to destination
-	unzipFiles, version, err := rr.install(t, name, zipPath, shellParas)
+	version, err := rr.install(t, name, zipPath, shellParas)
 	if err == nil && len(j.GetSymbols()) > 0 {
 		err = rr.db.Set(name, j.GetSymbols())
 	}
 	if err != nil { //Revert for any errors
-		if t == plugin2.SOURCE && len(unzipFiles) == 1 { //source that only copy so file
-			os.RemoveAll(unzipFiles[0])
-		}
 		if len(j.GetSymbols()) > 0 {
 			rr.removeSymbols(j.GetSymbols())
 		} else {
@@ -402,13 +406,13 @@ func (rr *Manager) GetPluginInfo(t plugin2.PluginType, name string) (map[string]
 	return nil, false
 }
 
-func (rr *Manager) install(t plugin2.PluginType, name, src string, shellParas []string) ([]string, string, error) {
+func (rr *Manager) install(t plugin2.PluginType, name, src string, shellParas []string) (string, error) {
 	var filenames []string
 	var tempPath = path.Join(rr.pluginDir, "temp", plugin2.PluginTypes[t], name)
 	defer os.RemoveAll(tempPath)
 	r, err := zip.OpenReader(src)
 	if err != nil {
-		return filenames, "", err
+		return "", err
 	}
 	defer r.Close()
 
@@ -420,10 +424,11 @@ func (rr *Manager) install(t plugin2.PluginType, name, src string, shellParas []
 		}
 	}
 	if len(shellParas) != 0 && !haveInstallFile {
-		return filenames, "", fmt.Errorf("have shell parameters : %s but no install.sh file", shellParas)
+		return "", fmt.Errorf("have shell parameters : %s but no install.sh file", shellParas)
 	}
 
 	soPrefix := regexp.MustCompile(fmt.Sprintf(`^((%s)|(%s))(@.*)?\.so$`, name, ucFirst(name)))
+	var soPath string
 	var yamlFile, yamlPath, version string
 	expFiles := 1
 	if t == plugin2.SOURCE {
@@ -432,12 +437,19 @@ func (rr *Manager) install(t plugin2.PluginType, name, src string, shellParas []
 		expFiles = 2
 	}
 	var revokeFiles []string
+	defer func() {
+		if err != nil {
+			for _, f := range revokeFiles {
+				os.RemoveAll(f)
+			}
+		}
+	}()
 	for _, file := range r.File {
 		fileName := file.Name
 		if yamlFile == fileName {
 			err = filex.UnzipTo(file, yamlPath)
 			if err != nil {
-				return filenames, "", err
+				return version, err
 			}
 			revokeFiles = append(revokeFiles, yamlPath)
 			filenames = append(filenames, yamlPath)
@@ -449,10 +461,10 @@ func (rr *Manager) install(t plugin2.PluginType, name, src string, shellParas []
 				revokeFiles = append(revokeFiles, jsonPath)
 			}
 		} else if soPrefix.Match([]byte(fileName)) {
-			soPath := path.Join(rr.pluginDir, plugin2.PluginTypes[t], fileName)
+			soPath = path.Join(rr.pluginDir, plugin2.PluginTypes[t], fileName)
 			err = filex.UnzipTo(file, soPath)
 			if err != nil {
-				return filenames, "", err
+				return version, err
 			}
 			filenames = append(filenames, soPath)
 			revokeFiles = append(revokeFiles, soPath)
@@ -460,17 +472,18 @@ func (rr *Manager) install(t plugin2.PluginType, name, src string, shellParas []
 		} else if strings.HasPrefix(fileName, "etc/") {
 			err = filex.UnzipTo(file, path.Join(rr.etcDir, plugin2.PluginTypes[t], strings.Replace(fileName, "etc", name, 1)))
 			if err != nil {
-				return filenames, "", err
+				return version, err
 			}
 		} else { //unzip other files
 			err = filex.UnzipTo(file, path.Join(tempPath, fileName))
 			if err != nil {
-				return filenames, "", err
+				return version, err
 			}
 		}
 	}
 	if len(filenames) != expFiles {
-		return filenames, version, fmt.Errorf("invalid zip file: so file or conf file is missing")
+		err = fmt.Errorf("invalid zip file: so file or conf file is missing")
+		return version, err
 	} else if haveInstallFile {
 		//run install script if there is
 		spath := path.Join(tempPath, "install.sh")
@@ -486,23 +499,28 @@ func (rr *Manager) install(t plugin2.PluginType, name, src string, shellParas []
 		err := cmd.Run()
 
 		if err != nil {
-			for _, f := range revokeFiles {
-				os.RemoveAll(f)
-			}
 			conf.Log.Infof(`err:%v stdout:%s stderr:%s`, err, outb.String(), errb.String())
-			return filenames, "", err
-		} else {
-			conf.Log.Infof(`run install script:%s`, outb.String())
-			conf.Log.Infof("install %s plugin %s", plugin2.PluginTypes[t], name)
+			return version, err
+		}
+		conf.Log.Infof(`run install script:%s`, outb.String())
+	}
+
+	if !conf.IsTesting {
+		// load the runtime first
+		_, err = manager.loadRuntime(t, "", soPath)
+		if err != nil {
+			return version, err
 		}
 	}
-	return filenames, version, nil
+
+	conf.Log.Infof("install %s plugin %s", plugin2.PluginTypes[t], name)
+	return version, nil
 }
 
 // binder factory implementations
 
 func (rr *Manager) Source(name string) (api.Source, error) {
-	nf, err := rr.loadRuntime(plugin2.SOURCE, name)
+	nf, err := rr.loadRuntime(plugin2.SOURCE, name, "")
 	if err != nil {
 		return nil, err
 	}
@@ -520,7 +538,7 @@ func (rr *Manager) Source(name string) (api.Source, error) {
 }
 
 func (rr *Manager) Sink(name string) (api.Sink, error) {
-	nf, err := rr.loadRuntime(plugin2.SINK, name)
+	nf, err := rr.loadRuntime(plugin2.SINK, name, "")
 	if err != nil {
 		return nil, err
 	}
@@ -540,7 +558,7 @@ func (rr *Manager) Sink(name string) (api.Sink, error) {
 }
 
 func (rr *Manager) Function(name string) (api.Function, error) {
-	nf, err := rr.loadRuntime(plugin2.FUNCTION, name)
+	nf, err := rr.loadRuntime(plugin2.FUNCTION, name, "")
 	if err != nil {
 		return nil, err
 	}
@@ -573,7 +591,7 @@ func (rr *Manager) ConvName(name string) (string, bool) {
 }
 
 // If not found, return nil,nil; Other errors return nil, err
-func (rr *Manager) loadRuntime(t plugin2.PluginType, name string) (plugin.Symbol, error) {
+func (rr *Manager) loadRuntime(t plugin2.PluginType, name, soFilepath string) (plugin.Symbol, error) {
 	ut := ucFirst(name)
 	ptype := plugin2.PluginTypes[t]
 	key := ptype + "/" + name
@@ -582,23 +600,30 @@ func (rr *Manager) loadRuntime(t plugin2.PluginType, name string) (plugin.Symbol
 	nf, ok := rr.runtime[key]
 	rr.RUnlock()
 	if !ok {
-		mod, err := rr.getSoFilePath(t, name, false)
-		if err != nil {
-			conf.Log.Debugf(fmt.Sprintf("cannot find the native plugin in path: %v", err))
-			return nil, nil
+		var soPath string
+		if soFilepath != "" {
+			soPath = soFilepath
+		} else {
+			mod, err := rr.getSoFilePath(t, name, false)
+			if err != nil {
+				conf.Log.Warnf(fmt.Sprintf("cannot find the native plugin %s in path: %v", name, err))
+				return nil, nil
+			}
+			soPath = mod
 		}
-		conf.Log.Debugf("Opening plugin %s", mod)
-		plug, err := plugin.Open(mod)
+		conf.Log.Debugf("Opening plugin %s", soPath)
+		plug, err := plugin.Open(soPath)
 		if err != nil {
-			return nil, fmt.Errorf("cannot open %s: %v", mod, err)
+			conf.Log.Errorf(fmt.Sprintf("plugin %s open error: %v", name, err))
+			return nil, fmt.Errorf("cannot open %s: %v", soPath, err)
 		}
-		conf.Log.Debugf("Successfully open plugin %s", mod)
+		conf.Log.Debugf("Successfully open plugin %s", soPath)
 		nf, err = plug.Lookup(ut)
 		if err != nil {
 			conf.Log.Debugf(fmt.Sprintf("cannot find symbol %s, please check if it is exported", name))
 			return nil, nil
 		}
-		conf.Log.Debugf("Successfully look-up plugin %s", mod)
+		conf.Log.Debugf("Successfully look-up plugin %s", soPath)
 		rr.Lock()
 		rr.runtime[key] = nf
 		rr.Unlock()