Jelajahi Sumber

feat(portable): plugin test tool

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 3 tahun lalu
induk
melakukan
06f619fe43

+ 18 - 0
internal/plugin/portable/manager.go

@@ -78,6 +78,24 @@ func GetManager() *Manager {
 	return manager
 }
 
+func MockManager(plugins map[string]*PluginInfo) (*Manager, error) {
+	registry := &registry{
+		RWMutex:   sync.RWMutex{},
+		plugins:   make(map[string]*PluginInfo),
+		sources:   make(map[string]string),
+		sinks:     make(map[string]string),
+		functions: make(map[string]string),
+	}
+	for name, pi := range plugins {
+		err := pi.Validate(name)
+		if err != nil {
+			return nil, err
+		}
+		registry.Set(name, pi)
+	}
+	return &Manager{reg: registry}, nil
+}
+
 func (m *Manager) syncRegistry() error {
 	files, err := ioutil.ReadDir(m.pluginDir)
 	if err != nil {

+ 4 - 0
internal/plugin/portable/runtime/connection.go

@@ -127,6 +127,7 @@ func CreateSourceChannel(ctx api.StreamContext) (DataInChannel, error) {
 	if err = listenWithRetry(sock, url); err != nil {
 		return nil, fmt.Errorf("can't listen on pull socket for %s: %s", url, err.Error())
 	}
+	conf.Log.Infof("source channel created: %s", url)
 	return sock, nil
 }
 
@@ -144,6 +145,7 @@ func CreateFunctionChannel(symbolName string) (DataReqChannel, error) {
 	if err = listenWithRetry(sock, url); err != nil {
 		return nil, fmt.Errorf("can't listen on rep socket for %s: %s", url, err.Error())
 	}
+	conf.Log.Infof("function channel created: %s", url)
 	return &NanomsgReqRepChannel{sock: sock}, nil
 }
 
@@ -160,6 +162,7 @@ func CreateSinkChannel(ctx api.StreamContext) (DataOutChannel, error) {
 	if err = sock.Dial(url); err != nil {
 		return nil, fmt.Errorf("can't dial on push socket: %s", err.Error())
 	}
+	conf.Log.Infof("sink channel created: %s", url)
 	return sock, nil
 }
 
@@ -177,6 +180,7 @@ func CreateControlChannel(pluginName string) (ControlChannel, error) {
 	if err = listenWithRetry(sock, url); err != nil {
 		return nil, fmt.Errorf("can't listen on rep socket: %s", err.Error())
 	}
+	conf.Log.Infof("sink channel created: %s", url)
 	return &NanomsgReqChannel{sock: sock}, nil
 }
 

+ 14 - 1
internal/plugin/portable/runtime/plugin_ins_manager.go

@@ -42,10 +42,16 @@ type PluginIns struct {
 }
 
 func NewPluginIns(name string, ctrlChan ControlChannel, process *os.Process) *PluginIns {
+	// if process is not passed, it is run in simulator mode. Then do not count running.
+	// so that it won't be automatically close.
+	rc := 0
+	if process == nil {
+		rc = 1
+	}
 	return &PluginIns{
 		process:      process,
 		ctrlChan:     ctrlChan,
-		runningCount: 0,
+		runningCount: rc,
 		name:         name,
 	}
 }
@@ -137,6 +143,13 @@ func (p *pluginInsManager) deletePluginIns(name string) {
 	delete(p.instances, name)
 }
 
+// AddPluginIns For mock only
+func (p *pluginInsManager) AddPluginIns(name string, ins *PluginIns) {
+	p.Lock()
+	defer p.Unlock()
+	p.instances[name] = ins
+}
+
 func (p *pluginInsManager) getOrStartProcess(pluginMeta *PluginMeta, pconf *PortableConfig) (*PluginIns, error) {
 	p.Lock()
 	defer p.Unlock()

+ 169 - 50
tools/plugin_server/plugin_test_server.go

@@ -15,54 +15,94 @@
 package main
 
 import (
+	context2 "context"
 	"encoding/json"
 	"fmt"
 	"github.com/gorilla/handlers"
 	"github.com/gorilla/mux"
 	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/plugin/portable"
 	"github.com/lf-edge/ekuiper/internal/plugin/portable/runtime"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/internal/topo/state"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"net/http"
+	"sync"
 	"time"
 )
 
+// Only support to test a single plugin Testing process.
+// 0. Edit the testingPlugin variable to match your plugin meta.
+// 1. Start this server, and wait for handshake.
+// 2. Start or debug your plugin. Make sure the handshake completed.
+// 3. Issue startSymbol/stopSymbol REST API to debug your plugin symbol.
+
+// EDIT HERE: Define the plugins that you want to test.
+var testingPlugin = &portable.PluginInfo{
+	PluginMeta: runtime.PluginMeta{
+		Name:       "mirror",
+		Version:    "v1",
+		Language:   "go",
+		Executable: "mirror.py",
+	},
+	Sources:   []string{"pyjson"},
+	Sinks:     []string{"print"},
+	Functions: []string{"revert"},
+}
+
+var mockSinkData = []map[string]interface{}{
+	{
+		"name":  "hello",
+		"count": 5,
+	}, {
+		"name":  "world",
+		"count": 10,
+	},
+}
+
+var mockFuncData = [][]interface{}{
+	{"twelve"},
+	{"eleven"},
+}
+
+var (
+	ins     *runtime.PluginIns
+	m       *portable.Manager
+	ctx     api.StreamContext
+	cancels sync.Map
+)
+
 func main() {
 	var err error
-	ins, err = startPluginIns()
+	m, err = portable.MockManager(map[string]*portable.PluginInfo{testingPlugin.Name: testingPlugin})
+	if err != nil {
+		panic(err)
+	}
+	ins, err := startPluginIns(testingPlugin)
 	if err != nil {
 		panic(err)
 	}
 	defer ins.Stop()
+	runtime.GetPluginInsManager().AddPluginIns(testingPlugin.Name, ins)
 	c := context.WithValue(context.Background(), context.LoggerKey, conf.Log)
 	ctx = c.WithMeta("rule1", "op1", &state.MemoryStore{}).WithInstance(1)
 	server := createRestServer("127.0.0.1", 33333)
 	server.ListenAndServe()
 }
 
-const (
-	pluginName = "$$test"
-)
-
-var (
-	ctx api.StreamContext
-	ins *runtime.PluginIns
-)
-
-func startPluginIns() (*runtime.PluginIns, error) {
+func startPluginIns(info *portable.PluginInfo) (*runtime.PluginIns, error) {
 	conf.Log.Infof("create control channel")
-	ctrlChan, err := runtime.CreateControlChannel(pluginName)
+	ctrlChan, err := runtime.CreateControlChannel(info.Name)
 	if err != nil {
 		return nil, fmt.Errorf("can't create new control channel: %s", err.Error())
 	}
 	conf.Log.Println("waiting handshake")
 	err = ctrlChan.Handshake()
 	if err != nil {
-		return nil, fmt.Errorf("plugin %s control handshake error: %v", pluginName, err)
+		return nil, fmt.Errorf("plugin %s control handshake error: %v", info.Name, err)
 	}
 	conf.Log.Println("plugin start running")
-	return runtime.NewPluginIns(pluginName, ctrlChan, nil), nil
+	return runtime.NewPluginIns(info.Name, ctrlChan, nil), nil
 }
 
 func createRestServer(ip string, port int) *http.Server {
@@ -87,12 +127,117 @@ func startSymbolHandler(w http.ResponseWriter, r *http.Request) {
 		http.Error(w, fmt.Sprintf("Invalid body: decode error %v", err), http.StatusBadRequest)
 		return
 	}
-	err = ins.StartSymbol(ctx, ctrl)
-	if err != nil {
-		http.Error(w, fmt.Sprintf("start symbol error: %v", err), http.StatusBadRequest)
-		return
+	switch ctrl.PluginType {
+	case runtime.TYPE_SOURCE:
+		source, err := m.Source(ctrl.SymbolName)
+		if err != nil {
+			http.Error(w, fmt.Sprintf("running source %s %v", ctrl.SymbolName, err), http.StatusBadRequest)
+			return
+		}
+		newctx, cancel := ctx.WithCancel()
+		if _, ok := cancels.LoadOrStore(ctrl.PluginType+ctrl.SymbolName, cancel); ok {
+			http.Error(w, fmt.Sprintf("source symbol %s already exists", ctrl.SymbolName), http.StatusBadRequest)
+			return
+		}
+		consumer := make(chan api.SourceTuple)
+		errCh := make(chan error)
+		go func() {
+			defer func() {
+				source.Close(newctx)
+				cancels.Delete(ctrl.PluginType + ctrl.SymbolName)
+			}()
+			for {
+				select {
+				case tuple := <-consumer:
+					fmt.Println(tuple)
+				case err := <-errCh:
+					conf.Log.Error(err)
+					return
+				case <-newctx.Done():
+					return
+				}
+			}
+		}()
+		go source.Open(newctx, consumer, errCh)
+	case runtime.TYPE_SINK:
+		sink, err := m.Sink(ctrl.SymbolName)
+		if err != nil {
+			http.Error(w, fmt.Sprintf("running sink %s %v", ctrl.SymbolName, err), http.StatusBadRequest)
+			return
+		}
+		newctx, cancel := ctx.WithCancel()
+		if _, ok := cancels.LoadOrStore(ctrl.PluginType+ctrl.SymbolName, cancel); ok {
+			http.Error(w, fmt.Sprintf("source symbol %s already exists", ctrl.SymbolName), http.StatusBadRequest)
+			return
+		}
+		err = sink.Open(newctx)
+		if err != nil {
+			http.Error(w, fmt.Sprintf("open sink %s %v", ctrl.SymbolName, err), http.StatusBadRequest)
+			return
+		}
+		go func() {
+			defer func() {
+				sink.Close(newctx)
+				cancels.Delete(ctrl.PluginType + ctrl.SymbolName)
+			}()
+			for {
+				for _, m := range mockSinkData {
+					b, err := json.Marshal(m)
+					if err != nil {
+						fmt.Printf("cannot marshall data: %v\n", err)
+						continue
+					}
+					err = sink.Collect(newctx, b)
+					if err != nil {
+						fmt.Printf("cannot collect data: %v\n", err)
+						continue
+					}
+					select {
+					case <-ctx.Done():
+						ctx.GetLogger().Info("stop sink")
+						return
+					default:
+					}
+					time.Sleep(1 * time.Second)
+				}
+			}
+		}()
+	case runtime.TYPE_FUNC:
+		f, err := m.Function(ctrl.SymbolName)
+		if err != nil {
+			http.Error(w, fmt.Sprintf("running function %s %v", ctrl.SymbolName, err), http.StatusBadRequest)
+			return
+		}
+		newctx, cancel := ctx.WithCancel()
+		fc := context.NewDefaultFuncContext(newctx, 1)
+		if _, ok := cancels.LoadOrStore(ctrl.PluginType+ctrl.SymbolName, cancel); ok {
+			http.Error(w, fmt.Sprintf("source symbol %s already exists", ctrl.SymbolName), http.StatusBadRequest)
+			return
+		}
+		go func() {
+			defer func() {
+				cancels.Delete(ctrl.PluginType + ctrl.SymbolName)
+			}()
+			for {
+				for _, m := range mockFuncData {
+					r, ok := f.Exec(m, fc)
+					if !ok {
+						fmt.Print("cannot exec func\n")
+						continue
+					}
+					fmt.Println(r)
+					select {
+					case <-ctx.Done():
+						ctx.GetLogger().Info("stop sink")
+						return
+					default:
+					}
+					time.Sleep(1 * time.Second)
+				}
+			}
+		}()
 	}
-	go receive(ctx)
+
 	w.WriteHeader(http.StatusOK)
 	w.Write([]byte("ok"))
 }
@@ -103,9 +248,11 @@ func stopSymbolHandler(w http.ResponseWriter, r *http.Request) {
 		http.Error(w, fmt.Sprintf("Invalid body: decode error %v", err), http.StatusBadRequest)
 		return
 	}
-	err = ins.StopSymbol(ctx, ctrl)
-	if err != nil {
-		http.Error(w, fmt.Sprintf("start symbol error: %v", err), http.StatusBadRequest)
+	if cancel, ok := cancels.Load(ctrl.PluginType + ctrl.SymbolName); ok {
+		cancel.(context2.CancelFunc)()
+		cancels.Delete(ctrl.PluginType + ctrl.SymbolName)
+	} else {
+		http.Error(w, fmt.Sprintf("Symbol %s already close", ctrl.SymbolName), http.StatusBadRequest)
 		return
 	}
 	w.WriteHeader(http.StatusOK)
@@ -118,31 +265,3 @@ func decode(r *http.Request) (*runtime.Control, error) {
 	err := json.NewDecoder(r.Body).Decode(ctrl)
 	return ctrl, err
 }
-
-func receive(ctx api.StreamContext) {
-	dataCh, err := runtime.CreateSourceChannel(ctx)
-	if err != nil {
-		fmt.Printf("cannot create source channel: %s\n", err.Error())
-	}
-	for {
-		var msg []byte
-		msg, err := dataCh.Recv()
-		if err != nil {
-			fmt.Printf("cannot receive from mangos Socket: %s\n", err.Error())
-			return
-		}
-		result := &api.DefaultSourceTuple{}
-		e := json.Unmarshal(msg, result)
-		if e != nil {
-			ctx.GetLogger().Errorf("Invalid data format, cannot decode %s to json format with error %s", string(msg), e)
-			continue
-		}
-		fmt.Println(result)
-		select {
-		case <-ctx.Done():
-			ctx.GetLogger().Info("stop source")
-			return
-		default:
-		}
-	}
-}