projectPlan.go 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package planner
  15. import "github.com/lf-edge/ekuiper/pkg/ast"
  16. type ProjectPlan struct {
  17. baseLogicalPlan
  18. isAggregate bool
  19. allWildcard bool
  20. sendMeta bool
  21. fields ast.Fields
  22. colNames [][]string
  23. aliasNames []string
  24. exprNames []string
  25. exceptNames []string
  26. wildcardEmitters map[string]bool
  27. aliasFields ast.Fields
  28. exprFields ast.Fields
  29. }
  30. func (p ProjectPlan) Init() *ProjectPlan {
  31. p.allWildcard = false
  32. p.wildcardEmitters = make(map[string]bool)
  33. for _, field := range p.fields {
  34. if field.AName != "" {
  35. p.aliasFields = append(p.aliasFields, field)
  36. p.aliasNames = append(p.aliasNames, field.AName)
  37. } else {
  38. switch ft := field.Expr.(type) {
  39. case *ast.Wildcard:
  40. p.allWildcard = true
  41. // TODO: fix Prunecolums
  42. p.exceptNames = ft.Except
  43. for _, replace := range ft.Replace {
  44. p.aliasFields = append(p.aliasFields, replace)
  45. p.aliasNames = append(p.aliasNames, replace.AName)
  46. }
  47. case *ast.FieldRef:
  48. if ft.Name == "*" {
  49. p.wildcardEmitters[string(ft.StreamName)] = true
  50. } else {
  51. p.colNames = append(p.colNames, []string{ft.Name, string(ft.StreamName)})
  52. }
  53. default:
  54. p.exprNames = append(p.exprNames, field.Name)
  55. p.exprFields = append(p.exprFields, field)
  56. }
  57. }
  58. }
  59. p.baseLogicalPlan.self = &p
  60. return &p
  61. }
  62. func (p *ProjectPlan) PruneColumns(fields []ast.Expr) error {
  63. f := getFields(p.fields)
  64. return p.baseLogicalPlan.PruneColumns(append(fields, f...))
  65. }