util.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package planner
  15. import "github.com/lf-edge/ekuiper/pkg/ast"
  16. func getRefSources(node ast.Node) ([]ast.StreamName, bool) {
  17. result := make(map[ast.StreamName]bool)
  18. keys := make([]ast.StreamName, 0, len(result))
  19. if node == nil {
  20. return keys, false
  21. }
  22. hasDefault := false
  23. ast.WalkFunc(node, func(n ast.Node) bool {
  24. if f, ok := n.(*ast.FieldRef); ok {
  25. for _, sn := range f.RefSources() {
  26. if sn == ast.DefaultStream {
  27. hasDefault = true
  28. }
  29. result[sn] = true
  30. }
  31. return false
  32. }
  33. return true
  34. })
  35. for k := range result {
  36. keys = append(keys, k)
  37. }
  38. return keys, hasDefault
  39. }
  40. func combine(l ast.Expr, r ast.Expr) ast.Expr {
  41. if l != nil && r != nil {
  42. return &ast.BinaryExpr{
  43. OP: ast.AND,
  44. LHS: l,
  45. RHS: r,
  46. }
  47. } else if l != nil {
  48. return l
  49. } else {
  50. return r
  51. }
  52. }
  53. func getFields(node ast.Node) []ast.Expr {
  54. result := make([]ast.Expr, 0)
  55. ast.WalkFunc(node, func(n ast.Node) bool {
  56. switch t := n.(type) {
  57. case *ast.FieldRef:
  58. if t.IsColumn() {
  59. result = append(result, t)
  60. }
  61. case *ast.Wildcard:
  62. result = append(result, t)
  63. case *ast.MetaRef:
  64. if t.StreamName != "" {
  65. result = append(result, t)
  66. }
  67. case *ast.SortField:
  68. result = append(result, t)
  69. }
  70. return true
  71. })
  72. return result
  73. }