|
@@ -4,11 +4,12 @@ import (
|
|
"crypto/tls"
|
|
"crypto/tls"
|
|
"encoding/json"
|
|
"encoding/json"
|
|
"fmt"
|
|
"fmt"
|
|
- "github.com/emqx/kuiper/common"
|
|
|
|
- "github.com/emqx/kuiper/plugins"
|
|
|
|
- "github.com/emqx/kuiper/services"
|
|
|
|
- "github.com/emqx/kuiper/xsql"
|
|
|
|
- "github.com/emqx/kuiper/xstream/api"
|
|
|
|
|
|
+ "github.com/emqx/kuiper/internal/conf"
|
|
|
|
+ "github.com/emqx/kuiper/internal/plugin"
|
|
|
|
+ "github.com/emqx/kuiper/internal/service"
|
|
|
|
+ "github.com/emqx/kuiper/pkg/api"
|
|
|
|
+ "github.com/emqx/kuiper/pkg/ast"
|
|
|
|
+ "github.com/emqx/kuiper/pkg/errorx"
|
|
"github.com/gorilla/handlers"
|
|
"github.com/gorilla/handlers"
|
|
"github.com/gorilla/mux"
|
|
"github.com/gorilla/mux"
|
|
"golang.org/x/net/html"
|
|
"golang.org/x/net/html"
|
|
@@ -50,9 +51,9 @@ func handleError(w http.ResponseWriter, err error, prefix string, logger api.Log
|
|
logger.Error(message)
|
|
logger.Error(message)
|
|
var ec int
|
|
var ec int
|
|
switch e := err.(type) {
|
|
switch e := err.(type) {
|
|
- case *common.Error:
|
|
|
|
|
|
+ case *errorx.Error:
|
|
switch e.Code() {
|
|
switch e.Code() {
|
|
- case common.NOT_FOUND:
|
|
|
|
|
|
+ case errorx.NOT_FOUND:
|
|
ec = http.StatusNotFound
|
|
ec = http.StatusNotFound
|
|
default:
|
|
default:
|
|
ec = http.StatusBadRequest
|
|
ec = http.StatusBadRequest
|
|
@@ -157,13 +158,13 @@ func pingHandler(w http.ResponseWriter, r *http.Request) {
|
|
w.WriteHeader(http.StatusOK)
|
|
w.WriteHeader(http.StatusOK)
|
|
}
|
|
}
|
|
|
|
|
|
-func sourcesManageHandler(w http.ResponseWriter, r *http.Request, st xsql.StreamType) {
|
|
|
|
|
|
+func sourcesManageHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
|
|
defer r.Body.Close()
|
|
defer r.Body.Close()
|
|
switch r.Method {
|
|
switch r.Method {
|
|
case http.MethodGet:
|
|
case http.MethodGet:
|
|
content, err := streamProcessor.ShowStream(st)
|
|
content, err := streamProcessor.ShowStream(st)
|
|
if err != nil {
|
|
if err != nil {
|
|
- handleError(w, err, fmt.Sprintf("%s command error", strings.Title(xsql.StreamTypeMap[st])), logger)
|
|
|
|
|
|
+ handleError(w, err, fmt.Sprintf("%s command error", strings.Title(ast.StreamTypeMap[st])), logger)
|
|
return
|
|
return
|
|
}
|
|
}
|
|
jsonResponse(content, w, logger)
|
|
jsonResponse(content, w, logger)
|
|
@@ -175,7 +176,7 @@ func sourcesManageHandler(w http.ResponseWriter, r *http.Request, st xsql.Stream
|
|
}
|
|
}
|
|
content, err := streamProcessor.ExecStreamSql(v.Sql)
|
|
content, err := streamProcessor.ExecStreamSql(v.Sql)
|
|
if err != nil {
|
|
if err != nil {
|
|
- handleError(w, err, fmt.Sprintf("%s command error", strings.Title(xsql.StreamTypeMap[st])), logger)
|
|
|
|
|
|
+ handleError(w, err, fmt.Sprintf("%s command error", strings.Title(ast.StreamTypeMap[st])), logger)
|
|
return
|
|
return
|
|
}
|
|
}
|
|
w.WriteHeader(http.StatusCreated)
|
|
w.WriteHeader(http.StatusCreated)
|
|
@@ -183,7 +184,7 @@ func sourcesManageHandler(w http.ResponseWriter, r *http.Request, st xsql.Stream
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-func sourceManageHandler(w http.ResponseWriter, r *http.Request, st xsql.StreamType) {
|
|
|
|
|
|
+func sourceManageHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
|
|
defer r.Body.Close()
|
|
defer r.Body.Close()
|
|
vars := mux.Vars(r)
|
|
vars := mux.Vars(r)
|
|
name := vars["name"]
|
|
name := vars["name"]
|
|
@@ -192,14 +193,14 @@ func sourceManageHandler(w http.ResponseWriter, r *http.Request, st xsql.StreamT
|
|
case http.MethodGet:
|
|
case http.MethodGet:
|
|
content, err := streamProcessor.DescStream(name, st)
|
|
content, err := streamProcessor.DescStream(name, st)
|
|
if err != nil {
|
|
if err != nil {
|
|
- handleError(w, err, fmt.Sprintf("describe %s error", xsql.StreamTypeMap[st]), logger)
|
|
|
|
|
|
+ handleError(w, err, fmt.Sprintf("describe %s error", ast.StreamTypeMap[st]), logger)
|
|
return
|
|
return
|
|
}
|
|
}
|
|
jsonResponse(content, w, logger)
|
|
jsonResponse(content, w, logger)
|
|
case http.MethodDelete:
|
|
case http.MethodDelete:
|
|
content, err := streamProcessor.DropStream(name, st)
|
|
content, err := streamProcessor.DropStream(name, st)
|
|
if err != nil {
|
|
if err != nil {
|
|
- handleError(w, err, fmt.Sprintf("delete %s error", xsql.StreamTypeMap[st]), logger)
|
|
|
|
|
|
+ handleError(w, err, fmt.Sprintf("delete %s error", ast.StreamTypeMap[st]), logger)
|
|
return
|
|
return
|
|
}
|
|
}
|
|
w.WriteHeader(http.StatusOK)
|
|
w.WriteHeader(http.StatusOK)
|
|
@@ -212,7 +213,7 @@ func sourceManageHandler(w http.ResponseWriter, r *http.Request, st xsql.StreamT
|
|
}
|
|
}
|
|
content, err := streamProcessor.ExecReplaceStream(v.Sql, st)
|
|
content, err := streamProcessor.ExecReplaceStream(v.Sql, st)
|
|
if err != nil {
|
|
if err != nil {
|
|
- handleError(w, err, fmt.Sprintf("%s command error", strings.Title(xsql.StreamTypeMap[st])), logger)
|
|
|
|
|
|
+ handleError(w, err, fmt.Sprintf("%s command error", strings.Title(ast.StreamTypeMap[st])), logger)
|
|
return
|
|
return
|
|
}
|
|
}
|
|
w.WriteHeader(http.StatusOK)
|
|
w.WriteHeader(http.StatusOK)
|
|
@@ -222,21 +223,21 @@ func sourceManageHandler(w http.ResponseWriter, r *http.Request, st xsql.StreamT
|
|
|
|
|
|
//list or create streams
|
|
//list or create streams
|
|
func streamsHandler(w http.ResponseWriter, r *http.Request) {
|
|
func streamsHandler(w http.ResponseWriter, r *http.Request) {
|
|
- sourcesManageHandler(w, r, xsql.TypeStream)
|
|
|
|
|
|
+ sourcesManageHandler(w, r, ast.TypeStream)
|
|
}
|
|
}
|
|
|
|
|
|
//describe or delete a stream
|
|
//describe or delete a stream
|
|
func streamHandler(w http.ResponseWriter, r *http.Request) {
|
|
func streamHandler(w http.ResponseWriter, r *http.Request) {
|
|
- sourceManageHandler(w, r, xsql.TypeStream)
|
|
|
|
|
|
+ sourceManageHandler(w, r, ast.TypeStream)
|
|
}
|
|
}
|
|
|
|
|
|
//list or create tables
|
|
//list or create tables
|
|
func tablesHandler(w http.ResponseWriter, r *http.Request) {
|
|
func tablesHandler(w http.ResponseWriter, r *http.Request) {
|
|
- sourcesManageHandler(w, r, xsql.TypeTable)
|
|
|
|
|
|
+ sourcesManageHandler(w, r, ast.TypeTable)
|
|
}
|
|
}
|
|
|
|
|
|
func tableHandler(w http.ResponseWriter, r *http.Request) {
|
|
func tableHandler(w http.ResponseWriter, r *http.Request) {
|
|
- sourceManageHandler(w, r, xsql.TypeTable)
|
|
|
|
|
|
+ sourceManageHandler(w, r, ast.TypeTable)
|
|
}
|
|
}
|
|
|
|
|
|
//list or create rules
|
|
//list or create rules
|
|
@@ -406,35 +407,35 @@ func getTopoRuleHandler(w http.ResponseWriter, r *http.Request) {
|
|
w.Write([]byte(content))
|
|
w.Write([]byte(content))
|
|
}
|
|
}
|
|
|
|
|
|
-func pluginsHandler(w http.ResponseWriter, r *http.Request, t plugins.PluginType) {
|
|
|
|
|
|
+func pluginsHandler(w http.ResponseWriter, r *http.Request, t plugin.PluginType) {
|
|
defer r.Body.Close()
|
|
defer r.Body.Close()
|
|
switch r.Method {
|
|
switch r.Method {
|
|
case http.MethodGet:
|
|
case http.MethodGet:
|
|
content, err := pluginManager.List(t)
|
|
content, err := pluginManager.List(t)
|
|
if err != nil {
|
|
if err != nil {
|
|
- handleError(w, err, fmt.Sprintf("%s plugins list command error", plugins.PluginTypes[t]), logger)
|
|
|
|
|
|
+ handleError(w, err, fmt.Sprintf("%s plugins list command error", plugin.PluginTypes[t]), logger)
|
|
return
|
|
return
|
|
}
|
|
}
|
|
jsonResponse(content, w, logger)
|
|
jsonResponse(content, w, logger)
|
|
case http.MethodPost:
|
|
case http.MethodPost:
|
|
- sd := plugins.NewPluginByType(t)
|
|
|
|
|
|
+ sd := plugin.NewPluginByType(t)
|
|
err := json.NewDecoder(r.Body).Decode(sd)
|
|
err := json.NewDecoder(r.Body).Decode(sd)
|
|
// Problems decoding
|
|
// Problems decoding
|
|
if err != nil {
|
|
if err != nil {
|
|
- handleError(w, err, fmt.Sprintf("Invalid body: Error decoding the %s plugin json", plugins.PluginTypes[t]), logger)
|
|
|
|
|
|
+ handleError(w, err, fmt.Sprintf("Invalid body: Error decoding the %s plugin json", plugin.PluginTypes[t]), logger)
|
|
return
|
|
return
|
|
}
|
|
}
|
|
err = pluginManager.Register(t, sd)
|
|
err = pluginManager.Register(t, sd)
|
|
if err != nil {
|
|
if err != nil {
|
|
- handleError(w, err, fmt.Sprintf("%s plugins create command error", plugins.PluginTypes[t]), logger)
|
|
|
|
|
|
+ handleError(w, err, fmt.Sprintf("%s plugins create command error", plugin.PluginTypes[t]), logger)
|
|
return
|
|
return
|
|
}
|
|
}
|
|
w.WriteHeader(http.StatusCreated)
|
|
w.WriteHeader(http.StatusCreated)
|
|
- w.Write([]byte(fmt.Sprintf("%s plugin %s is created", plugins.PluginTypes[t], sd.GetName())))
|
|
|
|
|
|
+ w.Write([]byte(fmt.Sprintf("%s plugin %s is created", plugin.PluginTypes[t], sd.GetName())))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-func pluginHandler(w http.ResponseWriter, r *http.Request, t plugins.PluginType) {
|
|
|
|
|
|
+func pluginHandler(w http.ResponseWriter, r *http.Request, t plugin.PluginType) {
|
|
defer r.Body.Close()
|
|
defer r.Body.Close()
|
|
vars := mux.Vars(r)
|
|
vars := mux.Vars(r)
|
|
name := vars["name"]
|
|
name := vars["name"]
|
|
@@ -445,11 +446,11 @@ func pluginHandler(w http.ResponseWriter, r *http.Request, t plugins.PluginType)
|
|
r := cb == "1"
|
|
r := cb == "1"
|
|
err := pluginManager.Delete(t, name, r)
|
|
err := pluginManager.Delete(t, name, r)
|
|
if err != nil {
|
|
if err != nil {
|
|
- handleError(w, err, fmt.Sprintf("delete %s plugin %s error", plugins.PluginTypes[t], name), logger)
|
|
|
|
|
|
+ handleError(w, err, fmt.Sprintf("delete %s plugin %s error", plugin.PluginTypes[t], name), logger)
|
|
return
|
|
return
|
|
}
|
|
}
|
|
w.WriteHeader(http.StatusOK)
|
|
w.WriteHeader(http.StatusOK)
|
|
- result := fmt.Sprintf("%s plugin %s is deleted", plugins.PluginTypes[t], name)
|
|
|
|
|
|
+ result := fmt.Sprintf("%s plugin %s is deleted", plugin.PluginTypes[t], name)
|
|
if r {
|
|
if r {
|
|
result = fmt.Sprintf("%s and Kuiper will be stopped", result)
|
|
result = fmt.Sprintf("%s and Kuiper will be stopped", result)
|
|
} else {
|
|
} else {
|
|
@@ -459,7 +460,7 @@ func pluginHandler(w http.ResponseWriter, r *http.Request, t plugins.PluginType)
|
|
case http.MethodGet:
|
|
case http.MethodGet:
|
|
j, ok := pluginManager.Get(t, name)
|
|
j, ok := pluginManager.Get(t, name)
|
|
if !ok {
|
|
if !ok {
|
|
- handleError(w, common.NewErrorWithCode(common.NOT_FOUND, "not found"), fmt.Sprintf("describe %s plugin %s error", plugins.PluginTypes[t], name), logger)
|
|
|
|
|
|
+ handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe %s plugin %s error", plugin.PluginTypes[t], name), logger)
|
|
return
|
|
return
|
|
}
|
|
}
|
|
jsonResponse(j, w, logger)
|
|
jsonResponse(j, w, logger)
|
|
@@ -468,27 +469,27 @@ func pluginHandler(w http.ResponseWriter, r *http.Request, t plugins.PluginType)
|
|
|
|
|
|
//list or create source plugin
|
|
//list or create source plugin
|
|
func sourcesHandler(w http.ResponseWriter, r *http.Request) {
|
|
func sourcesHandler(w http.ResponseWriter, r *http.Request) {
|
|
- pluginsHandler(w, r, plugins.SOURCE)
|
|
|
|
|
|
+ pluginsHandler(w, r, plugin.SOURCE)
|
|
}
|
|
}
|
|
|
|
|
|
//delete a source plugin
|
|
//delete a source plugin
|
|
func sourceHandler(w http.ResponseWriter, r *http.Request) {
|
|
func sourceHandler(w http.ResponseWriter, r *http.Request) {
|
|
- pluginHandler(w, r, plugins.SOURCE)
|
|
|
|
|
|
+ pluginHandler(w, r, plugin.SOURCE)
|
|
}
|
|
}
|
|
|
|
|
|
//list or create sink plugin
|
|
//list or create sink plugin
|
|
func sinksHandler(w http.ResponseWriter, r *http.Request) {
|
|
func sinksHandler(w http.ResponseWriter, r *http.Request) {
|
|
- pluginsHandler(w, r, plugins.SINK)
|
|
|
|
|
|
+ pluginsHandler(w, r, plugin.SINK)
|
|
}
|
|
}
|
|
|
|
|
|
//delete a sink plugin
|
|
//delete a sink plugin
|
|
func sinkHandler(w http.ResponseWriter, r *http.Request) {
|
|
func sinkHandler(w http.ResponseWriter, r *http.Request) {
|
|
- pluginHandler(w, r, plugins.SINK)
|
|
|
|
|
|
+ pluginHandler(w, r, plugin.SINK)
|
|
}
|
|
}
|
|
|
|
|
|
//list or create function plugin
|
|
//list or create function plugin
|
|
func functionsHandler(w http.ResponseWriter, r *http.Request) {
|
|
func functionsHandler(w http.ResponseWriter, r *http.Request) {
|
|
- pluginsHandler(w, r, plugins.FUNCTION)
|
|
|
|
|
|
+ pluginsHandler(w, r, plugin.FUNCTION)
|
|
}
|
|
}
|
|
|
|
|
|
//list all user defined functions in all function plugins
|
|
//list all user defined functions in all function plugins
|
|
@@ -506,7 +507,7 @@ func functionsGetHandler(w http.ResponseWriter, r *http.Request) {
|
|
name := vars["name"]
|
|
name := vars["name"]
|
|
j, ok := pluginManager.GetSymbol(name)
|
|
j, ok := pluginManager.GetSymbol(name)
|
|
if !ok {
|
|
if !ok {
|
|
- handleError(w, common.NewErrorWithCode(common.NOT_FOUND, "not found"), fmt.Sprintf("describe function %s error", name), logger)
|
|
|
|
|
|
+ handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe function %s error", name), logger)
|
|
return
|
|
return
|
|
}
|
|
}
|
|
jsonResponse(map[string]string{"name": name, "plugin": j}, w, logger)
|
|
jsonResponse(map[string]string{"name": name, "plugin": j}, w, logger)
|
|
@@ -514,7 +515,7 @@ func functionsGetHandler(w http.ResponseWriter, r *http.Request) {
|
|
|
|
|
|
//delete a function plugin
|
|
//delete a function plugin
|
|
func functionHandler(w http.ResponseWriter, r *http.Request) {
|
|
func functionHandler(w http.ResponseWriter, r *http.Request) {
|
|
- pluginHandler(w, r, plugins.FUNCTION)
|
|
|
|
|
|
+ pluginHandler(w, r, plugin.FUNCTION)
|
|
}
|
|
}
|
|
|
|
|
|
type functionList struct {
|
|
type functionList struct {
|
|
@@ -529,9 +530,9 @@ func functionRegisterHandler(w http.ResponseWriter, r *http.Request) {
|
|
vars := mux.Vars(r)
|
|
vars := mux.Vars(r)
|
|
name := vars["name"]
|
|
name := vars["name"]
|
|
|
|
|
|
- _, ok := pluginManager.Get(plugins.FUNCTION, name)
|
|
|
|
|
|
+ _, ok := pluginManager.Get(plugin.FUNCTION, name)
|
|
if !ok {
|
|
if !ok {
|
|
- handleError(w, common.NewErrorWithCode(common.NOT_FOUND, "not found"), fmt.Sprintf("register %s plugin %s error", plugins.PluginTypes[plugins.FUNCTION], name), logger)
|
|
|
|
|
|
+ handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("register %s plugin %s error", plugin.PluginTypes[plugin.FUNCTION], name), logger)
|
|
return
|
|
return
|
|
}
|
|
}
|
|
sd := functionList{}
|
|
sd := functionList{}
|
|
@@ -551,15 +552,15 @@ func functionRegisterHandler(w http.ResponseWriter, r *http.Request) {
|
|
}
|
|
}
|
|
|
|
|
|
func prebuildSourcePlugins(w http.ResponseWriter, r *http.Request) {
|
|
func prebuildSourcePlugins(w http.ResponseWriter, r *http.Request) {
|
|
- prebuildPluginsHandler(w, r, plugins.SOURCE)
|
|
|
|
|
|
+ prebuildPluginsHandler(w, r, plugin.SOURCE)
|
|
}
|
|
}
|
|
|
|
|
|
func prebuildSinkPlugins(w http.ResponseWriter, r *http.Request) {
|
|
func prebuildSinkPlugins(w http.ResponseWriter, r *http.Request) {
|
|
- prebuildPluginsHandler(w, r, plugins.SINK)
|
|
|
|
|
|
+ prebuildPluginsHandler(w, r, plugin.SINK)
|
|
}
|
|
}
|
|
|
|
|
|
func prebuildFuncsPlugins(w http.ResponseWriter, r *http.Request) {
|
|
func prebuildFuncsPlugins(w http.ResponseWriter, r *http.Request) {
|
|
- prebuildPluginsHandler(w, r, plugins.FUNCTION)
|
|
|
|
|
|
+ prebuildPluginsHandler(w, r, plugin.FUNCTION)
|
|
}
|
|
}
|
|
|
|
|
|
func isOffcialDockerImage() bool {
|
|
func isOffcialDockerImage() bool {
|
|
@@ -569,13 +570,13 @@ func isOffcialDockerImage() bool {
|
|
return true
|
|
return true
|
|
}
|
|
}
|
|
|
|
|
|
-func prebuildPluginsHandler(w http.ResponseWriter, r *http.Request, t plugins.PluginType) {
|
|
|
|
|
|
+func prebuildPluginsHandler(w http.ResponseWriter, r *http.Request, t plugin.PluginType) {
|
|
emsg := "It's strongly recommended to install plugins at official released Debian Docker images. If you choose to proceed to install plugin, please make sure the plugin is already validated in your own build."
|
|
emsg := "It's strongly recommended to install plugins at official released Debian Docker images. If you choose to proceed to install plugin, please make sure the plugin is already validated in your own build."
|
|
if !isOffcialDockerImage() {
|
|
if !isOffcialDockerImage() {
|
|
handleError(w, fmt.Errorf(emsg), "", logger)
|
|
handleError(w, fmt.Errorf(emsg), "", logger)
|
|
return
|
|
return
|
|
} else if runtime.GOOS == "linux" {
|
|
} else if runtime.GOOS == "linux" {
|
|
- osrelease, err := common.Read()
|
|
|
|
|
|
+ osrelease, err := Read()
|
|
if err != nil {
|
|
if err != nil {
|
|
logger.Infof("")
|
|
logger.Infof("")
|
|
return
|
|
return
|
|
@@ -583,11 +584,11 @@ func prebuildPluginsHandler(w http.ResponseWriter, r *http.Request, t plugins.Pl
|
|
prettyName := strings.ToUpper(osrelease["PRETTY_NAME"])
|
|
prettyName := strings.ToUpper(osrelease["PRETTY_NAME"])
|
|
os := "debian"
|
|
os := "debian"
|
|
if strings.Contains(prettyName, "DEBIAN") {
|
|
if strings.Contains(prettyName, "DEBIAN") {
|
|
- hosts := common.Config.Basic.PluginHosts
|
|
|
|
|
|
+ hosts := conf.Config.Basic.PluginHosts
|
|
ptype := "sources"
|
|
ptype := "sources"
|
|
- if t == plugins.SINK {
|
|
|
|
|
|
+ if t == plugin.SINK {
|
|
ptype = "sinks"
|
|
ptype = "sinks"
|
|
- } else if t == plugins.FUNCTION {
|
|
|
|
|
|
+ } else if t == plugin.FUNCTION {
|
|
ptype = "functions"
|
|
ptype = "functions"
|
|
}
|
|
}
|
|
if err, plugins := fetchPluginList(hosts, ptype, os, runtime.GOARCH); err != nil {
|
|
if err, plugins := fetchPluginList(hosts, ptype, os, runtime.GOARCH); err != nil {
|
|
@@ -686,7 +687,7 @@ loop:
|
|
//list sink plugin
|
|
//list sink plugin
|
|
func sinksMetaHandler(w http.ResponseWriter, r *http.Request) {
|
|
func sinksMetaHandler(w http.ResponseWriter, r *http.Request) {
|
|
defer r.Body.Close()
|
|
defer r.Body.Close()
|
|
- sinks := plugins.GetSinks()
|
|
|
|
|
|
+ sinks := plugin.GetSinks()
|
|
jsonResponse(sinks, w, logger)
|
|
jsonResponse(sinks, w, logger)
|
|
return
|
|
return
|
|
}
|
|
}
|
|
@@ -698,7 +699,7 @@ func newSinkMetaHandler(w http.ResponseWriter, r *http.Request) {
|
|
pluginName := vars["name"]
|
|
pluginName := vars["name"]
|
|
|
|
|
|
language := getLanguage(r)
|
|
language := getLanguage(r)
|
|
- ptrMetadata, err := plugins.GetSinkMeta(pluginName, language)
|
|
|
|
|
|
+ ptrMetadata, err := plugin.GetSinkMeta(pluginName, language)
|
|
if err != nil {
|
|
if err != nil {
|
|
handleError(w, err, "", logger)
|
|
handleError(w, err, "", logger)
|
|
return
|
|
return
|
|
@@ -709,7 +710,7 @@ func newSinkMetaHandler(w http.ResponseWriter, r *http.Request) {
|
|
//list functions
|
|
//list functions
|
|
func functionsMetaHandler(w http.ResponseWriter, r *http.Request) {
|
|
func functionsMetaHandler(w http.ResponseWriter, r *http.Request) {
|
|
defer r.Body.Close()
|
|
defer r.Body.Close()
|
|
- sinks := plugins.GetFunctions()
|
|
|
|
|
|
+ sinks := plugin.GetFunctions()
|
|
jsonResponse(sinks, w, logger)
|
|
jsonResponse(sinks, w, logger)
|
|
return
|
|
return
|
|
}
|
|
}
|
|
@@ -717,7 +718,7 @@ func functionsMetaHandler(w http.ResponseWriter, r *http.Request) {
|
|
//list source plugin
|
|
//list source plugin
|
|
func sourcesMetaHandler(w http.ResponseWriter, r *http.Request) {
|
|
func sourcesMetaHandler(w http.ResponseWriter, r *http.Request) {
|
|
defer r.Body.Close()
|
|
defer r.Body.Close()
|
|
- ret := plugins.GetSources()
|
|
|
|
|
|
+ ret := plugin.GetSources()
|
|
if nil != ret {
|
|
if nil != ret {
|
|
jsonResponse(ret, w, logger)
|
|
jsonResponse(ret, w, logger)
|
|
return
|
|
return
|
|
@@ -730,7 +731,7 @@ func sourceMetaHandler(w http.ResponseWriter, r *http.Request) {
|
|
vars := mux.Vars(r)
|
|
vars := mux.Vars(r)
|
|
pluginName := vars["name"]
|
|
pluginName := vars["name"]
|
|
language := getLanguage(r)
|
|
language := getLanguage(r)
|
|
- ret, err := plugins.GetSourceMeta(pluginName, language)
|
|
|
|
|
|
+ ret, err := plugin.GetSourceMeta(pluginName, language)
|
|
if err != nil {
|
|
if err != nil {
|
|
handleError(w, err, "", logger)
|
|
handleError(w, err, "", logger)
|
|
return
|
|
return
|
|
@@ -747,7 +748,7 @@ func sourceConfHandler(w http.ResponseWriter, r *http.Request) {
|
|
vars := mux.Vars(r)
|
|
vars := mux.Vars(r)
|
|
pluginName := vars["name"]
|
|
pluginName := vars["name"]
|
|
language := getLanguage(r)
|
|
language := getLanguage(r)
|
|
- ret, err := plugins.GetSourceConf(pluginName, language)
|
|
|
|
|
|
+ ret, err := plugin.GetSourceConf(pluginName, language)
|
|
if err != nil {
|
|
if err != nil {
|
|
handleError(w, err, "", logger)
|
|
handleError(w, err, "", logger)
|
|
return
|
|
return
|
|
@@ -761,7 +762,7 @@ func sourceConfKeysHandler(w http.ResponseWriter, r *http.Request) {
|
|
defer r.Body.Close()
|
|
defer r.Body.Close()
|
|
vars := mux.Vars(r)
|
|
vars := mux.Vars(r)
|
|
pluginName := vars["name"]
|
|
pluginName := vars["name"]
|
|
- ret := plugins.GetSourceConfKeys(pluginName)
|
|
|
|
|
|
+ ret := plugin.GetSourceConfKeys(pluginName)
|
|
if nil != ret {
|
|
if nil != ret {
|
|
jsonResponse(ret, w, logger)
|
|
jsonResponse(ret, w, logger)
|
|
return
|
|
return
|
|
@@ -780,14 +781,14 @@ func sourceConfKeyHandler(w http.ResponseWriter, r *http.Request) {
|
|
language := getLanguage(r)
|
|
language := getLanguage(r)
|
|
switch r.Method {
|
|
switch r.Method {
|
|
case http.MethodDelete:
|
|
case http.MethodDelete:
|
|
- err = plugins.DelSourceConfKey(pluginName, confKey, language)
|
|
|
|
|
|
+ err = plugin.DelSourceConfKey(pluginName, confKey, language)
|
|
case http.MethodPost:
|
|
case http.MethodPost:
|
|
v, err := ioutil.ReadAll(r.Body)
|
|
v, err := ioutil.ReadAll(r.Body)
|
|
if err != nil {
|
|
if err != nil {
|
|
handleError(w, err, "Invalid body", logger)
|
|
handleError(w, err, "Invalid body", logger)
|
|
return
|
|
return
|
|
}
|
|
}
|
|
- err = plugins.AddSourceConfKey(pluginName, confKey, language, v)
|
|
|
|
|
|
+ err = plugin.AddSourceConfKey(pluginName, confKey, language, v)
|
|
}
|
|
}
|
|
if err != nil {
|
|
if err != nil {
|
|
handleError(w, err, "", logger)
|
|
handleError(w, err, "", logger)
|
|
@@ -816,9 +817,9 @@ func sourceConfKeyFieldsHandler(w http.ResponseWriter, r *http.Request) {
|
|
language := getLanguage(r)
|
|
language := getLanguage(r)
|
|
switch r.Method {
|
|
switch r.Method {
|
|
case http.MethodDelete:
|
|
case http.MethodDelete:
|
|
- err = plugins.DelSourceConfKeyField(pluginName, confKey, language, v)
|
|
|
|
|
|
+ err = plugin.DelSourceConfKeyField(pluginName, confKey, language, v)
|
|
case http.MethodPost:
|
|
case http.MethodPost:
|
|
- err = plugins.AddSourceConfKeyField(pluginName, confKey, language, v)
|
|
|
|
|
|
+ err = plugin.AddSourceConfKeyField(pluginName, confKey, language, v)
|
|
}
|
|
}
|
|
if err != nil {
|
|
if err != nil {
|
|
handleError(w, err, "", logger)
|
|
handleError(w, err, "", logger)
|
|
@@ -848,7 +849,7 @@ func servicesHandler(w http.ResponseWriter, r *http.Request) {
|
|
}
|
|
}
|
|
jsonResponse(content, w, logger)
|
|
jsonResponse(content, w, logger)
|
|
case http.MethodPost:
|
|
case http.MethodPost:
|
|
- sd := &services.ServiceCreationRequest{}
|
|
|
|
|
|
+ sd := &service.ServiceCreationRequest{}
|
|
err := json.NewDecoder(r.Body).Decode(sd)
|
|
err := json.NewDecoder(r.Body).Decode(sd)
|
|
// Problems decoding
|
|
// Problems decoding
|
|
if err != nil {
|
|
if err != nil {
|
|
@@ -883,12 +884,12 @@ func serviceHandler(w http.ResponseWriter, r *http.Request) {
|
|
case http.MethodGet:
|
|
case http.MethodGet:
|
|
j, err := serviceManager.Get(name)
|
|
j, err := serviceManager.Get(name)
|
|
if err != nil {
|
|
if err != nil {
|
|
- handleError(w, common.NewErrorWithCode(common.NOT_FOUND, "not found"), fmt.Sprintf("describe service %s error", name), logger)
|
|
|
|
|
|
+ handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe service %s error", name), logger)
|
|
return
|
|
return
|
|
}
|
|
}
|
|
jsonResponse(j, w, logger)
|
|
jsonResponse(j, w, logger)
|
|
case http.MethodPut:
|
|
case http.MethodPut:
|
|
- sd := &services.ServiceCreationRequest{}
|
|
|
|
|
|
+ sd := &service.ServiceCreationRequest{}
|
|
err := json.NewDecoder(r.Body).Decode(sd)
|
|
err := json.NewDecoder(r.Body).Decode(sd)
|
|
// Problems decoding
|
|
// Problems decoding
|
|
if err != nil {
|
|
if err != nil {
|
|
@@ -920,7 +921,7 @@ func serviceFunctionHandler(w http.ResponseWriter, r *http.Request) {
|
|
name := vars["name"]
|
|
name := vars["name"]
|
|
j, err := serviceManager.GetFunction(name)
|
|
j, err := serviceManager.GetFunction(name)
|
|
if err != nil {
|
|
if err != nil {
|
|
- handleError(w, common.NewErrorWithCode(common.NOT_FOUND, "not found"), fmt.Sprintf("describe function %s error", name), logger)
|
|
|
|
|
|
+ handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe function %s error", name), logger)
|
|
return
|
|
return
|
|
}
|
|
}
|
|
jsonResponse(j, w, logger)
|
|
jsonResponse(j, w, logger)
|