function_runtime.go 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. package xsql
  2. import (
  3. "github.com/emqx/kuiper/xstream/api"
  4. "github.com/emqx/kuiper/xstream/contexts"
  5. "sync"
  6. )
  7. //Manage the function plugin instances
  8. //Each operator has a single instance of this to hold the context
  9. type funcRuntime struct {
  10. sync.Mutex
  11. regs map[string]*funcReg
  12. parentCtx api.StreamContext
  13. funcRegisters []FunctionRegister
  14. }
  15. type funcReg struct {
  16. ins api.Function
  17. ctx api.FunctionContext
  18. }
  19. func NewFuncRuntime(ctx api.StreamContext, registers []FunctionRegister) *funcRuntime {
  20. return &funcRuntime{
  21. parentCtx: ctx,
  22. funcRegisters: registers,
  23. }
  24. }
  25. func (fp *funcRuntime) getCustom(name string) (api.Function, api.FunctionContext, error) {
  26. fp.Lock()
  27. defer fp.Unlock()
  28. if fp.regs == nil {
  29. fp.regs = make(map[string]*funcReg)
  30. }
  31. if reg, ok := fp.regs[name]; !ok {
  32. var (
  33. nf api.Function
  34. err error
  35. )
  36. // Check service extension and plugin extension if set
  37. for _, r := range fp.funcRegisters {
  38. if r.HasFunction(name) {
  39. nf, err = r.Function(name)
  40. if err != nil {
  41. return nil, nil, err
  42. }
  43. break
  44. }
  45. }
  46. if nf == nil {
  47. return nil, nil, NotFoundErr
  48. }
  49. fctx := contexts.NewDefaultFuncContext(fp.parentCtx, len(fp.regs))
  50. fp.regs[name] = &funcReg{
  51. ins: nf,
  52. ctx: fctx,
  53. }
  54. return nf, fctx, nil
  55. } else {
  56. return reg.ins, reg.ctx, nil
  57. }
  58. }