functionRuntime.go 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. package xsql
  2. import (
  3. "github.com/emqx/kuiper/internal/topo/context"
  4. "github.com/emqx/kuiper/pkg/api"
  5. "github.com/emqx/kuiper/pkg/errorx"
  6. "sync"
  7. )
  8. //Manage the function plugin instances
  9. //Each operator has a single instance of this to hold the context
  10. type funcRuntime struct {
  11. sync.Mutex
  12. regs map[string]*funcReg
  13. parentCtx api.StreamContext
  14. funcRegisters []FunctionRegister
  15. }
  16. type funcReg struct {
  17. ins api.Function
  18. ctx api.FunctionContext
  19. }
  20. func NewFuncRuntime(ctx api.StreamContext, registers []FunctionRegister) *funcRuntime {
  21. return &funcRuntime{
  22. parentCtx: ctx,
  23. funcRegisters: registers,
  24. }
  25. }
  26. func (fp *funcRuntime) Get(name string) (api.Function, api.FunctionContext, error) {
  27. fp.Lock()
  28. defer fp.Unlock()
  29. if fp.regs == nil {
  30. fp.regs = make(map[string]*funcReg)
  31. }
  32. if reg, ok := fp.regs[name]; !ok {
  33. var (
  34. nf api.Function
  35. err error
  36. )
  37. // Check service extension and plugin extension if set
  38. for _, r := range fp.funcRegisters {
  39. if r.HasFunction(name) {
  40. nf, err = r.Function(name)
  41. if err != nil {
  42. return nil, nil, err
  43. }
  44. break
  45. }
  46. }
  47. if nf == nil {
  48. return nil, nil, errorx.NotFoundErr
  49. }
  50. fctx := context.NewDefaultFuncContext(fp.parentCtx, len(fp.regs))
  51. fp.regs[name] = &funcReg{
  52. ins: nf,
  53. ctx: fctx,
  54. }
  55. return nf, fctx, nil
  56. } else {
  57. return reg.ins, reg.ctx, nil
  58. }
  59. }