rpc_plugin_native.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build plugin && rpc && core && !portable
  15. // +build plugin,rpc,core,!portable
  16. package server
  17. import (
  18. "fmt"
  19. "github.com/lf-edge/ekuiper/internal/pkg/model"
  20. "github.com/lf-edge/ekuiper/internal/plugin"
  21. "strings"
  22. )
  23. func (t *Server) doRegister(pt plugin.PluginType, p plugin.Plugin) error {
  24. if pt == plugin.PORTABLE {
  25. return fmt.Errorf("portable plugin support is disabled")
  26. } else {
  27. return nativeManager.Register(pt, p)
  28. }
  29. }
  30. func (t *Server) doDelete(pt plugin.PluginType, name string, stopRun bool) error {
  31. if pt == plugin.PORTABLE {
  32. return fmt.Errorf("portable plugin support is disabled")
  33. } else {
  34. return nativeManager.Delete(pt, name, stopRun)
  35. }
  36. }
  37. func (t *Server) doDesc(pt plugin.PluginType, name string) (interface{}, error) {
  38. if pt == plugin.PORTABLE {
  39. return nil, fmt.Errorf("portable plugin support is disabled")
  40. } else {
  41. r, ok := nativeManager.GetPluginInfo(pt, name)
  42. if !ok {
  43. return nil, fmt.Errorf("not found")
  44. }
  45. return r, nil
  46. }
  47. }
  48. func (t *Server) RegisterPlugin(arg *model.PluginDesc, reply *string) error {
  49. p, err := getPluginByJson(arg, plugin.FUNCTION)
  50. if err != nil {
  51. return fmt.Errorf("Register plugin functions error: %s", err)
  52. }
  53. if len(p.GetSymbols()) == 0 {
  54. return fmt.Errorf("Register plugin functions error: Missing function list.")
  55. }
  56. err = nativeManager.RegisterFuncs(p.GetName(), p.GetSymbols())
  57. if err != nil {
  58. return fmt.Errorf("Create plugin error: %s", err)
  59. } else {
  60. *reply = fmt.Sprintf("Plugin %s is created.", p.GetName())
  61. }
  62. return nil
  63. }
  64. func (t *Server) ShowPlugins(arg int, reply *string) error {
  65. pt := plugin.PluginType(arg)
  66. l := nativeManager.List(pt)
  67. if len(l) == 0 {
  68. l = append(l, "No plugin is found.")
  69. }
  70. *reply = strings.Join(l, "\n")
  71. return nil
  72. }
  73. func (t *Server) ShowUdfs(_ int, reply *string) error {
  74. l := nativeManager.ListSymbols()
  75. if len(l) == 0 {
  76. l = append(l, "No udf is found.")
  77. }
  78. *reply = strings.Join(l, "\n")
  79. return nil
  80. }
  81. func (t *Server) DescUdf(arg string, reply *string) error {
  82. m, ok := nativeManager.GetPluginBySymbol(plugin.FUNCTION, arg)
  83. if !ok {
  84. return fmt.Errorf("Describe udf error: not found")
  85. } else {
  86. j := map[string]string{
  87. "name": arg,
  88. "plugin": m,
  89. }
  90. r, err := marshalDesc(j)
  91. if err != nil {
  92. return fmt.Errorf("Describe udf error: %v", err)
  93. }
  94. *reply = r
  95. }
  96. return nil
  97. }