|
@@ -1,4 +1,4 @@
|
|
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
|
|
|
|
|
|
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
|
|
//
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// you may not use this file except in compliance with the License.
|
|
@@ -131,22 +131,29 @@ func (r *NanomsgReqRepChannel) Close() error {
|
|
func (r *NanomsgReqRepChannel) Req(arg []byte) ([]byte, error) {
|
|
func (r *NanomsgReqRepChannel) Req(arg []byte) ([]byte, error) {
|
|
r.Lock()
|
|
r.Lock()
|
|
defer r.Unlock()
|
|
defer r.Unlock()
|
|
- if err := r.sock.Send(arg); err != nil {
|
|
|
|
- if err == mangos.ErrProtoState { // resend if protocol state wrong, because of plugin restart or other problems
|
|
|
|
- prev, err := r.sock.Recv()
|
|
|
|
- if err == nil {
|
|
|
|
- conf.Log.Warnf("discard previous response: %s", string(prev))
|
|
|
|
- err = r.sock.Send(arg)
|
|
|
|
- if err == nil {
|
|
|
|
- return r.sock.Recv()
|
|
|
|
- }
|
|
|
|
- } else {
|
|
|
|
- return nil, fmt.Errorf("can't send message on control rep socket: %s", err.Error())
|
|
|
|
- }
|
|
|
|
|
|
+ conf.Log.Debugf("send request: %s", string(arg))
|
|
|
|
+ err := r.sock.Send(arg)
|
|
|
|
+ // resend if protocol state wrong, because of plugin restart or other problems
|
|
|
|
+ if err == mangos.ErrProtoState {
|
|
|
|
+ conf.Log.Debugf("send request protestate error %s", err.Error())
|
|
|
|
+ var prev []byte
|
|
|
|
+ prev, err = r.sock.Recv()
|
|
|
|
+ if err == nil {
|
|
|
|
+ conf.Log.Warnf("discard previous response: %s", string(prev))
|
|
|
|
+ conf.Log.Debugf("resend request: %s", string(arg))
|
|
|
|
+ err = r.sock.Send(arg)
|
|
}
|
|
}
|
|
|
|
+ }
|
|
|
|
+ if err != nil {
|
|
return nil, fmt.Errorf("can't send message on function rep socket: %s", err.Error())
|
|
return nil, fmt.Errorf("can't send message on function rep socket: %s", err.Error())
|
|
}
|
|
}
|
|
- return r.sock.Recv()
|
|
|
|
|
|
+ result, e := r.sock.Recv()
|
|
|
|
+ if e != nil {
|
|
|
|
+ conf.Log.Errorf("can't receive: %s", e.Error())
|
|
|
|
+ } else {
|
|
|
|
+ conf.Log.Debugf("receive response: %s", string(result))
|
|
|
|
+ }
|
|
|
|
+ return result, e
|
|
}
|
|
}
|
|
|
|
|
|
func CreateSourceChannel(ctx api.StreamContext) (DataInChannel, error) {
|
|
func CreateSourceChannel(ctx api.StreamContext) (DataInChannel, error) {
|