function_plugin.go 980 B

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. package xsql
  2. import (
  3. "github.com/emqx/kuiper/plugins"
  4. "github.com/emqx/kuiper/xstream/api"
  5. "github.com/emqx/kuiper/xstream/contexts"
  6. )
  7. //Manage the function plugin instances
  8. //Each operator has a single instance of this
  9. type funcPlugins struct {
  10. plugins map[string]*funcReg
  11. parentCtx api.StreamContext
  12. }
  13. type funcReg struct {
  14. ins api.Function
  15. ctx api.FunctionContext
  16. }
  17. func NewFuncPlugins(ctx api.StreamContext) *funcPlugins {
  18. return &funcPlugins{
  19. parentCtx: ctx,
  20. }
  21. }
  22. func (fp *funcPlugins) GetFuncFromPlugin(name string) (api.Function, api.FunctionContext, error) {
  23. if fp.plugins == nil {
  24. fp.plugins = make(map[string]*funcReg)
  25. }
  26. if reg, ok := fp.plugins[name]; !ok {
  27. nf, err := plugins.GetFunction(name)
  28. if err != nil {
  29. return nil, nil, err
  30. }
  31. fctx := contexts.NewDefaultFuncContext(fp.parentCtx, len(fp.plugins))
  32. fp.plugins[name] = &funcReg{
  33. ins: nf,
  34. ctx: fctx,
  35. }
  36. return nf, fctx, nil
  37. } else {
  38. return reg.ins, reg.ctx, nil
  39. }
  40. }