function.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. # Copyright 2021 EMQ Technologies Co., Ltd.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import json
  15. import logging
  16. import traceback
  17. from . import reg
  18. from .connection import PairChannel
  19. from .contextimpl import ContextImpl
  20. from .symbol import SymbolRuntime
  21. from ..function import Function
  22. class FunctionRuntime(SymbolRuntime):
  23. def __init__(self, ctrl: dict, s: Function):
  24. ch = PairChannel(ctrl['symbolName'], 1)
  25. self.s = s
  26. self.ch = ch
  27. self.running = False
  28. self.key = "func_{}".format(ctrl['symbolName'])
  29. self.funcs = {}
  30. def run(self):
  31. self.running = True
  32. reg.setr(self.key, self)
  33. # noinspection PyBroadException
  34. try:
  35. self.ch.run(self.do_run)
  36. except Exception:
  37. if self.running:
  38. logging.error(traceback.format_exc())
  39. finally:
  40. self.stop()
  41. def do_run(self, req: bytes):
  42. # noinspection PyBroadException
  43. try:
  44. c = json.loads(req)
  45. logging.debug("running func with ", c)
  46. name = c['func']
  47. if name == "Validate":
  48. err = self.s.validate(c['arg'])
  49. if err != "":
  50. return encode_reply(False, err)
  51. else:
  52. return encode_reply(True, "")
  53. elif name == "Exec":
  54. args = c['arg']
  55. if isinstance(args, list) is False or len(args) < 1:
  56. return encode_reply(False, 'invalid arg')
  57. fmeta = json.loads(args[-1])
  58. if 'ruleId' in fmeta and 'opId' in fmeta and 'instanceId' in fmeta \
  59. and 'funcId' in fmeta:
  60. key = f"{fmeta['ruleId']}_{fmeta['opId']}_{fmeta['instanceId']}" \
  61. f"_{fmeta['funcId']}"
  62. if key in self.funcs:
  63. fctx = self.funcs[key]
  64. else:
  65. fctx = ContextImpl(fmeta)
  66. self.funcs[key] = fctx
  67. else:
  68. return encode_reply(False,
  69. f'invalid arg: {fmeta} ruleId, opId, instanceId and funcId'
  70. f' are required')
  71. r = self.s.exec(args[:-1], fctx)
  72. return encode_reply(True, r)
  73. elif name == "IsAggregate":
  74. r = self.s.is_aggregate()
  75. return encode_reply(True, r)
  76. else:
  77. return encode_reply(False, "invalid func {}".format(name))
  78. except Exception:
  79. """two occasions: normal stop will close socket to raise an error
  80. OR stopped by unexpected error"""
  81. if self.running:
  82. logging.error(traceback.format_exc())
  83. return encode_reply(False, traceback.format_exc())
  84. def stop(self):
  85. self.running = False
  86. # noinspection PyBroadException
  87. try:
  88. self.ch.close()
  89. reg.delete(self.key)
  90. except Exception:
  91. logging.error(traceback.format_exc())
  92. def is_running(self) -> bool:
  93. return self.running
  94. def encode_reply(state: bool, arg: any):
  95. try:
  96. return str.encode(json.dumps({'state': state, 'result': arg}))
  97. except Exception:
  98. return str.encode(json.dumps({'state': False, 'result': traceback.format_exc()}))