analyticfuncs_operator.go 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package operator
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/xsql"
  18. "github.com/lf-edge/ekuiper/pkg/api"
  19. "github.com/lf-edge/ekuiper/pkg/ast"
  20. )
  21. type AnalyticFuncsOp struct {
  22. Funcs []*ast.Call // Must range from end to start, because the later one may use the result of the former one
  23. }
  24. func (p *AnalyticFuncsOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
  25. ctx.GetLogger().Debugf("AnalyticFuncsOp receive: %s", data)
  26. switch input := data.(type) {
  27. case error:
  28. return input
  29. case xsql.TupleRow:
  30. ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(input, fv)}
  31. // Must range from end to start, because the later one may use the result of the former one
  32. for i := len(p.Funcs) - 1; i >= 0; i-- {
  33. f := p.Funcs[i]
  34. result := ve.Eval(f)
  35. if e, ok := result.(error); ok {
  36. return e
  37. }
  38. input.Set(f.CachedField, result)
  39. }
  40. case xsql.SingleCollection:
  41. err := input.RangeSet(func(_ int, row xsql.Row) (bool, error) {
  42. ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(row, &xsql.WindowRangeValuer{WindowRange: input.GetWindowRange()}, fv, &xsql.WildcardValuer{Data: row})}
  43. for i := len(p.Funcs) - 1; i >= 0; i-- {
  44. f := p.Funcs[i]
  45. result := ve.Eval(f)
  46. if e, ok := result.(error); ok {
  47. return false, e
  48. }
  49. row.Set(f.CachedField, result)
  50. }
  51. return true, nil
  52. })
  53. if err != nil {
  54. return err
  55. }
  56. default:
  57. return fmt.Errorf("run analytic funcs op error: invalid input %[1]T(%[1]v)", input)
  58. }
  59. return data
  60. }