ソースを参照

opt(portable): hot reload support

When deleting portable plugin, the rules are kept running. When recreating the plugin, the rule will recover automatically

This is done by record the symbol commands and replay when new plugin instance starts.

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 年 前
コミット
e33b0adca9

+ 7 - 6
internal/plugin/portable/manager.go

@@ -143,8 +143,8 @@ func (m *Manager) doRegister(name string, pi *PluginInfo, isInit bool) error {
 			}
 			}
 		}
 		}
 	}
 	}
-
 	conf.Log.Infof("Installed portable plugin %s successfully", name)
 	conf.Log.Infof("Installed portable plugin %s successfully", name)
+	runtime.GetPluginInsManager().CreateIns(&pi.PluginMeta)
 	return nil
 	return nil
 }
 }
 
 
@@ -340,11 +340,6 @@ func (m *Manager) Delete(name string) error {
 	if !ok {
 	if !ok {
 		return fmt.Errorf("portable plugin %s is not found", name)
 		return fmt.Errorf("portable plugin %s is not found", name)
 	}
 	}
-	pm := runtime.GetPluginInsManager()
-	err := pm.Kill(name)
-	if err != nil {
-		conf.Log.Errorf("fail to kill portable plugin %s process, please try to kill it manually", name)
-	}
 	// unregister the plugin
 	// unregister the plugin
 	m.reg.Delete(name)
 	m.reg.Delete(name)
 	// delete files and uninstall metas
 	// delete files and uninstall metas
@@ -367,5 +362,11 @@ func (m *Manager) Delete(name string) error {
 		os.Remove(p)
 		os.Remove(p)
 	}
 	}
 	_ = os.RemoveAll(path.Join(m.pluginDir, name))
 	_ = os.RemoveAll(path.Join(m.pluginDir, name))
+	// Kill the process in the end, and return error if it cannot be deleted
+	pm := runtime.GetPluginInsManager()
+	err := pm.Kill(name)
+	if err != nil {
+		return fmt.Errorf("fail to kill portable plugin %s process, please try to kill it manually", name)
+	}
 	return nil
 	return nil
 }
 }

+ 23 - 2
internal/plugin/portable/runtime/connection.go

@@ -62,6 +62,15 @@ func (r *NanomsgReqChannel) SendCmd(arg []byte) error {
 	r.Lock()
 	r.Lock()
 	defer r.Unlock()
 	defer r.Unlock()
 	if err := r.sock.Send(arg); err != nil {
 	if err := r.sock.Send(arg); err != nil {
+		if err == mangos.ErrProtoState {
+			_, err = r.sock.Recv()
+			if err == nil {
+				err = r.sock.Send(arg)
+				if err == nil {
+					return nil
+				}
+			}
+		}
 		return fmt.Errorf("can't send message on control rep socket: %s", err.Error())
 		return fmt.Errorf("can't send message on control rep socket: %s", err.Error())
 	}
 	}
 	if msg, err := r.sock.Recv(); err != nil {
 	if msg, err := r.sock.Recv(); err != nil {
@@ -85,7 +94,7 @@ func (r *NanomsgReqChannel) Handshake() error {
 		return err
 		return err
 	}
 	}
 	_, err = r.sock.Recv()
 	_, err = r.sock.Recv()
-	if err != nil {
+	if err != nil && err != mangos.ErrProtoState {
 		return err
 		return err
 	}
 	}
 	err = r.sock.SetOption(mangos.OptionRecvDeadline, t)
 	err = r.sock.SetOption(mangos.OptionRecvDeadline, t)
@@ -124,6 +133,15 @@ func (r *NanomsgReqRepChannel) Req(arg []byte) ([]byte, error) {
 	r.Lock()
 	r.Lock()
 	defer r.Unlock()
 	defer r.Unlock()
 	if err := r.sock.Send(arg); err != nil {
 	if err := r.sock.Send(arg); err != nil {
+		if err == mangos.ErrProtoState { // resend if protocol state wrong, because of plugin restart or other problems
+			err = r.Handshake()
+			if err == nil {
+				err = r.sock.Send(arg)
+				if err == nil {
+					return r.sock.Recv()
+				}
+			}
+		}
 		return nil, fmt.Errorf("can't send message on function rep socket: %s", err.Error())
 		return nil, fmt.Errorf("can't send message on function rep socket: %s", err.Error())
 	}
 	}
 	return r.sock.Recv()
 	return r.sock.Recv()
@@ -132,7 +150,10 @@ func (r *NanomsgReqRepChannel) Req(arg []byte) ([]byte, error) {
 // Handshake should only be called once
 // Handshake should only be called once
 func (r *NanomsgReqRepChannel) Handshake() error {
 func (r *NanomsgReqRepChannel) Handshake() error {
 	_, err := r.sock.Recv()
 	_, err := r.sock.Recv()
-	return err
+	if err != nil && err != mangos.ErrProtoState {
+		return err
+	}
+	return nil
 }
 }
 
 
 func CreateSourceChannel(ctx api.StreamContext) (DataInChannel, error) {
 func CreateSourceChannel(ctx api.StreamContext) (DataInChannel, error) {

+ 2 - 2
internal/plugin/portable/runtime/connection_test.go

@@ -112,8 +112,8 @@ func TestControlCh(t *testing.T) {
 
 
 	// 5. no handshake
 	// 5. no handshake
 	err = ch.SendCmd(okMsg)
 	err = ch.SendCmd(okMsg)
-	if err == nil || err.Error() != "can't send message on control rep socket: incorrect protocol state" {
-		t.Errorf("5th process: send command should have error but got %v", err)
+	if err != nil {
+		t.Errorf("5th process: send command should auto handshake but got %v", err)
 	}
 	}
 	err = ch.Handshake()
 	err = ch.Handshake()
 	if err != nil {
 	if err != nil {

+ 1 - 1
internal/plugin/portable/runtime/function.go

@@ -36,7 +36,7 @@ func NewPortableFunc(symbolName string, reg *PluginMeta) (*PortableFunc, error)
 	// Setup channel and route the data
 	// Setup channel and route the data
 	conf.Log.Infof("Start running portable function meta %+v", reg)
 	conf.Log.Infof("Start running portable function meta %+v", reg)
 	pm := GetPluginInsManager()
 	pm := GetPluginInsManager()
-	ins, err := pm.getOrStartProcess(reg, PortbleConf)
+	ins, err := pm.getOrStartProcess(reg, PortbleConf, false)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}

+ 113 - 45
internal/plugin/portable/runtime/plugin_ins_manager.go

@@ -35,25 +35,40 @@ var PortbleConf = &PortableConfig{
 	SendTimeout: 1000,
 	SendTimeout: 1000,
 }
 }
 
 
+// PluginIns created at two scenarios
+// 1. At runtime, plugin is created/updated: in order to be able to reload rules that already uses previous ins
+// 2. At system start/restart, when plugin is used by a rule
+// Once created, never deleted until delete plugin command or system shutdown
 type PluginIns struct {
 type PluginIns struct {
-	process      *os.Process
-	ctrlChan     ControlChannel
-	runningCount int
-	name         string
+	sync.RWMutex
+	name     string
+	ctrlChan ControlChannel // the same lifecycle as pluginIns, once created keep listening
+	// audit the commands, so that when restarting the plugin, we can replay the commands
+	commands map[Meta][]byte
+	process  *os.Process // created when used by rule and deleted when no rule uses it
 }
 }
 
 
 func NewPluginIns(name string, ctrlChan ControlChannel, process *os.Process) *PluginIns {
 func NewPluginIns(name string, ctrlChan ControlChannel, process *os.Process) *PluginIns {
-	// if process is not passed, it is run in simulator mode. Then do not count running.
-	// so that it won't be automatically close.
-	rc := 0
-	if process == nil {
-		rc = 1
+	return &PluginIns{
+		process:  process,
+		ctrlChan: ctrlChan,
+		name:     name,
+		commands: make(map[Meta][]byte),
 	}
 	}
+}
+
+func NewPluginInsForTest(name string, ctrlChan ControlChannel) *PluginIns {
+	commands := make(map[Meta][]byte)
+	commands[Meta{
+		RuleId:     "test",
+		OpId:       "test",
+		InstanceId: 0,
+	}] = []byte{}
 	return &PluginIns{
 	return &PluginIns{
-		process:      process,
-		ctrlChan:     ctrlChan,
-		runningCount: rc,
-		name:         name,
+		process:  nil,
+		ctrlChan: ctrlChan,
+		name:     name,
+		commands: commands,
 	}
 	}
 }
 }
 
 
@@ -72,7 +87,9 @@ func (i *PluginIns) StartSymbol(ctx api.StreamContext, ctrl *Control) error {
 	}
 	}
 	err = i.ctrlChan.SendCmd(jsonArg)
 	err = i.ctrlChan.SendCmd(jsonArg)
 	if err == nil {
 	if err == nil {
-		i.runningCount++
+		i.Lock()
+		i.commands[ctrl.Meta] = jsonArg
+		i.Unlock()
 		ctx.GetLogger().Infof("started symbol %s", ctrl.SymbolName)
 		ctx.GetLogger().Infof("started symbol %s", ctrl.SymbolName)
 	}
 	}
 	return err
 	return err
@@ -92,24 +109,30 @@ func (i *PluginIns) StopSymbol(ctx api.StreamContext, ctrl *Control) error {
 		return err
 		return err
 	}
 	}
 	err = i.ctrlChan.SendCmd(jsonArg)
 	err = i.ctrlChan.SendCmd(jsonArg)
-	i.runningCount--
-	ctx.GetLogger().Infof("stopped symbol %s", ctrl.SymbolName)
-	if i.runningCount == 0 {
-		err := GetPluginInsManager().Kill(i.name)
-		if err != nil {
-			ctx.GetLogger().Infof("fail to stop plugin %s: %v", i.name, err)
-			return err
+	if err == nil {
+		referred := false
+		i.Lock()
+		delete(i.commands, ctrl.Meta)
+		referred = len(i.commands) > 0
+		i.Unlock()
+		ctx.GetLogger().Infof("stopped symbol %s", ctrl.SymbolName)
+		if !referred {
+			err := GetPluginInsManager().Kill(i.name)
+			if err != nil {
+				ctx.GetLogger().Infof("fail to stop plugin %s: %v", i.name, err)
+				return err
+			}
+			ctx.GetLogger().Infof("stop plugin %s", i.name)
 		}
 		}
-		ctx.GetLogger().Infof("stop plugin %s", i.name)
 	}
 	}
 	return err
 	return err
 }
 }
 
 
+// Stop intentionally
 func (i *PluginIns) Stop() error {
 func (i *PluginIns) Stop() error {
 	var err error
 	var err error
-	if i.ctrlChan != nil {
-		err = i.ctrlChan.Close()
-	}
+	i.RLock()
+	defer i.RUnlock()
 	if i.process != nil { // will also trigger process exit clean up
 	if i.process != nil { // will also trigger process exit clean up
 		err = i.process.Kill()
 		err = i.process.Kill()
 	}
 	}
@@ -138,6 +161,7 @@ func (p *pluginInsManager) getPluginIns(name string) (*PluginIns, bool) {
 	return ins, ok
 	return ins, ok
 }
 }
 
 
+// deletePluginIns should only run when there is no state aka. commands
 func (p *pluginInsManager) deletePluginIns(name string) {
 func (p *pluginInsManager) deletePluginIns(name string) {
 	p.Lock()
 	p.Lock()
 	defer p.Unlock()
 	defer p.Unlock()
@@ -151,27 +175,56 @@ func (p *pluginInsManager) AddPluginIns(name string, ins *PluginIns) {
 	p.instances[name] = ins
 	p.instances[name] = ins
 }
 }
 
 
+// CreateIns Run when plugin is created/updated
+func (p *pluginInsManager) CreateIns(pluginMeta *PluginMeta) {
+	p.Lock()
+	defer p.Unlock()
+	if ins, ok := p.instances[pluginMeta.Name]; ok {
+		if len(ins.commands) != 0 {
+			go p.getOrStartProcess(pluginMeta, PortbleConf, true)
+		}
+	}
+}
+
 // getOrStartProcess Control the plugin process lifecycle.
 // getOrStartProcess Control the plugin process lifecycle.
 // Need to manage the resources: instances map, control socket, plugin process
 // Need to manage the resources: instances map, control socket, plugin process
+// May be called at plugin creation or restart with previous state(ctrlCh, commands)
+// PluginIns is created by plugin manager but started by rule/funcop.
+// During plugin delete/update, if the commands is not empty, keep the ins for next creation and restore
 // 1. During creation, clean up those resources for any errors in defer immediately after the resource is created.
 // 1. During creation, clean up those resources for any errors in defer immediately after the resource is created.
 // 2. During plugin running, when detecting plugin process exit, clean up those resources for the current ins.
 // 2. During plugin running, when detecting plugin process exit, clean up those resources for the current ins.
-func (p *pluginInsManager) getOrStartProcess(pluginMeta *PluginMeta, pconf *PortableConfig) (*PluginIns, error) {
+func (p *pluginInsManager) getOrStartProcess(pluginMeta *PluginMeta, pconf *PortableConfig, pluginCreation bool) (*PluginIns, error) {
 	p.Lock()
 	p.Lock()
 	defer p.Unlock()
 	defer p.Unlock()
-	if ins, ok := p.instances[pluginMeta.Name]; ok {
-		return ins, nil
+	var (
+		ins *PluginIns
+		ok  bool
+	)
+	// run initialization for firstly creating plugin instance
+	ins, ok = p.instances[pluginMeta.Name]
+	if !ok {
+		ins = NewPluginIns(pluginMeta.Name, nil, nil)
+		p.instances[pluginMeta.Name] = ins
 	}
 	}
-
-	conf.Log.Infof("create control channel")
-	ctrlChan, err := CreateControlChannel(pluginMeta.Name)
-	if err != nil {
-		return nil, fmt.Errorf("can't create new control channel: %s", err.Error())
+	// ins process has not run yet
+	if !pluginCreation && len(ins.commands) != 0 {
+		return ins, nil
 	}
 	}
-	defer func() {
+	// should only happen for first start, then the ctrl channel will keep running
+	if ins.ctrlChan == nil {
+		conf.Log.Infof("create control channel")
+		ctrlChan, err := CreateControlChannel(pluginMeta.Name)
 		if err != nil {
 		if err != nil {
-			_ = ctrlChan.Close()
+			return nil, fmt.Errorf("can't create new control channel: %s", err.Error())
 		}
 		}
-	}()
+		defer func() {
+			if err != nil {
+				_ = ctrlChan.Close()
+			}
+		}()
+		ins.ctrlChan = ctrlChan
+	}
+	// init or restart all need to run the process
 	conf.Log.Infof("executing plugin")
 	conf.Log.Infof("executing plugin")
 	jsonArg, err := json.Marshal(pconf)
 	jsonArg, err := json.Marshal(pconf)
 	if err != nil {
 	if err != nil {
@@ -216,23 +269,39 @@ func (p *pluginInsManager) getOrStartProcess(pluginMeta *PluginMeta, pconf *Port
 			conf.Log.Printf("plugin executable %s stops with error %v", pluginMeta.Executable, err)
 			conf.Log.Printf("plugin executable %s stops with error %v", pluginMeta.Executable, err)
 		}
 		}
 		// must make sure the plugin ins is not cleaned up yet by checking the process identity
 		// must make sure the plugin ins is not cleaned up yet by checking the process identity
+		// clean up for stop unintentionally
 		if ins, ok := p.getPluginIns(pluginMeta.Name); ok && ins.process == cmd.Process {
 		if ins, ok := p.getPluginIns(pluginMeta.Name); ok && ins.process == cmd.Process {
-			if ins.ctrlChan != nil {
-				_ = ins.ctrlChan.Close()
+			ins.Lock()
+			if len(ins.commands) == 0 {
+				if ins.ctrlChan != nil {
+					_ = ins.ctrlChan.Close()
+				}
+				p.deletePluginIns(pluginMeta.Name)
 			}
 			}
-			p.deletePluginIns(pluginMeta.Name)
+			ins.process = nil
+			ins.Unlock()
 		}
 		}
 		return nil
 		return nil
 	})
 	})
-
 	conf.Log.Println("waiting handshake")
 	conf.Log.Println("waiting handshake")
-	err = ctrlChan.Handshake()
+	err = ins.ctrlChan.Handshake()
 	if err != nil {
 	if err != nil {
 		return nil, fmt.Errorf("plugin %s control handshake error: %v", pluginMeta.Executable, err)
 		return nil, fmt.Errorf("plugin %s control handshake error: %v", pluginMeta.Executable, err)
 	}
 	}
-	ins := NewPluginIns(pluginMeta.Name, ctrlChan, process)
+	ins.process = process
 	p.instances[pluginMeta.Name] = ins
 	p.instances[pluginMeta.Name] = ins
 	conf.Log.Println("plugin start running")
 	conf.Log.Println("plugin start running")
+	// restore symbols by sending commands when restarting plugin
+	conf.Log.Info("restore plugin symbols")
+	for m, c := range ins.commands {
+		go func(key Meta, jsonArg []byte) {
+			e := ins.ctrlChan.SendCmd(jsonArg)
+			if e != nil {
+				conf.Log.Errorf("send command to %v error: %v", key, e)
+			}
+		}(m, c)
+	}
+
 	return ins, nil
 	return ins, nil
 }
 }
 
 
@@ -242,9 +311,9 @@ func (p *pluginInsManager) Kill(name string) error {
 	var err error
 	var err error
 	if ins, ok := p.instances[name]; ok {
 	if ins, ok := p.instances[name]; ok {
 		err = ins.Stop()
 		err = ins.Stop()
-		delete(p.instances, name)
 	} else {
 	} else {
-		return fmt.Errorf("instance %s not found", name)
+		conf.Log.Warnf("instance %s not found when deleting", name)
+		return nil
 	}
 	}
 	return err
 	return err
 }
 }
@@ -255,7 +324,6 @@ func (p *pluginInsManager) KillAll() error {
 	for _, ins := range p.instances {
 	for _, ins := range p.instances {
 		_ = ins.Stop()
 		_ = ins.Stop()
 	}
 	}
-	p.instances = make(map[string]*PluginIns)
 	return nil
 	return nil
 }
 }
 
 

+ 4 - 8
internal/plugin/portable/runtime/plugin_ins_manager_test.go

@@ -50,11 +50,7 @@ func TestPluginInstance(t *testing.T) {
 		t.Errorf("can't ack handshake: %s", err.Error())
 		t.Errorf("can't ack handshake: %s", err.Error())
 		return
 		return
 	}
 	}
-	ins := &PluginIns{
-		name:     "test",
-		process:  nil,
-		ctrlChan: ch,
-	}
+	ins := NewPluginIns("test", ch, nil)
 	var tests = []struct {
 	var tests = []struct {
 		c  *Control
 		c  *Control
 		sj string
 		sj string
@@ -63,7 +59,7 @@ func TestPluginInstance(t *testing.T) {
 		{
 		{
 			c: &Control{
 			c: &Control{
 				SymbolName: "symbol1",
 				SymbolName: "symbol1",
-				Meta: &Meta{
+				Meta: Meta{
 					RuleId:     "rule1",
 					RuleId:     "rule1",
 					OpId:       "op1",
 					OpId:       "op1",
 					InstanceId: 0,
 					InstanceId: 0,
@@ -77,7 +73,7 @@ func TestPluginInstance(t *testing.T) {
 		}, {
 		}, {
 			c: &Control{
 			c: &Control{
 				SymbolName: "symbol2",
 				SymbolName: "symbol2",
-				Meta: &Meta{
+				Meta: Meta{
 					RuleId:     "rule1",
 					RuleId:     "rule1",
 					OpId:       "op2",
 					OpId:       "op2",
 					InstanceId: 0,
 					InstanceId: 0,
@@ -89,7 +85,7 @@ func TestPluginInstance(t *testing.T) {
 		}, {
 		}, {
 			c: &Control{
 			c: &Control{
 				SymbolName: "symbol3",
 				SymbolName: "symbol3",
-				Meta: &Meta{
+				Meta: Meta{
 					RuleId:     "rule1",
 					RuleId:     "rule1",
 					OpId:       "op3",
 					OpId:       "op3",
 					InstanceId: 0,
 					InstanceId: 0,

+ 2 - 2
internal/plugin/portable/runtime/shared.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -33,7 +33,7 @@ type FuncMeta struct {
 
 
 type Control struct {
 type Control struct {
 	SymbolName string                 `json:"symbolName"`
 	SymbolName string                 `json:"symbolName"`
-	Meta       *Meta                  `json:"meta,omitempty"`
+	Meta       Meta                   `json:"meta"`
 	PluginType string                 `json:"pluginType"`
 	PluginType string                 `json:"pluginType"`
 	DataSource string                 `json:"dataSource,omitempty"`
 	DataSource string                 `json:"dataSource,omitempty"`
 	Config     map[string]interface{} `json:"config,omitempty"`
 	Config     map[string]interface{} `json:"config,omitempty"`

+ 3 - 3
internal/plugin/portable/runtime/sink.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -43,7 +43,7 @@ func (ps *PortableSink) Configure(props map[string]interface{}) error {
 func (ps *PortableSink) Open(ctx api.StreamContext) error {
 func (ps *PortableSink) Open(ctx api.StreamContext) error {
 	ctx.GetLogger().Infof("Start running portable sink %s with conf %+v", ps.symbolName, ps.props)
 	ctx.GetLogger().Infof("Start running portable sink %s with conf %+v", ps.symbolName, ps.props)
 	pm := GetPluginInsManager()
 	pm := GetPluginInsManager()
-	ins, err := pm.getOrStartProcess(ps.reg, PortbleConf)
+	ins, err := pm.getOrStartProcess(ps.reg, PortbleConf, false)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
@@ -51,7 +51,7 @@ func (ps *PortableSink) Open(ctx api.StreamContext) error {
 
 
 	// Control: send message to plugin to ask starting symbol
 	// Control: send message to plugin to ask starting symbol
 	c := &Control{
 	c := &Control{
-		Meta: &Meta{
+		Meta: Meta{
 			RuleId:     ctx.GetRuleId(),
 			RuleId:     ctx.GetRuleId(),
 			OpId:       ctx.GetOpId(),
 			OpId:       ctx.GetOpId(),
 			InstanceId: ctx.GetInstanceId(),
 			InstanceId: ctx.GetInstanceId(),

+ 3 - 2
internal/plugin/portable/runtime/source.go

@@ -44,7 +44,7 @@ func NewPortableSource(symbolName string, reg *PluginMeta) *PortableSource {
 func (ps *PortableSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
 func (ps *PortableSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
 	ctx.GetLogger().Infof("Start running portable source %s with datasource %s and conf %+v", ps.symbolName, ps.topic, ps.props)
 	ctx.GetLogger().Infof("Start running portable source %s with datasource %s and conf %+v", ps.symbolName, ps.topic, ps.props)
 	pm := GetPluginInsManager()
 	pm := GetPluginInsManager()
-	ins, err := pm.getOrStartProcess(ps.reg, PortbleConf)
+	ins, err := pm.getOrStartProcess(ps.reg, PortbleConf, false)
 	if err != nil {
 	if err != nil {
 		infra.DrainError(ctx, err, errCh)
 		infra.DrainError(ctx, err, errCh)
 		return
 		return
@@ -60,7 +60,7 @@ func (ps *PortableSource) Open(ctx api.StreamContext, consumer chan<- api.Source
 
 
 	// Control: send message to plugin to ask starting symbol
 	// Control: send message to plugin to ask starting symbol
 	c := &Control{
 	c := &Control{
-		Meta: &Meta{
+		Meta: Meta{
 			RuleId:     ctx.GetRuleId(),
 			RuleId:     ctx.GetRuleId(),
 			OpId:       ctx.GetOpId(),
 			OpId:       ctx.GetOpId(),
 			InstanceId: ctx.GetInstanceId(),
 			InstanceId: ctx.GetInstanceId(),
@@ -72,6 +72,7 @@ func (ps *PortableSource) Open(ctx api.StreamContext, consumer chan<- api.Source
 	}
 	}
 	err = ins.StartSymbol(ctx, c)
 	err = ins.StartSymbol(ctx, c)
 	if err != nil {
 	if err != nil {
+		ctx.GetLogger().Error(err)
 		infra.DrainError(ctx, err, errCh)
 		infra.DrainError(ctx, err, errCh)
 		_ = dataCh.Close()
 		_ = dataCh.Close()
 		return
 		return

+ 1 - 1
tools/plugin_server/plugin_test_server.go

@@ -102,7 +102,7 @@ func startPluginIns(info *portable.PluginInfo) (*runtime.PluginIns, error) {
 		return nil, fmt.Errorf("plugin %s control handshake error: %v", info.Name, err)
 		return nil, fmt.Errorf("plugin %s control handshake error: %v", info.Name, err)
 	}
 	}
 	conf.Log.Println("plugin start running")
 	conf.Log.Println("plugin start running")
-	return runtime.NewPluginIns(info.Name, ctrlChan, nil), nil
+	return runtime.NewPluginInsForTest(info.Name, ctrlChan), nil
 }
 }
 
 
 func createRestServer(ip string, port int) *http.Server {
 func createRestServer(ip string, port int) *http.Server {