Переглянути джерело

opt(portable): remove function handshake for seamingless reconnect

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 роки тому
батько
коміт
c190fbebaf

+ 4 - 11
internal/plugin/portable/runtime/connection.go

@@ -115,7 +115,6 @@ type DataOutChannel interface {
 }
 }
 
 
 type DataReqChannel interface {
 type DataReqChannel interface {
-	Handshake() error
 	Req([]byte) ([]byte, error)
 	Req([]byte) ([]byte, error)
 	Closable
 	Closable
 }
 }
@@ -134,12 +133,15 @@ func (r *NanomsgReqRepChannel) Req(arg []byte) ([]byte, error) {
 	defer r.Unlock()
 	defer r.Unlock()
 	if err := r.sock.Send(arg); err != nil {
 	if err := r.sock.Send(arg); err != nil {
 		if err == mangos.ErrProtoState { // resend if protocol state wrong, because of plugin restart or other problems
 		if err == mangos.ErrProtoState { // resend if protocol state wrong, because of plugin restart or other problems
-			err = r.Handshake()
+			prev, err := r.sock.Recv()
 			if err == nil {
 			if err == nil {
+				conf.Log.Warnf("discard previous response: %s", string(prev))
 				err = r.sock.Send(arg)
 				err = r.sock.Send(arg)
 				if err == nil {
 				if err == nil {
 					return r.sock.Recv()
 					return r.sock.Recv()
 				}
 				}
+			} else {
+				return nil, fmt.Errorf("can't send message on control rep socket: %s", err.Error())
 			}
 			}
 		}
 		}
 		return nil, fmt.Errorf("can't send message on function rep socket: %s", err.Error())
 		return nil, fmt.Errorf("can't send message on function rep socket: %s", err.Error())
@@ -147,15 +149,6 @@ func (r *NanomsgReqRepChannel) Req(arg []byte) ([]byte, error) {
 	return r.sock.Recv()
 	return r.sock.Recv()
 }
 }
 
 
-// Handshake should only be called once
-func (r *NanomsgReqRepChannel) Handshake() error {
-	_, err := r.sock.Recv()
-	if err != nil && err != mangos.ErrProtoState {
-		return err
-	}
-	return nil
-}
-
 func CreateSourceChannel(ctx api.StreamContext) (DataInChannel, error) {
 func CreateSourceChannel(ctx api.StreamContext) (DataInChannel, error) {
 	var (
 	var (
 		sock mangos.Socket
 		sock mangos.Socket

+ 10 - 7
internal/plugin/portable/runtime/function.go

@@ -59,11 +59,6 @@ func NewPortableFunc(symbolName string, reg *PluginMeta) (*PortableFunc, error)
 		return nil, err
 		return nil, err
 	}
 	}
 
 
-	err = dataCh.Handshake()
-	if err != nil {
-		return nil, fmt.Errorf("function %s handshake error: %v", reg.Name, err)
-	}
-
 	return &PortableFunc{
 	return &PortableFunc{
 		symbolName: reg.Name,
 		symbolName: reg.Name,
 		reg:        reg,
 		reg:        reg,
@@ -109,8 +104,16 @@ func (f *PortableFunc) Exec(args []interface{}, ctx api.FunctionContext) (interf
 	}
 	}
 	fr := &FuncReply{}
 	fr := &FuncReply{}
 	err = json.Unmarshal(res, fr)
 	err = json.Unmarshal(res, fr)
-	if err != nil {
-		return err, false
+	if err != nil { // retry if receive handshake after restart function process
+		ctx.GetLogger().Warnf("Failed to unmarshal function result %s", string(res))
+		res, err = f.dataCh.Req(jsonArg)
+		if err != nil {
+			return err, false
+		}
+		err = json.Unmarshal(res, fr)
+		if err != nil {
+			return err, false
+		}
 	}
 	}
 	if !fr.State {
 	if !fr.State {
 		if fr.Result != nil {
 		if fr.Result != nil {