瀏覽代碼

fix(sdk): disable req resend (#1144)

By default, the request will be resent after 1 minute. This makes the timeline in chaos when not receiving data in more than 1 minute.

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
ngjaying 3 年之前
父節點
當前提交
7e917a106d
共有 2 個文件被更改,包括 12 次插入6 次删除
  1. 3 1
      sdk/go/connection/connection.go
  2. 9 5
      sdk/python/ekuiper/runtime/connection.py

+ 3 - 1
sdk/go/connection/connection.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2022 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -93,6 +93,7 @@ func CreateControlChannel(pluginName string) (ControlChannel, error) {
 		return nil, fmt.Errorf("can't get new req socket: %s", err)
 		return nil, fmt.Errorf("can't get new req socket: %s", err)
 	}
 	}
 	setSockOptions(sock)
 	setSockOptions(sock)
+	sock.SetOption(mangos.OptionRetryTime, 0)
 	url := fmt.Sprintf("ipc:///tmp/plugin_%s.ipc", pluginName)
 	url := fmt.Sprintf("ipc:///tmp/plugin_%s.ipc", pluginName)
 	if err = sock.Dial(url); err != nil {
 	if err = sock.Dial(url); err != nil {
 		return nil, fmt.Errorf("can't dial on req socket: %s", err.Error())
 		return nil, fmt.Errorf("can't dial on req socket: %s", err.Error())
@@ -125,6 +126,7 @@ func CreateFuncChannel(symbolName string) (DataInOutChannel, error) {
 		return nil, fmt.Errorf("can't get new req socket: %s", err)
 		return nil, fmt.Errorf("can't get new req socket: %s", err)
 	}
 	}
 	setSockOptions(sock)
 	setSockOptions(sock)
+	sock.SetOption(mangos.OptionRetryTime, 0)
 	url := fmt.Sprintf("ipc:///tmp/func_%s.ipc", symbolName)
 	url := fmt.Sprintf("ipc:///tmp/func_%s.ipc", symbolName)
 	if err = sock.Dial(url); err != nil {
 	if err = sock.Dial(url); err != nil {
 		return nil, fmt.Errorf("can't dial on req socket: %s", err.Error())
 		return nil, fmt.Errorf("can't dial on req socket: %s", err.Error())

+ 9 - 5
sdk/python/ekuiper/runtime/connection.py

@@ -15,13 +15,13 @@ import logging
 import time
 import time
 from typing import Callable
 from typing import Callable
 
 
-from pynng import Req0, Push0, Pull0
+from pynng import Req0, Push0, Pull0, Timeout
 
 
 
 
 class PairChannel:
 class PairChannel:
 
 
     def __init__(self, name: str, typ: int):
     def __init__(self, name: str, typ: int):
-        s = Req0()
+        s = Req0(resend_time=0)
         """TODO options"""
         """TODO options"""
         if typ == 0:
         if typ == 0:
             url = "ipc:///tmp/plugin_{}.ipc".format(name)
             url = "ipc:///tmp/plugin_{}.ipc".format(name)
@@ -35,9 +35,13 @@ class PairChannel:
     def run(self, reply_func: Callable[[bytes], bytes]):
     def run(self, reply_func: Callable[[bytes], bytes]):
         self.sock.send(b'handshake')
         self.sock.send(b'handshake')
         while True:
         while True:
-            msg = self.sock.recv()
-            reply = reply_func(msg)
-            self.sock.send(reply)
+            try:
+                msg = self.sock.recv()
+                reply = reply_func(msg)
+                self.sock.send(reply)
+            except Timeout:
+                print('pair timeout')
+                pass
 
 
     def close(self):
     def close(self):
         self.sock.close()
         self.sock.close()