connection.py 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. # Copyright 2021 EMQ Technologies Co., Ltd.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. import time
  16. from typing import Callable
  17. from pynng import Req0, Push0, Pull0, Timeout
  18. class PairChannel:
  19. def __init__(self, name: str, typ: int):
  20. s = Req0(resend_time=0)
  21. """TODO options"""
  22. if typ == 0:
  23. url = "ipc:///tmp/plugin_{}.ipc".format(name)
  24. else:
  25. url = "ipc:///tmp/func_{}.ipc".format(name)
  26. s.dial(url)
  27. self.sock = s
  28. """ run this in a new thread"""
  29. def run(self, reply_func: Callable[[bytes], bytes]):
  30. self.sock.send(b'handshake')
  31. while True:
  32. try:
  33. msg = self.sock.recv()
  34. reply = reply_func(msg)
  35. self.sock.send(reply)
  36. except Timeout:
  37. print('pair timeout')
  38. pass
  39. def close(self):
  40. self.sock.close()
  41. class SourceChannel:
  42. def __init__(self, meta: dict):
  43. s = Push0()
  44. url = "ipc:///tmp/{}_{}_{}.ipc".format(meta['ruleId'], meta['opId'], meta['instanceId'])
  45. logging.info(url)
  46. s.dial(url)
  47. self.sock = s
  48. def send(self, data: bytes):
  49. self.sock.send(data)
  50. def close(self):
  51. self.sock.close()
  52. class SinkChannel:
  53. def __init__(self, meta: dict):
  54. s = Pull0()
  55. url = "ipc:///tmp/{}_{}_{}.ipc".format(meta['ruleId'], meta['opId'], meta['instanceId'])
  56. logging.info(url)
  57. listen_with_retry(s, url)
  58. self.sock = s
  59. def recv(self) -> bytes:
  60. return self.sock.recv()
  61. def close(self):
  62. self.sock.close()
  63. def listen_with_retry(sock, url: str):
  64. retry_count = 10
  65. retry_interval = 0.05
  66. while True:
  67. # noinspection PyBroadException
  68. try:
  69. sock.listen(url)
  70. break
  71. except Exception:
  72. retry_count -= 1
  73. if retry_count < 0:
  74. raise
  75. time.sleep(retry_interval)