meta_plugin_init.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build (plugin || !core) && (ui || !core)
  15. // +build plugin !core
  16. // +build ui !core
  17. package server
  18. import (
  19. "fmt"
  20. "github.com/gorilla/mux"
  21. "github.com/lf-edge/ekuiper/internal/conf"
  22. "github.com/lf-edge/ekuiper/internal/plugin"
  23. "net/http"
  24. "os"
  25. "runtime"
  26. "strings"
  27. )
  28. // This must be and will be run after meta_init.go init()
  29. func init() {
  30. metaEndpoints = append(metaEndpoints, func(r *mux.Router) {
  31. r.HandleFunc("/plugins/sources/prebuild", prebuildSourcePlugins).Methods(http.MethodGet)
  32. r.HandleFunc("/plugins/sinks/prebuild", prebuildSinkPlugins).Methods(http.MethodGet)
  33. r.HandleFunc("/plugins/functions/prebuild", prebuildFuncsPlugins).Methods(http.MethodGet)
  34. })
  35. }
  36. func prebuildSourcePlugins(w http.ResponseWriter, r *http.Request) {
  37. prebuildPluginsHandler(w, r, plugin.SOURCE)
  38. }
  39. func prebuildSinkPlugins(w http.ResponseWriter, r *http.Request) {
  40. prebuildPluginsHandler(w, r, plugin.SINK)
  41. }
  42. func prebuildFuncsPlugins(w http.ResponseWriter, r *http.Request) {
  43. prebuildPluginsHandler(w, r, plugin.FUNCTION)
  44. }
  45. func isOffcialDockerImage() bool {
  46. if !strings.EqualFold(os.Getenv("MAINTAINER"), "emqx.io") {
  47. return false
  48. }
  49. return true
  50. }
  51. func prebuildPluginsHandler(w http.ResponseWriter, r *http.Request, t plugin.PluginType) {
  52. emsg := "It's strongly recommended to install plugins at official released Debian Docker images. If you choose to proceed to install plugin, please make sure the plugin is already validated in your own build."
  53. if !isOffcialDockerImage() {
  54. handleError(w, fmt.Errorf(emsg), "", logger)
  55. return
  56. } else if runtime.GOOS == "linux" {
  57. osrelease, err := Read()
  58. if err != nil {
  59. logger.Infof("")
  60. return
  61. }
  62. prettyName := strings.ToUpper(osrelease["PRETTY_NAME"])
  63. os := "debian"
  64. if strings.Contains(prettyName, "DEBIAN") {
  65. hosts := conf.Config.Basic.PluginHosts
  66. if err, plugins := fetchPluginList(t, hosts, os, runtime.GOARCH); err != nil {
  67. handleError(w, err, "", logger)
  68. } else {
  69. jsonResponse(plugins, w, logger)
  70. }
  71. } else {
  72. handleError(w, fmt.Errorf(emsg), "", logger)
  73. return
  74. }
  75. } else {
  76. handleError(w, fmt.Errorf(emsg), "", logger)
  77. }
  78. }
  79. var NativeSourcePlugin = []string{"random", "zmq"}
  80. var NativeSinkPlugin = []string{"file", "image", "influx", "redis", "tdengine", "zmq"}
  81. var NativeFunctionPlugin = []string{"accumulateWordCount", "countPlusOne", "echo", "geohash", "image", "labelImage"}
  82. func fetchPluginList(t plugin.PluginType, hosts, os, arch string) (err error, result map[string]string) {
  83. ptype := "sources"
  84. plugins := NativeSourcePlugin
  85. if t == plugin.SINK {
  86. ptype = "sinks"
  87. plugins = NativeSinkPlugin
  88. } else if t == plugin.FUNCTION {
  89. ptype = "functions"
  90. plugins = NativeFunctionPlugin
  91. }
  92. if hosts == "" || ptype == "" || os == "" {
  93. logger.Errorf("Invalid parameter value: hosts %s, ptype %s or os: %s should not be empty.", hosts, ptype, os)
  94. return fmt.Errorf("invalid configruation for plugin host in kuiper.yaml"), nil
  95. }
  96. result = make(map[string]string)
  97. hostsArr := strings.Split(hosts, ",")
  98. for _, host := range hostsArr {
  99. host := strings.Trim(host, " ")
  100. tmp := []string{host, "kuiper-plugins", version, os, ptype}
  101. //The url is similar to http://host:port/kuiper-plugins/0.9.1/debian/sinks/
  102. url := strings.Join(tmp, "/")
  103. for _, p := range plugins {
  104. result[p] = url + "/" + p + "_" + arch + ".zip"
  105. }
  106. }
  107. return
  108. }