sink.py 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. # Copyright 2021 EMQ Technologies Co., Ltd.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. import traceback
  16. from . import reg
  17. from .connection import SinkChannel
  18. from .symbol import SymbolRuntime, parse_context
  19. from ..sink import Sink
  20. class SinkRuntime(SymbolRuntime):
  21. def __init__(self, ctrl: dict, s: Sink):
  22. ctx = parse_context(ctrl)
  23. config = {}
  24. if 'config' in ctrl:
  25. config = ctrl['config']
  26. s.configure(config)
  27. ch = SinkChannel(ctrl['meta'])
  28. self.s = s
  29. self.ctx = ctx
  30. self.ch = ch
  31. self.running = False
  32. self.key = f"{ctrl['meta']['ruleId']}_{ctrl['meta']['opId']}" \
  33. f"_{ctrl['meta']['instanceId']}_{ctrl['symbolName']}"
  34. def run(self):
  35. logging.info('start running sink')
  36. # noinspection PyBroadException
  37. try:
  38. self.s.open(self.ctx)
  39. self.running = True
  40. reg.setr(self.key, self)
  41. while True:
  42. msg = self.ch.recv()
  43. self.s.collect(self.ctx, msg)
  44. except Exception:
  45. """two occasions: normal stop will close socket to raise an error
  46. OR stopped by unexpected error"""
  47. if self.running:
  48. logging.error(traceback.format_exc())
  49. finally:
  50. if self.running:
  51. self.stop()
  52. def stop(self):
  53. self.running = False
  54. # noinspection PyBroadException
  55. try:
  56. self.s.close(self.ctx)
  57. self.ch.close()
  58. reg.delete(self.key)
  59. except Exception:
  60. logging.error(traceback.format_exc())
  61. def is_running(self) -> bool:
  62. return self.running