plugin.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. # Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import json
  15. import logging
  16. import sys
  17. import threading
  18. import traceback
  19. from typing import Dict, Callable
  20. from . import reg, shared
  21. from .connection import PairChannel
  22. from .function import FunctionRuntime
  23. from .sink import SinkRuntime
  24. from .source import SourceRuntime
  25. from ..function import Function
  26. from ..sink import Sink
  27. from ..source import Source
  28. class PluginConfig:
  29. def __init__(self, name: str, sources: Dict[str, Callable[[], Source]],
  30. sinks: Dict[str, Callable[[], Sink]],
  31. functions: Dict[str, Callable[[], Function]]):
  32. self.name = name
  33. self.sources = sources
  34. self.sinks = sinks
  35. self.functions = functions
  36. def get(self, plugin_type: str, symbol_name: str):
  37. if plugin_type == shared.TYPE_SOURCE:
  38. return self.sources[symbol_name]
  39. elif plugin_type == shared.TYPE_SINK:
  40. return self.sinks[symbol_name]
  41. elif plugin_type == shared.TYPE_FUNC:
  42. return self.functions[symbol_name]
  43. else:
  44. return None
  45. conf: PluginConfig
  46. def start(c: PluginConfig):
  47. init_vars(c)
  48. global conf
  49. conf = c
  50. logging.info("starting plugin {}".format(c.name))
  51. ch = PairChannel(c.name, 0)
  52. ch.run(command_reply)
  53. logging.info("started plugin {}".format(c.name))
  54. def init_vars(c: PluginConfig):
  55. # if len(sys.argv) != 2:
  56. # msg = gettext('fail to init plugin, must pass exactly 2 args but got {}'.format(sys.argv))
  57. # raise ValueError(msg)
  58. # """TODO validation"""
  59. # arg = json.loads(sys.argv[1])
  60. # noinspection PyTypeChecker
  61. root = logging.getLogger()
  62. root.setLevel(logging.INFO)
  63. handler = logging.StreamHandler(sys.stdout)
  64. handler.setLevel(logging.DEBUG)
  65. formatter = logging.Formatter('%(asctime)s - %(pathname)s[line:%(lineno)d]'
  66. ' - %(levelname)s: %(message)s')
  67. handler.setFormatter(formatter)
  68. root.addHandler(handler)
  69. # noinspection PyTypeChecker
  70. def command_reply(req: bytes) -> bytes:
  71. # noinspection PyBroadException
  72. try:
  73. cmd = json.loads(req)
  74. logging.debug("receive command {}".format(cmd))
  75. ctrl = json.loads(cmd['arg'])
  76. logging.debug(ctrl)
  77. if cmd['cmd'] == shared.CMD_START:
  78. f = conf.get(ctrl['pluginType'], ctrl['symbolName'])
  79. if f is None:
  80. return b'symbol not found'
  81. s = f()
  82. if ctrl['pluginType'] == shared.TYPE_SOURCE:
  83. logging.info("running source {}".format(ctrl['symbolName']))
  84. runtime = SourceRuntime(ctrl, s)
  85. x = threading.Thread(target=runtime.run, daemon=True)
  86. x.start()
  87. elif ctrl['pluginType'] == shared.TYPE_SINK:
  88. logging.info("running sink {}".format(ctrl['symbolName']))
  89. # noinspection PyTypeChecker
  90. runtime = SinkRuntime(ctrl, s)
  91. x = threading.Thread(target=runtime.run, daemon=True)
  92. x.start()
  93. elif ctrl['pluginType'] == shared.TYPE_FUNC:
  94. logging.info("running function {}".format(ctrl['symbolName']))
  95. runtime = FunctionRuntime(ctrl, s)
  96. x = threading.Thread(target=runtime.run, daemon=True)
  97. x.start()
  98. else:
  99. return b'invalid plugin type'
  100. elif cmd['cmd'] == shared.CMD_STOP:
  101. regkey = f"{ctrl['meta']['ruleId']}_{ctrl['meta']['opId']}" \
  102. f"_{ctrl['meta']['instanceId']}_{ctrl['symbolName']}"
  103. logging.info("stopping {}".format(regkey))
  104. if reg.has(regkey):
  105. runtime = reg.get(regkey)
  106. if runtime.is_running():
  107. runtime.stop()
  108. else:
  109. logging.warning("symbol ", regkey, " not found")
  110. return b'ok'
  111. except Exception:
  112. var = traceback.format_exc()
  113. return str.encode(var)