plugin_test_server.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. context2 "context"
  17. "encoding/json"
  18. "fmt"
  19. "net/http"
  20. "sync"
  21. "time"
  22. "github.com/gorilla/handlers"
  23. "github.com/gorilla/mux"
  24. "github.com/lf-edge/ekuiper/internal/conf"
  25. "github.com/lf-edge/ekuiper/internal/plugin/portable"
  26. "github.com/lf-edge/ekuiper/internal/plugin/portable/runtime"
  27. "github.com/lf-edge/ekuiper/internal/topo/context"
  28. "github.com/lf-edge/ekuiper/internal/topo/state"
  29. "github.com/lf-edge/ekuiper/pkg/api"
  30. "github.com/lf-edge/ekuiper/pkg/cast"
  31. )
  32. // Only support to test a single plugin Testing process.
  33. // 0. Edit the testingPlugin variable to match your plugin meta.
  34. // 1. Start this server, and wait for handshake.
  35. // 2. Start or debug your plugin. Make sure the handshake completed.
  36. // 3. Issue startSymbol/stopSymbol REST API to debug your plugin symbol.
  37. // EDIT HERE: Define the plugins that you want to test.
  38. var testingPlugin = &portable.PluginInfo{
  39. PluginMeta: runtime.PluginMeta{
  40. Name: "mirror",
  41. Version: "v1",
  42. Language: "go",
  43. Executable: "mirror.py",
  44. },
  45. Sources: []string{"pyjson"},
  46. Sinks: []string{"print"},
  47. Functions: []string{"revert"},
  48. }
  49. var mockSinkData = []map[string]interface{}{
  50. {
  51. "name": "hello",
  52. "count": 5,
  53. }, {
  54. "name": "world",
  55. "count": 10,
  56. },
  57. }
  58. var mockFuncData = [][]interface{}{
  59. {"twelve"},
  60. {"eleven"},
  61. }
  62. var (
  63. ins *runtime.PluginIns
  64. m *portable.Manager
  65. ctx api.StreamContext
  66. cancels sync.Map
  67. )
  68. func main() {
  69. var err error
  70. m, err = portable.MockManager(map[string]*portable.PluginInfo{testingPlugin.Name: testingPlugin})
  71. if err != nil {
  72. panic(err)
  73. }
  74. ins, err := startPluginIns(testingPlugin)
  75. if err != nil {
  76. panic(err)
  77. }
  78. defer ins.Stop()
  79. runtime.GetPluginInsManager().AddPluginIns(testingPlugin.Name, ins)
  80. c := context.WithValue(context.Background(), context.LoggerKey, conf.Log)
  81. ctx = c.WithMeta("rule1", "op1", &state.MemoryStore{}).WithInstance(1)
  82. server := createRestServer("127.0.0.1", 33333)
  83. server.ListenAndServe()
  84. }
  85. func startPluginIns(info *portable.PluginInfo) (*runtime.PluginIns, error) {
  86. conf.Log.Infof("create control channel")
  87. ctrlChan, err := runtime.CreateControlChannel(info.Name)
  88. if err != nil {
  89. return nil, fmt.Errorf("can't create new control channel: %s", err.Error())
  90. }
  91. conf.Log.Println("waiting handshake")
  92. err = ctrlChan.Handshake()
  93. if err != nil {
  94. return nil, fmt.Errorf("plugin %s control handshake error: %v", info.Name, err)
  95. }
  96. conf.Log.Println("plugin start running")
  97. return runtime.NewPluginInsForTest(info.Name, ctrlChan), nil
  98. }
  99. func createRestServer(ip string, port int) *http.Server {
  100. r := mux.NewRouter()
  101. r.HandleFunc("/symbol/start", startSymbolHandler).Methods(http.MethodPost)
  102. r.HandleFunc("/symbol/stop", stopSymbolHandler).Methods(http.MethodPost)
  103. server := &http.Server{
  104. Addr: cast.JoinHostPortInt(ip, port),
  105. // Good practice to set timeouts to avoid Slowloris attacks.
  106. WriteTimeout: time.Second * 60 * 5,
  107. ReadTimeout: time.Second * 60 * 5,
  108. IdleTimeout: time.Second * 60,
  109. Handler: handlers.CORS(handlers.AllowedHeaders([]string{"Accept", "Accept-Language", "Content-Type", "Content-Language", "Origin"}))(r),
  110. }
  111. server.SetKeepAlivesEnabled(false)
  112. return server
  113. }
  114. func startSymbolHandler(w http.ResponseWriter, r *http.Request) {
  115. ctrl, err := decode(r)
  116. if err != nil {
  117. http.Error(w, fmt.Sprintf("Invalid body: decode error %v", err), http.StatusBadRequest)
  118. return
  119. }
  120. switch ctrl.PluginType {
  121. case runtime.TYPE_SOURCE:
  122. source, err := m.Source(ctrl.SymbolName)
  123. if err != nil {
  124. http.Error(w, fmt.Sprintf("running source %s %v", ctrl.SymbolName, err), http.StatusBadRequest)
  125. return
  126. }
  127. newctx, cancel := ctx.WithCancel()
  128. if _, ok := cancels.LoadOrStore(ctrl.PluginType+ctrl.SymbolName, cancel); ok {
  129. http.Error(w, fmt.Sprintf("source symbol %s already exists", ctrl.SymbolName), http.StatusBadRequest)
  130. return
  131. }
  132. consumer := make(chan api.SourceTuple)
  133. errCh := make(chan error)
  134. go func() {
  135. defer func() {
  136. source.Close(newctx)
  137. cancels.Delete(ctrl.PluginType + ctrl.SymbolName)
  138. }()
  139. for {
  140. select {
  141. case tuple := <-consumer:
  142. fmt.Println(tuple)
  143. case err := <-errCh:
  144. conf.Log.Error(err)
  145. return
  146. case <-newctx.Done():
  147. return
  148. }
  149. }
  150. }()
  151. source.Configure("", ctrl.Config)
  152. go source.Open(newctx, consumer, errCh)
  153. case runtime.TYPE_SINK:
  154. sink, err := m.Sink(ctrl.SymbolName)
  155. if err != nil {
  156. http.Error(w, fmt.Sprintf("running sink %s %v", ctrl.SymbolName, err), http.StatusBadRequest)
  157. return
  158. }
  159. newctx, cancel := ctx.WithCancel()
  160. if _, ok := cancels.LoadOrStore(ctrl.PluginType+ctrl.SymbolName, cancel); ok {
  161. http.Error(w, fmt.Sprintf("source symbol %s already exists", ctrl.SymbolName), http.StatusBadRequest)
  162. return
  163. }
  164. sink.Configure(ctrl.Config)
  165. err = sink.Open(newctx)
  166. if err != nil {
  167. http.Error(w, fmt.Sprintf("open sink %s %v", ctrl.SymbolName, err), http.StatusBadRequest)
  168. return
  169. }
  170. go func() {
  171. defer func() {
  172. sink.Close(newctx)
  173. cancels.Delete(ctrl.PluginType + ctrl.SymbolName)
  174. }()
  175. for {
  176. for _, m := range mockSinkData {
  177. err = sink.Collect(newctx, m)
  178. if err != nil {
  179. fmt.Printf("cannot collect data: %v\n", err)
  180. continue
  181. }
  182. select {
  183. case <-ctx.Done():
  184. ctx.GetLogger().Info("stop sink")
  185. return
  186. default:
  187. }
  188. time.Sleep(1 * time.Second)
  189. }
  190. }
  191. }()
  192. case runtime.TYPE_FUNC:
  193. f, err := m.Function(ctrl.SymbolName)
  194. if err != nil {
  195. http.Error(w, fmt.Sprintf("running function %s %v", ctrl.SymbolName, err), http.StatusBadRequest)
  196. return
  197. }
  198. newctx, cancel := ctx.WithCancel()
  199. fc := context.NewDefaultFuncContext(newctx, 1)
  200. if _, ok := cancels.LoadOrStore(ctrl.PluginType+ctrl.SymbolName, cancel); ok {
  201. http.Error(w, fmt.Sprintf("source symbol %s already exists", ctrl.SymbolName), http.StatusBadRequest)
  202. return
  203. }
  204. go func() {
  205. defer func() {
  206. cancels.Delete(ctrl.PluginType + ctrl.SymbolName)
  207. }()
  208. for {
  209. for _, m := range mockFuncData {
  210. r, ok := f.Exec(m, fc)
  211. if !ok {
  212. fmt.Print("cannot exec func\n")
  213. continue
  214. }
  215. fmt.Println(r)
  216. select {
  217. case <-ctx.Done():
  218. ctx.GetLogger().Info("stop sink")
  219. return
  220. default:
  221. }
  222. time.Sleep(1 * time.Second)
  223. }
  224. }
  225. }()
  226. }
  227. w.WriteHeader(http.StatusOK)
  228. w.Write([]byte("ok"))
  229. }
  230. func stopSymbolHandler(w http.ResponseWriter, r *http.Request) {
  231. ctrl, err := decode(r)
  232. if err != nil {
  233. http.Error(w, fmt.Sprintf("Invalid body: decode error %v", err), http.StatusBadRequest)
  234. return
  235. }
  236. if cancel, ok := cancels.Load(ctrl.PluginType + ctrl.SymbolName); ok {
  237. cancel.(context2.CancelFunc)()
  238. cancels.Delete(ctrl.PluginType + ctrl.SymbolName)
  239. } else {
  240. http.Error(w, fmt.Sprintf("Symbol %s already close", ctrl.SymbolName), http.StatusBadRequest)
  241. return
  242. }
  243. w.WriteHeader(http.StatusOK)
  244. w.Write([]byte("ok"))
  245. }
  246. func decode(r *http.Request) (*runtime.Control, error) {
  247. defer r.Body.Close()
  248. ctrl := &runtime.Control{}
  249. err := json.NewDecoder(r.Body).Decode(ctrl)
  250. return ctrl, err
  251. }