瀏覽代碼

feat(stream): passing error for all plans

ngjaying 5 年之前
父節點
當前提交
33252e6459

+ 8 - 0
xsql/ast.go

@@ -963,6 +963,9 @@ func (v *ValuerEval) Eval(expr Expr) interface{} {
 				} else {
 					for i := range expr.Args {
 						args[i] = v.Eval(expr.Args[i])
+						if _, ok := args[i].(error); ok {
+							return args[i]
+						}
 					}
 				}
 			}
@@ -994,9 +997,14 @@ func (v *ValuerEval) evalBinaryExpr(expr *BinaryExpr) interface{} {
 		return v.evalJsonExpr(val, expr.OP, expr.RHS)
 	case []interface{}:
 		return v.evalJsonExpr(val, expr.OP, expr.RHS)
+	case error:
+		return val
 	}
 
 	rhs := v.Eval(expr.RHS)
+	if _, ok := rhs.(error); ok {
+		return rhs
+	}
 	return v.simpleDataEval(lhs, rhs, expr.OP)
 }
 

+ 9 - 3
xsql/plans/aggregate_operator.go

@@ -19,6 +19,8 @@ func (p *AggregatePlan) Apply(ctx api.StreamContext, data interface{}) interface
 	log.Debugf("aggregate plan receive %s", data)
 	var ms []xsql.DataValuer
 	switch input := data.(type) {
+	case error:
+		return input
 	case xsql.DataValuer:
 		ms = append(ms, input)
 	case xsql.WindowTuplesSet:
@@ -39,8 +41,7 @@ func (p *AggregatePlan) Apply(ctx api.StreamContext, data interface{}) interface
 			ms[i] = &t
 		}
 	default:
-		log.Errorf("Expect xsql.Valuer or its array type.")
-		return nil
+		return fmt.Errorf("expect xsql.Valuer or its array type.")
 	}
 
 	result := make(map[string]xsql.GroupedTuples)
@@ -48,7 +49,12 @@ func (p *AggregatePlan) Apply(ctx api.StreamContext, data interface{}) interface
 		var name string
 		ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(m, &xsql.FunctionValuer{})}
 		for _, d := range p.Dimensions {
-			name += fmt.Sprintf("%v,", ve.Eval(d.Expr))
+			r := ve.Eval(d.Expr)
+			if _, ok := r.(error); ok {
+				return r
+			} else {
+				name += fmt.Sprintf("%v,", r)
+			}
 		}
 		if ts, ok := result[name]; !ok {
 			result[name] = xsql.GroupedTuples{m}

+ 54 - 0
xsql/plans/aggregate_test.go

@@ -1,6 +1,7 @@
 package plans
 
 import (
+	"errors"
 	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xsql"
@@ -289,3 +290,56 @@ func TestAggregatePlan_Apply(t *testing.T) {
 		}
 	}
 }
+
+func TestAggregatePlanError(t *testing.T) {
+	tests := []struct {
+		sql    string
+		data   interface{}
+		result error
+	}{
+		{
+			sql:    "SELECT abc FROM tbl group by abc",
+			data:   errors.New("an error from upstream"),
+			result: errors.New("an error from upstream"),
+		},
+
+		{
+			sql: "SELECT abc FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1 * 2",
+			data: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter: "src1",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 1, "f1": "v1"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 2, "f1": "v2"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 3, "f1": "v1"},
+						},
+					},
+				},
+			},
+			result: errors.New("invalid operation string * int64"),
+		},
+	}
+
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestFilterPlanError")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
+	for i, tt := range tests {
+		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
+		if err != nil {
+			t.Errorf("statement parse error %s", err)
+			break
+		}
+
+		pp := &AggregatePlan{Dimensions: stmt.Dimensions.GetGroups()}
+		result := pp.Apply(ctx, tt.data)
+		if !reflect.DeepEqual(tt.result, result) {
+			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
+		}
+	}
+}

+ 29 - 21
xsql/plans/filter_operator.go

@@ -1,6 +1,7 @@
 package plans
 
 import (
+	"fmt"
 	"github.com/emqx/kuiper/xsql"
 	"github.com/emqx/kuiper/xstream/api"
 )
@@ -17,33 +18,39 @@ func (p *FilterPlan) Apply(ctx api.StreamContext, data interface{}) interface{}
 	log := ctx.GetLogger()
 	log.Debugf("filter plan receive %s", data)
 	switch input := data.(type) {
+	case error:
+		return input
 	case xsql.Valuer:
 		ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(input, &xsql.FunctionValuer{})}
-		result, ok := ve.Eval(p.Condition).(bool)
-		if ok {
-			if result {
+		result := ve.Eval(p.Condition)
+		switch r := result.(type) {
+		case error:
+			return r
+		case bool:
+			if r {
 				return input
 			}
-		} else {
-			log.Errorf("invalid condition that returns non-bool value")
+		default:
+			return fmt.Errorf("invalid condition that returns non-bool value")
 		}
 	case xsql.WindowTuplesSet:
 		if len(input) != 1 {
-			log.Infof("WindowTuplesSet with multiple tuples cannot be evaluated")
-			return nil
+			return fmt.Errorf("WindowTuplesSet with multiple tuples cannot be evaluated")
 		}
 		ms := input[0].Tuples
 		r := ms[:0]
 		for _, v := range ms {
 			ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(&v, &xsql.FunctionValuer{})}
-			result, ok := ve.Eval(p.Condition).(bool)
-			if ok {
-				if result {
+			result := ve.Eval(p.Condition)
+			switch val := result.(type) {
+			case error:
+				return val
+			case bool:
+				if val {
 					r = append(r, v)
 				}
-			} else {
-				log.Errorf("invalid condition that returns non-bool value")
-				return nil
+			default:
+				return fmt.Errorf("invalid condition that returns non-bool value")
 			}
 		}
 		if len(r) > 0 {
@@ -55,22 +62,23 @@ func (p *FilterPlan) Apply(ctx api.StreamContext, data interface{}) interface{}
 		r := ms[:0]
 		for _, v := range ms {
 			ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(&v, &xsql.FunctionValuer{})}
-			result, ok := ve.Eval(p.Condition).(bool)
-			if ok {
-				if result {
+			result := ve.Eval(p.Condition)
+			switch val := result.(type) {
+			case error:
+				return val
+			case bool:
+				if val {
 					r = append(r, v)
 				}
-			} else {
-				log.Errorf("invalid condition that returns non-bool value")
-				return nil
+			default:
+				return fmt.Errorf("invalid condition that returns non-bool value")
 			}
 		}
 		if len(r) > 0 {
 			return r
 		}
 	default:
-		log.Errorf("Expect xsql.Valuer or its array type.")
-		return nil
+		return fmt.Errorf("Expect xsql.Valuer or its array type.")
 	}
 	return nil
 }

+ 140 - 1
xsql/plans/filter_test.go

@@ -1,6 +1,7 @@
 package plans
 
 import (
+	"errors"
 	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xsql"
@@ -26,7 +27,22 @@ func TestFilterPlan_Apply(t *testing.T) {
 			},
 			result: nil,
 		},
-
+		// nil equals nil?
+		{
+			sql: "SELECT a FROM tbl WHERE def = ghi",
+			data: &xsql.Tuple{
+				Emitter: "tbl",
+				Message: xsql.Message{
+					"a": int64(6),
+				},
+			},
+			result: &xsql.Tuple{
+				Emitter: "tbl",
+				Message: xsql.Message{
+					"a": int64(6),
+				},
+			},
+		},
 		{
 			sql: "SELECT * FROM tbl WHERE abc > def and abc <= ghi",
 			data: &xsql.Tuple{
@@ -234,3 +250,126 @@ func TestFilterPlan_Apply(t *testing.T) {
 		}
 	}
 }
+
+func TestFilterPlanError(t *testing.T) {
+	tests := []struct {
+		sql    string
+		data   interface{}
+		result interface{}
+	}{
+		{
+			sql: "SELECT a FROM tbl WHERE a = b",
+			data: &xsql.Tuple{
+				Emitter: "tbl",
+				Message: xsql.Message{
+					"a": int64(6),
+					"b": "astring",
+				},
+			},
+			result: errors.New("invalid operation int64 = string"),
+		},
+		{
+			sql:    "SELECT a FROM tbl WHERE def = ghi",
+			data:   errors.New("an error from upstream"),
+			result: errors.New("an error from upstream"),
+		},
+		{
+			sql: "SELECT abc FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
+			data: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter: "src1",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 1, "f1": "v1"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 2, "f1": "v2"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 3, "f1": "v1"},
+						},
+					},
+				},
+				xsql.WindowTuples{
+					Emitter: "src2",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 1, "f1": "v1"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 2, "f1": "v2"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 3, "f1": "v1"},
+						},
+					},
+				},
+			},
+			result: errors.New("WindowTuplesSet with multiple tuples cannot be evaluated"),
+		},
+
+		{
+			sql: "SELECT abc FROM src1 WHERE f1 = \"v8\" GROUP BY TUMBLINGWINDOW(ss, 10)",
+			data: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter: "src1",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 1, "f1": "v1"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 2, "f1": 3},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 3, "f1": "v1"},
+						},
+					},
+				},
+			},
+			result: errors.New("invalid operation int64 = string"),
+		},
+		{
+			sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
+			data: xsql.JoinTupleSets{
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": 50}},
+						{Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
+						{Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v1"}},
+					},
+				},
+			},
+			result: errors.New("invalid operation int64 = string"),
+		},
+	}
+
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestAggregatePlan_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
+	for i, tt := range tests {
+		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
+		if err != nil {
+			t.Errorf("statement parse error %s", err)
+			break
+		}
+
+		pp := &FilterPlan{Condition: stmt.Condition}
+		result := pp.Apply(ctx, tt.data)
+		if !reflect.DeepEqual(tt.result, result) {
+			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
+		}
+	}
+}

+ 29 - 23
xsql/plans/having_operator.go

@@ -1,6 +1,7 @@
 package plans
 
 import (
+	"fmt"
 	"github.com/emqx/kuiper/xsql"
 	"github.com/emqx/kuiper/xstream/api"
 )
@@ -13,42 +14,46 @@ func (p *HavingPlan) Apply(ctx api.StreamContext, data interface{}) interface{}
 	log := ctx.GetLogger()
 	log.Debugf("having plan receive %s", data)
 	switch input := data.(type) {
+	case error:
+		return input
 	case xsql.GroupedTuplesSet:
 		r := xsql.GroupedTuplesSet{}
 		for _, v := range input {
 			ve := &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(v, &xsql.FunctionValuer{}, &xsql.AggregateFunctionValuer{Data: v})}
-			result, ok := ve.Eval(p.Condition).(bool)
-			if ok {
-				if result {
+			result := ve.Eval(p.Condition)
+			switch val := result.(type) {
+			case error:
+				return val
+			case bool:
+				if val {
 					r = append(r, v)
 				}
-			} else {
-				log.Errorf("invalid condition that returns non-bool value")
-				return nil
+			default:
+				return fmt.Errorf("invalid condition that returns non-bool value")
 			}
-
 		}
 		if len(r) > 0 {
 			return r
 		}
 	case xsql.WindowTuplesSet:
 		if len(input) != 1 {
-			log.Infof("WindowTuplesSet with multiple tuples cannot be evaluated")
-			return nil
+			return fmt.Errorf("WindowTuplesSet with multiple tuples cannot be evaluated")
 		}
 		ms := input[0].Tuples
 		r := ms[:0]
 		for _, v := range ms {
 			//ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(&v, &xsql.FunctionValuer{})}
 			ve := &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(input, &v, &xsql.FunctionValuer{}, &xsql.AggregateFunctionValuer{Data: input}, &xsql.WildcardValuer{Data: &v})}
-			result, ok := ve.Eval(p.Condition).(bool)
-			if ok {
-				if result {
+			result := ve.Eval(p.Condition)
+			switch val := result.(type) {
+			case error:
+				return val
+			case bool:
+				if val {
 					r = append(r, v)
 				}
-			} else {
-				log.Errorf("invalid condition that returns non-bool value")
-				return nil
+			default:
+				return fmt.Errorf("invalid condition that returns non-bool value")
 			}
 		}
 		if len(r) > 0 {
@@ -61,22 +66,23 @@ func (p *HavingPlan) Apply(ctx api.StreamContext, data interface{}) interface{}
 		for _, v := range ms {
 			//ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(&v, &xsql.FunctionValuer{})}
 			ve := &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(input, &v, &xsql.FunctionValuer{}, &xsql.AggregateFunctionValuer{Data: input}, &xsql.WildcardValuer{Data: &v})}
-			result, ok := ve.Eval(p.Condition).(bool)
-			if ok {
-				if result {
+			result := ve.Eval(p.Condition)
+			switch val := result.(type) {
+			case error:
+				return val
+			case bool:
+				if val {
 					r = append(r, v)
 				}
-			} else {
-				log.Errorf("invalid condition that returns non-bool value")
-				return nil
+			default:
+				return fmt.Errorf("invalid condition that returns non-bool value")
 			}
 		}
 		if len(r) > 0 {
 			return r
 		}
 	default:
-		log.Errorf("Expect xsql.Valuer or its array type.")
-		return nil
+		return fmt.Errorf("expect xsql.Valuer or its array type")
 	}
 	return nil
 }

+ 52 - 0
xsql/plans/having_test.go

@@ -1,6 +1,7 @@
 package plans
 
 import (
+	"errors"
 	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xsql"
@@ -156,3 +157,54 @@ func TestHavingPlan_Apply(t *testing.T) {
 		}
 	}
 }
+
+func TestHavingPlanError(t *testing.T) {
+	var tests = []struct {
+		sql    string
+		data   interface{}
+		result interface{}
+	}{
+		{
+			sql: `SELECT id1 FROM src1 HAVING avg(id1) > "str"`,
+			data: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter: "src1",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 1, "f1": "v1"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 2, "f1": "v2"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 5, "f1": "v1"},
+						},
+					},
+				},
+			},
+			result: errors.New("invalid operation int64 > string"),
+		}, {
+			sql:    `SELECT id1 FROM src1 HAVING avg(id1) > "str"`,
+			data:   errors.New("an error from upstream"),
+			result: errors.New("an error from upstream"),
+		},
+	}
+
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestHavingPlan_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
+	for i, tt := range tests {
+		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
+		if err != nil {
+			t.Errorf("statement parse error %s", err)
+			break
+		}
+
+		pp := &HavingPlan{Condition: stmt.Having}
+		result := pp.Apply(ctx, tt.data)
+		if !reflect.DeepEqual(tt.result, result) {
+			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
+		}
+	}
+}

+ 8 - 8
xsql/plans/join_operator.go

@@ -18,16 +18,16 @@ type JoinPlan struct {
 func (jp *JoinPlan) Apply(ctx api.StreamContext, data interface{}) interface{} {
 	log := ctx.GetLogger()
 	var input xsql.WindowTuplesSet
-	if d, ok := data.(xsql.WindowTuplesSet); !ok {
-		log.Errorf("Expect WindowTuplesSet type.\n")
-		return nil
-	} else {
-		log.Debugf("join plan receive %v", d)
-		input = d
+	switch v := data.(type) {
+	case error:
+		return input
+	case xsql.WindowTuplesSet:
+		input = v
+		log.Debugf("join plan receive %v", data)
+	default:
+		return fmt.Errorf("join is only supported in window")
 	}
-
 	result := xsql.JoinTupleSets{}
-
 	for i, join := range jp.Joins {
 		if i == 0 {
 			v, err := jp.evalSet(input, join)

+ 152 - 0
xsql/plans/join_test.go

@@ -73,6 +73,60 @@ func TestLeftJoinPlan_Apply(t *testing.T) {
 			},
 		},
 		{
+			sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
+			data: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter: "src1",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 1, "f1": "v1"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 2, "f1": "v2"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 3, "f1": "v3"},
+						},
+					},
+				},
+
+				xsql.WindowTuples{
+					Emitter: "src2",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src2",
+							Message: xsql.Message{"id2": 1, "f2": "w1"},
+						}, {
+							Emitter: "src2",
+							Message: xsql.Message{"f2": "w2"},
+						}, {
+							Emitter: "src2",
+							Message: xsql.Message{"id2": 4, "f2": "w3"},
+						},
+					},
+				},
+			},
+			result: xsql.JoinTupleSets{
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
+						{Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
+					},
+				},
+			},
+		},
+		{
 			sql: "SELECT id1 FROM src1 left join src2 on src1.ts = src2.ts",
 			data: xsql.WindowTuplesSet{
 				xsql.WindowTuples{
@@ -670,7 +724,50 @@ func TestInnerJoinPlan_Apply(t *testing.T) {
 				},
 			},
 		},
+		{
+			sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
+			data: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter: "src1",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 1, "f1": "v1"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 2, "f1": "v2"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 3, "f1": "v3"},
+						},
+					},
+				},
 
+				xsql.WindowTuples{
+					Emitter: "src2",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src2",
+							Message: xsql.Message{"id2": 1, "f2": "w1"},
+						}, {
+							Emitter: "src2",
+							Message: xsql.Message{"f2": "w2"},
+						}, {
+							Emitter: "src2",
+							Message: xsql.Message{"id2": 4, "f2": "w3"},
+						},
+					},
+				},
+			},
+			result: xsql.JoinTupleSets{
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
+						{Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
+					},
+				},
+			},
+		},
 		{
 			sql: "SELECT id1 FROM src1 As s1 inner join src2 as s2 on s1.id1 = s2.id2",
 			data: xsql.WindowTuplesSet{
@@ -1099,7 +1196,62 @@ func TestRightJoinPlan_Apply(t *testing.T) {
 				},
 			},
 		},
+		{
+			sql: "SELECT id1 FROM src1 right join src2 on src1.id1 = src2.id2",
+			data: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter: "src1",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 1, "f1": "v1"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 2, "f1": "v2"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 3, "f1": "v3"},
+						},
+					},
+				},
 
+				xsql.WindowTuples{
+					Emitter: "src2",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src2",
+							Message: xsql.Message{"id2": 1, "f2": "w1"},
+						}, {
+							Emitter: "src2",
+							Message: xsql.Message{"f2": "w2"},
+						}, {
+							Emitter: "src2",
+							Message: xsql.Message{"id2": 4, "f2": "w3"},
+						},
+					},
+				},
+			},
+			result: xsql.JoinTupleSets{
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
+						{Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
+					},
+				},
+
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src2", Message: xsql.Message{"f2": "w2"}},
+					},
+				},
+
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
+					},
+				},
+			},
+		},
 		{
 			sql: "SELECT id1 FROM src1 right join src2 on src1.id1 = src2.id2",
 			data: xsql.WindowTuplesSet{

+ 4 - 2
xsql/plans/order_operator.go

@@ -1,6 +1,7 @@
 package plans
 
 import (
+	"fmt"
 	"github.com/emqx/kuiper/xsql"
 	"github.com/emqx/kuiper/xstream/api"
 )
@@ -18,13 +19,14 @@ func (p *OrderPlan) Apply(ctx api.StreamContext, data interface{}) interface{} {
 	log.Debugf("order plan receive %s", data)
 	sorter := xsql.OrderedBy(p.SortFields)
 	switch input := data.(type) {
+	case error:
+		return input
 	case xsql.Valuer:
 		return input
 	case xsql.SortingData:
 		sorter.Sort(input)
 		return input
 	default:
-		log.Errorf("Expect xsql.Valuer or its array type.")
-		return nil
+		return fmt.Errorf("Expect xsql.Valuer or its array type.")
 	}
 }

+ 8 - 9
xsql/plans/preprocessor.go

@@ -44,8 +44,7 @@ func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}) interface{
 	log := ctx.GetLogger()
 	tuple, ok := data.(*xsql.Tuple)
 	if !ok {
-		log.Errorf("Expect tuple data type")
-		return nil
+		return fmt.Errorf("expect tuple data type")
 	}
 
 	log.Debugf("preprocessor receive %s", tuple.Message)
@@ -55,8 +54,7 @@ func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}) interface{
 		for _, f := range p.streamStmt.StreamFields {
 			fname := strings.ToLower(f.Name)
 			if e := p.addRecField(f.FieldType, result, tuple.Message, fname); e != nil {
-				log.Errorf("error in preprocessor: %s", e)
-				return nil
+				return fmt.Errorf("error in preprocessor: %s", e)
 			}
 		}
 	} else {
@@ -68,7 +66,10 @@ func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}) interface{
 	for _, f := range p.fields {
 		if f.AName != "" && (!xsql.HasAggFuncs(f.Expr)) {
 			ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(tuple, &xsql.FunctionValuer{})}
-			if v := ve.Eval(f.Expr); v != nil {
+			v := ve.Eval(f.Expr)
+			if _, ok := v.(error); ok {
+				return v
+			} else {
 				result[strings.ToLower(f.AName)] = v
 			}
 		}
@@ -78,15 +79,13 @@ func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}) interface{
 	if p.isEventTime {
 		if t, ok := result[p.timestampField]; ok {
 			if ts, err := common.InterfaceToUnixMilli(t, p.timestampFormat); err != nil {
-				log.Errorf("cannot convert timestamp field %s to timestamp with error %v", p.timestampField, err)
-				return nil
+				return fmt.Errorf("cannot convert timestamp field %s to timestamp with error %v", p.timestampField, err)
 			} else {
 				tuple.Timestamp = ts
 				log.Debugf("preprocessor calculate timstamp %d", tuple.Timestamp)
 			}
 		} else {
-			log.Errorf("cannot find timestamp field %s in tuple %v", p.timestampField, result)
-			return nil
+			return fmt.Errorf("cannot find timestamp field %s in tuple %v", p.timestampField, result)
 		}
 	}
 	return tuple

+ 79 - 6
xsql/plans/preprocessor_test.go

@@ -2,6 +2,7 @@ package plans
 
 import (
 	"encoding/json"
+	"errors"
 	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xsql"
@@ -28,7 +29,7 @@ func TestPreprocessor_Apply(t *testing.T) {
 				},
 			},
 			data:   []byte(`{"a": 6}`),
-			result: nil,
+			result: errors.New("error in preprocessor: invalid data map[a:%!s(float64=6)], field abc not found"),
 		},
 		{
 			stmt: &xsql.StreamStmt{
@@ -117,7 +118,7 @@ func TestPreprocessor_Apply(t *testing.T) {
 				},
 			},
 			data:   []byte(`{"abc": 77, "def" : "hello"}`),
-			result: nil,
+			result: errors.New("error in preprocessor: invalid data type for def, expect boolean but found hello"),
 		},
 		{
 			stmt: &xsql.StreamStmt{
@@ -128,7 +129,7 @@ func TestPreprocessor_Apply(t *testing.T) {
 				},
 			},
 			data:   []byte(`{"a": {"b" : "hello"}}`),
-			result: nil,
+			result: errors.New("error in preprocessor: invalid data map[a:map[b:hello]], field abc not found"),
 		},
 		{
 			stmt: &xsql.StreamStmt{
@@ -378,7 +379,7 @@ func TestPreprocessorTime_Apply(t *testing.T) {
 				},
 			},
 			data:   []byte(`{"abc": "2019-09-19T00:55:1dd5Z", "def" : 111568854573431}`),
-			result: nil,
+			result: errors.New("error in preprocessor: invalid data type for abc, cannot convert to datetime: parsing time \"2019-09-19T00:55:1dd5Z\" as \"2006-01-02T15:04:05.000Z07:00\": cannot parse \"1dd5Z\" as \"05\""),
 		},
 		{
 			stmt: &xsql.StreamStmt{
@@ -537,7 +538,7 @@ func TestPreprocessorEventtime_Apply(t *testing.T) {
 				},
 			},
 			data:   []byte(`{"abc": true}`),
-			result: nil,
+			result: errors.New("cannot convert timestamp field abc to timestamp with error unsupported type to convert to timestamp true"),
 		},
 		{
 			stmt: &xsql.StreamStmt{
@@ -611,7 +612,7 @@ func TestPreprocessorEventtime_Apply(t *testing.T) {
 				},
 			},
 			data:   []byte(`{"abc": 34, "def" : "2019-09-23AT02:47:29", "ghi": 50}`),
-			result: nil,
+			result: errors.New("cannot convert timestamp field def to timestamp with error parsing time \"2019-09-23AT02:47:29\" as \"2006-01-02PM15:04:05\": cannot parse \"02:47:29\" as \"PM\""),
 		},
 	}
 
@@ -647,3 +648,75 @@ func TestPreprocessorEventtime_Apply(t *testing.T) {
 
 	}
 }
+
+func TestPreprocessorError(t *testing.T) {
+	tests := []struct {
+		stmt   *xsql.StreamStmt
+		data   []byte
+		result interface{}
+	}{
+		{
+			stmt: &xsql.StreamStmt{
+				Name: xsql.StreamName("demo"),
+				StreamFields: []xsql.StreamField{
+					{Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
+				},
+			},
+			data:   []byte(`{"abc": "dafsad"}`),
+			result: errors.New("error in preprocessor: invalid data type for abc, expect bigint but found dafsad"),
+		}, {
+			stmt: &xsql.StreamStmt{
+				Name: xsql.StreamName("demo"),
+				StreamFields: []xsql.StreamField{
+					{Name: "a", FieldType: &xsql.RecType{
+						StreamFields: []xsql.StreamField{
+							{Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
+						},
+					}},
+				},
+			},
+			data:   []byte(`{"a": {"d" : "hello"}}`),
+			result: errors.New("error in preprocessor: invalid data map[d:hello], field b not found"),
+		}, {
+			stmt: &xsql.StreamStmt{
+				Name: xsql.StreamName("demo"),
+				StreamFields: []xsql.StreamField{
+					{Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
+				},
+				Options: map[string]string{
+					"DATASOURCE":       "users",
+					"FORMAT":           "AVRO",
+					"KEY":              "USERID",
+					"CONF_KEY":         "srv1",
+					"TYPE":             "MQTT",
+					"TIMESTAMP":        "abc",
+					"TIMESTAMP_FORMAT": "yyyy-MM-dd''T''HH:mm:ssX'",
+				},
+			},
+			data:   []byte(`{"abc": "not a time"}`),
+			result: errors.New("error in preprocessor: invalid data type for abc, expect bigint but found not a time"),
+		},
+	}
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+
+	defer common.CloseLogger()
+	contextLogger := common.Log.WithField("rule", "TestPreprocessorError")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
+	for i, tt := range tests {
+
+		pp := &Preprocessor{streamStmt: tt.stmt}
+
+		dm := make(map[string]interface{})
+		if e := json.Unmarshal(tt.data, &dm); e != nil {
+			log.Fatal(e)
+			return
+		} else {
+			tuple := &xsql.Tuple{Message: dm}
+			result := pp.Apply(ctx, tuple)
+			if !reflect.DeepEqual(tt.result, result) {
+				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
+			}
+		}
+
+	}
+}

+ 33 - 11
xsql/plans/project_operator.go

@@ -25,9 +25,15 @@ func (pp *ProjectPlan) Apply(ctx api.StreamContext, data interface{}) interface{
 	log.Debugf("project plan receive %s", data)
 	var results []map[string]interface{}
 	switch input := data.(type) {
+	case error:
+		return input
 	case *xsql.Tuple:
 		ve := pp.getVE(input, input)
-		results = append(results, project(pp.Fields, ve, pp.isTest))
+		if r, err := project(pp.Fields, ve, pp.isTest); err != nil {
+			return err
+		} else {
+			results = append(results, r)
+		}
 	case xsql.WindowTuplesSet:
 		if len(input) != 1 {
 			log.Infof("WindowTuplesSet with multiple tuples cannot be evaluated")
@@ -36,7 +42,11 @@ func (pp *ProjectPlan) Apply(ctx api.StreamContext, data interface{}) interface{
 		ms := input[0].Tuples
 		for _, v := range ms {
 			ve := pp.getVE(&v, input)
-			results = append(results, project(pp.Fields, ve, pp.isTest))
+			if r, err := project(pp.Fields, ve, pp.isTest); err != nil {
+				return err
+			} else {
+				results = append(results, r)
+			}
 			if pp.IsAggregate {
 				break
 			}
@@ -45,7 +55,11 @@ func (pp *ProjectPlan) Apply(ctx api.StreamContext, data interface{}) interface{
 		ms := input
 		for _, v := range ms {
 			ve := pp.getVE(&v, input)
-			results = append(results, project(pp.Fields, ve, pp.isTest))
+			if r, err := project(pp.Fields, ve, pp.isTest); err != nil {
+				return err
+			} else {
+				results = append(results, r)
+			}
 			if pp.IsAggregate {
 				break
 			}
@@ -53,18 +67,20 @@ func (pp *ProjectPlan) Apply(ctx api.StreamContext, data interface{}) interface{
 	case xsql.GroupedTuplesSet:
 		for _, v := range input {
 			ve := pp.getVE(v[0], v)
-			results = append(results, project(pp.Fields, ve, pp.isTest))
+			if r, err := project(pp.Fields, ve, pp.isTest); err != nil {
+				return err
+			} else {
+				results = append(results, r)
+			}
 		}
 	default:
-		log.Errorf("Expect xsql.Valuer or its array type")
-		return nil
+		return fmt.Errorf("Expect xsql.Valuer or its array type")
 	}
 
 	if ret, err := json.Marshal(results); err == nil {
 		return ret
 	} else {
-		fmt.Printf("Found error: %v", err)
-		return nil
+		return fmt.Errorf("Found error: %v", err)
 	}
 }
 
@@ -76,16 +92,22 @@ func (pp *ProjectPlan) getVE(tuple xsql.DataValuer, agg xsql.AggregateData) *xsq
 	}
 }
 
-func project(fs xsql.Fields, ve *xsql.ValuerEval, isTest bool) map[string]interface{} {
+func project(fs xsql.Fields, ve *xsql.ValuerEval, isTest bool) (map[string]interface{}, error) {
 	result := make(map[string]interface{})
 	for _, f := range fs {
 		//Avoid to re-evaluate for non-agg field has alias name, which was already evaluated in pre-processor operator.
 		if f.AName != "" && (!xsql.HasAggFuncs(f.Expr)) && !isTest {
 			fr := &xsql.FieldRef{StreamName: "", Name: f.AName}
 			v := ve.Eval(fr)
+			if e, ok := v.(error); ok {
+				return nil, e
+			}
 			result[f.AName] = v
 		} else {
 			v := ve.Eval(f.Expr)
+			if e, ok := v.(error); ok {
+				return nil, e
+			}
 			if _, ok := f.Expr.(*xsql.Wildcard); ok || f.Name == "*" {
 				switch val := v.(type) {
 				case map[string]interface{}:
@@ -101,7 +123,7 @@ func project(fs xsql.Fields, ve *xsql.ValuerEval, isTest bool) map[string]interf
 						}
 					}
 				default:
-					fmt.Printf("Wildcarder does not return map")
+					return nil, fmt.Errorf("wildcarder does not return map")
 				}
 			} else {
 				if v != nil {
@@ -113,7 +135,7 @@ func project(fs xsql.Fields, ve *xsql.ValuerEval, isTest bool) map[string]interf
 			}
 		}
 	}
-	return result
+	return result, nil
 }
 
 const DEFAULT_FIELD_NAME_PREFIX string = "rengine_field_"

+ 292 - 4
xsql/plans/project_test.go

@@ -2,6 +2,7 @@ package plans
 
 import (
 	"encoding/json"
+	"errors"
 	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xsql"
@@ -42,6 +43,18 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				"ts": "2019-09-19T00:56:13.431Z",
 			}},
 		},
+		//Schemaless may return a message without selecting column
+		{
+			sql: "SELECT ts FROM test",
+			data: &xsql.Tuple{
+				Emitter: "test",
+				Message: xsql.Message{
+					"a":   "val_a",
+					"ts2": common.TimeFromUnixMilli(1568854573431),
+				},
+			},
+			result: []map[string]interface{}{{}},
+		},
 		{
 			sql: "SELECT A FROM test",
 			data: &xsql.Tuple{
@@ -130,7 +143,26 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				"ab": "hello",
 			}},
 		},
-
+		{
+			sql: `SELECT a->b AS ab FROM test`,
+			data: &xsql.Tuple{
+				Emitter: "test",
+				Message: xsql.Message{
+					"name": "name",
+				},
+			},
+			result: []map[string]interface{}{{}},
+		},
+		{
+			sql: `SELECT a->b AS ab FROM test`,
+			data: &xsql.Tuple{
+				Emitter: "test",
+				Message: xsql.Message{
+					"a": "commonstring",
+				},
+			},
+			result: []map[string]interface{}{{}},
+		},
 		{
 			sql: `SELECT a[0]->b AS ab FROM test`,
 			data: &xsql.Tuple{
@@ -146,7 +178,6 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				"ab": "hello1",
 			}},
 		},
-
 		{
 			sql: `SELECT a->c->d AS f1 FROM test`,
 			data: &xsql.Tuple{
@@ -164,6 +195,33 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				"f1": 35.2,
 			}},
 		},
+		{
+			sql: `SELECT a->c->d AS f1 FROM test`,
+			data: &xsql.Tuple{
+				Emitter: "test",
+				Message: xsql.Message{
+					"a": map[string]interface{}{
+						"b": "hello",
+						"c": map[string]interface{}{
+							"e": 35.2,
+						},
+					},
+				},
+			},
+			result: []map[string]interface{}{{}},
+		},
+		{
+			sql: `SELECT a->c->d AS f1 FROM test`,
+			data: &xsql.Tuple{
+				Emitter: "test",
+				Message: xsql.Message{
+					"a": map[string]interface{}{
+						"b": "hello",
+					},
+				},
+			},
+			result: []map[string]interface{}{{}},
+		},
 
 		//The int type is not supported yet, the json parser returns float64 for int values
 		{
@@ -289,7 +347,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
 			}
 		} else {
-			t.Errorf("The returned result is not type of []byte\n")
+			t.Errorf("%d. The returned result is not type of []byte\n", i)
 		}
 	}
 }
@@ -355,6 +413,31 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 			}},
 		},
 		{
+			sql: "SELECT id1 FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
+			data: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter: "src1",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 1, "f1": "v1"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id2": 2, "f1": "v2"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 3, "f1": "v1"},
+						},
+					},
+				},
+			},
+			result: []map[string]interface{}{{
+				"id1": float64(1),
+			}, {}, {
+				"id1": float64(3),
+			}},
+		},
+		{
 			sql: "SELECT * FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
 			data: xsql.WindowTuplesSet{
 				xsql.WindowTuples{
@@ -385,6 +468,36 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 			}},
 		},
 		{
+			sql: "SELECT * FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
+			data: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter: "src1",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 1, "f1": "v1"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id2": 2, "f2": "v2"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 3, "f1": "v1"},
+						},
+					},
+				},
+			},
+			result: []map[string]interface{}{{
+				"id1": float64(1),
+				"f1":  "v1",
+			}, {
+				"id2": float64(2),
+				"f2":  "v2",
+			}, {
+				"id1": float64(3),
+				"f1":  "v1",
+			}},
+		},
+		{
 			sql: "SELECT src1.* FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
 			data: xsql.WindowTuplesSet{
 				xsql.WindowTuples{
@@ -443,7 +556,33 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				"id1": float64(3),
 			}},
 		},
-
+		{
+			sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
+			data: xsql.JoinTupleSets{
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
+						{Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
+						{Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{"id2": 3, "f1": "v1"}},
+					},
+				},
+			},
+			result: []map[string]interface{}{{
+				"id1": float64(1),
+			}, {
+				"id1": float64(2),
+			}, {}},
+		},
 		{
 			sql: "SELECT abc FROM tbl group by abc",
 			data: xsql.GroupedTuplesSet{
@@ -462,6 +601,20 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 			}},
 		},
 		{
+			sql: "SELECT abc FROM tbl group by abc",
+			data: xsql.GroupedTuplesSet{
+				{
+					&xsql.Tuple{
+						Emitter: "tbl",
+						Message: xsql.Message{
+							"def": "hello",
+						},
+					},
+				},
+			},
+			result: []map[string]interface{}{{}},
+		},
+		{
 			sql: "SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1",
 			data: xsql.GroupedTuplesSet{
 				{
@@ -488,6 +641,30 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 			}},
 		},
 		{
+			sql: "SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1",
+			data: xsql.GroupedTuplesSet{
+				{
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 1, "f1": "v1"},
+					},
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 3, "f1": "v1"},
+					},
+				},
+				{
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id2": 2, "f1": "v2"},
+					},
+				},
+			},
+			result: []map[string]interface{}{{
+				"id1": float64(1),
+			}, {}},
+		},
+		{
 			sql: "SELECT src2.id2 FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10)",
 			data: xsql.GroupedTuplesSet{
 				{
@@ -770,6 +947,30 @@ func TestProjectPlan_Funcs(t *testing.T) {
 				"r": float64(123124),
 			}},
 		}, {
+			sql: "SELECT round(a) as r FROM test GROUP BY TumblingWindow(ss, 10)",
+			data: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter: "test",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"a": 53.1},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"b": 27.4},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"a": 123123.7},
+						},
+					},
+				},
+			},
+			result: []map[string]interface{}{{
+				"r": float64(53),
+			}, {}, {
+				"r": float64(123124),
+			}},
+		}, {
 			sql: "SELECT round(a) as r FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
 			data: xsql.JoinTupleSets{
 				xsql.JoinTuple{
@@ -1048,6 +1249,48 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 			result: []map[string]interface{}{{
 				"sum": float64(123203),
 			}},
+		}, {
+			sql: "SELECT sum(a) as sum FROM test GROUP BY TumblingWindow(ss, 10)",
+			data: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter: "test",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"b": 53},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"a": 27},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"a": 123123},
+						},
+					},
+				},
+			},
+			result: []map[string]interface{}{{
+				"sum": float64(123150),
+			}},
+		}, {
+			sql: "SELECT sum(a) as sum FROM test GROUP BY TumblingWindow(ss, 10)",
+			data: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter: "test",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"a": "nan"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"a": 27},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"a": 123123},
+						},
+					},
+				},
+			},
+			result: []map[string]interface{}{{}},
 		},
 	}
 
@@ -1080,3 +1323,48 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 		}
 	}
 }
+
+func TestProjectPlanError(t *testing.T) {
+	var tests = []struct {
+		sql    string
+		data   interface{}
+		result interface{}
+	}{
+		{
+			sql:    "SELECT a FROM test",
+			data:   errors.New("an error from upstream"),
+			result: errors.New("an error from upstream"),
+		}, {
+			sql: "SELECT a * 5 FROM test",
+			data: &xsql.Tuple{
+				Emitter: "test",
+				Message: xsql.Message{
+					"a": "val_a",
+				},
+			},
+			result: errors.New("invalid operation string * int64"),
+		}, {
+			sql: `SELECT a[0]->b AS ab FROM test`,
+			data: &xsql.Tuple{
+				Emitter: "test",
+				Message: xsql.Message{
+					"a": "common string",
+				},
+			},
+			result: errors.New("invalid operation string  *xsql.BracketEvalResult"),
+		},
+	}
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestProjectPlanError")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
+	for i, tt := range tests {
+		stmt, _ := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
+
+		pp := &ProjectPlan{Fields: stmt.Fields}
+		pp.isTest = true
+		result := pp.Apply(ctx, tt.data)
+		if !reflect.DeepEqual(tt.result, result) {
+			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
+		}
+	}
+}