浏览代码

feat(portable): python sdk and example

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 3 年之前
父节点
当前提交
f259fa7b15

+ 1 - 1
internal/plugin/portable/model.go

@@ -28,7 +28,7 @@ type PluginInfo struct {
 
 var langMap = map[string]bool{
 	"go":     true,
-	"python": false,
+	"python": true,
 }
 
 // Validate TODO validate duplication of source, sink and functions

+ 24 - 0
sdk/python/ekuiper/__init__.py

@@ -0,0 +1,24 @@
+#  Copyright 2021 EMQ Technologies Co., Ltd.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+from ekuiper.function import Function
+from ekuiper.runtime import plugin
+from ekuiper.runtime.context import Context
+from ekuiper.runtime.plugin import PluginConfig
+from ekuiper.sink import Sink
+from ekuiper.source import Source
+
+__all__ = [
+    'plugin', 'PluginConfig', 'Source', 'Sink', 'Function', 'Context'
+]

+ 37 - 0
sdk/python/ekuiper/function.py

@@ -0,0 +1,37 @@
+#  Copyright 2021 EMQ Technologies Co., Ltd.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+from abc import abstractmethod
+from typing import List, Any
+
+from .runtime.context import Context
+
+
+class Function(object):
+    """abstract class for eKuiper function plugin"""
+
+    @abstractmethod
+    def validate(self, args: List[Any]):
+        """callback to validate against ast args, return a string error or empty string"""
+        pass
+
+    @abstractmethod
+    def exec(self, args: List[Any], ctx: Context) -> Any:
+        """callback to do execution, return result"""
+        pass
+
+    @abstractmethod
+    def is_aggregate(self):
+        """callback to check if function is for aggregation, return bool"""
+        pass

+ 90 - 0
sdk/python/ekuiper/runtime/connection.py

@@ -0,0 +1,90 @@
+#  Copyright 2021 EMQ Technologies Co., Ltd.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+import logging
+import time
+from typing import Callable
+
+from pynng import Req0, Push0, Pull0
+
+
+class PairChannel:
+
+    def __init__(self, name: str, typ: int):
+        s = Req0()
+        """TODO options"""
+        if typ == 0:
+            url = "ipc:///tmp/plugin_{}.ipc".format(name)
+        else:
+            url = "ipc:///tmp/func_{}.ipc".format(name)
+        s.dial(url)
+        self.sock = s
+
+    """ run this in a new thread"""
+
+    def run(self, reply_func: Callable[[bytes], bytes]):
+        self.sock.send(b'handshake')
+        while True:
+            msg = self.sock.recv()
+            reply = reply_func(msg)
+            self.sock.send(reply)
+
+    def close(self):
+        self.sock.close()
+
+
+class SourceChannel:
+
+    def __init__(self, meta: dict):
+        s = Push0()
+        url = "ipc:///tmp/{}_{}_{}.ipc".format(meta['ruleId'], meta['opId'], meta['instanceId'])
+        logging.info(url)
+        s.dial(url)
+        self.sock = s
+
+    def send(self, data: bytes):
+        self.sock.send(data)
+
+    def close(self):
+        self.sock.close()
+
+
+class SinkChannel:
+
+    def __init__(self, meta: dict):
+        s = Pull0()
+        url = "ipc:///tmp/{}_{}_{}.ipc".format(meta['ruleId'], meta['opId'], meta['instanceId'])
+        logging.info(url)
+        listen_with_retry(s, url)
+        self.sock = s
+
+    def recv(self) -> bytes:
+        return self.sock.recv()
+
+    def close(self):
+        self.sock.close()
+
+
+def listen_with_retry(sock, url: str):
+    retry_count = 10
+    retry_interval = 0.05
+    while True:
+        # noinspection PyBroadException
+        try:
+            sock.listen(url)
+            break
+        except Exception:
+            retry_count -= 1
+            if retry_count < 0:
+                raise
+        time.sleep(retry_interval)

+ 53 - 0
sdk/python/ekuiper/runtime/context.py

@@ -0,0 +1,53 @@
+#  Copyright 2021 EMQ Technologies Co., Ltd.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+"""context.py: Context defines context information available during
+# processing of a request.
+"""
+import logging
+from abc import abstractmethod
+
+
+class Context(object):
+    """Interface defining information available at process time"""
+
+    @abstractmethod
+    def get_rule_id(self) -> str:
+        """Return the ruleId of the current stream processing graph"""
+        pass
+
+    @abstractmethod
+    def get_op_id(self) -> str:
+        """Return the operation id"""
+        pass
+
+    @abstractmethod
+    def get_instance_id(self) -> int:
+        """Return the instance id"""
+        pass
+
+    @abstractmethod
+    def get_logger(self) -> logging:
+        """Returns the logger object that can be used to do logging"""
+        pass
+
+    @abstractmethod
+    def emit(self, message: dict, meta: dict):
+        """Emit the tuple to the stream"""
+        pass
+
+    @abstractmethod
+    def emit_error(self, error: str):
+        """Emit error to the stream"""
+        pass

+ 54 - 0
sdk/python/ekuiper/runtime/contextimpl.py

@@ -0,0 +1,54 @@
+#  Copyright 2021 EMQ Technologies Co., Ltd.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import json
+import logging
+import sys
+
+from .connection import SourceChannel
+from .context import Context
+
+
+class ContextImpl(Context):
+
+    def __init__(self, meta: dict):
+        self.ruleId = meta['ruleId']
+        self.opId = meta['opId']
+        self.instanceId = meta['instanceId']
+        self.emitter = None
+
+    def set_emitter(self, emitter: SourceChannel):
+        self.emitter = emitter
+
+    def get_rule_id(self) -> str:
+        return self.ruleId
+
+    def get_op_id(self) -> str:
+        return self.opId
+
+    def get_instance_id(self) -> int:
+        return self.instanceId
+
+    def get_logger(self) -> logging:
+        return sys.stdout
+
+    def emit(self, message: dict, meta: dict):
+        data = {'message': message, 'meta': meta}
+        json_str = json.dumps(data)
+        return self.emitter.send(str.encode(json_str))
+
+    def emit_error(self, error: str):
+        data = {'error': error}
+        json_str = json.dumps(data)
+        return self.emitter.send(str.encode(json_str))

+ 105 - 0
sdk/python/ekuiper/runtime/function.py

@@ -0,0 +1,105 @@
+#  Copyright 2021 EMQ Technologies Co., Ltd.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import json
+import logging
+import traceback
+
+from . import reg
+from .connection import PairChannel
+from .contextimpl import ContextImpl
+from .symbol import SymbolRuntime
+from ..function import Function
+
+
+class FunctionRuntime(SymbolRuntime):
+
+    def __init__(self, ctrl: dict, s: Function):
+        ch = PairChannel(ctrl['symbolName'], 1)
+        self.s = s
+        self.ch = ch
+        self.running = False
+        self.key = "func_{}".format(ctrl['symbolName'])
+        self.funcs = {}
+
+    def run(self):
+        reg.setr(self.key, self)
+        # noinspection PyBroadException
+        try:
+            self.ch.run(self.do_run)
+        except Exception:
+            if self.running:
+                logging.error(traceback.format_exc())
+        finally:
+            self.stop()
+
+    def do_run(self, req: bytes):
+        # noinspection PyBroadException
+        try:
+            c = json.loads(req)
+            logging.debug("running func with ", c)
+            name = c['func']
+            if name == "Validate":
+                err = self.s.validate(c['arg'])
+                if err != "":
+                    return encode_reply(False, err)
+                else:
+                    return encode_reply(True, "")
+            elif name == "Exec":
+                args = c['arg']
+                if isinstance(args, list) is False or len(args) < 1:
+                    return encode_reply(False, 'invalid arg')
+                fmeta = json.loads(args[-1])
+                if 'ruleId' in fmeta and 'opId' in fmeta and 'instanceId' in fmeta \
+                        and 'funcId' in fmeta:
+                    key = f"{fmeta['ruleId']}_{fmeta['opId']}_{fmeta['instanceId']}" \
+                          f"_{fmeta['funcId']}"
+                    if key in self.funcs:
+                        fctx = self.funcs[key]
+                    else:
+                        fctx = ContextImpl(fmeta)
+                        self.funcs[key] = fctx
+                else:
+                    return encode_reply(False,
+                                        f'invalid arg: {fmeta} ruleId, opId, instanceId and funcId'
+                                        f' are required')
+                r = self.s.exec(args[:-1], fctx)
+                return encode_reply(True, r)
+            elif name == "IsAggregate":
+                r = self.s.is_aggregate()
+                return encode_reply(True, r)
+            else:
+                return encode_reply(False, "invalid func {}".format(name))
+        except Exception:
+            """two occasions: normal stop will close socket to raise an error 
+            OR stopped by unexpected error"""
+            if self.running:
+                logging.error(traceback.format_exc())
+                return encode_reply(False, traceback.format_exc())
+
+    def stop(self):
+        self.running = False
+        # noinspection PyBroadException
+        try:
+            self.ch.close()
+            reg.delete(self.key)
+        except Exception:
+            logging.error(traceback.format_exc())
+
+    def is_running(self) -> bool:
+        return self.running
+
+
+def encode_reply(state: bool, arg: str):
+    return str.encode(json.dumps({'state': state, 'result': arg}))

+ 126 - 0
sdk/python/ekuiper/runtime/plugin.py

@@ -0,0 +1,126 @@
+#  Copyright 2021 EMQ Technologies Co., Ltd.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+import json
+import logging
+import sys
+import threading
+import traceback
+from typing import Dict, Callable
+
+from . import reg, shared
+from .connection import PairChannel
+from .function import FunctionRuntime
+from .sink import SinkRuntime
+from .source import SourceRuntime
+from ..function import Function
+from ..sink import Sink
+from ..source import Source
+
+
+class PluginConfig:
+
+    def __init__(self, name: str, sources: Dict[str, Callable[[], Source]],
+                 sinks: Dict[str, Callable[[], Sink]],
+                 functions: Dict[str, Callable[[], Function]]):
+        self.name = name
+        self.sources = sources
+        self.sinks = sinks
+        self.functions = functions
+
+    def get(self, plugin_type: str, symbol_name: str):
+        if plugin_type == shared.TYPE_SOURCE:
+            return self.sources[symbol_name]
+        elif plugin_type == shared.TYPE_SINK:
+            return self.sinks[symbol_name]
+        elif plugin_type == shared.TYPE_FUNC:
+            return self.functions[symbol_name]
+        else:
+            return None
+
+
+conf: PluginConfig
+
+
+def start(c: PluginConfig):
+    init_vars(c)
+    global conf
+    conf = c
+    logging.info("starting plugin {}".format(c.name))
+    ch = PairChannel(c.name, 0)
+    ch.run(command_reply)
+
+
+def init_vars(c: PluginConfig):
+    # if len(sys.argv) != 2:
+    #     msg = gettext('fail to init plugin, must pass exactly 2 args but got {}'.format(sys.argv))
+    #     raise ValueError(msg)
+    # """TODO validation"""
+    # arg = json.loads(sys.argv[1])
+    # noinspection PyTypeChecker
+    root = logging.getLogger()
+    root.setLevel(logging.INFO)
+
+    handler = logging.StreamHandler(sys.stdout)
+    handler.setLevel(logging.DEBUG)
+    formatter = logging.Formatter('%(asctime)s - %(pathname)s[line:%(lineno)d]'
+                                  ' - %(levelname)s: %(message)s')
+    handler.setFormatter(formatter)
+    root.addHandler(handler)
+
+
+# noinspection PyTypeChecker
+def command_reply(req: bytes) -> bytes:
+    # noinspection PyBroadException
+    try:
+        cmd = json.loads(req)
+        logging.debug("receive command {}".format(cmd))
+        ctrl = json.loads(cmd['arg'])
+        logging.debug(ctrl)
+        if cmd['cmd'] == shared.CMD_START:
+            f = conf.get(ctrl['pluginType'], ctrl['symbolName'])
+            if f is None:
+                return b'symbol not found'
+            s = f()
+            if ctrl['pluginType'] == shared.TYPE_SOURCE:
+                logging.info("running source {}".format(ctrl['symbolName']))
+                runtime = SourceRuntime(ctrl, s)
+                x = threading.Thread(target=runtime.run, daemon=True)
+                x.start()
+            elif ctrl['pluginType'] == shared.TYPE_SINK:
+                logging.info("running sink {}".format(ctrl['symbolName']))
+                # noinspection PyTypeChecker
+                runtime = SinkRuntime(ctrl, s)
+                x = threading.Thread(target=runtime.run, daemon=True)
+                x.start()
+            elif ctrl['pluginType'] == shared.TYPE_FUNC:
+                logging.info("running function {}".format(ctrl['symbolName']))
+                runtime = FunctionRuntime(ctrl, s)
+                x = threading.Thread(target=runtime.run, daemon=True)
+                x.start()
+            else:
+                return b'invalid plugin type'
+        elif cmd['cmd'] == shared.CMD_STOP:
+            regkey = f"{ctrl['meta']['ruleId']}_{ctrl['meta']['opId']}" \
+                     f"_{ctrl['meta']['instanceId']}_{ctrl['symbolName']}"
+            logging.info("stopping ", regkey)
+            if reg.has(regkey):
+                runtime = reg.get(regkey)
+                if runtime.is_running():
+                    runtime.stop()
+            else:
+                logging.warning("symbol ", regkey, " not found")
+        return b'ok'
+    except Exception:
+        var = traceback.format_exc()
+        return str.encode(var)

+ 41 - 0
sdk/python/ekuiper/runtime/reg.py

@@ -0,0 +1,41 @@
+#  Copyright 2021 EMQ Technologies Co., Ltd.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import logging
+import traceback
+from typing import Dict
+
+from .symbol import SymbolRuntime
+
+runtimes: Dict[str, SymbolRuntime] = {}
+
+
+def has(name: str) -> bool:
+    return name in runtimes
+
+
+def get(name: str) -> SymbolRuntime:
+    return runtimes[name]
+
+
+def setr(name: str, r: SymbolRuntime):
+    runtimes[name] = r
+
+
+def delete(name: str):
+    # noinspection PyBroadException
+    try:
+        del runtimes[name]
+    except Exception:
+        logging.error(traceback.format_exc())

+ 22 - 0
sdk/python/ekuiper/runtime/shared.py

@@ -0,0 +1,22 @@
+#  Copyright 2021 EMQ Technologies Co., Ltd.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+CMD_START = "start"
+CMD_STOP = "stop"
+
+TYPE_SOURCE = "source"
+TYPE_SINK = "sink"
+TYPE_FUNC = "func"
+
+REPLY_OK = "ok"

+ 69 - 0
sdk/python/ekuiper/runtime/sink.py

@@ -0,0 +1,69 @@
+#  Copyright 2021 EMQ Technologies Co., Ltd.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import logging
+import traceback
+
+from . import reg
+from .connection import SinkChannel
+from .symbol import SymbolRuntime, parse_context
+from ..sink import Sink
+
+
+class SinkRuntime(SymbolRuntime):
+
+    def __init__(self, ctrl: dict, s: Sink):
+        ctx = parse_context(ctrl)
+        config = {}
+        if 'config' in ctrl:
+            config = ctrl['config']
+        s.configure(config)
+        ch = SinkChannel(ctrl['meta'])
+        self.s = s
+        self.ctx = ctx
+        self.ch = ch
+        self.running = False
+        self.key = f"{ctrl['meta']['ruleId']}_{ctrl['meta']['opId']}" \
+                   f"_{ctrl['meta']['instanceId']}_{ctrl['symbolName']}"
+
+    def run(self):
+        logging.info('start running sink')
+        # noinspection PyBroadException
+        try:
+            self.s.open(self.ctx)
+            self.running = True
+            reg.setr(self.key, self)
+            while True:
+                msg = self.ch.recv()
+                self.s.collect(self.ctx, msg)
+        except Exception:
+            """two occasions: normal stop will close socket to raise an error 
+            OR stopped by unexpected error"""
+            if self.running:
+                logging.error(traceback.format_exc())
+        finally:
+            self.stop()
+
+    def stop(self):
+        self.running = False
+        # noinspection PyBroadException
+        try:
+            self.s.close(self.ctx)
+            self.ch.close()
+            reg.delete(self.key)
+        except Exception:
+            logging.error(traceback.format_exc())
+
+    def is_running(self) -> bool:
+        return self.running

+ 71 - 0
sdk/python/ekuiper/runtime/source.py

@@ -0,0 +1,71 @@
+#  Copyright 2021 EMQ Technologies Co., Ltd.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import logging
+import traceback
+
+from . import reg
+from .connection import SourceChannel
+from .symbol import parse_context, SymbolRuntime
+from ..source import Source
+
+
+class SourceRuntime(SymbolRuntime):
+
+    def __init__(self, ctrl: dict, s: Source):
+        ctx = parse_context(ctrl)
+        ds = ""
+        config = {}
+        if 'datasource' in ctrl:
+            ds = ctrl['datasource']
+        if 'config' in ctrl:
+            config = ctrl['config']
+        s.configure(ds, config)
+        ch = SourceChannel(ctrl['meta'])
+        ctx.set_emitter(ch)
+        key = f"{ctrl['meta']['ruleId']}_{ctrl['meta']['opId']}" \
+              f"_{ctrl['meta']['instanceId']}_{ctrl['symbolName']}"
+        self.s = s
+        self.ctx = ctx
+        self.ch = ch
+        self.running = False
+        self.key = key
+
+    def run(self):
+        logging.info('start running source')
+        self.running = True
+        reg.setr(self.key, self)
+        # noinspection PyBroadException
+        try:
+            self.s.open(self.ctx)
+        except Exception:
+            """two occasions: normal stop will close socket to raise an error OR\
+             stopped by unexpected error"""
+            if self.running:
+                logging.error(traceback.format_exc())
+        finally:
+            self.stop()
+
+    def stop(self):
+        self.running = False
+        # noinspection PyBroadException
+        try:
+            self.s.close(self.ctx)
+            self.ch.close()
+            reg.delete(self.key)
+        except Exception:
+            logging.error(traceback.format_exc())
+
+    def is_running(self) -> bool:
+        return self.running

+ 42 - 0
sdk/python/ekuiper/runtime/symbol.py

@@ -0,0 +1,42 @@
+#  Copyright 2021 EMQ Technologies Co., Ltd.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+from abc import abstractmethod
+
+from . import contextimpl
+
+
+class SymbolRuntime:
+    """class to model the running symbol of source/sink/function"""
+
+    @abstractmethod
+    def run(self):
+        """start to run the symbol"""
+        pass
+
+    @abstractmethod
+    def stop(self):
+        """stop the symbol"""
+        pass
+
+    @abstractmethod
+    def is_running(self) -> bool:
+        """check if symbol is running"""
+        pass
+
+
+def parse_context(ctrl):
+    if ctrl['meta']['ruleId'] == "" or ctrl['meta']['opId'] == "":
+        raise ('invalid arg: ', ctrl, 'ruleId and opId are required')
+    return contextimpl.ContextImpl(ctrl['meta'])

+ 42 - 0
sdk/python/ekuiper/sink.py

@@ -0,0 +1,42 @@
+#  Copyright 2021 EMQ Technologies Co., Ltd.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+from abc import abstractmethod
+from typing import Any
+
+from .runtime.context import Context
+
+
+class Sink(object):
+    """abstract class for eKuiper sink plugin"""
+
+    @abstractmethod
+    def configure(self, conf: dict):
+        """configure with conf map and raise error if any"""
+        pass
+
+    @abstractmethod
+    def open(self, ctx: Context):
+        """open connection and wait to receive data"""
+        pass
+
+    @abstractmethod
+    def collect(self, ctx: Context, data: Any):
+        """callback to deal with received data"""
+        pass
+
+    @abstractmethod
+    def close(self, ctx: Context):
+        """stop running and clean up"""
+        pass

+ 36 - 0
sdk/python/ekuiper/source.py

@@ -0,0 +1,36 @@
+#  Copyright 2021 EMQ Technologies Co., Ltd.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+from abc import abstractmethod
+
+from .runtime.context import Context
+
+
+class Source(object):
+    """abstract class for eKuiper source plugin"""
+
+    @abstractmethod
+    def configure(self, datasource: str, conf: dict):
+        """configure with the string datasource and conf map and raise error if any"""
+        pass
+
+    @abstractmethod
+    def open(self, ctx: Context):
+        """run continuously and send out the data or error with ctx"""
+        pass
+
+    @abstractmethod
+    def close(self, ctx: Context):
+        """stop running and clean up"""
+        pass

+ 23 - 0
sdk/python/example/pysam/functions/revert.json

@@ -0,0 +1,23 @@
+{
+	"about": {
+		"trial": false,
+		"author": {
+			"name": "EMQ",
+			"email": "contact@emqx.io",
+			"company": "EMQ Technologies Co., Ltd",
+			"website": "https://www.emqx.io"
+		},
+		"description": {
+			"en_US": "Example python plugin to revert the input string",
+			"zh_CN": "示例python插件,用于反转输入的字符串"
+		}
+	},
+	"functions": [{
+		"name": "revert",
+		"example": "revert(col1)",
+		"hint": {
+			"en_US": "Revert the input string",
+			"zh_CN": "输出反转参数值。"
+		}
+	}]
+}

+ 35 - 0
sdk/python/example/pysam/print.py

@@ -0,0 +1,35 @@
+#  Copyright 2021 EMQ Technologies Co., Ltd.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+from typing import Any
+
+from ekuiper import Sink, Context
+
+
+class PrintSink(Sink):
+
+    def __init__(self):
+        pass
+
+    def configure(self, conf: dict):
+        print('configure print sink')
+
+    def open(self, ctx: Context):
+        print('open print sink: ', ctx)
+
+    def collect(self, ctx: Context, data: Any):
+        print('receive: ', data)
+
+    def close(self, ctx: Context):
+        print("closing print sink")

+ 48 - 0
sdk/python/example/pysam/pyjson.py

@@ -0,0 +1,48 @@
+#  Copyright 2021 EMQ Technologies Co., Ltd.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+import logging
+import time
+
+from ekuiper import Source, Context
+
+
+class PyJson(Source):
+    def __init__(self):
+        self.data = {"name": "pyjson", "value": 2021}
+
+    def configure(self, datasource: str, conf: dict):
+        logging.info("configuring with datasource {} and conf {}".format(datasource, conf))
+
+    # noinspection PyTypeChecker
+    def open(self, ctx: Context):
+        print("opening")
+        for i in range(100):
+            ctx.emit(self.data, None)
+            print("emit")
+            time.sleep(0.2)
+
+    def close(self, ctx: Context):
+        print("closing")

+ 14 - 0
sdk/python/example/pysam/pysam.json

@@ -0,0 +1,14 @@
+{
+  "version": "v1.0.0",
+  "language": "python",
+  "executable": "pysam.py",
+  "sources": [
+    "pyjson"
+  ],
+  "sinks": [
+    "print"
+  ],
+  "functions": [
+    "revert"
+  ]
+}

+ 24 - 0
sdk/python/example/pysam/pysam.py

@@ -0,0 +1,24 @@
+
+#  Copyright 2021 EMQ Technologies Co., Ltd.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+from ekuiper import plugin, PluginConfig
+from print import PrintSink
+from pyjson import PyJson
+from revert import revertIns
+
+if __name__ == '__main__':
+    c = PluginConfig("pysam", {"pyjson": lambda: PyJson()}, {"print": lambda: PrintSink()},
+                     {"revert": lambda: revertIns})
+    plugin.start(c)

+ 35 - 0
sdk/python/example/pysam/revert.py

@@ -0,0 +1,35 @@
+#  Copyright 2021 EMQ Technologies Co., Ltd.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+from typing import Any, List
+
+from ekuiper import Function, Context
+
+
+class RevertFunc(Function):
+
+    def __init__(self):
+        pass
+
+    def validate(self, args: List[Any]):
+        return ""
+
+    def exec(self, args: List[Any], ctx: Context):
+        return args[0][::-1]
+
+    def is_aggregate(self):
+        return False
+
+
+revertIns = RevertFunc()

+ 15 - 0
sdk/python/example/pysam/sinks/print.json

@@ -0,0 +1,15 @@
+{
+	"about": {
+		"trial": true,
+		"author": {
+			"name": "EMQ",
+			"email": "contact@emqx.io",
+			"company": "EMQ Technologies Co., Ltd",
+			"website": "https://www.emqx.io"
+		},
+		"description": {
+			"en_US": "Example sink plugin to print with python logger",
+			"zh_CN": "示例 python 插件,使用 python 打出日志"
+		}
+	}
+}

+ 19 - 0
sdk/python/example/pysam/sources/pyjson.json

@@ -0,0 +1,19 @@
+{
+  "about": {
+    "trial": true,
+    "author": {
+      "name": "EMQ",
+      "email": "contact@emqx.io",
+      "company": "EMQ Technologies Co., Ltd",
+      "website": "https://www.emqx.io"
+    },
+    "helpUrl": {
+      "en_US": "https://github.com/lf-edge/ekuiper/blob/master/docs/en_US/plugins/sources/random.md",
+      "zh_CN": "https://github.com/lf-edge/ekuiper/blob/master/docs/zh_CN/plugins/sources/random.md"
+    },
+    "description": {
+      "en_US": "The python source will send json data.",
+      "zh_CN": "Python 源发送数据"
+    }
+  }
+}

+ 1 - 0
sdk/python/example/pysam/sources/pyjson.yaml

@@ -0,0 +1 @@
+default: