123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109 |
- # Copyright 2021 EMQ Technologies Co., Ltd.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import json
- import logging
- import traceback
- from . import reg
- from .connection import PairChannel
- from .contextimpl import ContextImpl
- from .symbol import SymbolRuntime
- from ..function import Function
- class FunctionRuntime(SymbolRuntime):
- def __init__(self, ctrl: dict, s: Function):
- ch = PairChannel(ctrl['symbolName'], 1)
- self.s = s
- self.ch = ch
- self.running = False
- self.key = "func_{}".format(ctrl['symbolName'])
- self.funcs = {}
- def run(self):
- self.running = True
- reg.setr(self.key, self)
- # noinspection PyBroadException
- try:
- self.ch.run(self.do_run)
- except Exception:
- if self.running:
- logging.error(traceback.format_exc())
- finally:
- self.stop()
- def do_run(self, req: bytes):
- # noinspection PyBroadException
- try:
- c = json.loads(req)
- logging.debug("running func with ", c)
- name = c['func']
- if name == "Validate":
- err = self.s.validate(c['arg'])
- if err != "":
- return encode_reply(False, err)
- else:
- return encode_reply(True, "")
- elif name == "Exec":
- args = c['arg']
- if isinstance(args, list) is False or len(args) < 1:
- return encode_reply(False, 'invalid arg')
- fmeta = json.loads(args[-1])
- if 'ruleId' in fmeta and 'opId' in fmeta and 'instanceId' in fmeta \
- and 'funcId' in fmeta:
- key = f"{fmeta['ruleId']}_{fmeta['opId']}_{fmeta['instanceId']}" \
- f"_{fmeta['funcId']}"
- if key in self.funcs:
- fctx = self.funcs[key]
- else:
- fctx = ContextImpl(fmeta)
- self.funcs[key] = fctx
- else:
- return encode_reply(False,
- f'invalid arg: {fmeta} ruleId, opId, instanceId and funcId'
- f' are required')
- r = self.s.exec(args[:-1], fctx)
- return encode_reply(True, r)
- elif name == "IsAggregate":
- r = self.s.is_aggregate()
- return encode_reply(True, r)
- else:
- return encode_reply(False, "invalid func {}".format(name))
- except Exception:
- """two occasions: normal stop will close socket to raise an error
- OR stopped by unexpected error"""
- if self.running:
- logging.error(traceback.format_exc())
- return encode_reply(False, traceback.format_exc())
- def stop(self):
- self.running = False
- # noinspection PyBroadException
- try:
- self.ch.close()
- reg.delete(self.key)
- except Exception:
- logging.error(traceback.format_exc())
- def is_running(self) -> bool:
- return self.running
- def encode_reply(state: bool, arg: any):
- try:
- return str.encode(json.dumps({'state': state, 'result': arg}))
- except Exception:
- return str.encode(json.dumps({'state': False, 'result': traceback.format_exc()}))
|