123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276 |
- // Copyright 2021-2023 EMQ Technologies Co., Ltd.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package planner
- import (
- "fmt"
- "sort"
- "strings"
- "github.com/lf-edge/ekuiper/pkg/ast"
- "github.com/lf-edge/ekuiper/pkg/message"
- )
- type DataSourcePlan struct {
- baseLogicalPlan
- name ast.StreamName
- // calculated properties
- // initialized with stream definition, pruned with rule
- metaFields []string
- // pass-on and converted state. For schemaless, the value is always nil
- streamFields map[string]*ast.JsonStreamField
- // pass-on properties
- isSchemaless bool
- streamStmt *ast.StreamStmt
- allMeta bool
- isBinary bool
- iet bool
- timestampFormat string
- timestampField string
- // intermediate status
- isWildCard bool
- fields map[string]*ast.JsonStreamField
- metaMap map[string]string
- pruneFields []string
- }
- func (p DataSourcePlan) Init() *DataSourcePlan {
- p.baseLogicalPlan.self = &p
- p.baseLogicalPlan.setPlanType(DATASOURCE)
- return &p
- }
- func (p *DataSourcePlan) BuildExplainInfo(id int64) {
- info := ""
- if p.name != "" {
- info += "StreamName: " + string(p.name)
- }
- if p.fields != nil && len(p.fields) != 0 {
- info += ", Fields:[ "
- keys := make([]string, 0, len(p.fields))
- for k := range p.fields {
- keys = append(keys, k)
- }
- sort.Strings(keys)
- for i := 0; i < len(keys); i++ {
- info += keys[i]
- if i != len(keys)-1 {
- info += ", "
- }
- }
- info += " ]"
- }
- if p.streamFields != nil && len(p.streamFields) != 0 {
- info += ", StreamFields:[ "
- keys := make([]string, 0, len(p.streamFields))
- for k := range p.streamFields {
- keys = append(keys, k)
- }
- sort.Strings(keys)
- for i := 0; i < len(keys); i++ {
- info += keys[i]
- if i != len(keys)-1 {
- info += ", "
- }
- }
- info += " ]"
- }
- p.baseLogicalPlan.ExplainInfo.ID = id
- p.baseLogicalPlan.ExplainInfo.Info = info
- }
- // PushDownPredicate Presume no children for data source
- func (p *DataSourcePlan) PushDownPredicate(condition ast.Expr) (ast.Expr, LogicalPlan) {
- if p.streamStmt.StreamType == ast.TypeTable {
- return condition, p.self
- }
- owned, other := p.extract(condition)
- if owned != nil {
- // Add a filter plan for children
- f := FilterPlan{
- condition: owned,
- }.Init()
- f.SetChildren([]LogicalPlan{p})
- return other, f
- }
- return other, p
- }
- func (p *DataSourcePlan) extract(expr ast.Expr) (ast.Expr, ast.Expr) {
- s, hasDefault := getRefSources(expr)
- l := len(s)
- if hasDefault {
- l += 1
- }
- switch len(s) {
- case 0:
- return expr, nil
- case 1:
- if s[0] == p.name || s[0] == ast.DefaultStream {
- return expr, nil
- } else {
- return nil, expr
- }
- default:
- if be, ok := expr.(*ast.BinaryExpr); ok && be.OP == ast.AND {
- ul, pl := p.extract(be.LHS)
- ur, pr := p.extract(be.RHS)
- owned := combine(ul, ur)
- other := combine(pl, pr)
- return owned, other
- }
- return nil, expr
- }
- }
- func (p *DataSourcePlan) PruneColumns(fields []ast.Expr) error {
- // init values
- err := p.getProps()
- if err != nil {
- return err
- }
- p.fields = make(map[string]*ast.JsonStreamField)
- p.pruneFields = make([]string, 0)
- if !p.allMeta {
- p.metaMap = make(map[string]string)
- }
- if p.timestampField != "" {
- if !p.isSchemaless {
- tsf, ok := p.streamFields[p.timestampField]
- if !ok {
- return fmt.Errorf("timestamp field %s not found", p.timestampField)
- }
- p.fields[p.timestampField] = tsf
- } else {
- p.fields[p.timestampField] = nil
- }
- }
- for _, field := range fields {
- switch f := field.(type) {
- case *ast.Wildcard:
- if len(f.Except) == 0 && len(f.Replace) == 0 {
- p.isWildCard = true
- } else {
- for _, except := range f.Except {
- p.pruneFields = append(p.pruneFields, except)
- }
- for _, replace := range f.Replace {
- p.pruneFields = append(p.pruneFields, replace.AName)
- }
- }
- case *ast.FieldRef:
- if !p.isWildCard && (f.StreamName == ast.DefaultStream || f.StreamName == p.name) {
- if _, ok := p.fields[f.Name]; !ok {
- sf, err := p.getField(f.Name, f.StreamName == p.name)
- if err != nil {
- return err
- }
- if p.isSchemaless || sf != nil {
- p.fields[f.Name] = sf
- }
- }
- }
- case *ast.MetaRef:
- if p.allMeta {
- break
- }
- if f.StreamName == ast.DefaultStream || f.StreamName == p.name {
- if f.Name == "*" {
- p.allMeta = true
- p.metaMap = nil
- } else if !p.allMeta {
- p.metaMap[strings.ToLower(f.Name)] = f.Name
- }
- }
- case *ast.SortField:
- if !p.isWildCard {
- sf, err := p.getField(f.Name, f.StreamName == p.name)
- if err != nil {
- return err
- }
- if p.isSchemaless || sf != nil {
- p.fields[f.Name] = sf
- }
- }
- default:
- return fmt.Errorf("unsupported field %v", field)
- }
- }
- p.getAllFields()
- return nil
- }
- func (p *DataSourcePlan) getField(name string, strict bool) (*ast.JsonStreamField, error) {
- if !p.isSchemaless {
- r, ok := p.streamFields[name]
- if !ok {
- if strict {
- return nil, fmt.Errorf("field %s not found in stream %s", name, p.name)
- }
- } else {
- return r, nil
- }
- }
- // always return nil for schemaless
- return nil, nil
- }
- // Do not prune fields now for preprocessor
- // TODO provide field information to the source for it to prune
- func (p *DataSourcePlan) getAllFields() {
- if !p.isWildCard {
- if len(p.pruneFields) == 0 {
- p.streamFields = p.fields
- } else {
- for _, pf := range p.pruneFields {
- prune := true
- for f := range p.fields {
- if pf == f {
- prune = false
- break
- }
- }
- if prune {
- delete(p.streamFields, pf)
- }
- }
- }
- }
- p.metaFields = make([]string, 0, len(p.metaMap))
- for _, v := range p.metaMap {
- p.metaFields = append(p.metaFields, v)
- }
- // for consistency of results for testing
- sort.Strings(p.metaFields)
- p.fields = nil
- p.metaMap = nil
- }
- func (p *DataSourcePlan) getProps() error {
- if p.iet {
- if p.streamStmt.Options.TIMESTAMP != "" {
- p.timestampField = p.streamStmt.Options.TIMESTAMP
- } else {
- return fmt.Errorf("preprocessor is set to be event time but stream option TIMESTAMP not found")
- }
- }
- if p.streamStmt.Options.TIMESTAMP_FORMAT != "" {
- p.timestampFormat = p.streamStmt.Options.TIMESTAMP_FORMAT
- }
- if strings.EqualFold(p.streamStmt.Options.FORMAT, message.FormatBinary) {
- p.isBinary = true
- }
- return nil
- }
|