|
@@ -1,4 +1,4 @@
|
|
|
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
|
|
|
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
|
|
|
//
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
// you may not use this file except in compliance with the License.
|
|
@@ -40,9 +40,10 @@ type DataSourcePlan struct {
|
|
|
timestampFormat string
|
|
|
timestampField string
|
|
|
// intermediate status
|
|
|
- isWildCard bool
|
|
|
- fields map[string]*ast.JsonStreamField
|
|
|
- metaMap map[string]string
|
|
|
+ isWildCard bool
|
|
|
+ fields map[string]*ast.JsonStreamField
|
|
|
+ metaMap map[string]string
|
|
|
+ pruneFields []string
|
|
|
}
|
|
|
|
|
|
func (p DataSourcePlan) Init() *DataSourcePlan {
|
|
@@ -141,6 +142,7 @@ func (p *DataSourcePlan) PruneColumns(fields []ast.Expr) error {
|
|
|
return err
|
|
|
}
|
|
|
p.fields = make(map[string]*ast.JsonStreamField)
|
|
|
+ p.pruneFields = make([]string, 0)
|
|
|
if !p.allMeta {
|
|
|
p.metaMap = make(map[string]string)
|
|
|
}
|
|
@@ -158,7 +160,16 @@ func (p *DataSourcePlan) PruneColumns(fields []ast.Expr) error {
|
|
|
for _, field := range fields {
|
|
|
switch f := field.(type) {
|
|
|
case *ast.Wildcard:
|
|
|
- p.isWildCard = true
|
|
|
+ if len(f.Except) == 0 && len(f.Replace) == 0 {
|
|
|
+ p.isWildCard = true
|
|
|
+ } else {
|
|
|
+ for _, except := range f.Except {
|
|
|
+ p.pruneFields = append(p.pruneFields, except)
|
|
|
+ }
|
|
|
+ for _, replace := range f.Replace {
|
|
|
+ p.pruneFields = append(p.pruneFields, replace.AName)
|
|
|
+ }
|
|
|
+ }
|
|
|
case *ast.FieldRef:
|
|
|
if !p.isWildCard && (f.StreamName == ast.DefaultStream || f.StreamName == p.name) {
|
|
|
if _, ok := p.fields[f.Name]; !ok {
|
|
@@ -220,7 +231,22 @@ func (p *DataSourcePlan) getField(name string, strict bool) (*ast.JsonStreamFiel
|
|
|
// TODO provide field information to the source for it to prune
|
|
|
func (p *DataSourcePlan) getAllFields() {
|
|
|
if !p.isWildCard {
|
|
|
- p.streamFields = p.fields
|
|
|
+ if len(p.pruneFields) == 0 {
|
|
|
+ p.streamFields = p.fields
|
|
|
+ } else {
|
|
|
+ for _, pf := range p.pruneFields {
|
|
|
+ prune := true
|
|
|
+ for f := range p.fields {
|
|
|
+ if pf == f {
|
|
|
+ prune = false
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if prune {
|
|
|
+ delete(p.streamFields, pf)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
p.metaFields = make([]string, 0, len(p.metaMap))
|
|
|
for _, v := range p.metaMap {
|