Просмотр исходного кода

feat(sql): add extended wildcard (#2041)

* add wildcard extend sql

Signed-off-by: Rui-Gan <1171530954@qq.com>

* fix wildcard eval

Signed-off-by: Rui-Gan <1171530954@qq.com>

* add docs for sql

Signed-off-by: Rui-Gan <1171530954@qq.com>

* fix markdownlint

Signed-off-by: Rui-Gan <1171530954@qq.com>

* fix message

Signed-off-by: Rui-Gan <1171530954@qq.com>

* delete unnecessary convert

Signed-off-by: Rui-Gan <1171530954@qq.com>

---------

Signed-off-by: Rui-Gan <1171530954@qq.com>
Regina 1 год назад
Родитель
Сommit
10d123dae8

+ 19 - 1
docs/en_US/sqls/query_language_elements.md

@@ -21,7 +21,7 @@ Retrieves rows from input streams and enables the selection of one or many colum
 
 ```sql
 SELECT 
-    *
+    * [EXCEPT | REPLACE]
     | [source_stream.]column_name [AS column_alias]
     | expression
   
@@ -35,6 +35,24 @@ Specifies that all columns from all input streams in the FROM clause should be r
 
 Select all of fields from source stream.
 
+**\* EXCEPT**
+
+Specify one or more fields to be excluded from the result. It allows excluding one or more specific column names from the query result while still including other columns.
+
+```sql
+SELECT * EXCEPT(column_name1, column_name2...)
+FROM stream1
+```
+
+**\* REPLACE**
+
+Replace specific columns in the result. It allows for the replacement of certain columns in the result by specifying new expressions, while other columns are still included in the output.
+
+```sql
+SELECT * REPLACE(expression1 as column_name1, expression2 as column_name2...)
+FROM stream1
+```
+
 **source_stream**
 
 The source stream name or alias name.

+ 19 - 1
docs/zh_CN/sqls/query_language_elements.md

@@ -22,7 +22,7 @@ eKuiper 提供了用于构建查询的各种元素。 总结如下。
 
 ```sql
 SELECT 
-    *
+    * [EXCEPT | REPLACE]
     | [source_stream.]column_name [AS column_alias]
     | expression
   
@@ -36,6 +36,24 @@ SELECT
 
 从源流中选择所有字段。
 
+**\* EXCEPT**
+
+用于指定一个或多个字段,以便从结果中排除这些字段。它允许在查询结果中排除一个或多个特定的列名,而其他列则仍然包含在查询结果中。
+
+```sql
+SELECT * EXCEPT(column_name1, column_name2...)
+FROM stream1
+```
+
+**\* REPLACE**
+
+用于在SQL查询结果中替换指定列。它允许通过指定新的表达式来替换查询结果中的某些列,而其他列则仍然包含在查询结果中。
+
+```sql
+SELECT * REPLACE(expression1 as column_name1, expression2 as column_name2...)
+FROM stream1
+```
+
 **source_stream**
 
 源流名称或别名。

+ 3 - 2
internal/topo/operator/project_operator.go

@@ -1,4 +1,4 @@
-// Copyright 2022 EMQ Technologies Co., Ltd.
+// Copyright 2022-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -28,6 +28,7 @@ type ProjectOp struct {
 	ColNames         [][]string // list of [col, table]
 	AliasNames       []string   // list of alias name
 	ExprNames        []string   // list of expr name
+	ExceptNames      []string   // list of except name
 	AllWildcard      bool
 	WildcardEmitters map[string]bool
 	AliasFields      ast.Fields
@@ -147,7 +148,7 @@ func (pp *ProjectOp) project(row xsql.Row, ve *xsql.ValuerEval) error {
 			pp.alias = append(pp.alias, f.AName, vi)
 		}
 	}
-	row.Pick(pp.AllWildcard, pp.ColNames, pp.WildcardEmitters)
+	row.Pick(pp.AllWildcard, pp.ColNames, pp.WildcardEmitters, pp.ExceptNames)
 	for i := 0; i < len(pp.kvs); i += 2 {
 		row.Set(pp.kvs[i].(string), pp.kvs[i+1])
 	}

+ 170 - 0
internal/topo/operator/project_test.go

@@ -39,6 +39,11 @@ func parseStmt(p *ProjectOp, fields ast.Fields) {
 			switch ft := field.Expr.(type) {
 			case *ast.Wildcard:
 				p.AllWildcard = true
+				p.ExceptNames = ft.Except
+				for _, replace := range ft.Replace {
+					p.AliasNames = append(p.AliasNames, replace.AName)
+					p.AliasFields = append(p.AliasFields, replace)
+				}
 			case *ast.FieldRef:
 				if ft.Name == "*" {
 					p.WildcardEmitters[string(ft.StreamName)] = true
@@ -606,6 +611,66 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				"f1":  -12,
 			}},
 		},
+		// 36
+		{
+			sql: `SELECT * EXCEPT(a, b) from test`,
+			data: &xsql.Tuple{
+				Emitter: "test",
+				Message: xsql.Message{
+					"a": map[string]interface{}{
+						"b": "test",
+					},
+					"b": "b",
+					"c": "c",
+				},
+			},
+			result: []map[string]interface{}{
+				{
+					"c": "c",
+				},
+			},
+		},
+		// 37
+		{
+			sql: `SELECT * REPLACE(a->b as a) from test`,
+			data: &xsql.Tuple{
+				Emitter: "test",
+				Message: xsql.Message{
+					"a": map[string]interface{}{
+						"b": "test",
+					},
+					"b": "b",
+					"c": "c",
+				},
+			},
+			result: []map[string]interface{}{
+				{
+					"a": "test",
+					"b": "b",
+					"c": "c",
+				},
+			},
+		},
+		// 38
+		{
+			sql: `SELECT * EXCEPT(c) REPLACE("test" as b, a->b as a) from test`,
+			data: &xsql.Tuple{
+				Emitter: "test",
+				Message: xsql.Message{
+					"a": map[string]interface{}{
+						"b": "test",
+					},
+					"b": "b",
+					"c": 1,
+				},
+			},
+			result: []map[string]interface{}{
+				{
+					"a": "test",
+					"b": "test",
+				},
+			},
+		},
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
@@ -1213,6 +1278,81 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				"f1":  "v1",
 			}},
 		},
+		// 19
+		{
+			sql: `SELECT * EXCEPT(a, b) from test WHERE f1 = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
+			data: &xsql.WindowTuples{
+				Content: []xsql.TupleRow{
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 1, "f1": "v1", "a": map[string]interface{}{"b": "test"}, "b": "b", "c": "c"},
+					}, &xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 2, "f1": "v2", "a": map[string]interface{}{"b": "test"}, "b": "b", "c": "c"},
+					}, &xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 3, "f1": "v1", "a": map[string]interface{}{"b": "test"}, "b": "b", "c": "c"},
+					},
+				},
+			},
+			result: []map[string]interface{}{{
+				"id1": 1, "c": "c", "f1": "v1",
+			}, {
+				"id1": 2, "c": "c", "f1": "v2",
+			}, {
+				"id1": 3, "c": "c", "f1": "v1",
+			}},
+		},
+		// 20
+		{
+			sql: `SELECT * REPLACE(a->b as a) from test WHERE f1 = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
+			data: &xsql.WindowTuples{
+				Content: []xsql.TupleRow{
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 1, "f1": "v1", "a": map[string]interface{}{"b": "test"}, "b": "b", "c": "c"},
+					}, &xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 2, "f1": "v2", "a": map[string]interface{}{"b": "test"}, "b": "b", "c": "c"},
+					}, &xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 3, "f1": "v1", "a": map[string]interface{}{"b": "test"}, "b": "b", "c": "c"},
+					},
+				},
+			},
+			result: []map[string]interface{}{{
+				"id1": 1, "c": "c", "a": "test", "b": "b", "f1": "v1",
+			}, {
+				"id1": 2, "c": "c", "a": "test", "b": "b", "f1": "v2",
+			}, {
+				"id1": 3, "c": "c", "a": "test", "b": "b", "f1": "v1",
+			}},
+		},
+		// 21
+		{
+			sql: `SELECT * EXCEPT(c) REPLACE("test" as b, a->b as a) from test WHERE f1 = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
+			data: &xsql.WindowTuples{
+				Content: []xsql.TupleRow{
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 1, "f1": "v1", "a": map[string]interface{}{"b": "test"}, "b": "test"},
+					}, &xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 2, "f1": "v2", "a": map[string]interface{}{"b": "test"}, "b": "test"},
+					}, &xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 3, "f1": "v1", "a": map[string]interface{}{"b": "test"}, "b": "test"},
+					},
+				},
+			},
+			result: []map[string]interface{}{{
+				"id1": 1, "a": "test", "b": "test", "f1": "v1",
+			}, {
+				"id1": 2, "a": "test", "b": "test", "f1": "v2",
+			}, {
+				"id1": 3, "a": "test", "b": "test", "f1": "v1",
+			}},
+		},
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
@@ -2227,6 +2367,36 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 				"max3": int64(100),
 			}},
 		},
+		// 22
+		{
+			sql: "SELECT count(* EXCEPT(a, b)) as all  FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
+			data: &xsql.JoinTuples{
+				Content: []*xsql.JoinTuple{
+					{
+						Tuples: []xsql.TupleRow{
+							&xsql.Tuple{Emitter: "test", Message: xsql.Message{"id": 1, "a": "a", "b": "b"}},
+							&xsql.Tuple{Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
+						},
+					},
+					{
+						Tuples: []xsql.TupleRow{
+							&xsql.Tuple{Emitter: "test", Message: xsql.Message{"id": 1, "a": "a", "b": "b"}},
+							&xsql.Tuple{Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
+						},
+					},
+					{
+						Tuples: []xsql.TupleRow{
+							&xsql.Tuple{Emitter: "test", Message: xsql.Message{"id": 5, "a": "a", "b": "b"}},
+							&xsql.Tuple{Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
+						},
+					},
+				},
+			},
+
+			result: []map[string]interface{}{{
+				"all": 3,
+			}},
+		},
 	}
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
 	contextLogger := conf.Log.WithField("rule", "TestProjectPlan_AggFuncs")

+ 1 - 1
internal/topo/planner/planner.go

@@ -177,7 +177,7 @@ func buildOps(lp LogicalPlan, tp *topo.Topo, options *api.RuleOption, sources []
 	case *OrderPlan:
 		op = Transform(&operator.OrderOp{SortFields: t.SortFields}, fmt.Sprintf("%d_order", newIndex), options)
 	case *ProjectPlan:
-		op = Transform(&operator.ProjectOp{ColNames: t.colNames, AliasNames: t.aliasNames, AliasFields: t.aliasFields, ExprFields: t.exprFields, IsAggregate: t.isAggregate, AllWildcard: t.allWildcard, WildcardEmitters: t.wildcardEmitters, ExprNames: t.exprNames, SendMeta: t.sendMeta}, fmt.Sprintf("%d_project", newIndex), options)
+		op = Transform(&operator.ProjectOp{ColNames: t.colNames, AliasNames: t.aliasNames, AliasFields: t.aliasFields, ExprFields: t.exprFields, ExceptNames: t.exceptNames, IsAggregate: t.isAggregate, AllWildcard: t.allWildcard, WildcardEmitters: t.wildcardEmitters, ExprNames: t.exprNames, SendMeta: t.sendMeta}, fmt.Sprintf("%d_project", newIndex), options)
 	case *ProjectSetPlan:
 		op = Transform(&operator.ProjectSetOperator{SrfMapping: t.SrfMapping}, fmt.Sprintf("%d_projectset", newIndex), options)
 	default:

+ 1 - 1
internal/topo/planner/planner_graph.go

@@ -692,7 +692,7 @@ func parsePick(props map[string]interface{}, sourceNames []string) (*operator.Pr
 		fields:      stmt.Fields,
 		isAggregate: xsql.WithAggFields(stmt),
 	}.Init()
-	return &operator.ProjectOp{ColNames: t.colNames, AliasNames: t.aliasNames, AliasFields: t.aliasFields, ExprFields: t.exprFields, IsAggregate: t.isAggregate, AllWildcard: t.allWildcard, WildcardEmitters: t.wildcardEmitters, ExprNames: t.exprNames, SendMeta: t.sendMeta}, nil
+	return &operator.ProjectOp{ColNames: t.colNames, AliasNames: t.aliasNames, AliasFields: t.aliasFields, ExprFields: t.exprFields, ExceptNames: t.exceptNames, IsAggregate: t.isAggregate, AllWildcard: t.allWildcard, WildcardEmitters: t.wildcardEmitters, ExprNames: t.exprNames, SendMeta: t.sendMeta}, nil
 }
 
 func parseFunc(props map[string]interface{}, sourceNames []string) (*operator.FuncOp, error) {

+ 8 - 1
internal/topo/planner/projectPlan.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@ type ProjectPlan struct {
 	colNames         [][]string
 	aliasNames       []string
 	exprNames        []string
+	exceptNames      []string
 	wildcardEmitters map[string]bool
 	aliasFields      ast.Fields
 	exprFields       ast.Fields
@@ -41,6 +42,12 @@ func (p ProjectPlan) Init() *ProjectPlan {
 			switch ft := field.Expr.(type) {
 			case *ast.Wildcard:
 				p.allWildcard = true
+				// TODO: fix Prunecolums
+				p.exceptNames = ft.Except
+				for _, replace := range ft.Replace {
+					p.aliasFields = append(p.aliasFields, replace)
+					p.aliasNames = append(p.aliasNames, replace.AName)
+				}
 			case *ast.FieldRef:
 				if ft.Name == "*" {
 					p.wildcardEmitters[string(ft.StreamName)] = true

+ 4 - 4
internal/xsql/collection.go

@@ -274,11 +274,11 @@ func (w *WindowTuples) ToMaps() []map[string]interface{} {
 	}
 }
 
-func (w *WindowTuples) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool) {
+func (w *WindowTuples) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool, except []string) {
 	cols = w.AffiliateRow.Pick(cols)
 	for i, t := range w.Content {
 		tc := t.Clone()
-		tc.Pick(allWildcard, cols, wildcardEmitters)
+		tc.Pick(allWildcard, cols, wildcardEmitters, except)
 		w.Content[i] = tc.(TupleRow)
 	}
 }
@@ -412,11 +412,11 @@ func (s *JoinTuples) ToMaps() []map[string]interface{} {
 	}
 }
 
-func (s *JoinTuples) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool) {
+func (s *JoinTuples) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool, except []string) {
 	cols = s.AffiliateRow.Pick(cols)
 	for i, t := range s.Content {
 		tc := t.Clone().(*JoinTuple)
-		tc.Pick(allWildcard, cols, wildcardEmitters)
+		tc.Pick(allWildcard, cols, wildcardEmitters, except)
 		s.Content[i] = tc
 	}
 }

+ 4 - 0
internal/xsql/lexical.go

@@ -227,6 +227,10 @@ func (s *Scanner) ScanIdent() (tok ast.Token, lit string) {
 		return ast.OVER, lit
 	case "PARTITION":
 		return ast.PARTITION, lit
+	case "REPLACE":
+		return ast.REPLACE, lit
+	case "EXCEPT":
+		return ast.EXCEPT, lit
 	case "TRUE":
 		return ast.TRUE, lit
 	case "FALSE":

+ 63 - 4
internal/xsql/parser.go

@@ -1568,12 +1568,71 @@ func (p *Parser) parseFilter() (ast.Expr, error) {
 }
 
 func (p *Parser) parseAsterisk() (ast.Expr, error) {
-	switch p.inFunc {
-	case "mqtt", "meta":
+	if p.inFunc == "mqtt" || p.inFunc == "meta" {
+		tok, _ := p.scanIgnoreWhitespace()
+		if tok == ast.EXCEPT || tok == ast.REPLACE {
+			return nil, fmt.Errorf("%q is not supported in meta function", tok)
+		}
+		p.unscan()
 		return &ast.MetaRef{StreamName: ast.DefaultStream, Name: "*"}, nil
-	default:
-		return &ast.Wildcard{Token: ast.ASTERISK}, nil
 	}
+
+	w := ast.Wildcard{Token: ast.ASTERISK}
+loop:
+	for {
+		tok, _ := p.scanIgnoreWhitespace()
+		switch tok {
+		case ast.EXCEPT:
+			if tok1, lit := p.scanIgnoreWhitespace(); tok1 != ast.LPAREN {
+				return nil, fmt.Errorf("Found %q after EXCEPT, expect left parentheses.", lit)
+			}
+			fieldNames := make([]string, 0)
+		except:
+			for {
+				tok, lit := p.scanIgnoreWhitespace()
+				switch tok {
+				case ast.IDENT:
+					fieldNames = append(fieldNames, lit)
+				case ast.COMMA:
+					continue except
+				case ast.RPAREN:
+					break except
+				default:
+					return nil, fmt.Errorf("Found %q in EXCEPT", lit)
+				}
+			}
+			w.Except = fieldNames
+		case ast.REPLACE:
+			if tok1, lit := p.scanIgnoreWhitespace(); tok1 != ast.LPAREN {
+				return nil, fmt.Errorf("Found %q after REPLACE, expect left parentheses.", lit)
+			}
+			var fields ast.Fields
+		replace:
+			for {
+				field, err := p.parseField()
+
+				if err != nil {
+					return nil, err
+				} else {
+					fields = append(fields, *field)
+				}
+
+				tok, lit := p.scanIgnoreWhitespace()
+				if tok == ast.RPAREN {
+					break replace
+				}
+				if tok != ast.COMMA {
+					return nil, fmt.Errorf("Found % q in REPLACE", lit)
+				}
+			}
+			w.Replace = fields
+		default:
+			p.unscan()
+			break loop
+		}
+	}
+
+	return &w, nil
 }
 
 func (p *Parser) inmeta() bool {

+ 276 - 0
internal/xsql/parser_test.go

@@ -340,6 +340,59 @@ func TestParser_ParseStatement(t *testing.T) {
 			},
 		},
 		{
+			s: `SELECT * EXCEPT(a, b, c) FROM tbl`,
+			stmt: &ast.SelectStatement{
+				Fields: []ast.Field{
+					{
+						Expr:  &ast.Wildcard{Token: ast.ASTERISK, Except: []string{"a", "b", "c"}},
+						Name:  "*",
+						AName: "",
+					},
+				},
+				Sources: []ast.Source{&ast.Table{Name: "tbl"}},
+			},
+		},
+		{
+			s: `SELECT * REPLACE(a * 2 AS a, b / 2 AS b) FROM tbl`,
+			stmt: &ast.SelectStatement{
+				Fields: []ast.Field{
+					{
+						Expr: &ast.Wildcard{Token: ast.ASTERISK, Replace: []ast.Field{
+							{
+								AName: "a",
+								Expr: &ast.BinaryExpr{
+									LHS: &ast.FieldRef{
+										Name:       "a",
+										StreamName: ast.DefaultStream,
+									},
+									OP: ast.MUL,
+									RHS: &ast.IntegerLiteral{
+										Val: 2,
+									},
+								},
+							},
+							{
+								AName: "b",
+								Expr: &ast.BinaryExpr{
+									LHS: &ast.FieldRef{
+										Name:       "b",
+										StreamName: ast.DefaultStream,
+									},
+									OP: ast.DIV,
+									RHS: &ast.IntegerLiteral{
+										Val: 2,
+									},
+								},
+							},
+						}},
+						Name:  "*",
+						AName: "",
+					},
+				},
+				Sources: []ast.Source{&ast.Table{Name: "tbl"}},
+			},
+		},
+		{
 			s: `SELECT a,b FROM tbl`,
 			stmt: &ast.SelectStatement{
 				Fields: []ast.Field{
@@ -660,6 +713,69 @@ func TestParser_ParseStatement(t *testing.T) {
 		},
 
 		{
+			s: `SELECT count(* EXCEPT(a, b, c)) FROM tbl`,
+			stmt: &ast.SelectStatement{
+				Fields: []ast.Field{
+					{
+						AName: "",
+						Name:  "count",
+						Expr: &ast.Call{
+							Name:     "count",
+							Args:     []ast.Expr{&ast.Wildcard{Token: ast.ASTERISK, Except: []string{"a", "b", "c"}}},
+							FuncType: ast.FuncTypeAgg,
+						},
+					},
+				},
+				Sources: []ast.Source{&ast.Table{Name: "tbl"}},
+			},
+		},
+
+		{
+			s: `SELECT count(* REPLACE(a * 2 AS a, b / 2 AS b)) FROM tbl`,
+			stmt: &ast.SelectStatement{
+				Fields: []ast.Field{
+					{
+						AName: "",
+						Name:  "count",
+						Expr: &ast.Call{
+							Name: "count",
+							Args: []ast.Expr{&ast.Wildcard{Token: ast.ASTERISK, Replace: []ast.Field{
+								{
+									AName: "a",
+									Expr: &ast.BinaryExpr{
+										LHS: &ast.FieldRef{
+											Name:       "a",
+											StreamName: ast.DefaultStream,
+										},
+										OP: ast.MUL,
+										RHS: &ast.IntegerLiteral{
+											Val: 2,
+										},
+									},
+								},
+								{
+									AName: "b",
+									Expr: &ast.BinaryExpr{
+										LHS: &ast.FieldRef{
+											Name:       "b",
+											StreamName: ast.DefaultStream,
+										},
+										OP: ast.DIV,
+										RHS: &ast.IntegerLiteral{
+											Val: 2,
+										},
+									},
+								},
+							}}},
+							FuncType: ast.FuncTypeAgg,
+						},
+					},
+				},
+				Sources: []ast.Source{&ast.Table{Name: "tbl"}},
+			},
+		},
+
+		{
 			s:    `SELECT count(*, f1) FROM tbl`,
 			stmt: nil,
 			err:  `Expect 1 arguments but found 2.`,
@@ -1952,6 +2068,63 @@ func TestParser_ParseStatement(t *testing.T) {
 		},
 
 		{
+			s: `SELECT * EXCEPT(a, b, c) FROM topic/sensor1 ORDER BY name DESC`,
+			stmt: &ast.SelectStatement{
+				Fields: []ast.Field{
+					{
+						Expr:  &ast.Wildcard{Token: ast.ASTERISK, Except: []string{"a", "b", "c"}},
+						Name:  "*",
+						AName: "",
+					},
+				},
+				Sources:    []ast.Source{&ast.Table{Name: "topic/sensor1"}},
+				SortFields: []ast.SortField{{Uname: "name", Name: "name", Ascending: false, FieldExpr: &ast.FieldRef{Name: "name", StreamName: ast.DefaultStream}}},
+			},
+		},
+
+		{
+			s: `SELECT * REPLACE(a * 2 as a, b / 2 as b) FROM topic/sensor1 ORDER BY name DESC`,
+			stmt: &ast.SelectStatement{
+				Fields: []ast.Field{
+					{
+						Expr: &ast.Wildcard{Token: ast.ASTERISK, Replace: []ast.Field{
+							{
+								AName: "a",
+								Expr: &ast.BinaryExpr{
+									LHS: &ast.FieldRef{
+										Name:       "a",
+										StreamName: ast.DefaultStream,
+									},
+									OP: ast.MUL,
+									RHS: &ast.IntegerLiteral{
+										Val: 2,
+									},
+								},
+							},
+							{
+								AName: "b",
+								Expr: &ast.BinaryExpr{
+									LHS: &ast.FieldRef{
+										Name:       "b",
+										StreamName: ast.DefaultStream,
+									},
+									OP: ast.DIV,
+									RHS: &ast.IntegerLiteral{
+										Val: 2,
+									},
+								},
+							},
+						}},
+						Name:  "*",
+						AName: "",
+					},
+				},
+				Sources:    []ast.Source{&ast.Table{Name: "topic/sensor1"}},
+				SortFields: []ast.SortField{{Uname: "name", Name: "name", Ascending: false, FieldExpr: &ast.FieldRef{Name: "name", StreamName: ast.DefaultStream}}},
+			},
+		},
+
+		{
 			s: `SELECT * FROM topic/sensor1 ORDER BY name DESC, name2 ASC`,
 			stmt: &ast.SelectStatement{
 				Fields: []ast.Field{
@@ -2597,6 +2770,109 @@ func TestParser_ParseStatement(t *testing.T) {
 			},
 		},
 		{
+			s: `SELECT changed_cols("",true,a,* EXCEPT(a, b, c),c) FROM tbl`,
+			stmt: &ast.SelectStatement{
+				Fields: []ast.Field{
+					{
+						AName: "",
+						Name:  "changed_cols",
+						Expr: &ast.Call{
+							Name: "changed_cols",
+							Args: []ast.Expr{
+								&ast.ColFuncField{
+									Name: "",
+									Expr: &ast.StringLiteral{Val: ""},
+								},
+								&ast.ColFuncField{
+									Name: "",
+									Expr: &ast.BooleanLiteral{Val: true},
+								},
+								&ast.ColFuncField{Name: "a", Expr: &ast.FieldRef{
+									StreamName: ast.DefaultStream,
+									Name:       "a",
+								}},
+								&ast.ColFuncField{Name: "*", Expr: &ast.Wildcard{
+									Token:  ast.ASTERISK,
+									Except: []string{"a", "b", "c"},
+								}},
+								&ast.ColFuncField{Name: "c", Expr: &ast.FieldRef{
+									StreamName: ast.DefaultStream,
+									Name:       "c",
+								}},
+							},
+							FuncType: ast.FuncTypeCols,
+						},
+					},
+				},
+				Sources: []ast.Source{&ast.Table{Name: "tbl"}},
+			},
+		},
+		{
+			s: `SELECT changed_cols("",true,a,* REPLACE(a * 2 as a, b / 2 as b),c) FROM tbl`,
+			stmt: &ast.SelectStatement{
+				Fields: []ast.Field{
+					{
+						AName: "",
+						Name:  "changed_cols",
+						Expr: &ast.Call{
+							Name: "changed_cols",
+							Args: []ast.Expr{
+								&ast.ColFuncField{
+									Name: "",
+									Expr: &ast.StringLiteral{Val: ""},
+								},
+								&ast.ColFuncField{
+									Name: "",
+									Expr: &ast.BooleanLiteral{Val: true},
+								},
+								&ast.ColFuncField{Name: "a", Expr: &ast.FieldRef{
+									StreamName: ast.DefaultStream,
+									Name:       "a",
+								}},
+								&ast.ColFuncField{Name: "*", Expr: &ast.Wildcard{
+									Token: ast.ASTERISK,
+									Replace: []ast.Field{
+										{
+											AName: "a",
+											Expr: &ast.BinaryExpr{
+												LHS: &ast.FieldRef{
+													Name:       "a",
+													StreamName: ast.DefaultStream,
+												},
+												OP: ast.MUL,
+												RHS: &ast.IntegerLiteral{
+													Val: 2,
+												},
+											},
+										},
+										{
+											AName: "b",
+											Expr: &ast.BinaryExpr{
+												LHS: &ast.FieldRef{
+													Name:       "b",
+													StreamName: ast.DefaultStream,
+												},
+												OP: ast.DIV,
+												RHS: &ast.IntegerLiteral{
+													Val: 2,
+												},
+											},
+										},
+									},
+								}},
+								&ast.ColFuncField{Name: "c", Expr: &ast.FieldRef{
+									StreamName: ast.DefaultStream,
+									Name:       "c",
+								}},
+							},
+							FuncType: ast.FuncTypeCols,
+						},
+					},
+				},
+				Sources: []ast.Source{&ast.Table{Name: "tbl"}},
+			},
+		},
+		{
 			s: `SELECT changed_cols('',true,a,*,c) FROM tbl`,
 			stmt: &ast.SelectStatement{
 				Fields: []ast.Field{

+ 15 - 7
internal/xsql/row.go

@@ -26,7 +26,7 @@ import (
 // The tuple clone should be cheap.
 
 /*
- * Interfaces definition
+ *  Interfaces definition
  */
 
 type Wildcarder interface {
@@ -55,7 +55,7 @@ type Row interface {
 	ToMap() map[string]interface{}
 	// Pick the columns and discard others. It replaces the underlying message with a new value. There are 3 types to pick: column, alias and annonymous expressions.
 	// cols is a list [columnname, tablename]
-	Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool)
+	Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool, except []string)
 }
 
 type CloneAbleRow interface {
@@ -421,7 +421,7 @@ func (t *Tuple) FuncValue(key string) (interface{}, bool) {
 	}
 }
 
-func (t *Tuple) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool) {
+func (t *Tuple) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool, except []string) {
 	cols = t.AffiliateRow.Pick(cols)
 	if !allWildcard && wildcardEmitters[t.Emitter] {
 		allWildcard = true
@@ -442,6 +442,14 @@ func (t *Tuple) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[str
 			// invalidate cache, will calculate again
 			t.cachedMap = nil
 		}
+	} else if len(except) > 0 {
+		pickedMap := make(map[string]interface{})
+		for key, mess := range t.Message {
+			if !contains(except, key) {
+				pickedMap[key] = mess
+			}
+		}
+		t.Message = pickedMap
 	}
 }
 
@@ -548,7 +556,7 @@ func (jt *JoinTuple) ToMap() map[string]interface{} {
 	return jt.cachedMap
 }
 
-func (jt *JoinTuple) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool) {
+func (jt *JoinTuple) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool, except []string) {
 	cols = jt.AffiliateRow.Pick(cols)
 	if !allWildcard {
 		if len(cols) > 0 {
@@ -557,7 +565,7 @@ func (jt *JoinTuple) Pick(allWildcard bool, cols [][]string, wildcardEmitters ma
 					continue
 				}
 				nt := tuple.Clone().(TupleRow)
-				nt.Pick(allWildcard, cols, wildcardEmitters)
+				nt.Pick(allWildcard, cols, wildcardEmitters, except)
 				jt.Tuples[i] = nt
 			}
 		} else {
@@ -620,9 +628,9 @@ func (s *GroupedTuples) Clone() CloneAbleRow {
 	return c
 }
 
-func (s *GroupedTuples) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool) {
+func (s *GroupedTuples) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool, except []string) {
 	cols = s.AffiliateRow.Pick(cols)
 	sc := s.Content[0].Clone().(TupleRow)
-	sc.Pick(allWildcard, cols, wildcardEmitters)
+	sc.Pick(allWildcard, cols, wildcardEmitters, except)
 	s.Content[0] = sc
 }

+ 9 - 3
internal/xsql/sqlValidator.go

@@ -204,14 +204,20 @@ func validateExpr(expr ast.Expr, streamName []string) ast.Expr {
 	case *ast.ColFuncField:
 		e.Expr = validateExpr(e.Expr, streamName)
 		return e
+	case *ast.Wildcard:
+		for i, replace := range e.Replace {
+			e.Replace[i].Expr = validateExpr(replace.Expr, streamName)
+		}
+		return e
 	default:
 		return expr
 	}
 }
 
-func contains(streamName []string, name string) bool {
-	for _, s := range streamName {
-		if s == name {
+// Checks whether a slice contains an element
+func contains(s []string, n string) bool {
+	for _, val := range s {
+		if val == n {
 			return true
 		}
 	}

+ 19 - 1
internal/xsql/valuer.go

@@ -462,7 +462,25 @@ func (v *ValuerEval) Eval(expr ast.Expr) interface{} {
 			return nil
 		}
 	case *ast.Wildcard:
-		val, _ := v.Valuer.Value("*", "")
+		// TODO: optimize this to avoid copy twice
+		all, _ := v.Valuer.Value("*", "")
+		al, ok := all.(map[string]interface{})
+		if !ok {
+			return fmt.Errorf("unexpected wildcard value %v", all)
+		}
+		val := make(map[string]interface{})
+		for k, v := range al {
+			if !contains(expr.Except, k) {
+				val[k] = v
+			}
+		}
+		for _, field := range expr.Replace {
+			vi := v.Eval(field.Expr)
+			if e, ok := vi.(error); ok {
+				return e
+			}
+			val[field.AName] = vi
+		}
 		return val
 	case *ast.CaseExpr:
 		return v.evalCase(expr)

+ 3 - 1
pkg/ast/expr.go

@@ -75,7 +75,9 @@ type NumberLiteral struct {
 }
 
 type Wildcard struct {
-	Token Token
+	Token   Token
+	Replace []Field
+	Except  []string
 }
 
 func (pe *ParenExpr) expr() {}

+ 4 - 0
pkg/ast/token.go

@@ -65,6 +65,8 @@ const (
 	NOTBETWEEN
 	LIKE
 	NOTLIKE
+	REPLACE
+	EXCEPT
 
 	operatorEnd
 
@@ -192,6 +194,8 @@ var Tokens = []string{
 	NOTBETWEEN: "NOT BETWEEN",
 	LIKE:       "LIKE",
 	NOTLIKE:    "NOT LIKE",
+	REPLACE:    "REPLACE",
+	EXCEPT:     "EXCEPT",
 
 	DD: "DD",
 	HH: "HH",