Selaa lähdekoodia

fix(portable): allow to start from wrong portable plugin

1. If plugin is wrong, function instance may fail at the last step "send symbol to plugin". Clean up the function channel once failed.
2. Plugin Ins wrap the function of sendCmd so that we can wrap the error message with process information to make diagnose easier
3. Plugin Ins initialization is depending on the process status.

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 vuotta sitten
vanhempi
commit
55fee348f9

+ 1 - 1
internal/plugin/portable/factory.go

@@ -60,7 +60,7 @@ func (m *Manager) Function(name string) (api.Function, error) {
 	}
 	}
 	f, err := runtime.NewPortableFunc(name, meta)
 	f, err := runtime.NewPortableFunc(name, meta)
 	if err != nil {
 	if err != nil {
-		conf.Log.Errorf("Error creating function %v", err)
+		conf.Log.Errorf("Error creating portable function %v", err)
 		return nil, err
 		return nil, err
 	}
 	}
 	funcInsMap.Store(name, f)
 	funcInsMap.Store(name, f)

+ 3 - 3
internal/plugin/portable/runtime/connection.go

@@ -154,7 +154,7 @@ func (r *NanomsgReqRepChannel) Req(arg []byte) ([]byte, error) {
 		} else {
 		} else {
 			conf.Log.Debugf("receive response: %s", string(result))
 			conf.Log.Debugf("receive response: %s", string(result))
 		}
 		}
-		if result[0] == 'h' {
+		if len(result) > 0 && result[0] == 'h' {
 			conf.Log.Debugf("receive handshake response: %s", string(result))
 			conf.Log.Debugf("receive handshake response: %s", string(result))
 			continue
 			continue
 		}
 		}
@@ -255,13 +255,13 @@ func setSockOptions(sock mangos.Socket, sockOptions map[string]interface{}) {
 
 
 func listenWithRetry(sock mangos.Socket, url string) error {
 func listenWithRetry(sock mangos.Socket, url string) error {
 	var (
 	var (
-		retryCount    = 300
+		retryCount    = 5
 		retryInterval = 100
 		retryInterval = 100
 	)
 	)
 	for {
 	for {
 		err := sock.Listen(url)
 		err := sock.Listen(url)
 		if err == nil {
 		if err == nil {
-			conf.Log.Infof("start to listen at %s after %d tries", url, 301-retryCount)
+			conf.Log.Infof("start to listen at %s after %d tries", url, 5-retryCount)
 			return err
 			return err
 		}
 		}
 		retryCount--
 		retryCount--

+ 7 - 2
internal/plugin/portable/runtime/function.go

@@ -36,11 +36,11 @@ type PortableFunc struct {
 	isAgg      int // 0 - not calculate yet, 1 - no, 2 - yes
 	isAgg      int // 0 - not calculate yet, 1 - no, 2 - yes
 }
 }
 
 
-func NewPortableFunc(symbolName string, reg *PluginMeta) (*PortableFunc, error) {
+func NewPortableFunc(symbolName string, reg *PluginMeta) (_ *PortableFunc, e error) {
 	// Setup channel and route the data
 	// Setup channel and route the data
 	conf.Log.Infof("Start running portable function meta %+v", reg)
 	conf.Log.Infof("Start running portable function meta %+v", reg)
 	pm := GetPluginInsManager()
 	pm := GetPluginInsManager()
-	ins, err := pm.getOrStartProcess(reg, PortbleConf, false)
+	ins, err := pm.getOrStartProcess(reg, PortbleConf)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
@@ -51,6 +51,11 @@ func NewPortableFunc(symbolName string, reg *PluginMeta) (*PortableFunc, error)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
+	defer func() {
+		if e != nil {
+			dataCh.Close()
+		}
+	}()
 
 
 	// Start symbol
 	// Start symbol
 	c := &Control{
 	c := &Control{

+ 16 - 13
internal/plugin/portable/runtime/plugin_ins_manager.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -72,6 +72,14 @@ func NewPluginInsForTest(name string, ctrlChan ControlChannel) *PluginIns {
 	}
 	}
 }
 }
 
 
+func (i *PluginIns) sendCmd(jsonArg []byte) error {
+	err := i.ctrlChan.SendCmd(jsonArg)
+	if err != nil && i.process == nil {
+		return fmt.Errorf("plugin %s is not running sucessfully, please make sure it is valid", i.name)
+	}
+	return err
+}
+
 func (i *PluginIns) StartSymbol(ctx api.StreamContext, ctrl *Control) error {
 func (i *PluginIns) StartSymbol(ctx api.StreamContext, ctrl *Control) error {
 	arg, err := json.Marshal(ctrl)
 	arg, err := json.Marshal(ctrl)
 	if err != nil {
 	if err != nil {
@@ -85,7 +93,7 @@ func (i *PluginIns) StartSymbol(ctx api.StreamContext, ctrl *Control) error {
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
-	err = i.ctrlChan.SendCmd(jsonArg)
+	err = i.sendCmd(jsonArg)
 	if err == nil {
 	if err == nil {
 		i.Lock()
 		i.Lock()
 		i.commands[ctrl.Meta] = jsonArg
 		i.commands[ctrl.Meta] = jsonArg
@@ -108,7 +116,7 @@ func (i *PluginIns) StopSymbol(ctx api.StreamContext, ctrl *Control) error {
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
-	err = i.ctrlChan.SendCmd(jsonArg)
+	err = i.sendCmd(jsonArg)
 	if err == nil {
 	if err == nil {
 		referred := false
 		referred := false
 		i.Lock()
 		i.Lock()
@@ -181,7 +189,7 @@ func (p *pluginInsManager) CreateIns(pluginMeta *PluginMeta) {
 	defer p.Unlock()
 	defer p.Unlock()
 	if ins, ok := p.instances[pluginMeta.Name]; ok {
 	if ins, ok := p.instances[pluginMeta.Name]; ok {
 		if len(ins.commands) != 0 {
 		if len(ins.commands) != 0 {
-			go p.getOrStartProcess(pluginMeta, PortbleConf, true)
+			go p.getOrStartProcess(pluginMeta, PortbleConf)
 		}
 		}
 	}
 	}
 }
 }
@@ -193,7 +201,7 @@ func (p *pluginInsManager) CreateIns(pluginMeta *PluginMeta) {
 // During plugin delete/update, if the commands is not empty, keep the ins for next creation and restore
 // During plugin delete/update, if the commands is not empty, keep the ins for next creation and restore
 // 1. During creation, clean up those resources for any errors in defer immediately after the resource is created.
 // 1. During creation, clean up those resources for any errors in defer immediately after the resource is created.
 // 2. During plugin running, when detecting plugin process exit, clean up those resources for the current ins.
 // 2. During plugin running, when detecting plugin process exit, clean up those resources for the current ins.
-func (p *pluginInsManager) getOrStartProcess(pluginMeta *PluginMeta, pconf *PortableConfig, pluginCreation bool) (*PluginIns, error) {
+func (p *pluginInsManager) getOrStartProcess(pluginMeta *PluginMeta, pconf *PortableConfig) (_ *PluginIns, e error) {
 	p.Lock()
 	p.Lock()
 	defer p.Unlock()
 	defer p.Unlock()
 	var (
 	var (
@@ -207,7 +215,7 @@ func (p *pluginInsManager) getOrStartProcess(pluginMeta *PluginMeta, pconf *Port
 		p.instances[pluginMeta.Name] = ins
 		p.instances[pluginMeta.Name] = ins
 	}
 	}
 	// ins process has not run yet
 	// ins process has not run yet
-	if !pluginCreation && ins.ctrlChan != nil {
+	if ins.process != nil && ins.ctrlChan != nil {
 		return ins, nil
 		return ins, nil
 	}
 	}
 	// should only happen for first start, then the ctrl channel will keep running
 	// should only happen for first start, then the ctrl channel will keep running
@@ -217,11 +225,6 @@ func (p *pluginInsManager) getOrStartProcess(pluginMeta *PluginMeta, pconf *Port
 		if err != nil {
 		if err != nil {
 			return nil, fmt.Errorf("can't create new control channel: %s", err.Error())
 			return nil, fmt.Errorf("can't create new control channel: %s", err.Error())
 		}
 		}
-		defer func() {
-			if err != nil {
-				_ = ctrlChan.Close()
-			}
-		}()
 		ins.ctrlChan = ctrlChan
 		ins.ctrlChan = ctrlChan
 	}
 	}
 	// init or restart all need to run the process
 	// init or restart all need to run the process
@@ -259,7 +262,7 @@ func (p *pluginInsManager) getOrStartProcess(pluginMeta *PluginMeta, pconf *Port
 	process := cmd.Process
 	process := cmd.Process
 	conf.Log.Printf("plugin started pid: %d\n", process.Pid)
 	conf.Log.Printf("plugin started pid: %d\n", process.Pid)
 	defer func() {
 	defer func() {
-		if err != nil {
+		if e != nil {
 			_ = process.Kill()
 			_ = process.Kill()
 		}
 		}
 	}()
 	}()
@@ -295,7 +298,7 @@ func (p *pluginInsManager) getOrStartProcess(pluginMeta *PluginMeta, pconf *Port
 	conf.Log.Info("restore plugin symbols")
 	conf.Log.Info("restore plugin symbols")
 	for m, c := range ins.commands {
 	for m, c := range ins.commands {
 		go func(key Meta, jsonArg []byte) {
 		go func(key Meta, jsonArg []byte) {
-			e := ins.ctrlChan.SendCmd(jsonArg)
+			e := ins.sendCmd(jsonArg)
 			if e != nil {
 			if e != nil {
 				conf.Log.Errorf("send command to %v error: %v", key, e)
 				conf.Log.Errorf("send command to %v error: %v", key, e)
 			}
 			}

+ 2 - 2
internal/plugin/portable/runtime/sink.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -43,7 +43,7 @@ func (ps *PortableSink) Configure(props map[string]interface{}) error {
 func (ps *PortableSink) Open(ctx api.StreamContext) error {
 func (ps *PortableSink) Open(ctx api.StreamContext) error {
 	ctx.GetLogger().Infof("Start running portable sink %s with conf %+v", ps.symbolName, ps.props)
 	ctx.GetLogger().Infof("Start running portable sink %s with conf %+v", ps.symbolName, ps.props)
 	pm := GetPluginInsManager()
 	pm := GetPluginInsManager()
-	ins, err := pm.getOrStartProcess(ps.reg, PortbleConf, false)
+	ins, err := pm.getOrStartProcess(ps.reg, PortbleConf)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}

+ 2 - 2
internal/plugin/portable/runtime/source.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -44,7 +44,7 @@ func NewPortableSource(symbolName string, reg *PluginMeta) *PortableSource {
 func (ps *PortableSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
 func (ps *PortableSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
 	ctx.GetLogger().Infof("Start running portable source %s with datasource %s and conf %+v", ps.symbolName, ps.topic, ps.props)
 	ctx.GetLogger().Infof("Start running portable source %s with datasource %s and conf %+v", ps.symbolName, ps.topic, ps.props)
 	pm := GetPluginInsManager()
 	pm := GetPluginInsManager()
-	ins, err := pm.getOrStartProcess(ps.reg, PortbleConf, false)
+	ins, err := pm.getOrStartProcess(ps.reg, PortbleConf)
 	if err != nil {
 	if err != nil {
 		infra.DrainError(ctx, err, errCh)
 		infra.DrainError(ctx, err, errCh)
 		return
 		return