|
@@ -21,6 +21,7 @@ import (
|
|
"github.com/lf-edge/ekuiper/internal/pkg/model"
|
|
"github.com/lf-edge/ekuiper/internal/pkg/model"
|
|
"github.com/lf-edge/ekuiper/internal/plugin"
|
|
"github.com/lf-edge/ekuiper/internal/plugin"
|
|
"github.com/lf-edge/ekuiper/internal/plugin/native"
|
|
"github.com/lf-edge/ekuiper/internal/plugin/native"
|
|
|
|
+ "github.com/lf-edge/ekuiper/internal/plugin/portable"
|
|
"github.com/lf-edge/ekuiper/internal/service"
|
|
"github.com/lf-edge/ekuiper/internal/service"
|
|
"github.com/lf-edge/ekuiper/internal/topo/sink"
|
|
"github.com/lf-edge/ekuiper/internal/topo/sink"
|
|
"strings"
|
|
"strings"
|
|
@@ -211,7 +212,11 @@ func (t *Server) CreatePlugin(arg *model.PluginDesc, reply *string) error {
|
|
if p.GetFile() == "" {
|
|
if p.GetFile() == "" {
|
|
return fmt.Errorf("Create plugin error: Missing plugin file url.")
|
|
return fmt.Errorf("Create plugin error: Missing plugin file url.")
|
|
}
|
|
}
|
|
- err = native.GetManager().Register(pt, p)
|
|
|
|
|
|
+ if pt == plugin.PORTABLE {
|
|
|
|
+ err = portable.GetManager().Register(p)
|
|
|
|
+ } else {
|
|
|
|
+ err = native.GetManager().Register(pt, p)
|
|
|
|
+ }
|
|
if err != nil {
|
|
if err != nil {
|
|
return fmt.Errorf("Create plugin error: %s", err)
|
|
return fmt.Errorf("Create plugin error: %s", err)
|
|
} else {
|
|
} else {
|
|
@@ -243,16 +248,24 @@ func (t *Server) DropPlugin(arg *model.PluginDesc, reply *string) error {
|
|
if err != nil {
|
|
if err != nil {
|
|
return fmt.Errorf("Drop plugin error: %s", err)
|
|
return fmt.Errorf("Drop plugin error: %s", err)
|
|
}
|
|
}
|
|
- err = native.GetManager().Delete(pt, p.GetName(), arg.Stop)
|
|
|
|
- if err != nil {
|
|
|
|
- return fmt.Errorf("Drop plugin error: %s", err)
|
|
|
|
|
|
+ if pt == plugin.PORTABLE {
|
|
|
|
+ err = portable.GetManager().Delete(p.GetName())
|
|
|
|
+ if err != nil {
|
|
|
|
+ return fmt.Errorf("Drop plugin error: %s", err)
|
|
|
|
+ } else {
|
|
|
|
+ *reply = fmt.Sprintf("Plugin %s is dropped .", p.GetName())
|
|
|
|
+ }
|
|
} else {
|
|
} else {
|
|
- if arg.Stop {
|
|
|
|
- *reply = fmt.Sprintf("Plugin %s is dropped and Kuiper will be stopped.", p.GetName())
|
|
|
|
|
|
+ err = native.GetManager().Delete(pt, p.GetName(), arg.Stop)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return fmt.Errorf("Drop plugin error: %s", err)
|
|
} else {
|
|
} else {
|
|
- *reply = fmt.Sprintf("Plugin %s is dropped and Kuiper must restart for the change to take effect.", p.GetName())
|
|
|
|
|
|
+ if arg.Stop {
|
|
|
|
+ *reply = fmt.Sprintf("Plugin %s is dropped and Kuiper will be stopped.", p.GetName())
|
|
|
|
+ } else {
|
|
|
|
+ *reply = fmt.Sprintf("Plugin %s is dropped and Kuiper must restart for the change to take effect.", p.GetName())
|
|
|
|
+ }
|
|
}
|
|
}
|
|
-
|
|
|
|
}
|
|
}
|
|
return nil
|
|
return nil
|
|
}
|
|
}
|
|
@@ -282,7 +295,13 @@ func (t *Server) DescPlugin(arg *model.PluginDesc, reply *string) error {
|
|
if err != nil {
|
|
if err != nil {
|
|
return fmt.Errorf("Describe plugin error: %s", err)
|
|
return fmt.Errorf("Describe plugin error: %s", err)
|
|
}
|
|
}
|
|
- m, ok := native.GetManager().GetPluginInfo(pt, p.GetName())
|
|
|
|
|
|
+ var m interface{}
|
|
|
|
+ var ok bool
|
|
|
|
+ if pt == plugin.PORTABLE {
|
|
|
|
+ m, ok = portable.GetManager().GetPluginInfo(p.GetName())
|
|
|
|
+ } else {
|
|
|
|
+ m, ok = native.GetManager().GetPluginInfo(pt, p.GetName())
|
|
|
|
+ }
|
|
if !ok {
|
|
if !ok {
|
|
return fmt.Errorf("Describe plugin error: not found")
|
|
return fmt.Errorf("Describe plugin error: not found")
|
|
} else {
|
|
} else {
|