projectPlan.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package planner
  15. import "github.com/lf-edge/ekuiper/pkg/ast"
  16. type ProjectPlan struct {
  17. baseLogicalPlan
  18. isAggregate bool
  19. allWildcard bool
  20. sendMeta bool
  21. fields ast.Fields
  22. colNames [][]string
  23. aliasNames []string
  24. exprNames []string
  25. exceptNames []string
  26. windowFuncNames map[string]struct{}
  27. wildcardEmitters map[string]bool
  28. aliasFields ast.Fields
  29. exprFields ast.Fields
  30. enableLimit bool
  31. limitCount int
  32. }
  33. func (p ProjectPlan) Init() *ProjectPlan {
  34. p.allWildcard = false
  35. p.wildcardEmitters = make(map[string]bool)
  36. for _, field := range p.fields {
  37. if field.AName != "" {
  38. p.aliasFields = append(p.aliasFields, field)
  39. p.aliasNames = append(p.aliasNames, field.AName)
  40. } else {
  41. switch ft := field.Expr.(type) {
  42. case *ast.Wildcard:
  43. p.allWildcard = true
  44. p.exceptNames = ft.Except
  45. for _, replace := range ft.Replace {
  46. p.aliasFields = append(p.aliasFields, replace)
  47. p.aliasNames = append(p.aliasNames, replace.AName)
  48. }
  49. case *ast.FieldRef:
  50. if ft.Name == "*" {
  51. p.wildcardEmitters[string(ft.StreamName)] = true
  52. } else {
  53. p.colNames = append(p.colNames, []string{ft.Name, string(ft.StreamName)})
  54. }
  55. default:
  56. p.exprNames = append(p.exprNames, field.Name)
  57. p.exprFields = append(p.exprFields, field)
  58. }
  59. }
  60. }
  61. p.baseLogicalPlan.self = &p
  62. if len(p.windowFuncNames) < 1 {
  63. p.windowFuncNames = nil
  64. }
  65. return &p
  66. }
  67. func (p *ProjectPlan) PruneColumns(fields []ast.Expr) error {
  68. f := getFields(p.fields)
  69. return p.baseLogicalPlan.PruneColumns(append(fields, f...))
  70. }