plugin_test_server.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. context2 "context"
  17. "encoding/json"
  18. "fmt"
  19. "github.com/gorilla/handlers"
  20. "github.com/gorilla/mux"
  21. "github.com/lf-edge/ekuiper/internal/conf"
  22. "github.com/lf-edge/ekuiper/internal/plugin/portable"
  23. "github.com/lf-edge/ekuiper/internal/plugin/portable/runtime"
  24. "github.com/lf-edge/ekuiper/internal/topo/context"
  25. "github.com/lf-edge/ekuiper/internal/topo/state"
  26. "github.com/lf-edge/ekuiper/pkg/api"
  27. "net/http"
  28. "sync"
  29. "time"
  30. )
  31. // Only support to test a single plugin Testing process.
  32. // 0. Edit the testingPlugin variable to match your plugin meta.
  33. // 1. Start this server, and wait for handshake.
  34. // 2. Start or debug your plugin. Make sure the handshake completed.
  35. // 3. Issue startSymbol/stopSymbol REST API to debug your plugin symbol.
  36. // EDIT HERE: Define the plugins that you want to test.
  37. var testingPlugin = &portable.PluginInfo{
  38. PluginMeta: runtime.PluginMeta{
  39. Name: "mirror",
  40. Version: "v1",
  41. Language: "go",
  42. Executable: "mirror.py",
  43. },
  44. Sources: []string{"pyjson"},
  45. Sinks: []string{"print"},
  46. Functions: []string{"revert"},
  47. }
  48. var mockSinkData = []map[string]interface{}{
  49. {
  50. "name": "hello",
  51. "count": 5,
  52. }, {
  53. "name": "world",
  54. "count": 10,
  55. },
  56. }
  57. var mockFuncData = [][]interface{}{
  58. {"twelve"},
  59. {"eleven"},
  60. }
  61. var (
  62. ins *runtime.PluginIns
  63. m *portable.Manager
  64. ctx api.StreamContext
  65. cancels sync.Map
  66. )
  67. func main() {
  68. var err error
  69. m, err = portable.MockManager(map[string]*portable.PluginInfo{testingPlugin.Name: testingPlugin})
  70. if err != nil {
  71. panic(err)
  72. }
  73. ins, err := startPluginIns(testingPlugin)
  74. if err != nil {
  75. panic(err)
  76. }
  77. defer ins.Stop()
  78. runtime.GetPluginInsManager().AddPluginIns(testingPlugin.Name, ins)
  79. c := context.WithValue(context.Background(), context.LoggerKey, conf.Log)
  80. ctx = c.WithMeta("rule1", "op1", &state.MemoryStore{}).WithInstance(1)
  81. server := createRestServer("127.0.0.1", 33333)
  82. server.ListenAndServe()
  83. }
  84. func startPluginIns(info *portable.PluginInfo) (*runtime.PluginIns, error) {
  85. conf.Log.Infof("create control channel")
  86. ctrlChan, err := runtime.CreateControlChannel(info.Name)
  87. if err != nil {
  88. return nil, fmt.Errorf("can't create new control channel: %s", err.Error())
  89. }
  90. conf.Log.Println("waiting handshake")
  91. err = ctrlChan.Handshake()
  92. if err != nil {
  93. return nil, fmt.Errorf("plugin %s control handshake error: %v", info.Name, err)
  94. }
  95. conf.Log.Println("plugin start running")
  96. return runtime.NewPluginInsForTest(info.Name, ctrlChan), nil
  97. }
  98. func createRestServer(ip string, port int) *http.Server {
  99. r := mux.NewRouter()
  100. r.HandleFunc("/symbol/start", startSymbolHandler).Methods(http.MethodPost)
  101. r.HandleFunc("/symbol/stop", stopSymbolHandler).Methods(http.MethodPost)
  102. server := &http.Server{
  103. Addr: fmt.Sprintf("%s:%d", ip, port),
  104. // Good practice to set timeouts to avoid Slowloris attacks.
  105. WriteTimeout: time.Second * 60 * 5,
  106. ReadTimeout: time.Second * 60 * 5,
  107. IdleTimeout: time.Second * 60,
  108. Handler: handlers.CORS(handlers.AllowedHeaders([]string{"Accept", "Accept-Language", "Content-Type", "Content-Language", "Origin"}))(r),
  109. }
  110. server.SetKeepAlivesEnabled(false)
  111. return server
  112. }
  113. func startSymbolHandler(w http.ResponseWriter, r *http.Request) {
  114. ctrl, err := decode(r)
  115. if err != nil {
  116. http.Error(w, fmt.Sprintf("Invalid body: decode error %v", err), http.StatusBadRequest)
  117. return
  118. }
  119. switch ctrl.PluginType {
  120. case runtime.TYPE_SOURCE:
  121. source, err := m.Source(ctrl.SymbolName)
  122. if err != nil {
  123. http.Error(w, fmt.Sprintf("running source %s %v", ctrl.SymbolName, err), http.StatusBadRequest)
  124. return
  125. }
  126. newctx, cancel := ctx.WithCancel()
  127. if _, ok := cancels.LoadOrStore(ctrl.PluginType+ctrl.SymbolName, cancel); ok {
  128. http.Error(w, fmt.Sprintf("source symbol %s already exists", ctrl.SymbolName), http.StatusBadRequest)
  129. return
  130. }
  131. consumer := make(chan api.SourceTuple)
  132. errCh := make(chan error)
  133. go func() {
  134. defer func() {
  135. source.Close(newctx)
  136. cancels.Delete(ctrl.PluginType + ctrl.SymbolName)
  137. }()
  138. for {
  139. select {
  140. case tuple := <-consumer:
  141. fmt.Println(tuple)
  142. case err := <-errCh:
  143. conf.Log.Error(err)
  144. return
  145. case <-newctx.Done():
  146. return
  147. }
  148. }
  149. }()
  150. source.Configure("", ctrl.Config)
  151. go source.Open(newctx, consumer, errCh)
  152. case runtime.TYPE_SINK:
  153. sink, err := m.Sink(ctrl.SymbolName)
  154. if err != nil {
  155. http.Error(w, fmt.Sprintf("running sink %s %v", ctrl.SymbolName, err), http.StatusBadRequest)
  156. return
  157. }
  158. newctx, cancel := ctx.WithCancel()
  159. if _, ok := cancels.LoadOrStore(ctrl.PluginType+ctrl.SymbolName, cancel); ok {
  160. http.Error(w, fmt.Sprintf("source symbol %s already exists", ctrl.SymbolName), http.StatusBadRequest)
  161. return
  162. }
  163. sink.Configure(ctrl.Config)
  164. err = sink.Open(newctx)
  165. if err != nil {
  166. http.Error(w, fmt.Sprintf("open sink %s %v", ctrl.SymbolName, err), http.StatusBadRequest)
  167. return
  168. }
  169. go func() {
  170. defer func() {
  171. sink.Close(newctx)
  172. cancels.Delete(ctrl.PluginType + ctrl.SymbolName)
  173. }()
  174. for {
  175. for _, m := range mockSinkData {
  176. err = sink.Collect(newctx, m)
  177. if err != nil {
  178. fmt.Printf("cannot collect data: %v\n", err)
  179. continue
  180. }
  181. select {
  182. case <-ctx.Done():
  183. ctx.GetLogger().Info("stop sink")
  184. return
  185. default:
  186. }
  187. time.Sleep(1 * time.Second)
  188. }
  189. }
  190. }()
  191. case runtime.TYPE_FUNC:
  192. f, err := m.Function(ctrl.SymbolName)
  193. if err != nil {
  194. http.Error(w, fmt.Sprintf("running function %s %v", ctrl.SymbolName, err), http.StatusBadRequest)
  195. return
  196. }
  197. newctx, cancel := ctx.WithCancel()
  198. fc := context.NewDefaultFuncContext(newctx, 1)
  199. if _, ok := cancels.LoadOrStore(ctrl.PluginType+ctrl.SymbolName, cancel); ok {
  200. http.Error(w, fmt.Sprintf("source symbol %s already exists", ctrl.SymbolName), http.StatusBadRequest)
  201. return
  202. }
  203. go func() {
  204. defer func() {
  205. cancels.Delete(ctrl.PluginType + ctrl.SymbolName)
  206. }()
  207. for {
  208. for _, m := range mockFuncData {
  209. r, ok := f.Exec(m, fc)
  210. if !ok {
  211. fmt.Print("cannot exec func\n")
  212. continue
  213. }
  214. fmt.Println(r)
  215. select {
  216. case <-ctx.Done():
  217. ctx.GetLogger().Info("stop sink")
  218. return
  219. default:
  220. }
  221. time.Sleep(1 * time.Second)
  222. }
  223. }
  224. }()
  225. }
  226. w.WriteHeader(http.StatusOK)
  227. w.Write([]byte("ok"))
  228. }
  229. func stopSymbolHandler(w http.ResponseWriter, r *http.Request) {
  230. ctrl, err := decode(r)
  231. if err != nil {
  232. http.Error(w, fmt.Sprintf("Invalid body: decode error %v", err), http.StatusBadRequest)
  233. return
  234. }
  235. if cancel, ok := cancels.Load(ctrl.PluginType + ctrl.SymbolName); ok {
  236. cancel.(context2.CancelFunc)()
  237. cancels.Delete(ctrl.PluginType + ctrl.SymbolName)
  238. } else {
  239. http.Error(w, fmt.Sprintf("Symbol %s already close", ctrl.SymbolName), http.StatusBadRequest)
  240. return
  241. }
  242. w.WriteHeader(http.StatusOK)
  243. w.Write([]byte("ok"))
  244. }
  245. func decode(r *http.Request) (*runtime.Control, error) {
  246. defer r.Body.Close()
  247. ctrl := &runtime.Control{}
  248. err := json.NewDecoder(r.Body).Decode(ctrl)
  249. return ctrl, err
  250. }