function.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. # Copyright 2021 EMQ Technologies Co., Ltd.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import json
  15. import logging
  16. import traceback
  17. from . import reg
  18. from .connection import PairChannel
  19. from .contextimpl import ContextImpl
  20. from .symbol import SymbolRuntime
  21. from ..function import Function
  22. class FunctionRuntime(SymbolRuntime):
  23. def __init__(self, ctrl: dict, s: Function):
  24. ch = PairChannel(ctrl['symbolName'], 1)
  25. self.s = s
  26. self.ch = ch
  27. self.running = False
  28. self.key = "func_{}".format(ctrl['symbolName'])
  29. self.funcs = {}
  30. def run(self):
  31. reg.setr(self.key, self)
  32. # noinspection PyBroadException
  33. try:
  34. self.ch.run(self.do_run)
  35. except Exception:
  36. if self.running:
  37. logging.error(traceback.format_exc())
  38. finally:
  39. self.stop()
  40. def do_run(self, req: bytes):
  41. # noinspection PyBroadException
  42. try:
  43. c = json.loads(req)
  44. logging.debug("running func with ", c)
  45. name = c['func']
  46. if name == "Validate":
  47. err = self.s.validate(c['arg'])
  48. if err != "":
  49. return encode_reply(False, err)
  50. else:
  51. return encode_reply(True, "")
  52. elif name == "Exec":
  53. args = c['arg']
  54. if isinstance(args, list) is False or len(args) < 1:
  55. return encode_reply(False, 'invalid arg')
  56. fmeta = json.loads(args[-1])
  57. if 'ruleId' in fmeta and 'opId' in fmeta and 'instanceId' in fmeta \
  58. and 'funcId' in fmeta:
  59. key = f"{fmeta['ruleId']}_{fmeta['opId']}_{fmeta['instanceId']}" \
  60. f"_{fmeta['funcId']}"
  61. if key in self.funcs:
  62. fctx = self.funcs[key]
  63. else:
  64. fctx = ContextImpl(fmeta)
  65. self.funcs[key] = fctx
  66. else:
  67. return encode_reply(False,
  68. f'invalid arg: {fmeta} ruleId, opId, instanceId and funcId'
  69. f' are required')
  70. r = self.s.exec(args[:-1], fctx)
  71. return encode_reply(True, r)
  72. elif name == "IsAggregate":
  73. r = self.s.is_aggregate()
  74. return encode_reply(True, r)
  75. else:
  76. return encode_reply(False, "invalid func {}".format(name))
  77. except Exception:
  78. """two occasions: normal stop will close socket to raise an error
  79. OR stopped by unexpected error"""
  80. if self.running:
  81. logging.error(traceback.format_exc())
  82. return encode_reply(False, traceback.format_exc())
  83. def stop(self):
  84. self.running = False
  85. # noinspection PyBroadException
  86. try:
  87. self.ch.close()
  88. reg.delete(self.key)
  89. except Exception:
  90. logging.error(traceback.format_exc())
  91. def is_running(self) -> bool:
  92. return self.running
  93. def encode_reply(state: bool, arg: str):
  94. return str.encode(json.dumps({'state': state, 'result': arg}))