portable_init.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build portable || !core
  15. // +build portable !core
  16. package server
  17. import (
  18. "encoding/json"
  19. "fmt"
  20. "net/http"
  21. "github.com/gorilla/mux"
  22. "github.com/lf-edge/ekuiper/internal/binder"
  23. "github.com/lf-edge/ekuiper/internal/conf"
  24. "github.com/lf-edge/ekuiper/internal/plugin"
  25. "github.com/lf-edge/ekuiper/internal/plugin/portable"
  26. "github.com/lf-edge/ekuiper/pkg/errorx"
  27. )
  28. var portableManager *portable.Manager
  29. func init() {
  30. components["portable"] = portableComp{}
  31. }
  32. type portableComp struct{}
  33. func (p portableComp) register() {
  34. var err error
  35. portableManager, err = portable.InitManager()
  36. if err != nil {
  37. panic(err)
  38. }
  39. entries = append(entries, binder.FactoryEntry{Name: "portable plugin", Factory: portableManager, Weight: 8})
  40. }
  41. func (p portableComp) rest(r *mux.Router) {
  42. r.HandleFunc("/plugins/portables", portablesHandler).Methods(http.MethodGet, http.MethodPost)
  43. r.HandleFunc("/plugins/portables/{name}", portableHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
  44. }
  45. func portablesHandler(w http.ResponseWriter, r *http.Request) {
  46. defer r.Body.Close()
  47. switch r.Method {
  48. case http.MethodGet:
  49. content := portableManager.List()
  50. jsonResponse(content, w, logger)
  51. case http.MethodPost:
  52. sd := plugin.NewPluginByType(plugin.PORTABLE)
  53. err := json.NewDecoder(r.Body).Decode(sd)
  54. // Problems decoding
  55. if err != nil {
  56. handleError(w, err, "Invalid body: Error decoding the portable plugin json", logger)
  57. return
  58. }
  59. err = portableManager.Register(sd)
  60. if err != nil {
  61. handleError(w, err, "portable plugin create command error", logger)
  62. return
  63. }
  64. w.WriteHeader(http.StatusCreated)
  65. fmt.Fprintf(w, "portable plugin %s is created", sd.GetName())
  66. }
  67. }
  68. func portableHandler(w http.ResponseWriter, r *http.Request) {
  69. defer r.Body.Close()
  70. vars := mux.Vars(r)
  71. name := vars["name"]
  72. switch r.Method {
  73. case http.MethodDelete:
  74. err := portableManager.Delete(name)
  75. if err != nil {
  76. handleError(w, err, fmt.Sprintf("delete portable plugin %s error", name), logger)
  77. return
  78. }
  79. w.WriteHeader(http.StatusOK)
  80. fmt.Fprintf(w, "portable plugin %s is deleted", name)
  81. case http.MethodGet:
  82. j, ok := portableManager.GetPluginInfo(name)
  83. if !ok {
  84. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe portable plugin %s error", name), logger)
  85. return
  86. }
  87. jsonResponse(j, w, logger)
  88. case http.MethodPut:
  89. sd := plugin.NewPluginByType(plugin.PORTABLE)
  90. err := json.NewDecoder(r.Body).Decode(sd)
  91. // Problems decoding
  92. if err != nil {
  93. handleError(w, err, "Invalid body: Error decoding the portable plugin json", logger)
  94. return
  95. }
  96. err = portableManager.Delete(name)
  97. if err != nil {
  98. conf.Log.Errorf("delete portable plugin %s error: %v", name, err)
  99. }
  100. err = portableManager.Register(sd)
  101. if err != nil {
  102. handleError(w, err, "portable plugin update command error", logger)
  103. return
  104. }
  105. w.WriteHeader(http.StatusOK)
  106. fmt.Fprintf(w, "portable plugin %s is updated", sd.GetName())
  107. }
  108. }
  109. func portablePluginsReset() {
  110. portableManager.UninstallAllPlugins()
  111. }
  112. func portablePluginExport() map[string]string {
  113. return portableManager.GetAllPlugins()
  114. }
  115. func portablePluginStatusExport() map[string]string {
  116. return portableManager.GetAllPlugins()
  117. }
  118. func portablePluginImport(plugins map[string]string) map[string]string {
  119. return portableManager.PluginImport(plugins)
  120. }
  121. func portablePluginPartialImport(plugins map[string]string) map[string]string {
  122. return portableManager.PluginPartialImport(plugins)
  123. }