portable_init.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build portable || !core
  15. // +build portable !core
  16. package server
  17. import (
  18. "encoding/json"
  19. "fmt"
  20. "net/http"
  21. "github.com/gorilla/mux"
  22. "github.com/lf-edge/ekuiper/internal/binder"
  23. "github.com/lf-edge/ekuiper/internal/conf"
  24. "github.com/lf-edge/ekuiper/internal/plugin"
  25. "github.com/lf-edge/ekuiper/internal/plugin/portable"
  26. "github.com/lf-edge/ekuiper/pkg/errorx"
  27. )
  28. var portableManager *portable.Manager
  29. func init() {
  30. components["portable"] = portableComp{}
  31. }
  32. type portableComp struct{}
  33. func (p portableComp) register() {
  34. var err error
  35. portableManager, err = portable.InitManager()
  36. if err != nil {
  37. panic(err)
  38. }
  39. entries = append(entries, binder.FactoryEntry{Name: "portable plugin", Factory: portableManager, Weight: 8})
  40. }
  41. func (p portableComp) rest(r *mux.Router) {
  42. r.HandleFunc("/plugins/portables", portablesHandler).Methods(http.MethodGet, http.MethodPost)
  43. r.HandleFunc("/plugins/portables/{name}", portableHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
  44. }
  45. func portablesHandler(w http.ResponseWriter, r *http.Request) {
  46. defer r.Body.Close()
  47. switch r.Method {
  48. case http.MethodGet:
  49. content := portableManager.List()
  50. jsonResponse(content, w, logger)
  51. case http.MethodPost:
  52. sd := plugin.NewPluginByType(plugin.PORTABLE)
  53. err := json.NewDecoder(r.Body).Decode(sd)
  54. // Problems decoding
  55. if err != nil {
  56. handleError(w, err, "Invalid body: Error decoding the portable plugin json", logger)
  57. return
  58. }
  59. err = portableManager.Register(sd)
  60. if err != nil {
  61. handleError(w, err, "portable plugin create command error", logger)
  62. return
  63. }
  64. w.WriteHeader(http.StatusCreated)
  65. w.Write([]byte(fmt.Sprintf("portable plugin %s is created", sd.GetName())))
  66. }
  67. }
  68. func portableHandler(w http.ResponseWriter, r *http.Request) {
  69. defer r.Body.Close()
  70. vars := mux.Vars(r)
  71. name := vars["name"]
  72. switch r.Method {
  73. case http.MethodDelete:
  74. err := portableManager.Delete(name)
  75. if err != nil {
  76. handleError(w, err, fmt.Sprintf("delete portable plugin %s error", name), logger)
  77. return
  78. }
  79. w.WriteHeader(http.StatusOK)
  80. result := fmt.Sprintf("portable plugin %s is deleted", name)
  81. w.Write([]byte(result))
  82. case http.MethodGet:
  83. j, ok := portableManager.GetPluginInfo(name)
  84. if !ok {
  85. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe portable plugin %s error", name), logger)
  86. return
  87. }
  88. jsonResponse(j, w, logger)
  89. case http.MethodPut:
  90. sd := plugin.NewPluginByType(plugin.PORTABLE)
  91. err := json.NewDecoder(r.Body).Decode(sd)
  92. // Problems decoding
  93. if err != nil {
  94. handleError(w, err, "Invalid body: Error decoding the portable plugin json", logger)
  95. return
  96. }
  97. err = portableManager.Delete(name)
  98. if err != nil {
  99. conf.Log.Errorf("delete portable plugin %s error: %v", name, err)
  100. }
  101. err = portableManager.Register(sd)
  102. if err != nil {
  103. handleError(w, err, "portable plugin update command error", logger)
  104. return
  105. }
  106. w.WriteHeader(http.StatusOK)
  107. w.Write([]byte(fmt.Sprintf("portable plugin %s is updated", sd.GetName())))
  108. }
  109. }
  110. func portablePluginsReset() {
  111. portableManager.UninstallAllPlugins()
  112. }
  113. func portablePluginExport() map[string]string {
  114. return portableManager.GetAllPlugins()
  115. }
  116. func portablePluginStatusExport() map[string]string {
  117. return portableManager.GetAllPlugins()
  118. }
  119. func portablePluginImport(plugins map[string]string) map[string]string {
  120. return portableManager.PluginImport(plugins)
  121. }
  122. func portablePluginPartialImport(plugins map[string]string) map[string]string {
  123. return portableManager.PluginPartialImport(plugins)
  124. }