windowfunc_operator.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. // Copyright 2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package operator
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/xsql"
  18. "github.com/lf-edge/ekuiper/pkg/api"
  19. "github.com/lf-edge/ekuiper/pkg/ast"
  20. )
  21. type WindowFuncOperator struct {
  22. WindowFuncFields ast.Fields
  23. }
  24. type windowFuncHandle interface {
  25. handleTuple(input xsql.TupleRow)
  26. handleCollection(input xsql.Collection)
  27. }
  28. type rowNumberFuncHandle struct {
  29. name string
  30. }
  31. func (rh *rowNumberFuncHandle) handleTuple(input xsql.TupleRow) {
  32. input.Set(rh.name, 1)
  33. }
  34. func (rh *rowNumberFuncHandle) handleCollection(input xsql.Collection) {
  35. index := 1
  36. input.RangeSet(func(i int, r xsql.Row) (bool, error) {
  37. r.Set(rh.name, index)
  38. index++
  39. return true, nil
  40. })
  41. }
  42. func (wf *WindowFuncOperator) Apply(_ api.StreamContext, data interface{}, _ *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
  43. for _, windowFuncField := range wf.WindowFuncFields {
  44. name := windowFuncField.Name
  45. if windowFuncField.AName != "" {
  46. name = windowFuncField.AName
  47. }
  48. var funcName string
  49. switch c := windowFuncField.Expr.(type) {
  50. case *ast.Call:
  51. funcName = c.Name
  52. case *ast.FieldRef:
  53. funcName = c.AliasRef.Expression.(*ast.Call).Name
  54. }
  55. wh, err := getWindowFuncHandle(funcName, name)
  56. if err != nil {
  57. return err
  58. }
  59. switch input := data.(type) {
  60. case xsql.TupleRow:
  61. wh.handleTuple(input)
  62. case xsql.Collection:
  63. wh.handleCollection(input)
  64. }
  65. }
  66. return data
  67. }
  68. func getWindowFuncHandle(funcName, colName string) (windowFuncHandle, error) {
  69. switch funcName {
  70. case "row_number":
  71. return &rowNumberFuncHandle{name: colName}, nil
  72. }
  73. return nil, fmt.Errorf("")
  74. }