plugin_init.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build plugin || !core
  15. package server
  16. import (
  17. "encoding/json"
  18. "fmt"
  19. "io"
  20. "net/http"
  21. "runtime"
  22. "strings"
  23. "github.com/gorilla/mux"
  24. "github.com/lf-edge/ekuiper/internal/binder"
  25. "github.com/lf-edge/ekuiper/internal/conf"
  26. "github.com/lf-edge/ekuiper/internal/plugin"
  27. "github.com/lf-edge/ekuiper/internal/plugin/native"
  28. "github.com/lf-edge/ekuiper/pkg/errorx"
  29. )
  30. var nativeManager *native.Manager
  31. func init() {
  32. components["plugin"] = pluginComp{}
  33. }
  34. type pluginComp struct{}
  35. func (p pluginComp) register() {
  36. var err error
  37. nativeManager, err = native.InitManager()
  38. if err != nil {
  39. panic(err)
  40. }
  41. entries = append(entries, binder.FactoryEntry{Name: "native plugin", Factory: nativeManager, Weight: 9})
  42. }
  43. func (p pluginComp) rest(r *mux.Router) {
  44. r.HandleFunc("/plugins/sources/prebuild", prebuildSourcePlugins).Methods(http.MethodGet)
  45. r.HandleFunc("/plugins/sinks/prebuild", prebuildSinkPlugins).Methods(http.MethodGet)
  46. r.HandleFunc("/plugins/functions/prebuild", prebuildFuncsPlugins).Methods(http.MethodGet)
  47. r.HandleFunc("/plugins/sources", sourcesHandler).Methods(http.MethodGet, http.MethodPost)
  48. r.HandleFunc("/plugins/sources/{name}", sourceHandler).Methods(http.MethodDelete, http.MethodGet, http.MethodPut)
  49. r.HandleFunc("/plugins/sinks", sinksHandler).Methods(http.MethodGet, http.MethodPost)
  50. r.HandleFunc("/plugins/sinks/{name}", sinkHandler).Methods(http.MethodDelete, http.MethodGet, http.MethodPut)
  51. r.HandleFunc("/plugins/functions", functionsHandler).Methods(http.MethodGet, http.MethodPost)
  52. r.HandleFunc("/plugins/functions/{name}", functionHandler).Methods(http.MethodDelete, http.MethodGet, http.MethodPut)
  53. r.HandleFunc("/plugins/functions/{name}/register", functionRegisterHandler).Methods(http.MethodPost)
  54. r.HandleFunc("/plugins/udfs", functionsListHandler).Methods(http.MethodGet)
  55. r.HandleFunc("/plugins/udfs/{name}", functionsGetHandler).Methods(http.MethodGet)
  56. }
  57. func pluginsHandler(w http.ResponseWriter, r *http.Request, t plugin.PluginType) {
  58. defer func(Body io.ReadCloser) { _ = Body.Close() }(r.Body)
  59. switch r.Method {
  60. case http.MethodGet:
  61. content := nativeManager.List(t)
  62. jsonResponse(content, w, logger)
  63. case http.MethodPost:
  64. sd := plugin.NewPluginByType(t)
  65. err := json.NewDecoder(r.Body).Decode(sd)
  66. // Problems decoding
  67. if err != nil {
  68. handleError(w, err, fmt.Sprintf("Invalid body: Error decoding the %s plugin json", plugin.PluginTypes[t]), logger)
  69. return
  70. }
  71. err = nativeManager.Register(t, sd)
  72. if err != nil {
  73. handleError(w, err, fmt.Sprintf("%s plugins create command error", plugin.PluginTypes[t]), logger)
  74. return
  75. }
  76. w.WriteHeader(http.StatusCreated)
  77. _, _ = fmt.Fprintf(w, "%s plugin %s is created", plugin.PluginTypes[t], sd.GetName())
  78. }
  79. }
  80. func pluginHandler(w http.ResponseWriter, r *http.Request, t plugin.PluginType) {
  81. defer func(Body io.ReadCloser) { _ = Body.Close() }(r.Body)
  82. vars := mux.Vars(r)
  83. name := vars["name"]
  84. cb := r.URL.Query().Get("stop")
  85. switch r.Method {
  86. case http.MethodDelete:
  87. r := cb == "1"
  88. err := nativeManager.Delete(t, name, r)
  89. if err != nil {
  90. handleError(w, err, fmt.Sprintf("delete %s plugin %s error", plugin.PluginTypes[t], name), logger)
  91. return
  92. }
  93. w.WriteHeader(http.StatusOK)
  94. result := fmt.Sprintf("%s plugin %s is deleted", plugin.PluginTypes[t], name)
  95. if r {
  96. result = fmt.Sprintf("%s and eKuiper will be stopped", result)
  97. } else {
  98. result = fmt.Sprintf("%s and eKuiper must restart for the change to take effect.", result)
  99. }
  100. _, _ = fmt.Fprint(w, result)
  101. case http.MethodGet:
  102. j, ok := nativeManager.GetPluginInfo(t, name)
  103. if !ok {
  104. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe %s plugin %s error", plugin.PluginTypes[t], name), logger)
  105. return
  106. }
  107. jsonResponse(j, w, logger)
  108. case http.MethodPut:
  109. sd := plugin.NewPluginByType(t)
  110. err := json.NewDecoder(r.Body).Decode(sd)
  111. // Problems decoding
  112. if err != nil {
  113. handleError(w, err, fmt.Sprintf("Invalid body: Error decoding the %s plugin json", plugin.PluginTypes[t]), logger)
  114. return
  115. }
  116. err = nativeManager.Delete(t, name, false)
  117. if err != nil {
  118. handleError(w, err, fmt.Sprintf("update %s plugin %s error, cannot delete old version", plugin.PluginTypes[t], name), logger)
  119. return
  120. }
  121. err = nativeManager.Register(t, sd)
  122. if err != nil {
  123. handleError(w, err, fmt.Sprintf("%s plugins create command error", plugin.PluginTypes[t]), logger)
  124. return
  125. }
  126. w.WriteHeader(http.StatusOK)
  127. _, _ = fmt.Fprintf(w, "plugin %s is updated and eKuiper must restart for the change to take effect.", sd.GetName())
  128. }
  129. }
  130. // list or create source plugin
  131. func sourcesHandler(w http.ResponseWriter, r *http.Request) {
  132. pluginsHandler(w, r, plugin.SOURCE)
  133. }
  134. // delete a source plugin
  135. func sourceHandler(w http.ResponseWriter, r *http.Request) {
  136. pluginHandler(w, r, plugin.SOURCE)
  137. }
  138. // list or create sink plugin
  139. func sinksHandler(w http.ResponseWriter, r *http.Request) {
  140. pluginsHandler(w, r, plugin.SINK)
  141. }
  142. // delete a sink plugin
  143. func sinkHandler(w http.ResponseWriter, r *http.Request) {
  144. pluginHandler(w, r, plugin.SINK)
  145. }
  146. // list or create function plugin
  147. func functionsHandler(w http.ResponseWriter, r *http.Request) {
  148. pluginsHandler(w, r, plugin.FUNCTION)
  149. }
  150. // list all user-defined functions in all function plugins
  151. func functionsListHandler(w http.ResponseWriter, _ *http.Request) {
  152. content := nativeManager.ListSymbols()
  153. jsonResponse(content, w, logger)
  154. }
  155. func functionsGetHandler(w http.ResponseWriter, r *http.Request) {
  156. vars := mux.Vars(r)
  157. name := vars["name"]
  158. j, ok := nativeManager.GetPluginBySymbol(plugin.FUNCTION, name)
  159. if !ok {
  160. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe function %s error", name), logger)
  161. return
  162. }
  163. jsonResponse(map[string]string{"name": name, "plugin": j}, w, logger)
  164. }
  165. // delete a function plugin
  166. func functionHandler(w http.ResponseWriter, r *http.Request) {
  167. pluginHandler(w, r, plugin.FUNCTION)
  168. }
  169. type functionList struct {
  170. Functions []string `json:"functions,omitempty"`
  171. }
  172. // Register function list for function plugin. If a plugin exports multiple functions, the function list must be registered
  173. // either by create or register. If the function plugin has been loaded because of auto load through so file, the function
  174. // list MUST be registered by this API or only the function with the same name as the plugin can be used.
  175. func functionRegisterHandler(w http.ResponseWriter, r *http.Request) {
  176. defer func(Body io.ReadCloser) { _ = Body.Close() }(r.Body)
  177. vars := mux.Vars(r)
  178. name := vars["name"]
  179. _, ok := nativeManager.GetPluginInfo(plugin.FUNCTION, name)
  180. if !ok {
  181. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("register %s plugin %s error", plugin.PluginTypes[plugin.FUNCTION], name), logger)
  182. return
  183. }
  184. sd := functionList{}
  185. err := json.NewDecoder(r.Body).Decode(&sd)
  186. // Problems decoding
  187. if err != nil {
  188. handleError(w, err, fmt.Sprintf("Invalid body: Error decoding the function list json %s", r.Body), logger)
  189. return
  190. }
  191. err = nativeManager.RegisterFuncs(name, sd.Functions)
  192. if err != nil {
  193. handleError(w, err, fmt.Sprintf("function plugins %s regiser functions error", name), logger)
  194. return
  195. }
  196. w.WriteHeader(http.StatusOK)
  197. _, _ = fmt.Fprintf(w, "function plugin %s function list is registered", name)
  198. }
  199. func prebuildSourcePlugins(w http.ResponseWriter, r *http.Request) {
  200. prebuildPluginsHandler(w, r, plugin.SOURCE)
  201. }
  202. func prebuildSinkPlugins(w http.ResponseWriter, r *http.Request) {
  203. prebuildPluginsHandler(w, r, plugin.SINK)
  204. }
  205. func prebuildFuncsPlugins(w http.ResponseWriter, r *http.Request) {
  206. prebuildPluginsHandler(w, r, plugin.FUNCTION)
  207. }
  208. func prebuildPluginsHandler(w http.ResponseWriter, _ *http.Request, t plugin.PluginType) {
  209. emsg := "It's strongly recommended to install plugins at linux. If you choose to proceed to install plugin, please make sure the plugin is already validated in your own build."
  210. if runtime.GOOS == "linux" {
  211. osrelease, err := Read()
  212. if err != nil {
  213. handleError(w, err, "", logger)
  214. return
  215. }
  216. prettyName := strings.ToUpper(osrelease["PRETTY_NAME"])
  217. os := "debian"
  218. if strings.Contains(prettyName, "ALPINE") {
  219. os = "alpine"
  220. }
  221. hosts := conf.Config.Basic.PluginHosts
  222. if plugins, err := fetchPluginList(t, hosts, os, runtime.GOARCH); err != nil {
  223. handleError(w, err, "", logger)
  224. } else {
  225. jsonResponse(plugins, w, logger)
  226. }
  227. } else {
  228. handleError(w, fmt.Errorf(emsg), "", logger)
  229. }
  230. }
  231. var (
  232. NativeSourcePlugin = []string{"random", "zmq", "sql", "video"}
  233. NativeSinkPlugin = []string{"image", "influx", "influx2", "tdengine", "zmq", "kafka", "sql"}
  234. NativeFunctionPlugin = []string{"accumulateWordCount", "countPlusOne", "echo", "geohash", "image", "labelImage", "tfLite"}
  235. )
  236. func fetchPluginList(t plugin.PluginType, hosts, os, arch string) (result map[string]string, err error) {
  237. ptype := "sources"
  238. plugins := NativeSourcePlugin
  239. if t == plugin.SINK {
  240. ptype = "sinks"
  241. plugins = NativeSinkPlugin
  242. } else if t == plugin.FUNCTION {
  243. ptype = "functions"
  244. plugins = NativeFunctionPlugin
  245. }
  246. if hosts == "" || ptype == "" || os == "" {
  247. logger.Errorf("Invalid parameter value: hosts %s, ptype %s or os: %s should not be empty.", hosts, ptype, os)
  248. return nil, fmt.Errorf("invalid configuration for plugin host in kuiper.yaml")
  249. }
  250. result = make(map[string]string)
  251. hostsArr := strings.Split(hosts, ",")
  252. for _, host := range hostsArr {
  253. host := strings.Trim(host, " ")
  254. tmp := []string{host, "kuiper-plugins", version, os, ptype}
  255. // The url is similar to http://host:port/kuiper-plugins/0.9.1/debian/sinks/
  256. url := strings.Join(tmp, "/")
  257. for _, p := range plugins {
  258. result[p] = url + "/" + p + "_" + arch + ".zip"
  259. }
  260. }
  261. return
  262. }
  263. func pluginReset() {
  264. nativeManager.UninstallAllPlugins()
  265. }
  266. func pluginExport() map[string]string {
  267. return nativeManager.GetAllPlugins()
  268. }
  269. func pluginStatusExport() map[string]string {
  270. return nativeManager.GetAllPluginsStatus()
  271. }
  272. func pluginImport(plugins map[string]string) error {
  273. return nativeManager.PluginImport(plugins)
  274. }
  275. func pluginPartialImport(plugins map[string]string) map[string]string {
  276. return nativeManager.PluginPartialImport(plugins)
  277. }