script_operator.go 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. // Copyright 2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build script
  15. package operator
  16. import (
  17. "fmt"
  18. "github.com/dop251/goja"
  19. "github.com/lf-edge/ekuiper/internal/xsql"
  20. "github.com/lf-edge/ekuiper/pkg/api"
  21. )
  22. type ScriptOp struct {
  23. vm *goja.Runtime
  24. jsfunc goja.Callable
  25. isAgg bool
  26. }
  27. func NewScriptOp(script string, isAgg bool) (*ScriptOp, error) {
  28. vm := goja.New()
  29. _, err := vm.RunString(script)
  30. if err != nil {
  31. return nil, fmt.Errorf("failed to interprete script: %v", err)
  32. }
  33. exec, ok := goja.AssertFunction(vm.Get("exec"))
  34. if !ok {
  35. return nil, fmt.Errorf("cannot find function \"exec\" in script")
  36. }
  37. n := &ScriptOp{
  38. vm: vm,
  39. jsfunc: exec,
  40. isAgg: isAgg,
  41. }
  42. return n, nil
  43. }
  44. func (p *ScriptOp) Apply(ctx api.StreamContext, data interface{}, _ *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
  45. ctx.GetLogger().Debugf("ScriptOp receive: %v", data)
  46. switch input := data.(type) {
  47. case error:
  48. return input
  49. case *xsql.Tuple:
  50. val, err := p.jsfunc(goja.Undefined(), p.vm.ToValue(input.ToMap()), p.vm.ToValue(input.Metadata))
  51. if err != nil {
  52. return fmt.Errorf("failed to execute script: %v", err)
  53. } else {
  54. nm, ok := val.Export().(map[string]interface{})
  55. if !ok {
  56. return fmt.Errorf("script exec result is not a map: %v", val.Export())
  57. } else {
  58. return &xsql.Tuple{Message: nm, Metadata: input.Metadata, Emitter: input.Emitter, Timestamp: input.Timestamp}
  59. }
  60. }
  61. case xsql.Collection:
  62. val, err := p.jsfunc(goja.Undefined(), p.vm.ToValue(input.ToMaps()))
  63. if err != nil {
  64. return fmt.Errorf("failed to execute script: %v", err)
  65. } else {
  66. switch nm := val.Export().(type) {
  67. case map[string]interface{}:
  68. if !p.isAgg {
  69. return fmt.Errorf("script node is not aggregate but exec result is aggregated: %v", val.Export())
  70. }
  71. return &xsql.Tuple{Message: nm}
  72. case []map[string]interface{}:
  73. if p.isAgg {
  74. return fmt.Errorf("script node is aggregate but exec result is not aggreagated: %v", val.Export())
  75. }
  76. w := &xsql.WindowTuples{}
  77. for _, v := range nm {
  78. if v != nil {
  79. w.Content = append(w.Content, &xsql.Tuple{Message: v})
  80. }
  81. }
  82. return w
  83. default:
  84. return fmt.Errorf("script exec result is not a map or array of map: %v", val.Export())
  85. }
  86. }
  87. default:
  88. return fmt.Errorf("run script op invalid input allow tuple only but got %[1]T(%[1]v)", input)
  89. }
  90. }