123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267 |
- // Copyright 2021-2022 EMQ Technologies Co., Ltd.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package main
- import (
- context2 "context"
- "encoding/json"
- "fmt"
- "net/http"
- "sync"
- "time"
- "github.com/gorilla/handlers"
- "github.com/gorilla/mux"
- "github.com/lf-edge/ekuiper/internal/conf"
- "github.com/lf-edge/ekuiper/internal/plugin/portable"
- "github.com/lf-edge/ekuiper/internal/plugin/portable/runtime"
- "github.com/lf-edge/ekuiper/internal/topo/context"
- "github.com/lf-edge/ekuiper/internal/topo/state"
- "github.com/lf-edge/ekuiper/pkg/api"
- "github.com/lf-edge/ekuiper/pkg/cast"
- )
- // Only support to test a single plugin Testing process.
- // 0. Edit the testingPlugin variable to match your plugin meta.
- // 1. Start this server, and wait for handshake.
- // 2. Start or debug your plugin. Make sure the handshake completed.
- // 3. Issue startSymbol/stopSymbol REST API to debug your plugin symbol.
- // EDIT HERE: Define the plugins that you want to test.
- var testingPlugin = &portable.PluginInfo{
- PluginMeta: runtime.PluginMeta{
- Name: "mirror",
- Version: "v1",
- Language: "go",
- Executable: "mirror.py",
- },
- Sources: []string{"pyjson"},
- Sinks: []string{"print"},
- Functions: []string{"revert"},
- }
- var mockSinkData = []map[string]interface{}{
- {
- "name": "hello",
- "count": 5,
- }, {
- "name": "world",
- "count": 10,
- },
- }
- var mockFuncData = [][]interface{}{
- {"twelve"},
- {"eleven"},
- }
- var (
- ins *runtime.PluginIns
- m *portable.Manager
- ctx api.StreamContext
- cancels sync.Map
- )
- func main() {
- var err error
- m, err = portable.MockManager(map[string]*portable.PluginInfo{testingPlugin.Name: testingPlugin})
- if err != nil {
- panic(err)
- }
- ins, err := startPluginIns(testingPlugin)
- if err != nil {
- panic(err)
- }
- defer ins.Stop()
- runtime.GetPluginInsManager().AddPluginIns(testingPlugin.Name, ins)
- c := context.WithValue(context.Background(), context.LoggerKey, conf.Log)
- ctx = c.WithMeta("rule1", "op1", &state.MemoryStore{}).WithInstance(1)
- server := createRestServer("127.0.0.1", 33333)
- server.ListenAndServe()
- }
- func startPluginIns(info *portable.PluginInfo) (*runtime.PluginIns, error) {
- conf.Log.Infof("create control channel")
- ctrlChan, err := runtime.CreateControlChannel(info.Name)
- if err != nil {
- return nil, fmt.Errorf("can't create new control channel: %s", err.Error())
- }
- conf.Log.Println("waiting handshake")
- err = ctrlChan.Handshake()
- if err != nil {
- return nil, fmt.Errorf("plugin %s control handshake error: %v", info.Name, err)
- }
- conf.Log.Println("plugin start running")
- return runtime.NewPluginInsForTest(info.Name, ctrlChan), nil
- }
- func createRestServer(ip string, port int) *http.Server {
- r := mux.NewRouter()
- r.HandleFunc("/symbol/start", startSymbolHandler).Methods(http.MethodPost)
- r.HandleFunc("/symbol/stop", stopSymbolHandler).Methods(http.MethodPost)
- server := &http.Server{
- Addr: cast.JoinHostPortInt(ip, port),
- // Good practice to set timeouts to avoid Slowloris attacks.
- WriteTimeout: time.Second * 60 * 5,
- ReadTimeout: time.Second * 60 * 5,
- IdleTimeout: time.Second * 60,
- Handler: handlers.CORS(handlers.AllowedHeaders([]string{"Accept", "Accept-Language", "Content-Type", "Content-Language", "Origin"}))(r),
- }
- server.SetKeepAlivesEnabled(false)
- return server
- }
- func startSymbolHandler(w http.ResponseWriter, r *http.Request) {
- ctrl, err := decode(r)
- if err != nil {
- http.Error(w, fmt.Sprintf("Invalid body: decode error %v", err), http.StatusBadRequest)
- return
- }
- switch ctrl.PluginType {
- case runtime.TYPE_SOURCE:
- source, err := m.Source(ctrl.SymbolName)
- if err != nil {
- http.Error(w, fmt.Sprintf("running source %s %v", ctrl.SymbolName, err), http.StatusBadRequest)
- return
- }
- newctx, cancel := ctx.WithCancel()
- if _, ok := cancels.LoadOrStore(ctrl.PluginType+ctrl.SymbolName, cancel); ok {
- http.Error(w, fmt.Sprintf("source symbol %s already exists", ctrl.SymbolName), http.StatusBadRequest)
- return
- }
- consumer := make(chan api.SourceTuple)
- errCh := make(chan error)
- go func() {
- defer func() {
- source.Close(newctx)
- cancels.Delete(ctrl.PluginType + ctrl.SymbolName)
- }()
- for {
- select {
- case tuple := <-consumer:
- fmt.Println(tuple)
- case err := <-errCh:
- conf.Log.Error(err)
- return
- case <-newctx.Done():
- return
- }
- }
- }()
- source.Configure("", ctrl.Config)
- go source.Open(newctx, consumer, errCh)
- case runtime.TYPE_SINK:
- sink, err := m.Sink(ctrl.SymbolName)
- if err != nil {
- http.Error(w, fmt.Sprintf("running sink %s %v", ctrl.SymbolName, err), http.StatusBadRequest)
- return
- }
- newctx, cancel := ctx.WithCancel()
- if _, ok := cancels.LoadOrStore(ctrl.PluginType+ctrl.SymbolName, cancel); ok {
- http.Error(w, fmt.Sprintf("source symbol %s already exists", ctrl.SymbolName), http.StatusBadRequest)
- return
- }
- sink.Configure(ctrl.Config)
- err = sink.Open(newctx)
- if err != nil {
- http.Error(w, fmt.Sprintf("open sink %s %v", ctrl.SymbolName, err), http.StatusBadRequest)
- return
- }
- go func() {
- defer func() {
- sink.Close(newctx)
- cancels.Delete(ctrl.PluginType + ctrl.SymbolName)
- }()
- for {
- for _, m := range mockSinkData {
- err = sink.Collect(newctx, m)
- if err != nil {
- fmt.Printf("cannot collect data: %v\n", err)
- continue
- }
- select {
- case <-ctx.Done():
- ctx.GetLogger().Info("stop sink")
- return
- default:
- }
- time.Sleep(1 * time.Second)
- }
- }
- }()
- case runtime.TYPE_FUNC:
- f, err := m.Function(ctrl.SymbolName)
- if err != nil {
- http.Error(w, fmt.Sprintf("running function %s %v", ctrl.SymbolName, err), http.StatusBadRequest)
- return
- }
- newctx, cancel := ctx.WithCancel()
- fc := context.NewDefaultFuncContext(newctx, 1)
- if _, ok := cancels.LoadOrStore(ctrl.PluginType+ctrl.SymbolName, cancel); ok {
- http.Error(w, fmt.Sprintf("source symbol %s already exists", ctrl.SymbolName), http.StatusBadRequest)
- return
- }
- go func() {
- defer func() {
- cancels.Delete(ctrl.PluginType + ctrl.SymbolName)
- }()
- for {
- for _, m := range mockFuncData {
- r, ok := f.Exec(m, fc)
- if !ok {
- fmt.Print("cannot exec func\n")
- continue
- }
- fmt.Println(r)
- select {
- case <-ctx.Done():
- ctx.GetLogger().Info("stop sink")
- return
- default:
- }
- time.Sleep(1 * time.Second)
- }
- }
- }()
- }
- w.WriteHeader(http.StatusOK)
- w.Write([]byte("ok"))
- }
- func stopSymbolHandler(w http.ResponseWriter, r *http.Request) {
- ctrl, err := decode(r)
- if err != nil {
- http.Error(w, fmt.Sprintf("Invalid body: decode error %v", err), http.StatusBadRequest)
- return
- }
- if cancel, ok := cancels.Load(ctrl.PluginType + ctrl.SymbolName); ok {
- cancel.(context2.CancelFunc)()
- cancels.Delete(ctrl.PluginType + ctrl.SymbolName)
- } else {
- http.Error(w, fmt.Sprintf("Symbol %s already close", ctrl.SymbolName), http.StatusBadRequest)
- return
- }
- w.WriteHeader(http.StatusOK)
- w.Write([]byte("ok"))
- }
- func decode(r *http.Request) (*runtime.Control, error) {
- defer r.Body.Close()
- ctrl := &runtime.Control{}
- err := json.NewDecoder(r.Body).Decode(ctrl)
- return ctrl, err
- }
|