connection.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. # Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. import time
  16. from typing import Callable
  17. from pynng import Req0, Push0, Pull0, Timeout
  18. class PairChannel:
  19. def __init__(self, name: str, typ: int):
  20. s = Req0(resend_time=0)
  21. """TODO options"""
  22. if typ == 0:
  23. url = "ipc:///tmp/plugin_{}.ipc".format(name)
  24. else:
  25. url = "ipc:///tmp/func_{}.ipc".format(name)
  26. logging.info("dialing {}".format(url))
  27. try:
  28. dial_with_retry(s, url)
  29. except Exception as e:
  30. logging.info("control/function channel {} cannot created {}".format(url, e))
  31. exit(0)
  32. self.sock = s
  33. """ run this in a new thread"""
  34. def run(self, reply_func: Callable[[bytes], bytes]):
  35. self.sock.send(b'handshake')
  36. while True:
  37. try:
  38. msg = self.sock.recv()
  39. reply = reply_func(msg)
  40. self.sock.send(reply)
  41. except Timeout:
  42. print('pair timeout')
  43. pass
  44. def close(self):
  45. self.sock.close()
  46. class SourceChannel:
  47. def __init__(self, meta: dict):
  48. s = Push0(send_timeout=1000)
  49. url = "ipc:///tmp/{}_{}_{}.ipc".format(meta['ruleId'], meta['opId'], meta['instanceId'])
  50. logging.info(url)
  51. dial_with_retry(s, url)
  52. self.sock = s
  53. def send(self, data: bytes):
  54. self.sock.send(data)
  55. def close(self):
  56. self.sock.close()
  57. class SinkChannel:
  58. def __init__(self, meta: dict):
  59. s = Pull0()
  60. url = "ipc:///tmp/{}_{}_{}.ipc".format(meta['ruleId'], meta['opId'], meta['instanceId'])
  61. logging.info(url)
  62. listen_with_retry(s, url)
  63. self.sock = s
  64. def recv(self) -> bytes:
  65. return self.sock.recv()
  66. def close(self):
  67. self.sock.close()
  68. def listen_with_retry(sock, url: str):
  69. retry_count = 10
  70. retry_interval = 0.05
  71. while True:
  72. # noinspection PyBroadException
  73. try:
  74. sock.listen(url)
  75. break
  76. except Exception:
  77. retry_count -= 1
  78. if retry_count < 0:
  79. raise
  80. time.sleep(retry_interval)
  81. def dial_with_retry(sock, url: str):
  82. retry_count = 50
  83. retry_interval = 0.1
  84. while True:
  85. try:
  86. sock.dial(url, block=True)
  87. break
  88. except Exception as e:
  89. logging.debug("dial error {}".format(e))
  90. retry_count -= 1
  91. if retry_count < 0:
  92. raise
  93. time.sleep(retry_interval)