plugin_init.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build plugin || !core
  15. // +build plugin !core
  16. package server
  17. import (
  18. "encoding/json"
  19. "fmt"
  20. "net/http"
  21. "runtime"
  22. "strings"
  23. "github.com/gorilla/mux"
  24. "github.com/lf-edge/ekuiper/internal/binder"
  25. "github.com/lf-edge/ekuiper/internal/conf"
  26. "github.com/lf-edge/ekuiper/internal/plugin"
  27. "github.com/lf-edge/ekuiper/internal/plugin/native"
  28. "github.com/lf-edge/ekuiper/pkg/errorx"
  29. )
  30. var nativeManager *native.Manager
  31. func init() {
  32. components["plugin"] = pluginComp{}
  33. }
  34. type pluginComp struct{}
  35. func (p pluginComp) register() {
  36. var err error
  37. nativeManager, err = native.InitManager()
  38. if err != nil {
  39. panic(err)
  40. }
  41. entries = append(entries, binder.FactoryEntry{Name: "native plugin", Factory: nativeManager, Weight: 9})
  42. }
  43. func (p pluginComp) rest(r *mux.Router) {
  44. r.HandleFunc("/plugins/sources/prebuild", prebuildSourcePlugins).Methods(http.MethodGet)
  45. r.HandleFunc("/plugins/sinks/prebuild", prebuildSinkPlugins).Methods(http.MethodGet)
  46. r.HandleFunc("/plugins/functions/prebuild", prebuildFuncsPlugins).Methods(http.MethodGet)
  47. r.HandleFunc("/plugins/sources", sourcesHandler).Methods(http.MethodGet, http.MethodPost)
  48. r.HandleFunc("/plugins/sources/{name}", sourceHandler).Methods(http.MethodDelete, http.MethodGet)
  49. r.HandleFunc("/plugins/sinks", sinksHandler).Methods(http.MethodGet, http.MethodPost)
  50. r.HandleFunc("/plugins/sinks/{name}", sinkHandler).Methods(http.MethodDelete, http.MethodGet)
  51. r.HandleFunc("/plugins/functions", functionsHandler).Methods(http.MethodGet, http.MethodPost)
  52. r.HandleFunc("/plugins/functions/{name}", functionHandler).Methods(http.MethodDelete, http.MethodGet)
  53. r.HandleFunc("/plugins/functions/{name}/register", functionRegisterHandler).Methods(http.MethodPost)
  54. r.HandleFunc("/plugins/udfs", functionsListHandler).Methods(http.MethodGet)
  55. r.HandleFunc("/plugins/udfs/{name}", functionsGetHandler).Methods(http.MethodGet)
  56. }
  57. func pluginsHandler(w http.ResponseWriter, r *http.Request, t plugin.PluginType) {
  58. defer r.Body.Close()
  59. switch r.Method {
  60. case http.MethodGet:
  61. content := nativeManager.List(t)
  62. jsonResponse(content, w, logger)
  63. case http.MethodPost:
  64. sd := plugin.NewPluginByType(t)
  65. err := json.NewDecoder(r.Body).Decode(sd)
  66. // Problems decoding
  67. if err != nil {
  68. handleError(w, err, fmt.Sprintf("Invalid body: Error decoding the %s plugin json", plugin.PluginTypes[t]), logger)
  69. return
  70. }
  71. err = nativeManager.Register(t, sd)
  72. if err != nil {
  73. handleError(w, err, fmt.Sprintf("%s plugins create command error", plugin.PluginTypes[t]), logger)
  74. return
  75. }
  76. w.WriteHeader(http.StatusCreated)
  77. fmt.Fprintf(w, "%s plugin %s is created", plugin.PluginTypes[t], sd.GetName())
  78. }
  79. }
  80. func pluginHandler(w http.ResponseWriter, r *http.Request, t plugin.PluginType) {
  81. defer r.Body.Close()
  82. vars := mux.Vars(r)
  83. name := vars["name"]
  84. cb := r.URL.Query().Get("stop")
  85. switch r.Method {
  86. case http.MethodDelete:
  87. r := cb == "1"
  88. err := nativeManager.Delete(t, name, r)
  89. if err != nil {
  90. handleError(w, err, fmt.Sprintf("delete %s plugin %s error", plugin.PluginTypes[t], name), logger)
  91. return
  92. }
  93. w.WriteHeader(http.StatusOK)
  94. result := fmt.Sprintf("%s plugin %s is deleted", plugin.PluginTypes[t], name)
  95. if r {
  96. result = fmt.Sprintf("%s and Kuiper will be stopped", result)
  97. } else {
  98. result = fmt.Sprintf("%s and Kuiper must restart for the change to take effect.", result)
  99. }
  100. w.Write([]byte(result))
  101. case http.MethodGet:
  102. j, ok := nativeManager.GetPluginInfo(t, name)
  103. if !ok {
  104. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe %s plugin %s error", plugin.PluginTypes[t], name), logger)
  105. return
  106. }
  107. jsonResponse(j, w, logger)
  108. }
  109. }
  110. // list or create source plugin
  111. func sourcesHandler(w http.ResponseWriter, r *http.Request) {
  112. pluginsHandler(w, r, plugin.SOURCE)
  113. }
  114. // delete a source plugin
  115. func sourceHandler(w http.ResponseWriter, r *http.Request) {
  116. pluginHandler(w, r, plugin.SOURCE)
  117. }
  118. // list or create sink plugin
  119. func sinksHandler(w http.ResponseWriter, r *http.Request) {
  120. pluginsHandler(w, r, plugin.SINK)
  121. }
  122. // delete a sink plugin
  123. func sinkHandler(w http.ResponseWriter, r *http.Request) {
  124. pluginHandler(w, r, plugin.SINK)
  125. }
  126. // list or create function plugin
  127. func functionsHandler(w http.ResponseWriter, r *http.Request) {
  128. pluginsHandler(w, r, plugin.FUNCTION)
  129. }
  130. // list all user defined functions in all function plugins
  131. func functionsListHandler(w http.ResponseWriter, _ *http.Request) {
  132. content := nativeManager.ListSymbols()
  133. jsonResponse(content, w, logger)
  134. }
  135. func functionsGetHandler(w http.ResponseWriter, r *http.Request) {
  136. vars := mux.Vars(r)
  137. name := vars["name"]
  138. j, ok := nativeManager.GetPluginBySymbol(plugin.FUNCTION, name)
  139. if !ok {
  140. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe function %s error", name), logger)
  141. return
  142. }
  143. jsonResponse(map[string]string{"name": name, "plugin": j}, w, logger)
  144. }
  145. // delete a function plugin
  146. func functionHandler(w http.ResponseWriter, r *http.Request) {
  147. pluginHandler(w, r, plugin.FUNCTION)
  148. }
  149. type functionList struct {
  150. Functions []string `json:"functions,omitempty"`
  151. }
  152. // register function list for function plugin. If a plugin exports multiple functions, the function list must be registered
  153. // either by create or register. If the function plugin has been loaded because of auto load through so file, the function
  154. // list MUST be registered by this API or only the function with the same name as the plugin can be used.
  155. func functionRegisterHandler(w http.ResponseWriter, r *http.Request) {
  156. defer r.Body.Close()
  157. vars := mux.Vars(r)
  158. name := vars["name"]
  159. _, ok := nativeManager.GetPluginInfo(plugin.FUNCTION, name)
  160. if !ok {
  161. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("register %s plugin %s error", plugin.PluginTypes[plugin.FUNCTION], name), logger)
  162. return
  163. }
  164. sd := functionList{}
  165. err := json.NewDecoder(r.Body).Decode(&sd)
  166. // Problems decoding
  167. if err != nil {
  168. handleError(w, err, fmt.Sprintf("Invalid body: Error decoding the function list json %s", r.Body), logger)
  169. return
  170. }
  171. err = nativeManager.RegisterFuncs(name, sd.Functions)
  172. if err != nil {
  173. handleError(w, err, fmt.Sprintf("function plugins %s regiser functions error", name), logger)
  174. return
  175. }
  176. w.WriteHeader(http.StatusOK)
  177. fmt.Fprintf(w, "function plugin %s function list is registered", name)
  178. }
  179. func prebuildSourcePlugins(w http.ResponseWriter, r *http.Request) {
  180. prebuildPluginsHandler(w, r, plugin.SOURCE)
  181. }
  182. func prebuildSinkPlugins(w http.ResponseWriter, r *http.Request) {
  183. prebuildPluginsHandler(w, r, plugin.SINK)
  184. }
  185. func prebuildFuncsPlugins(w http.ResponseWriter, r *http.Request) {
  186. prebuildPluginsHandler(w, r, plugin.FUNCTION)
  187. }
  188. func prebuildPluginsHandler(w http.ResponseWriter, _ *http.Request, t plugin.PluginType) {
  189. emsg := "It's strongly recommended to install plugins at linux. If you choose to proceed to install plugin, please make sure the plugin is already validated in your own build."
  190. if runtime.GOOS == "linux" {
  191. osrelease, err := Read()
  192. if err != nil {
  193. handleError(w, err, "", logger)
  194. return
  195. }
  196. prettyName := strings.ToUpper(osrelease["PRETTY_NAME"])
  197. os := "debian"
  198. if strings.Contains(prettyName, "ALPINE") {
  199. os = "alpine"
  200. }
  201. hosts := conf.Config.Basic.PluginHosts
  202. if plugins, err := fetchPluginList(t, hosts, os, runtime.GOARCH); err != nil {
  203. handleError(w, err, "", logger)
  204. } else {
  205. jsonResponse(plugins, w, logger)
  206. }
  207. } else {
  208. handleError(w, fmt.Errorf(emsg), "", logger)
  209. }
  210. }
  211. var (
  212. NativeSourcePlugin = []string{"random", "zmq", "sql", "video"}
  213. NativeSinkPlugin = []string{"image", "influx", "influx2", "tdengine", "zmq", "kafka", "sql"}
  214. NativeFunctionPlugin = []string{"accumulateWordCount", "countPlusOne", "echo", "geohash", "image", "labelImage", "tfLite"}
  215. )
  216. func fetchPluginList(t plugin.PluginType, hosts, os, arch string) (result map[string]string, err error) {
  217. ptype := "sources"
  218. plugins := NativeSourcePlugin
  219. if t == plugin.SINK {
  220. ptype = "sinks"
  221. plugins = NativeSinkPlugin
  222. } else if t == plugin.FUNCTION {
  223. ptype = "functions"
  224. plugins = NativeFunctionPlugin
  225. }
  226. if hosts == "" || ptype == "" || os == "" {
  227. logger.Errorf("Invalid parameter value: hosts %s, ptype %s or os: %s should not be empty.", hosts, ptype, os)
  228. return nil, fmt.Errorf("invalid configuration for plugin host in kuiper.yaml")
  229. }
  230. result = make(map[string]string)
  231. hostsArr := strings.Split(hosts, ",")
  232. for _, host := range hostsArr {
  233. host := strings.Trim(host, " ")
  234. tmp := []string{host, "kuiper-plugins", version, os, ptype}
  235. // The url is similar to http://host:port/kuiper-plugins/0.9.1/debian/sinks/
  236. url := strings.Join(tmp, "/")
  237. for _, p := range plugins {
  238. result[p] = url + "/" + p + "_" + arch + ".zip"
  239. }
  240. }
  241. return
  242. }
  243. func pluginReset() {
  244. nativeManager.UninstallAllPlugins()
  245. }
  246. func pluginExport() map[string]string {
  247. return nativeManager.GetAllPlugins()
  248. }
  249. func pluginStatusExport() map[string]string {
  250. return nativeManager.GetAllPluginsStatus()
  251. }
  252. func pluginImport(plugins map[string]string) error {
  253. return nativeManager.PluginImport(plugins)
  254. }
  255. func pluginPartialImport(plugins map[string]string) map[string]string {
  256. return nativeManager.PluginPartialImport(plugins)
  257. }