Quellcode durchsuchen

feat(valuer): return error for incompatible types

Modify nil manipulation
Add test cases
ngjaying vor 5 Jahren
Ursprung
Commit
c7ac1d6612
2 geänderte Dateien mit 352 neuen und 105 gelöschten Zeilen
  1. 111 105
      xsql/ast.go
  2. 241 0
      xsql/valuer_eval_test.go

+ 111 - 105
xsql/ast.go

@@ -287,7 +287,7 @@ type StreamStmt struct {
 
 
 func (ss *StreamStmt) node() {}
 func (ss *StreamStmt) node() {}
 func (ss *StreamStmt) Stmt() {}
 func (ss *StreamStmt) Stmt() {}
-func (ss *StreamStmt) isSchemaless() bool{
+func (ss *StreamStmt) isSchemaless() bool {
 	return ss.StreamFields == nil
 	return ss.StreamFields == nil
 }
 }
 
 
@@ -997,18 +997,6 @@ func (v *ValuerEval) evalBinaryExpr(expr *BinaryExpr) interface{} {
 	}
 	}
 
 
 	rhs := v.Eval(expr.RHS)
 	rhs := v.Eval(expr.RHS)
-	if lhs == nil && rhs != nil {
-		// When the LHS is nil and the RHS is a boolean, implicitly cast the
-		// nil to false.
-		if _, ok := rhs.(bool); ok {
-			lhs = false
-		}
-	} else if lhs != nil && rhs == nil {
-		// Implicit cast of the RHS nil to false when the LHS is a boolean.
-		if _, ok := lhs.(bool); ok {
-			rhs = false
-		}
-	}
 	return v.simpleDataEval(lhs, rhs, expr.OP)
 	return v.simpleDataEval(lhs, rhs, expr.OP)
 }
 }
 
 
@@ -1020,12 +1008,10 @@ func (v *ValuerEval) evalJsonExpr(result interface{}, op Token, expr Expr) inter
 				ve := &ValuerEval{Valuer: Message(val)}
 				ve := &ValuerEval{Valuer: Message(val)}
 				return ve.Eval(exp)
 				return ve.Eval(exp)
 			} else {
 			} else {
-				fmt.Printf("The right expression is not a field reference node.\n")
-				return nil
+				return fmt.Errorf("the right expression is not a field reference node")
 			}
 			}
 		default:
 		default:
-			fmt.Printf("%v is an invalid operation.\n", op)
-			return nil
+			return fmt.Errorf("%v is an invalid operation for %T", op, val)
 		}
 		}
 	}
 	}
 
 
@@ -1036,56 +1022,77 @@ func (v *ValuerEval) evalJsonExpr(result interface{}, op Token, expr Expr) inter
 			if berVal, ok1 := ber.(*BracketEvalResult); ok1 {
 			if berVal, ok1 := ber.(*BracketEvalResult); ok1 {
 				if berVal.isIndex() {
 				if berVal.isIndex() {
 					if berVal.Start >= len(val) {
 					if berVal.Start >= len(val) {
-						fmt.Printf("Out of index: %d of %d.\n", berVal.Start, len(val))
-						return nil
+						return fmt.Errorf("out of index: %d of %d", berVal.Start, len(val))
 					}
 					}
 					return val[berVal.Start]
 					return val[berVal.Start]
 				} else {
 				} else {
 					if berVal.Start >= len(val) {
 					if berVal.Start >= len(val) {
-						fmt.Printf("Start value is out of index: %d of %d.\n", berVal.Start, len(val))
-						return nil
+						return fmt.Errorf("start value is out of index: %d of %d", berVal.Start, len(val))
 					}
 					}
 
 
 					if berVal.End >= len(val) {
 					if berVal.End >= len(val) {
-						fmt.Printf("End value is out of index: %d of %d.\n", berVal.End, len(val))
-						return nil
+						return fmt.Errorf("end value is out of index: %d of %d", berVal.End, len(val))
 					}
 					}
 					return val[berVal.Start:berVal.End]
 					return val[berVal.Start:berVal.End]
 				}
 				}
 			} else {
 			} else {
-				fmt.Printf("Invalid evaluation result - %v.\n", berVal)
-				return nil
+				return fmt.Errorf("invalid evaluation result - %v", berVal)
 			}
 			}
 		default:
 		default:
-			fmt.Printf("%v is an invalid operation.\n", op)
-			return nil
+			return fmt.Errorf("%v is an invalid operation for %T", op, val)
 		}
 		}
 	}
 	}
 	return nil
 	return nil
 }
 }
 
 
+//lhs and rhs are non-nil
 func (v *ValuerEval) simpleDataEval(lhs, rhs interface{}, op Token) interface{} {
 func (v *ValuerEval) simpleDataEval(lhs, rhs interface{}, op Token) interface{} {
+	if lhs == nil || rhs == nil {
+		switch op {
+		case EQ, LTE, GTE:
+			if lhs == nil && rhs == nil {
+				return true
+			} else {
+				return false
+			}
+		case NEQ:
+			if lhs == nil && rhs == nil {
+				return false
+			} else {
+				return true
+			}
+		case LT, GT:
+			return false
+		default:
+			return nil
+		}
+	}
 	lhs = convertNum(lhs)
 	lhs = convertNum(lhs)
 	rhs = convertNum(rhs)
 	rhs = convertNum(rhs)
 	// Evaluate if both sides are simple types.
 	// Evaluate if both sides are simple types.
 	switch lhs := lhs.(type) {
 	switch lhs := lhs.(type) {
 	case bool:
 	case bool:
 		rhs, ok := rhs.(bool)
 		rhs, ok := rhs.(bool)
+		if !ok {
+			return invalidOpError(lhs, op, rhs)
+		}
 		switch op {
 		switch op {
 		case AND:
 		case AND:
-			return ok && (lhs && rhs)
+			return lhs && rhs
 		case OR:
 		case OR:
-			return ok && (lhs || rhs)
+			return lhs || rhs
 		case BITWISE_AND:
 		case BITWISE_AND:
-			return ok && (lhs && rhs)
+			return lhs && rhs
 		case BITWISE_OR:
 		case BITWISE_OR:
-			return ok && (lhs || rhs)
+			return lhs || rhs
 		case BITWISE_XOR:
 		case BITWISE_XOR:
-			return ok && (lhs != rhs)
+			return lhs != rhs
 		case EQ:
 		case EQ:
-			return ok && (lhs == rhs)
+			return lhs == rhs
 		case NEQ:
 		case NEQ:
-			return ok && (lhs != rhs)
+			return lhs != rhs
+		default:
+			return invalidOpError(lhs, op, rhs)
 		}
 		}
 	case float64:
 	case float64:
 		// Try the rhs as a float64, int64, or uint64
 		// Try the rhs as a float64, int64, or uint64
@@ -1098,48 +1105,41 @@ func (v *ValuerEval) simpleDataEval(lhs, rhs interface{}, op Token) interface{}
 				rhsf, ok = float64(val), true
 				rhsf, ok = float64(val), true
 			}
 			}
 		}
 		}
-
+		if !ok {
+			return invalidOpError(lhs, op, rhs)
+		}
 		rhs := rhsf
 		rhs := rhsf
 		switch op {
 		switch op {
 		case EQ:
 		case EQ:
-			return ok && (lhs == rhs)
+			return lhs == rhs
 		case NEQ:
 		case NEQ:
-			return ok && (lhs != rhs)
+			return lhs != rhs
 		case LT:
 		case LT:
-			return ok && (lhs < rhs)
+			return lhs < rhs
 		case LTE:
 		case LTE:
-			return ok && (lhs <= rhs)
+			return lhs <= rhs
 		case GT:
 		case GT:
-			return ok && (lhs > rhs)
+			return lhs > rhs
 		case GTE:
 		case GTE:
-			return ok && (lhs >= rhs)
+			return lhs >= rhs
 		case ADD:
 		case ADD:
-			if !ok {
-				return nil
-			}
 			return lhs + rhs
 			return lhs + rhs
 		case SUB:
 		case SUB:
-			if !ok {
-				return nil
-			}
 			return lhs - rhs
 			return lhs - rhs
 		case MUL:
 		case MUL:
-			if !ok {
-				return nil
-			}
 			return lhs * rhs
 			return lhs * rhs
 		case DIV:
 		case DIV:
-			if !ok {
-				return nil
-			} else if rhs == 0 {
-				return float64(0)
+			if rhs == 0 {
+				return fmt.Errorf("divided by zero")
 			}
 			}
 			return lhs / rhs
 			return lhs / rhs
 		case MOD:
 		case MOD:
-			if !ok {
-				return nil
+			if rhs == 0 {
+				return fmt.Errorf("divided by zero")
 			}
 			}
 			return math.Mod(lhs, rhs)
 			return math.Mod(lhs, rhs)
+		default:
+			return invalidOpError(lhs, op, rhs)
 		}
 		}
 	case int64:
 	case int64:
 		// Try as a float64 to see if a float cast is required.
 		// Try as a float64 to see if a float cast is required.
@@ -1167,11 +1167,16 @@ func (v *ValuerEval) simpleDataEval(lhs, rhs interface{}, op Token) interface{}
 				return lhs * rhs
 				return lhs * rhs
 			case DIV:
 			case DIV:
 				if rhs == 0 {
 				if rhs == 0 {
-					return float64(0)
+					return fmt.Errorf("divided by zero")
 				}
 				}
 				return lhs / rhs
 				return lhs / rhs
 			case MOD:
 			case MOD:
+				if rhs == 0 {
+					return fmt.Errorf("divided by zero")
+				}
 				return math.Mod(lhs, rhs)
 				return math.Mod(lhs, rhs)
+			default:
+				return invalidOpError(lhs, op, rhs)
 			}
 			}
 		case int64:
 		case int64:
 			switch op {
 			switch op {
@@ -1196,18 +1201,18 @@ func (v *ValuerEval) simpleDataEval(lhs, rhs interface{}, op Token) interface{}
 			case DIV:
 			case DIV:
 				if v.IntegerFloatDivision {
 				if v.IntegerFloatDivision {
 					if rhs == 0 {
 					if rhs == 0 {
-						return float64(0)
+						return fmt.Errorf("divided by zero")
 					}
 					}
 					return float64(lhs) / float64(rhs)
 					return float64(lhs) / float64(rhs)
 				}
 				}
 
 
 				if rhs == 0 {
 				if rhs == 0 {
-					return int64(0)
+					return fmt.Errorf("divided by zero")
 				}
 				}
 				return lhs / rhs
 				return lhs / rhs
 			case MOD:
 			case MOD:
 				if rhs == 0 {
 				if rhs == 0 {
-					return int64(0)
+					return fmt.Errorf("divided by zero")
 				}
 				}
 				return lhs % rhs
 				return lhs % rhs
 			case BITWISE_AND:
 			case BITWISE_AND:
@@ -1216,6 +1221,8 @@ func (v *ValuerEval) simpleDataEval(lhs, rhs interface{}, op Token) interface{}
 				return lhs | rhs
 				return lhs | rhs
 			case BITWISE_XOR:
 			case BITWISE_XOR:
 				return lhs ^ rhs
 				return lhs ^ rhs
+			default:
+				return invalidOpError(lhs, op, rhs)
 			}
 			}
 		case uint64:
 		case uint64:
 			switch op {
 			switch op {
@@ -1251,12 +1258,12 @@ func (v *ValuerEval) simpleDataEval(lhs, rhs interface{}, op Token) interface{}
 				return uint64(lhs) * rhs
 				return uint64(lhs) * rhs
 			case DIV:
 			case DIV:
 				if rhs == 0 {
 				if rhs == 0 {
-					return uint64(0)
+					return fmt.Errorf("divided by zero")
 				}
 				}
 				return uint64(lhs) / rhs
 				return uint64(lhs) / rhs
 			case MOD:
 			case MOD:
 				if rhs == 0 {
 				if rhs == 0 {
-					return uint64(0)
+					return fmt.Errorf("divided by zero")
 				}
 				}
 				return uint64(lhs) % rhs
 				return uint64(lhs) % rhs
 			case BITWISE_AND:
 			case BITWISE_AND:
@@ -1265,7 +1272,11 @@ func (v *ValuerEval) simpleDataEval(lhs, rhs interface{}, op Token) interface{}
 				return uint64(lhs) | rhs
 				return uint64(lhs) | rhs
 			case BITWISE_XOR:
 			case BITWISE_XOR:
 				return uint64(lhs) ^ rhs
 				return uint64(lhs) ^ rhs
+			default:
+				return invalidOpError(lhs, op, rhs)
 			}
 			}
+		default:
+			return invalidOpError(lhs, op, rhs)
 		}
 		}
 	case uint64:
 	case uint64:
 		// Try as a float64 to see if a float cast is required.
 		// Try as a float64 to see if a float cast is required.
@@ -1293,11 +1304,16 @@ func (v *ValuerEval) simpleDataEval(lhs, rhs interface{}, op Token) interface{}
 				return lhs * rhs
 				return lhs * rhs
 			case DIV:
 			case DIV:
 				if rhs == 0 {
 				if rhs == 0 {
-					return float64(0)
+					return fmt.Errorf("divided by zero")
 				}
 				}
 				return lhs / rhs
 				return lhs / rhs
 			case MOD:
 			case MOD:
+				if rhs == 0 {
+					return fmt.Errorf("divided by zero")
+				}
 				return math.Mod(lhs, rhs)
 				return math.Mod(lhs, rhs)
+			default:
+				return invalidOpError(lhs, op, rhs)
 			}
 			}
 		case int64:
 		case int64:
 			switch op {
 			switch op {
@@ -1333,12 +1349,12 @@ func (v *ValuerEval) simpleDataEval(lhs, rhs interface{}, op Token) interface{}
 				return lhs * uint64(rhs)
 				return lhs * uint64(rhs)
 			case DIV:
 			case DIV:
 				if rhs == 0 {
 				if rhs == 0 {
-					return uint64(0)
+					return fmt.Errorf("divided by zero")
 				}
 				}
 				return lhs / uint64(rhs)
 				return lhs / uint64(rhs)
 			case MOD:
 			case MOD:
 				if rhs == 0 {
 				if rhs == 0 {
-					return uint64(0)
+					return fmt.Errorf("divided by zero")
 				}
 				}
 				return lhs % uint64(rhs)
 				return lhs % uint64(rhs)
 			case BITWISE_AND:
 			case BITWISE_AND:
@@ -1347,6 +1363,8 @@ func (v *ValuerEval) simpleDataEval(lhs, rhs interface{}, op Token) interface{}
 				return lhs | uint64(rhs)
 				return lhs | uint64(rhs)
 			case BITWISE_XOR:
 			case BITWISE_XOR:
 				return lhs ^ uint64(rhs)
 				return lhs ^ uint64(rhs)
+			default:
+				return invalidOpError(lhs, op, rhs)
 			}
 			}
 		case uint64:
 		case uint64:
 			switch op {
 			switch op {
@@ -1370,12 +1388,12 @@ func (v *ValuerEval) simpleDataEval(lhs, rhs interface{}, op Token) interface{}
 				return lhs * rhs
 				return lhs * rhs
 			case DIV:
 			case DIV:
 				if rhs == 0 {
 				if rhs == 0 {
-					return uint64(0)
+					return fmt.Errorf("divided by zero")
 				}
 				}
 				return lhs / rhs
 				return lhs / rhs
 			case MOD:
 			case MOD:
 				if rhs == 0 {
 				if rhs == 0 {
-					return uint64(0)
+					return fmt.Errorf("divided by zero")
 				}
 				}
 				return lhs % rhs
 				return lhs % rhs
 			case BITWISE_AND:
 			case BITWISE_AND:
@@ -1384,51 +1402,37 @@ func (v *ValuerEval) simpleDataEval(lhs, rhs interface{}, op Token) interface{}
 				return lhs | rhs
 				return lhs | rhs
 			case BITWISE_XOR:
 			case BITWISE_XOR:
 				return lhs ^ rhs
 				return lhs ^ rhs
+			default:
+				return invalidOpError(lhs, op, rhs)
 			}
 			}
+		default:
+			return invalidOpError(lhs, op, rhs)
 		}
 		}
 	case string:
 	case string:
+		rhss, ok := rhs.(string)
+		if !ok {
+			return invalidOpError(lhs, op, rhs)
+		}
 		switch op {
 		switch op {
 		case EQ:
 		case EQ:
-			rhs, ok := rhs.(string)
-			if !ok {
-				return false
-			}
-			return lhs == rhs
+			return lhs == rhss
 		case NEQ:
 		case NEQ:
-			rhs, ok := rhs.(string)
-			if !ok {
-				return false
-			}
-			return lhs != rhs
+			return lhs != rhss
 		case LT:
 		case LT:
-			rhs, ok := rhs.(string)
-			if !ok {
-				return false
-			}
-			return lhs < rhs
+			return lhs < rhss
 		case LTE:
 		case LTE:
-			rhs, ok := rhs.(string)
-			if !ok {
-				return false
-			}
-			return lhs <= rhs
+			return lhs <= rhss
 		case GT:
 		case GT:
-			rhs, ok := rhs.(string)
-			if !ok {
-				return false
-			}
-			return lhs > rhs
+			return lhs > rhss
 		case GTE:
 		case GTE:
-			rhs, ok := rhs.(string)
-			if !ok {
-				return false
-			}
-			return lhs >= rhs
+			return lhs >= rhss
+		default:
+			return invalidOpError(lhs, op, rhs)
 		}
 		}
 	case time.Time:
 	case time.Time:
 		rt, err := common.InterfaceToTime(rhs, "")
 		rt, err := common.InterfaceToTime(rhs, "")
 		if err != nil {
 		if err != nil {
-			return false
+			return invalidOpError(lhs, op, rhs)
 		}
 		}
 		switch op {
 		switch op {
 		case EQ:
 		case EQ:
@@ -1443,16 +1447,18 @@ func (v *ValuerEval) simpleDataEval(lhs, rhs interface{}, op Token) interface{}
 			return lhs.After(rt)
 			return lhs.After(rt)
 		case GTE:
 		case GTE:
 			return lhs.After(rt) || lhs.Equal(rt)
 			return lhs.After(rt) || lhs.Equal(rt)
+		default:
+			return invalidOpError(lhs, op, rhs)
 		}
 		}
+	default:
+		return invalidOpError(lhs, op, rhs)
 	}
 	}
 
 
-	// The types were not comparable. If our operation was an equality operation,
-	// return false instead of true.
-	switch op {
-	case EQ, NEQ, LT, LTE, GT, GTE:
-		return false
-	}
-	return nil
+	return invalidOpError(lhs, op, rhs)
+}
+
+func invalidOpError(lhs interface{}, op Token, rhs interface{}) error {
+	return fmt.Errorf("invalid operation %T %s %T", lhs, tokens[op], rhs)
 }
 }
 
 
 func convertNum(para interface{}) interface{} {
 func convertNum(para interface{}) interface{} {

+ 241 - 0
xsql/valuer_eval_test.go

@@ -0,0 +1,241 @@
+package xsql
+
+import (
+	"errors"
+	"fmt"
+	"github.com/emqx/kuiper/common"
+	"reflect"
+	"strings"
+	"testing"
+)
+
+func TestComparison(t *testing.T) {
+	testTime, _ := common.InterfaceToTime(1541152488442, "")
+	data := []struct {
+		m Message
+		r []interface{}
+	}{
+		{
+			m: map[string]interface{}{
+				"a": float64(32),
+				"b": float64(72),
+			},
+			r: []interface{}{
+				false, true, errors.New("invalid operation float64 = string"),
+				false, true, false, true,
+			},
+		}, {
+			m: map[string]interface{}{
+				"a": int64(32),
+				"b": int64(72),
+			},
+			r: []interface{}{
+				false, true, errors.New("invalid operation int64 = string"),
+				false, true, false, true,
+			},
+		}, {
+			m: map[string]interface{}{
+				"a": "32",
+				"b": "72",
+			},
+			r: []interface{}{
+				errors.New("invalid operation string > int64"), errors.New("invalid operation string <= int64"), false,
+				false, true, false, true,
+			},
+		}, {
+			m: map[string]interface{}{
+				"a": []interface{}{32, 72},
+				"b": []interface{}{32, 72},
+			},
+			r: []interface{}{
+				errors.New("> is an invalid operation for []interface {}"), errors.New("<= is an invalid operation for []interface {}"), errors.New("= is an invalid operation for []interface {}"),
+				errors.New(">= is an invalid operation for []interface {}"), errors.New("< is an invalid operation for []interface {}"), errors.New("= is an invalid operation for []interface {}"), errors.New("!= is an invalid operation for []interface {}"),
+			},
+		}, {
+			m: map[string]interface{}{
+				"a": map[string]interface{}{"c": 5},
+				"b": map[string]interface{}{"d": 5},
+			},
+			r: []interface{}{
+				errors.New("> is an invalid operation for map[string]interface {}"), errors.New("<= is an invalid operation for map[string]interface {}"), errors.New("= is an invalid operation for map[string]interface {}"),
+				errors.New(">= is an invalid operation for map[string]interface {}"), errors.New("< is an invalid operation for map[string]interface {}"), errors.New("= is an invalid operation for map[string]interface {}"), errors.New("!= is an invalid operation for map[string]interface {}"),
+			},
+		}, {
+			m: map[string]interface{}{
+				"a": float64(55),
+				"b": int64(55),
+			},
+			r: []interface{}{
+				false, false, errors.New("invalid operation float64 = string"),
+				true, false, true, false,
+			},
+		}, {
+			m: map[string]interface{}{
+				"a": testTime,
+				"b": int64(1541152388442),
+			},
+			r: []interface{}{
+				true, false, errors.New("invalid operation time.Time = string"),
+				true, false, false, true,
+			},
+		}, {
+			m: map[string]interface{}{
+				"a": testTime,
+				"b": "2020-02-26T02:37:21.822Z",
+			},
+			r: []interface{}{
+				true, false, errors.New("invalid operation time.Time = string"),
+				false, true, false, true,
+			},
+		}, {
+			m: map[string]interface{}{
+				"a": int64(1541152388442),
+				"b": testTime,
+			},
+			r: []interface{}{
+				true, false, errors.New("invalid operation int64 = string"),
+				errors.New("invalid operation int64 >= time.Time"), errors.New("invalid operation int64 < time.Time"), errors.New("invalid operation int64 = time.Time"), errors.New("invalid operation int64 != time.Time"),
+			},
+		}, {
+			m: map[string]interface{}{
+				"a": "2020-02-26T02:37:21.822Z",
+				"b": testTime,
+			},
+			r: []interface{}{
+				errors.New("invalid operation string > int64"), errors.New("invalid operation string <= int64"), false,
+				errors.New("invalid operation string >= time.Time"), errors.New("invalid operation string < time.Time"), errors.New("invalid operation string = time.Time"), errors.New("invalid operation string != time.Time"),
+			},
+		}, {
+			m: map[string]interface{}{
+				"c": "nothing",
+			},
+			r: []interface{}{
+				false, false, false,
+				true, false, true, false,
+			},
+		}, {
+			m: map[string]interface{}{
+				"a": 12,
+				"c": "nothing",
+			},
+			r: []interface{}{
+				false, true, errors.New("invalid operation int64 = string"),
+				false, false, false, true,
+			},
+		},
+	}
+	sqls := []string{
+		"select * from src where a > 72",
+		"select * from src where a <= 32",
+		"select * from src where a = \"string literal\"",
+		"select * from src where a >= b",
+		"select * from src where a < b",
+		"select * from src where a = b",
+		"select * from src where a != b",
+	}
+	var conditions []Expr
+	for _, sql := range sqls {
+		stmt, _ := NewParser(strings.NewReader(sql)).Parse()
+		conditions = append(conditions, stmt.Condition)
+	}
+
+	fmt.Printf("The test bucket size is %d.\n\n", len(data)*len(sqls))
+	for i, tt := range data {
+		for j, c := range conditions {
+			tuple := &Tuple{Emitter: "src", Message: tt.m, Timestamp: common.GetNowInMilli(), Metadata: nil}
+			ve := &ValuerEval{Valuer: MultiValuer(tuple, &FunctionValuer{})}
+			result := ve.Eval(c)
+			if !reflect.DeepEqual(tt.r[j], result) {
+				t.Errorf("%d-%d. \nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, j, tt.r[j], result)
+			}
+		}
+	}
+}
+
+func TestCalculation(t *testing.T) {
+	data := []struct {
+		m Message
+		r []interface{}
+	}{
+		{
+			m: map[string]interface{}{
+				"a": float64(32),
+				"b": float64(72),
+			},
+			r: []interface{}{
+				float64(104), float64(96), float64(0.4444444444444444), float64(32),
+			},
+		}, {
+			m: map[string]interface{}{
+				"a": int64(32),
+				"b": int64(72),
+			},
+			r: []interface{}{
+				int64(104), int64(96), int64(0), int64(32),
+			},
+		}, {
+			m: map[string]interface{}{
+				"a": "32",
+				"b": "72",
+			},
+			r: []interface{}{
+				errors.New("invalid operation string + string"), errors.New("invalid operation string * int64"),
+				errors.New("invalid operation string / string"), errors.New("invalid operation string % string"),
+			},
+		}, {
+			m: map[string]interface{}{
+				"a": float64(55),
+				"b": int64(55),
+			},
+			r: []interface{}{
+				float64(110), float64(165), float64(1), float64(0),
+			},
+		}, {
+			m: map[string]interface{}{
+				"a": int64(55),
+				"b": float64(0),
+			},
+			r: []interface{}{
+				float64(55), int64(165), errors.New("divided by zero"), errors.New("divided by zero"),
+			},
+		}, {
+			m: map[string]interface{}{
+				"c": "nothing",
+			},
+			r: []interface{}{
+				nil, nil, nil, nil,
+			},
+		}, {
+			m: map[string]interface{}{
+				"a": 12,
+				"c": "nothing",
+			},
+			r: []interface{}{
+				nil, int64(36), nil, nil,
+			},
+		},
+	}
+	sqls := []string{
+		"select a + b as t from src",
+		"select a * 3 as t from src",
+		"select a / b as t from src",
+		"select a % b as t from src",
+	}
+	var projects []Expr
+	for _, sql := range sqls {
+		stmt, _ := NewParser(strings.NewReader(sql)).Parse()
+		projects = append(projects, stmt.Fields[0].Expr)
+	}
+
+	fmt.Printf("The test bucket size is %d.\n\n", len(data)*len(sqls))
+	for i, tt := range data {
+		for j, c := range projects {
+			tuple := &Tuple{Emitter: "src", Message: tt.m, Timestamp: common.GetNowInMilli(), Metadata: nil}
+			ve := &ValuerEval{Valuer: MultiValuer(tuple, &FunctionValuer{})}
+			result := ve.Eval(c)
+			if !reflect.DeepEqual(tt.r[j], result) {
+				t.Errorf("%d-%d. \nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, j, tt.r[j], result)
+			}
+		}
+	}
+}