123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778 |
- // Copyright 2021 EMQ Technologies Co., Ltd.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package planner
- import "github.com/lf-edge/ekuiper/pkg/ast"
- func getRefSources(node ast.Node) ([]ast.StreamName, bool) {
- result := make(map[ast.StreamName]bool)
- keys := make([]ast.StreamName, 0, len(result))
- if node == nil {
- return keys, false
- }
- hasDefault := false
- ast.WalkFunc(node, func(n ast.Node) bool {
- if f, ok := n.(*ast.FieldRef); ok {
- for _, sn := range f.RefSources() {
- if sn == ast.DefaultStream {
- hasDefault = true
- }
- result[sn] = true
- }
- return false
- }
- return true
- })
- for k := range result {
- keys = append(keys, k)
- }
- return keys, hasDefault
- }
- func combine(l ast.Expr, r ast.Expr) ast.Expr {
- if l != nil && r != nil {
- return &ast.BinaryExpr{
- OP: ast.AND,
- LHS: l,
- RHS: r,
- }
- } else if l != nil {
- return l
- } else {
- return r
- }
- }
- func getFields(node ast.Node) []ast.Expr {
- result := make([]ast.Expr, 0)
- ast.WalkFunc(node, func(n ast.Node) bool {
- switch t := n.(type) {
- case *ast.FieldRef:
- if t.IsColumn() {
- result = append(result, t)
- }
- case *ast.Wildcard:
- result = append(result, t)
- case *ast.MetaRef:
- if t.StreamName != "" {
- result = append(result, t)
- }
- case *ast.SortField:
- result = append(result, t)
- }
- return true
- })
- return result
- }
|