Pārlūkot izejas kodu

feat(portable): test server for plugin

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 3 gadi atpakaļ
vecāks
revīzija
2712c4144a

+ 1 - 1
internal/plugin/portable/runtime/connection.go

@@ -194,7 +194,7 @@ func listenWithRetry(sock mangos.Socket, url string) error {
 	for {
 	for {
 		err := sock.Listen(url)
 		err := sock.Listen(url)
 		if err == nil {
 		if err == nil {
-			conf.Log.Infof("start to listen after %d tries", 301-retryCount)
+			conf.Log.Infof("start to listen at %s after %d tries", url, 301-retryCount)
 			return err
 			return err
 		}
 		}
 		retryCount--
 		retryCount--

+ 26 - 16
internal/plugin/portable/runtime/plugin_ins_manager.go

@@ -34,14 +34,23 @@ var PortbleConf = &PortableConfig{
 	SendTimeout: 1000,
 	SendTimeout: 1000,
 }
 }
 
 
-type pluginIns struct {
+type PluginIns struct {
 	process      *os.Process
 	process      *os.Process
 	ctrlChan     ControlChannel
 	ctrlChan     ControlChannel
 	runningCount int
 	runningCount int
 	name         string
 	name         string
 }
 }
 
 
-func (i *pluginIns) StartSymbol(ctx api.StreamContext, ctrl *Control) error {
+func NewPluginIns(name string, ctrlChan ControlChannel, process *os.Process) *PluginIns {
+	return &PluginIns{
+		process:      process,
+		ctrlChan:     ctrlChan,
+		runningCount: 0,
+		name:         name,
+	}
+}
+
+func (i *PluginIns) StartSymbol(ctx api.StreamContext, ctrl *Control) error {
 	arg, err := json.Marshal(ctrl)
 	arg, err := json.Marshal(ctrl)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
@@ -62,7 +71,7 @@ func (i *pluginIns) StartSymbol(ctx api.StreamContext, ctrl *Control) error {
 	return err
 	return err
 }
 }
 
 
-func (i *pluginIns) StopSymbol(ctx api.StreamContext, ctrl *Control) error {
+func (i *PluginIns) StopSymbol(ctx api.StreamContext, ctrl *Control) error {
 	arg, err := json.Marshal(ctrl)
 	arg, err := json.Marshal(ctrl)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
@@ -89,28 +98,33 @@ func (i *pluginIns) StopSymbol(ctx api.StreamContext, ctrl *Control) error {
 	return err
 	return err
 }
 }
 
 
-func (i *pluginIns) Stop() error {
-	_ = i.ctrlChan.Close()
-	err := i.process.Kill()
+func (i *PluginIns) Stop() error {
+	var err error
+	if i.ctrlChan != nil {
+		err = i.ctrlChan.Close()
+	}
+	if i.process != nil {
+		err = i.process.Kill()
+	}
 	return err
 	return err
 }
 }
 
 
 // Manager plugin process and control socket
 // Manager plugin process and control socket
 type pluginInsManager struct {
 type pluginInsManager struct {
-	instances map[string]*pluginIns
+	instances map[string]*PluginIns
 	sync.RWMutex
 	sync.RWMutex
 }
 }
 
 
 func GetPluginInsManager() *pluginInsManager {
 func GetPluginInsManager() *pluginInsManager {
 	once.Do(func() {
 	once.Do(func() {
 		pm = &pluginInsManager{
 		pm = &pluginInsManager{
-			instances: make(map[string]*pluginIns),
+			instances: make(map[string]*PluginIns),
 		}
 		}
 	})
 	})
 	return pm
 	return pm
 }
 }
 
 
-func (p *pluginInsManager) getPluginIns(name string) (*pluginIns, bool) {
+func (p *pluginInsManager) getPluginIns(name string) (*PluginIns, bool) {
 	p.RLock()
 	p.RLock()
 	defer p.RUnlock()
 	defer p.RUnlock()
 	ins, ok := p.instances[name]
 	ins, ok := p.instances[name]
@@ -123,7 +137,7 @@ func (p *pluginInsManager) deletePluginIns(name string) {
 	delete(p.instances, name)
 	delete(p.instances, name)
 }
 }
 
 
-func (p *pluginInsManager) getOrStartProcess(pluginMeta *PluginMeta, pconf *PortableConfig) (*pluginIns, error) {
+func (p *pluginInsManager) getOrStartProcess(pluginMeta *PluginMeta, pconf *PortableConfig) (*PluginIns, error) {
 	p.Lock()
 	p.Lock()
 	defer p.Unlock()
 	defer p.Unlock()
 	if ins, ok := p.instances[pluginMeta.Name]; ok {
 	if ins, ok := p.instances[pluginMeta.Name]; ok {
@@ -181,11 +195,7 @@ func (p *pluginInsManager) getOrStartProcess(pluginMeta *PluginMeta, pconf *Port
 		return nil, fmt.Errorf("plugin %s control handshake error: %v", pluginMeta.Executable, err)
 		return nil, fmt.Errorf("plugin %s control handshake error: %v", pluginMeta.Executable, err)
 	}
 	}
 
 
-	ins := &pluginIns{
-		name:     pluginMeta.Name,
-		process:  process,
-		ctrlChan: ctrlChan,
-	}
+	ins := NewPluginIns(pluginMeta.Name, ctrlChan, process)
 	p.instances[pluginMeta.Name] = ins
 	p.instances[pluginMeta.Name] = ins
 	conf.Log.Println("plugin start running")
 	conf.Log.Println("plugin start running")
 	return ins, nil
 	return ins, nil
@@ -210,7 +220,7 @@ func (p *pluginInsManager) KillAll() error {
 	for _, ins := range p.instances {
 	for _, ins := range p.instances {
 		_ = ins.Stop()
 		_ = ins.Stop()
 	}
 	}
-	p.instances = make(map[string]*pluginIns)
+	p.instances = make(map[string]*PluginIns)
 	return nil
 	return nil
 }
 }
 
 

+ 1 - 1
internal/plugin/portable/runtime/plugin_ins_manager_test.go

@@ -49,7 +49,7 @@ func TestPluginInstance(t *testing.T) {
 		t.Errorf("can't ack handshake: %s", err.Error())
 		t.Errorf("can't ack handshake: %s", err.Error())
 		return
 		return
 	}
 	}
-	ins := &pluginIns{
+	ins := &PluginIns{
 		name:     "test",
 		name:     "test",
 		process:  nil,
 		process:  nil,
 		ctrlChan: ch,
 		ctrlChan: ch,

+ 148 - 0
tools/plugin_server/plugin_test_server.go

@@ -0,0 +1,148 @@
+// Copyright 2021 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/gorilla/handlers"
+	"github.com/gorilla/mux"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/plugin/portable/runtime"
+	"github.com/lf-edge/ekuiper/internal/topo/context"
+	"github.com/lf-edge/ekuiper/internal/topo/state"
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"net/http"
+	"time"
+)
+
+func main() {
+	var err error
+	ins, err = startPluginIns()
+	if err != nil {
+		panic(err)
+	}
+	defer ins.Stop()
+	c := context.WithValue(context.Background(), context.LoggerKey, conf.Log)
+	ctx = c.WithMeta("rule1", "op1", &state.MemoryStore{}).WithInstance(1)
+	server := createRestServer("127.0.0.1", 33333)
+	server.ListenAndServe()
+}
+
+const (
+	pluginName = "$$test"
+)
+
+var (
+	ctx api.StreamContext
+	ins *runtime.PluginIns
+)
+
+func startPluginIns() (*runtime.PluginIns, error) {
+	conf.Log.Infof("create control channel")
+	ctrlChan, err := runtime.CreateControlChannel(pluginName)
+	if err != nil {
+		return nil, fmt.Errorf("can't create new control channel: %s", err.Error())
+	}
+	conf.Log.Println("waiting handshake")
+	err = ctrlChan.Handshake()
+	if err != nil {
+		return nil, fmt.Errorf("plugin %s control handshake error: %v", pluginName, err)
+	}
+	conf.Log.Println("plugin start running")
+	return runtime.NewPluginIns(pluginName, ctrlChan, nil), nil
+}
+
+func createRestServer(ip string, port int) *http.Server {
+	r := mux.NewRouter()
+	r.HandleFunc("/symbol/start", startSymbolHandler).Methods(http.MethodPost)
+	r.HandleFunc("/symbol/stop", stopSymbolHandler).Methods(http.MethodPost)
+	server := &http.Server{
+		Addr: fmt.Sprintf("%s:%d", ip, port),
+		// Good practice to set timeouts to avoid Slowloris attacks.
+		WriteTimeout: time.Second * 60 * 5,
+		ReadTimeout:  time.Second * 60 * 5,
+		IdleTimeout:  time.Second * 60,
+		Handler:      handlers.CORS(handlers.AllowedHeaders([]string{"Accept", "Accept-Language", "Content-Type", "Content-Language", "Origin"}))(r),
+	}
+	server.SetKeepAlivesEnabled(false)
+	return server
+}
+
+func startSymbolHandler(w http.ResponseWriter, r *http.Request) {
+	ctrl, err := decode(r)
+	if err != nil {
+		http.Error(w, fmt.Sprintf("Invalid body: decode error %v", err), http.StatusBadRequest)
+		return
+	}
+	err = ins.StartSymbol(ctx, ctrl)
+	if err != nil {
+		http.Error(w, fmt.Sprintf("start symbol error: %v", err), http.StatusBadRequest)
+		return
+	}
+	go receive(ctx)
+	w.WriteHeader(http.StatusOK)
+	w.Write([]byte("ok"))
+}
+
+func stopSymbolHandler(w http.ResponseWriter, r *http.Request) {
+	ctrl, err := decode(r)
+	if err != nil {
+		http.Error(w, fmt.Sprintf("Invalid body: decode error %v", err), http.StatusBadRequest)
+		return
+	}
+	err = ins.StopSymbol(ctx, ctrl)
+	if err != nil {
+		http.Error(w, fmt.Sprintf("start symbol error: %v", err), http.StatusBadRequest)
+		return
+	}
+	w.WriteHeader(http.StatusOK)
+	w.Write([]byte("ok"))
+}
+
+func decode(r *http.Request) (*runtime.Control, error) {
+	defer r.Body.Close()
+	ctrl := &runtime.Control{}
+	err := json.NewDecoder(r.Body).Decode(ctrl)
+	return ctrl, err
+}
+
+func receive(ctx api.StreamContext) {
+	dataCh, err := runtime.CreateSourceChannel(ctx)
+	if err != nil {
+		fmt.Printf("cannot create source channel: %s\n", err.Error())
+	}
+	for {
+		var msg []byte
+		msg, err := dataCh.Recv()
+		if err != nil {
+			fmt.Printf("cannot receive from mangos Socket: %s\n", err.Error())
+			return
+		}
+		result := &api.DefaultSourceTuple{}
+		e := json.Unmarshal(msg, result)
+		if e != nil {
+			ctx.GetLogger().Errorf("Invalid data format, cannot decode %s to json format with error %s", string(msg), e)
+			continue
+		}
+		fmt.Println(result)
+		select {
+		case <-ctx.Done():
+			ctx.GetLogger().Info("stop source")
+			return
+		default:
+		}
+	}
+}