Przeglądaj źródła

fix(portable): remedy hot reload and full import

- revert the previous fix to adapt to import portable data
- clarify function instance management so that the instance is reusable
- add more log

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 lat temu
rodzic
commit
47748a7f64

+ 4 - 18
internal/plugin/portable/factory.go

@@ -43,6 +43,10 @@ func (m *Manager) Sink(name string) (api.Sink, error) {
 	return runtime.NewPortableSink(name, meta), nil
 }
 
+// The function instance are kept forever even after deletion
+// The instance is actually a wrapper of the nng channel which is dependant from the plugin instance
+// Even updated plugin instance can reuse the channel if the function name is not changed
+// It is not used to check if the function is bound, use ConvName which checks the meta
 var funcInsMap = &sync.Map{}
 
 func (m *Manager) Function(name string) (api.Function, error) {
@@ -72,21 +76,3 @@ func (m *Manager) ConvName(funcName string) (string, bool) {
 	_, ok := m.GetPluginMeta(plugin.FUNCTION, funcName)
 	return funcName, ok
 }
-
-// Clean up function map
-func (m *Manager) Clean() {
-	funcInsMap.Range(func(_, ins interface{}) bool {
-		f := ins.(*runtime.PortableFunc)
-		_ = f.Close()
-		return true
-	})
-	funcInsMap = &sync.Map{}
-}
-
-func (m *Manager) DeleteFunc(funcName string) {
-	f, ok := funcInsMap.Load(funcName)
-	if ok {
-		_ = f.(*runtime.PortableFunc).Close()
-		funcInsMap.Delete(funcName)
-	}
-}

+ 0 - 1
internal/plugin/portable/manager.go

@@ -388,7 +388,6 @@ func (m *Manager) Delete(name string) error {
 	for _, s := range pinfo.Functions {
 		p := path.Join(m.pluginConfDir, plugin.PluginTypes[plugin.FUNCTION], s+".json")
 		os.Remove(p)
-		m.DeleteFunc(s)
 	}
 	_ = os.RemoveAll(path.Join(m.pluginDir, name))
 	m.removePluginInstallScript(name)

+ 26 - 20
internal/plugin/portable/runtime/connection.go

@@ -132,28 +132,34 @@ func (r *NanomsgReqRepChannel) Req(arg []byte) ([]byte, error) {
 	r.Lock()
 	defer r.Unlock()
 	conf.Log.Debugf("send request: %s", string(arg))
-	err := r.sock.Send(arg)
-	// resend if protocol state wrong, because of plugin restart or other problems
-	if err == mangos.ErrProtoState {
-		conf.Log.Debugf("send request protestate error %s", err.Error())
-		var prev []byte
-		prev, err = r.sock.Recv()
-		if err == nil {
-			conf.Log.Warnf("discard previous response: %s", string(prev))
-			conf.Log.Debugf("resend request: %s", string(arg))
-			err = r.sock.Send(arg)
+	for {
+		err := r.sock.Send(arg)
+		// resend if protocol state wrong, because of plugin restart or other problems
+		if err == mangos.ErrProtoState {
+			conf.Log.Debugf("send request protestate error %s", err.Error())
+			var prev []byte
+			prev, err = r.sock.Recv()
+			if err == nil {
+				conf.Log.Warnf("discard previous response: %s", string(prev))
+				conf.Log.Debugf("resend request: %s", string(arg))
+				err = r.sock.Send(arg)
+			}
 		}
+		if err != nil {
+			return nil, fmt.Errorf("can't send message on function rep socket: %s", err.Error())
+		}
+		result, e := r.sock.Recv()
+		if e != nil {
+			conf.Log.Errorf("can't receive: %s", e.Error())
+		} else {
+			conf.Log.Debugf("receive response: %s", string(result))
+		}
+		if result[0] == 'h' {
+			conf.Log.Debugf("receive handshake response: %s", string(result))
+			continue
+		}
+		return result, e
 	}
-	if err != nil {
-		return nil, fmt.Errorf("can't send message on function rep socket: %s", err.Error())
-	}
-	result, e := r.sock.Recv()
-	if e != nil {
-		conf.Log.Errorf("can't receive: %s", e.Error())
-	} else {
-		conf.Log.Debugf("receive response: %s", string(result))
-	}
-	return result, e
 }
 
 func CreateSourceChannel(ctx api.StreamContext) (DataInChannel, error) {

+ 11 - 14
internal/plugin/portable/runtime/function.go

@@ -1,4 +1,4 @@
-// Copyright 2022 EMQ Technologies Co., Ltd.
+// Copyright 2022-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -23,11 +23,15 @@ import (
 )
 
 // PortableFunc each function symbol only has a singleton
-// Each singleton are long running go routine.
+// Each singleton are long-running go routine
+// Currently, it is cached and never ended once created
+// It is actually a wrapper of the data channel and can be fit to any plugin instance
+// Thus, it is possible to hot reload, which is simply attach a new nng client to the same channel
+// without changing the server(plugin runtime) side
 // TODO think about ending a portable func when needed.
 type PortableFunc struct {
 	symbolName string
-	reg        *PluginMeta
+	reg        *PluginMeta // initial plugin meta, only used for initialize the function instance
 	dataCh     DataReqChannel
 	isAgg      int // 0 - not calculate yet, 1 - no, 2 - yes
 }
@@ -40,9 +44,9 @@ func NewPortableFunc(symbolName string, reg *PluginMeta) (*PortableFunc, error)
 	if err != nil {
 		return nil, err
 	}
-	conf.Log.Infof("Plugin started successfully")
 
 	// Create function channel
+	conf.Log.Infof("creating function channel for symbol %s", symbolName)
 	dataCh, err := CreateFunctionChannel(symbolName)
 	if err != nil {
 		return nil, err
@@ -54,6 +58,7 @@ func NewPortableFunc(symbolName string, reg *PluginMeta) (*PortableFunc, error)
 		PluginType: TYPE_FUNC,
 	}
 	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, conf.Log)
+	conf.Log.Infof("starting symbol %s", symbolName)
 	err = ins.StartSymbol(ctx, c)
 	if err != nil {
 		return nil, err
@@ -104,16 +109,8 @@ func (f *PortableFunc) Exec(args []interface{}, ctx api.FunctionContext) (interf
 	}
 	fr := &FuncReply{}
 	err = json.Unmarshal(res, fr)
-	if err != nil { // retry if receive handshake after restart function process
-		ctx.GetLogger().Warnf("Failed to unmarshal function result %s", string(res))
-		res, err = f.dataCh.Req(jsonArg)
-		if err != nil {
-			return err, false
-		}
-		err = json.Unmarshal(res, fr)
-		if err != nil {
-			return err, false
-		}
+	if err != nil {
+		return fmt.Errorf("Failed to unmarshal function result %s", string(res)), false
 	}
 	if !fr.State {
 		if fr.Result != nil {

+ 2 - 3
internal/plugin/portable/runtime/plugin_ins_manager.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2023 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -208,7 +208,6 @@ func (p *pluginInsManager) getOrStartProcess(pluginMeta *PluginMeta, pconf *Port
 	}
 	// ins process has not run yet
 	if !pluginCreation && ins.ctrlChan != nil {
-		conf.Log.Warnf("plugin %s is not run yet", pluginMeta.Name)
 		return ins, nil
 	}
 	// should only happen for first start, then the ctrl channel will keep running
@@ -255,7 +254,7 @@ func (p *pluginInsManager) getOrStartProcess(pluginMeta *PluginMeta, pconf *Port
 	conf.Log.Println("plugin starting")
 	err = cmd.Start()
 	if err != nil {
-		return nil, fmt.Errorf("plugin executable %s starts with error %v", pluginMeta.Executable, err)
+		return nil, fmt.Errorf("plugin executable %s stops with error %v", pluginMeta.Executable, err)
 	}
 	process := cmd.Process
 	conf.Log.Printf("plugin started pid: %d\n", process.Pid)

+ 7 - 1
internal/processor/ruleset.go

@@ -1,4 +1,4 @@
-// Copyright 2022-2023 EMQ Technologies Co., Ltd.
+// Copyright 2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -146,6 +146,7 @@ func (rs *RulesetProcessor) ImportRuleSet(all Ruleset) {
 	_ = rs.s.tableStatusDb.Clean()
 	_ = rs.r.ruleStatusDb.Clean()
 
+	counts := make([]int, 3)
 	// restore streams
 	for k, v := range all.Streams {
 		_, e := rs.s.ExecStreamSql(v)
@@ -153,6 +154,8 @@ func (rs *RulesetProcessor) ImportRuleSet(all Ruleset) {
 			conf.Log.Errorf("Fail to import stream %s(%s) with error: %v", k, v, e)
 			_ = rs.s.streamStatusDb.Set(k, e.Error())
 			continue
+		} else {
+			counts[0]++
 		}
 	}
 	// restore tables
@@ -162,6 +165,8 @@ func (rs *RulesetProcessor) ImportRuleSet(all Ruleset) {
 			conf.Log.Errorf("Fail to import table %s(%s) with error: %v", k, v, e)
 			_ = rs.s.tableStatusDb.Set(k, e.Error())
 			continue
+		} else {
+			counts[1]++
 		}
 	}
 	var rules []string
@@ -174,6 +179,7 @@ func (rs *RulesetProcessor) ImportRuleSet(all Ruleset) {
 			continue
 		} else {
 			rules = append(rules, k)
+			counts[2]++
 		}
 	}
 }

+ 6 - 3
sdk/python/ekuiper/runtime/connection.py

@@ -1,4 +1,4 @@
-#  Copyright 2021 EMQ Technologies Co., Ltd.
+#  Copyright 2021-2023 EMQ Technologies Co., Ltd.
 #
 #  Licensed under the Apache License, Version 2.0 (the "License");
 #  you may not use this file except in compliance with the License.
@@ -11,6 +11,7 @@
 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
+
 import logging
 import time
 from typing import Callable
@@ -27,10 +28,11 @@ class PairChannel:
             url = "ipc:///tmp/plugin_{}.ipc".format(name)
         else:
             url = "ipc:///tmp/func_{}.ipc".format(name)
+        logging.info("dialing {}".format(url))
         try:
             dial_with_retry(s, url)
         except Exception as e:
-            print(e)
+            logging.info("control/function channel {} cannot created {}".format(url, e))
             exit(0)
         self.sock = s
 
@@ -105,7 +107,8 @@ def dial_with_retry(sock, url: str):
         try:
             sock.dial(url, block=True)
             break
-        except Exception:
+        except Exception as e:
+            logging.debug("dial error {}".format(e))
             retry_count -= 1
             if retry_count < 0:
                 raise

+ 3 - 1
sdk/python/ekuiper/runtime/plugin.py

@@ -1,4 +1,4 @@
-#  Copyright 2021 EMQ Technologies Co., Ltd.
+#  Copyright 2021-2023 EMQ Technologies Co., Ltd.
 #
 #  Licensed under the Apache License, Version 2.0 (the "License");
 #  you may not use this file except in compliance with the License.
@@ -11,6 +11,7 @@
 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
+
 import json
 import logging
 import sys
@@ -59,6 +60,7 @@ def start(c: PluginConfig):
     logging.info("starting plugin {}".format(c.name))
     ch = PairChannel(c.name, 0)
     ch.run(command_reply)
+    logging.info("started plugin {}".format(c.name))
 
 
 def init_vars(c: PluginConfig):