|
@@ -8,10 +8,13 @@ import (
|
|
|
"github.com/emqx/kuiper/xstream/api"
|
|
|
"github.com/gorilla/handlers"
|
|
|
"github.com/gorilla/mux"
|
|
|
+ "golang.org/x/net/html"
|
|
|
"io"
|
|
|
"io/ioutil"
|
|
|
"net/http"
|
|
|
+ "os"
|
|
|
"runtime"
|
|
|
+ "strings"
|
|
|
"time"
|
|
|
)
|
|
|
|
|
@@ -82,10 +85,14 @@ func createRestServer(port int) *http.Server {
|
|
|
r.HandleFunc("/rules/{name}/topo", getTopoRuleHandler).Methods(http.MethodGet)
|
|
|
|
|
|
r.HandleFunc("/plugins/sources", sourcesHandler).Methods(http.MethodGet, http.MethodPost)
|
|
|
+ r.HandleFunc("/plugins/sources/prebuild", prebuildSourcePlugins).Methods(http.MethodGet)
|
|
|
r.HandleFunc("/plugins/sources/{name}", sourceHandler).Methods(http.MethodDelete, http.MethodGet)
|
|
|
+
|
|
|
r.HandleFunc("/plugins/sinks", sinksHandler).Methods(http.MethodGet, http.MethodPost)
|
|
|
+ r.HandleFunc("/plugins/sinks/prebuild", prebuildSinkPlugins).Methods(http.MethodGet)
|
|
|
r.HandleFunc("/plugins/sinks/{name}", sinkHandler).Methods(http.MethodDelete, http.MethodGet)
|
|
|
r.HandleFunc("/plugins/functions", functionsHandler).Methods(http.MethodGet, http.MethodPost)
|
|
|
+ r.HandleFunc("/plugins/functions/prebuild", prebuildFuncsPlugins).Methods(http.MethodGet)
|
|
|
r.HandleFunc("/plugins/functions/{name}", functionHandler).Methods(http.MethodDelete, http.MethodGet)
|
|
|
|
|
|
r.HandleFunc("/metadata/functions", functionsMetaHandler).Methods(http.MethodGet)
|
|
@@ -413,6 +420,130 @@ func functionHandler(w http.ResponseWriter, r *http.Request) {
|
|
|
pluginHandler(w, r, plugins.FUNCTION)
|
|
|
}
|
|
|
|
|
|
+func prebuildSourcePlugins(w http.ResponseWriter, r *http.Request) {
|
|
|
+ prebuildPluginsHandler(w, r, plugins.SOURCE)
|
|
|
+}
|
|
|
+
|
|
|
+func prebuildSinkPlugins(w http.ResponseWriter, r *http.Request) {
|
|
|
+ prebuildPluginsHandler(w, r, plugins.SINK)
|
|
|
+}
|
|
|
+
|
|
|
+func prebuildFuncsPlugins(w http.ResponseWriter, r *http.Request) {
|
|
|
+ prebuildPluginsHandler(w, r, plugins.FUNCTION)
|
|
|
+}
|
|
|
+
|
|
|
+func isOffcialDockerImage() bool {
|
|
|
+ if strings.ToLower(os.Getenv("MAINTAINER")) != "emqx.io" {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ return true
|
|
|
+}
|
|
|
+
|
|
|
+func prebuildPluginsHandler(w http.ResponseWriter, r *http.Request, t plugins.PluginType) {
|
|
|
+ if runtime.GOOS != "linux" {
|
|
|
+ handleError(w, fmt.Errorf("Plugins can be only installed at Linux."), "", logger)
|
|
|
+ return
|
|
|
+ } else if !isOffcialDockerImage() {
|
|
|
+ handleError(w, fmt.Errorf("Plugins can be only installed at official released Docker images."), "", logger)
|
|
|
+ return
|
|
|
+ } else if runtime.GOOS == "linux" {
|
|
|
+ osrelease, err := common.Read()
|
|
|
+ if err != nil {
|
|
|
+ logger.Infof("")
|
|
|
+ return
|
|
|
+ }
|
|
|
+ prettyName := strings.ToUpper(osrelease["PRETTY_NAME"])
|
|
|
+ if strings.Contains(prettyName, "ALPINE") || strings.Contains(prettyName, "DEBIAN") {
|
|
|
+ hosts := common.Config.Basic.PluginHosts
|
|
|
+ ptype := "sources"
|
|
|
+ if t == plugins.SINK {
|
|
|
+ ptype = "sinks"
|
|
|
+ } else if t == plugins.FUNCTION {
|
|
|
+ ptype = "functions"
|
|
|
+ }
|
|
|
+ if err, plugins := fetchPluginList(hosts, ptype, strings.ToLower(prettyName), runtime.GOARCH); err != nil {
|
|
|
+ handleError(w, err, "", logger)
|
|
|
+ } else {
|
|
|
+ jsonResponse(plugins, w, logger)
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ handleError(w, fmt.Errorf("Only ALPINE & DEBIAN docker images are supported."), "", logger)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ handleError(w, fmt.Errorf("Please use official Kuiper docker images to install the plugins."), "", logger)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func fetchPluginList(hosts, ptype, os, arch string) (err error, result map[string]string) {
|
|
|
+ if hosts == "" || ptype == "" || os == "" {
|
|
|
+ return fmt.Errorf("Invalid parameter value: hosts, ptype and os value should not be empty."), nil
|
|
|
+ }
|
|
|
+ result = make(map[string]string)
|
|
|
+ hostsArr := strings.Split(hosts, ",")
|
|
|
+ for _, host := range hostsArr {
|
|
|
+ host := strings.Trim(host, " ")
|
|
|
+ tmp := []string{host, "kuiper-plugins", version, ptype, os}
|
|
|
+ //The url is similar to http://host:port/kuiper-plugins/0.9.1/sinks/alpine
|
|
|
+ url := strings.Join(tmp, "/")
|
|
|
+ resp, err := http.Get(url)
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ return err, nil
|
|
|
+ }
|
|
|
+ defer resp.Body.Close()
|
|
|
+
|
|
|
+ if resp.StatusCode != http.StatusOK {
|
|
|
+ return fmt.Errorf("Status error: %v", resp.StatusCode), nil
|
|
|
+ }
|
|
|
+ data, err := ioutil.ReadAll(resp.Body)
|
|
|
+ if err != nil {
|
|
|
+ return err, nil
|
|
|
+ }
|
|
|
+ plugins := extractFromHtml(string(data), arch)
|
|
|
+ for _, p := range plugins {
|
|
|
+ //If already existed, using the existed.
|
|
|
+ if _, ok := result[p]; !ok {
|
|
|
+ result[p] = url + "/" + p + "_" + arch + ".zip"
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+func extractFromHtml(content, arch string) []string {
|
|
|
+ plugins := []string{}
|
|
|
+ htmlTokens := html.NewTokenizer(strings.NewReader(content))
|
|
|
+loop:
|
|
|
+ for {
|
|
|
+ tt := htmlTokens.Next()
|
|
|
+ switch tt {
|
|
|
+ case html.ErrorToken:
|
|
|
+ break loop
|
|
|
+ case html.StartTagToken:
|
|
|
+ t := htmlTokens.Token()
|
|
|
+ isAnchor := t.Data == "a"
|
|
|
+ if isAnchor {
|
|
|
+ found := false
|
|
|
+ for _, prop := range t.Attr {
|
|
|
+ if strings.ToUpper(prop.Key) == "HREF" {
|
|
|
+ if strings.HasSuffix(prop.Val, "_"+arch+".zip") {
|
|
|
+ if index := strings.LastIndex(prop.Val, "_"); index != -1 {
|
|
|
+ plugins = append(plugins, prop.Val[0:index])
|
|
|
+ }
|
|
|
+ }
|
|
|
+ found = true
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if !found {
|
|
|
+ logger.Infof("Invalid plugin download link %s", t)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return plugins
|
|
|
+}
|
|
|
+
|
|
|
//list sink plugin
|
|
|
func sinksMetaHandler(w http.ResponseWriter, r *http.Request) {
|
|
|
defer r.Body.Close()
|
|
@@ -503,6 +634,7 @@ func sourceConfKeysHandler(w http.ResponseWriter, r *http.Request) {
|
|
|
|
|
|
//Add del confkey
|
|
|
func sourceConfKeyHandler(w http.ResponseWriter, r *http.Request) {
|
|
|
+
|
|
|
defer r.Body.Close()
|
|
|
var ret interface{}
|
|
|
var err error
|
|
@@ -513,12 +645,12 @@ func sourceConfKeyHandler(w http.ResponseWriter, r *http.Request) {
|
|
|
case http.MethodDelete:
|
|
|
err = plugins.DelSourceConfKey(pluginName, confKey)
|
|
|
case http.MethodPost:
|
|
|
- v, err := decodeStatementDescriptor(r.Body)
|
|
|
+ v, err := ioutil.ReadAll(r.Body)
|
|
|
if err != nil {
|
|
|
handleError(w, err, "Invalid body", logger)
|
|
|
return
|
|
|
}
|
|
|
- err = plugins.AddSourceConfKey(pluginName, confKey, v.Sql)
|
|
|
+ err = plugins.AddSourceConfKey(pluginName, confKey, v)
|
|
|
}
|
|
|
if err != nil {
|
|
|
handleError(w, err, "metadata error", logger)
|
|
@@ -538,16 +670,16 @@ func sourceConfKeyFieldsHandler(w http.ResponseWriter, r *http.Request) {
|
|
|
vars := mux.Vars(r)
|
|
|
pluginName := vars["name"]
|
|
|
confKey := vars["confKey"]
|
|
|
- v, err := decodeStatementDescriptor(r.Body)
|
|
|
+ v, err := ioutil.ReadAll(r.Body)
|
|
|
if err != nil {
|
|
|
handleError(w, err, "Invalid body", logger)
|
|
|
return
|
|
|
}
|
|
|
switch r.Method {
|
|
|
case http.MethodDelete:
|
|
|
- err = plugins.DelSourceConfKeyField(pluginName, confKey, v.Sql)
|
|
|
+ err = plugins.DelSourceConfKeyField(pluginName, confKey, v)
|
|
|
case http.MethodPost:
|
|
|
- err = plugins.AddSourceConfKeyField(pluginName, confKey, v.Sql)
|
|
|
+ err = plugins.AddSourceConfKeyField(pluginName, confKey, v)
|
|
|
}
|
|
|
if err != nil {
|
|
|
handleError(w, err, "metadata error", logger)
|