|
@@ -18,6 +18,7 @@ import (
|
|
|
"bytes"
|
|
|
"encoding/json"
|
|
|
"fmt"
|
|
|
+ "github.com/lf-edge/ekuiper/internal/pkg/model"
|
|
|
"github.com/lf-edge/ekuiper/internal/plugin"
|
|
|
"github.com/lf-edge/ekuiper/internal/service"
|
|
|
"github.com/lf-edge/ekuiper/internal/topo/sink"
|
|
@@ -89,7 +90,7 @@ func (t *Server) Stream(stream string, reply *string) error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (t *Server) CreateRule(rule *RPCArgDesc, reply *string) error {
|
|
|
+func (t *Server) CreateRule(rule *model.RPCArgDesc, reply *string) error {
|
|
|
r, err := ruleProcessor.ExecCreate(rule.Name, rule.Json)
|
|
|
if err != nil {
|
|
|
return fmt.Errorf("Create rule error : %s.", err)
|
|
@@ -200,7 +201,7 @@ func (t *Server) DropRule(name string, reply *string) error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (t *Server) CreatePlugin(arg *PluginDesc, reply *string) error {
|
|
|
+func (t *Server) CreatePlugin(arg *model.PluginDesc, reply *string) error {
|
|
|
pt := plugin.PluginType(arg.Type)
|
|
|
p, err := getPluginByJson(arg, pt)
|
|
|
if err != nil {
|
|
@@ -218,7 +219,7 @@ func (t *Server) CreatePlugin(arg *PluginDesc, reply *string) error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (t *Server) RegisterPlugin(arg *PluginDesc, reply *string) error {
|
|
|
+func (t *Server) RegisterPlugin(arg *model.PluginDesc, reply *string) error {
|
|
|
p, err := getPluginByJson(arg, plugin.FUNCTION)
|
|
|
if err != nil {
|
|
|
return fmt.Errorf("Register plugin functions error: %s", err)
|
|
@@ -235,7 +236,7 @@ func (t *Server) RegisterPlugin(arg *PluginDesc, reply *string) error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (t *Server) DropPlugin(arg *PluginDesc, reply *string) error {
|
|
|
+func (t *Server) DropPlugin(arg *model.PluginDesc, reply *string) error {
|
|
|
pt := plugin.PluginType(arg.Type)
|
|
|
p, err := getPluginByJson(arg, pt)
|
|
|
if err != nil {
|
|
@@ -282,7 +283,7 @@ func (t *Server) ShowUdfs(_ int, reply *string) error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (t *Server) DescPlugin(arg *PluginDesc, reply *string) error {
|
|
|
+func (t *Server) DescPlugin(arg *model.PluginDesc, reply *string) error {
|
|
|
pt := plugin.PluginType(arg.Type)
|
|
|
p, err := getPluginByJson(arg, pt)
|
|
|
if err != nil {
|
|
@@ -319,7 +320,7 @@ func (t *Server) DescUdf(arg string, reply *string) error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (t *Server) CreateService(arg *RPCArgDesc, reply *string) error {
|
|
|
+func (t *Server) CreateService(arg *model.RPCArgDesc, reply *string) error {
|
|
|
sd := &service.ServiceCreationRequest{}
|
|
|
if arg.Json != "" {
|
|
|
if err := json.Unmarshal([]byte(arg.Json), sd); err != nil {
|
|
@@ -424,7 +425,7 @@ func marshalDesc(m interface{}) (string, error) {
|
|
|
return dst.String(), nil
|
|
|
}
|
|
|
|
|
|
-func getPluginByJson(arg *PluginDesc, pt plugin.PluginType) (plugin.Plugin, error) {
|
|
|
+func getPluginByJson(arg *model.PluginDesc, pt plugin.PluginType) (plugin.Plugin, error) {
|
|
|
p := plugin.NewPluginByType(pt)
|
|
|
if arg.Json != "" {
|
|
|
if err := json.Unmarshal([]byte(arg.Json), p); err != nil {
|