source.py 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. # Copyright 2021 EMQ Technologies Co., Ltd.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. import traceback
  16. from . import reg
  17. from .connection import SourceChannel
  18. from .symbol import parse_context, SymbolRuntime
  19. from ..source import Source
  20. class SourceRuntime(SymbolRuntime):
  21. def __init__(self, ctrl: dict, s: Source):
  22. ctx = parse_context(ctrl)
  23. ds = ""
  24. config = {}
  25. if 'datasource' in ctrl:
  26. ds = ctrl['datasource']
  27. if 'config' in ctrl:
  28. config = ctrl['config']
  29. s.configure(ds, config)
  30. ch = SourceChannel(ctrl['meta'])
  31. ctx.set_emitter(ch)
  32. key = f"{ctrl['meta']['ruleId']}_{ctrl['meta']['opId']}" \
  33. f"_{ctrl['meta']['instanceId']}_{ctrl['symbolName']}"
  34. self.s = s
  35. self.ctx = ctx
  36. self.ch = ch
  37. self.running = False
  38. self.key = key
  39. def run(self):
  40. logging.info('start running source')
  41. self.running = True
  42. reg.setr(self.key, self)
  43. # noinspection PyBroadException
  44. try:
  45. self.s.open(self.ctx)
  46. except Exception:
  47. """two occasions: normal stop will close socket to raise an error OR\
  48. stopped by unexpected error"""
  49. if self.running:
  50. logging.error(traceback.format_exc())
  51. finally:
  52. if self.running:
  53. self.stop()
  54. def stop(self):
  55. self.running = False
  56. # noinspection PyBroadException
  57. try:
  58. self.s.close(self.ctx)
  59. self.ch.close()
  60. reg.delete(self.key)
  61. except Exception:
  62. logging.error(traceback.format_exc())
  63. def is_running(self) -> bool:
  64. return self.running