123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128 |
- # Copyright 2021-2023 EMQ Technologies Co., Ltd.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import json
- import logging
- import sys
- import threading
- import traceback
- from typing import Dict, Callable
- from . import reg, shared
- from .connection import PairChannel
- from .function import FunctionRuntime
- from .sink import SinkRuntime
- from .source import SourceRuntime
- from ..function import Function
- from ..sink import Sink
- from ..source import Source
- class PluginConfig:
- def __init__(self, name: str, sources: Dict[str, Callable[[], Source]],
- sinks: Dict[str, Callable[[], Sink]],
- functions: Dict[str, Callable[[], Function]]):
- self.name = name
- self.sources = sources
- self.sinks = sinks
- self.functions = functions
- def get(self, plugin_type: str, symbol_name: str):
- if plugin_type == shared.TYPE_SOURCE:
- return self.sources[symbol_name]
- elif plugin_type == shared.TYPE_SINK:
- return self.sinks[symbol_name]
- elif plugin_type == shared.TYPE_FUNC:
- return self.functions[symbol_name]
- else:
- return None
- conf: PluginConfig
- def start(c: PluginConfig):
- init_vars(c)
- global conf
- conf = c
- logging.info("starting plugin {}".format(c.name))
- ch = PairChannel(c.name, 0)
- ch.run(command_reply)
- logging.info("started plugin {}".format(c.name))
- def init_vars(c: PluginConfig):
- # if len(sys.argv) != 2:
- # msg = gettext('fail to init plugin, must pass exactly 2 args but got {}'.format(sys.argv))
- # raise ValueError(msg)
- # """TODO validation"""
- # arg = json.loads(sys.argv[1])
- # noinspection PyTypeChecker
- root = logging.getLogger()
- root.setLevel(logging.INFO)
- handler = logging.StreamHandler(sys.stdout)
- handler.setLevel(logging.DEBUG)
- formatter = logging.Formatter('%(asctime)s - %(pathname)s[line:%(lineno)d]'
- ' - %(levelname)s: %(message)s')
- handler.setFormatter(formatter)
- root.addHandler(handler)
- # noinspection PyTypeChecker
- def command_reply(req: bytes) -> bytes:
- # noinspection PyBroadException
- try:
- cmd = json.loads(req)
- logging.debug("receive command {}".format(cmd))
- ctrl = json.loads(cmd['arg'])
- logging.debug(ctrl)
- if cmd['cmd'] == shared.CMD_START:
- f = conf.get(ctrl['pluginType'], ctrl['symbolName'])
- if f is None:
- return b'symbol not found'
- s = f()
- if ctrl['pluginType'] == shared.TYPE_SOURCE:
- logging.info("running source {}".format(ctrl['symbolName']))
- runtime = SourceRuntime(ctrl, s)
- x = threading.Thread(target=runtime.run, daemon=True)
- x.start()
- elif ctrl['pluginType'] == shared.TYPE_SINK:
- logging.info("running sink {}".format(ctrl['symbolName']))
- # noinspection PyTypeChecker
- runtime = SinkRuntime(ctrl, s)
- x = threading.Thread(target=runtime.run, daemon=True)
- x.start()
- elif ctrl['pluginType'] == shared.TYPE_FUNC:
- logging.info("running function {}".format(ctrl['symbolName']))
- runtime = FunctionRuntime(ctrl, s)
- x = threading.Thread(target=runtime.run, daemon=True)
- x.start()
- else:
- return b'invalid plugin type'
- elif cmd['cmd'] == shared.CMD_STOP:
- regkey = f"{ctrl['meta']['ruleId']}_{ctrl['meta']['opId']}" \
- f"_{ctrl['meta']['instanceId']}_{ctrl['symbolName']}"
- logging.info("stopping {}".format(regkey))
- if reg.has(regkey):
- runtime = reg.get(regkey)
- if runtime.is_running():
- runtime.stop()
- else:
- logging.warning("symbol ", regkey, " not found")
- return b'ok'
- except Exception:
- var = traceback.format_exc()
- return str.encode(var)
|