Explorar el Código

feat(plan): join plan error handling

ngjaying hace 5 años
padre
commit
accdd904b0
Se han modificado 2 ficheros con 119 adiciones y 20 borrados
  1. 45 20
      xsql/plans/join_operator.go
  2. 74 0
      xsql/plans/join_test.go

+ 45 - 20
xsql/plans/join_operator.go

@@ -2,7 +2,6 @@ package plans
 
 import (
 	"fmt"
-	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xsql"
 	"github.com/emqx/kuiper/xstream/api"
 )
@@ -25,19 +24,21 @@ func (jp *JoinPlan) Apply(ctx api.StreamContext, data interface{}) interface{} {
 		input = v
 		log.Debugf("join plan receive %v", data)
 	default:
-		return fmt.Errorf("join is only supported in window")
+		return fmt.Errorf("run Join error: join is only supported in window")
 	}
 	result := xsql.JoinTupleSets{}
 	for i, join := range jp.Joins {
 		if i == 0 {
 			v, err := jp.evalSet(input, join)
 			if err != nil {
-				fmt.Println(err)
-				return nil
+				return fmt.Errorf("run Join error: %s", err)
 			}
 			result = v
 		} else {
-			r1, _ := jp.evalJoinSets(&result, input, join)
+			r1, err := jp.evalJoinSets(&result, input, join)
+			if err != nil {
+				return fmt.Errorf("run Join error: %s", err)
+			}
 			if v1, ok := r1.(xsql.JoinTupleSets); ok {
 				result = v1
 			}
@@ -113,8 +114,12 @@ func (jp *JoinPlan) evalSet(input xsql.WindowTuplesSet, join xsql.Join) (xsql.Jo
 				temp.AddTuple(left)
 				temp.AddTuple(right)
 				ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(temp, &xsql.FunctionValuer{})}
-				if r, ok := ve.Eval(join.Expr).(bool); ok {
-					if r {
+				result := ve.Eval(join.Expr)
+				switch val := result.(type) {
+				case error:
+					return nil, val
+				case bool:
+					if val {
 						if join.JoinType == xsql.INNER_JOIN {
 							merged.AddTuple(left)
 							merged.AddTuple(right)
@@ -124,8 +129,8 @@ func (jp *JoinPlan) evalSet(input xsql.WindowTuplesSet, join xsql.Join) (xsql.Jo
 							merged.AddTuple(right)
 						}
 					}
-				} else {
-					common.Log.Infoln("Evaluation error for set.")
+				default:
+					return nil, fmt.Errorf("invalid join condition that returns non-bool value %[1]T(%[1]v)", val)
 				}
 			}
 		}
@@ -135,10 +140,14 @@ func (jp *JoinPlan) evalSet(input xsql.WindowTuplesSet, join xsql.Join) (xsql.Jo
 	}
 
 	if join.JoinType == xsql.FULL_JOIN {
-		if rightJoinSet, err := jp.evalSetWithRightJoin(input, join, true); err == nil && len(rightJoinSet) > 0 {
-			for _, jt := range rightJoinSet {
-				sets = append(sets, jt)
+		if rightJoinSet, err := jp.evalSetWithRightJoin(input, join, true); err == nil {
+			if len(rightJoinSet) > 0 {
+				for _, jt := range rightJoinSet {
+					sets = append(sets, jt)
+				}
 			}
+		} else {
+			return nil, err
 		}
 	}
 	return sets, nil
@@ -168,13 +177,17 @@ func (jp *JoinPlan) evalSetWithRightJoin(input xsql.WindowTuplesSet, join xsql.J
 			temp.AddTuple(right)
 			temp.AddTuple(left)
 			ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(temp, &xsql.FunctionValuer{})}
-			if r, ok := ve.Eval(join.Expr).(bool); ok {
-				if r {
+			result := ve.Eval(join.Expr)
+			switch val := result.(type) {
+			case error:
+				return nil, val
+			case bool:
+				if val {
 					merged.AddTuple(left)
 					isJoint = true
 				}
-			} else {
-				common.Log.Infoln("Evaluation error for set.")
+			default:
+				return nil, fmt.Errorf("invalid join condition that returns non-bool value %[1]T(%[1]v)", val)
 			}
 		}
 		if excludeJoint {
@@ -215,14 +228,20 @@ func (jp *JoinPlan) evalJoinSets(set *xsql.JoinTupleSets, input xsql.WindowTuple
 				merged.AddTuple(right)
 			} else {
 				ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(&left, &right, &xsql.FunctionValuer{})}
-				if r, ok := ve.Eval(join.Expr).(bool); ok {
-					if r {
+				result := ve.Eval(join.Expr)
+				switch val := result.(type) {
+				case error:
+					return nil, val
+				case bool:
+					if val {
 						if join.JoinType == xsql.INNER_JOIN && !innerAppend {
 							merged.AddTuples(left.Tuples)
 							innerAppend = true
 						}
 						merged.AddTuple(right)
 					}
+				default:
+					return nil, fmt.Errorf("invalid join condition that returns non-bool value %[1]T(%[1]v)", val)
 				}
 			}
 		}
@@ -260,11 +279,17 @@ func (jp *JoinPlan) evalRightJoinSets(set *xsql.JoinTupleSets, input xsql.Window
 		isJoint := false
 		for _, left := range *set {
 			ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(&right, &left, &xsql.FunctionValuer{})}
-			if r, ok := ve.Eval(join.Expr).(bool); ok {
-				if r {
+			result := ve.Eval(join.Expr)
+			switch val := result.(type) {
+			case error:
+				return nil, val
+			case bool:
+				if val {
 					isJoint = true
 					merged.AddTuples(left.Tuples)
 				}
+			default:
+				return nil, fmt.Errorf("invalid join condition that returns non-bool value %[1]T(%[1]v)", val)
 			}
 		}
 

+ 74 - 0
xsql/plans/join_test.go

@@ -2,6 +2,7 @@ package plans
 
 import (
 	"encoding/json"
+	"errors"
 	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xsql"
@@ -1696,6 +1697,79 @@ func TestCrossJoinPlan_Apply(t *testing.T) {
 	}
 }
 
+func TestCrossJoinPlanError(t *testing.T) {
+	var tests = []struct {
+		sql    string
+		data   interface{}
+		result interface{}
+	}{
+		{
+			sql:    "SELECT id1 FROM src1 cross join src2",
+			data:   errors.New("an error from upstream"),
+			result: errors.New("an error from upstream"),
+		}, {
+			sql: "SELECT id1 FROM src1 full join src2 on src1.id1 = src2.id2",
+			data: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter: "src1",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 1, "f1": "v1"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 2, "f1": "v2"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 3, "f1": "v3"},
+						},
+					},
+				},
+
+				xsql.WindowTuples{
+					Emitter: "src2",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src2",
+							Message: xsql.Message{"id2": 1, "f2": "w1"},
+						}, {
+							Emitter: "src2",
+							Message: xsql.Message{"id2": "3", "f2": "w2"},
+						}, {
+							Emitter: "src2",
+							Message: xsql.Message{"id2": 4, "f2": "w3"},
+						}, {
+							Emitter: "src2",
+							Message: xsql.Message{"id2": 2, "f2": "w4"},
+						},
+					},
+				},
+			},
+			result: errors.New("run Join error: invalid operation int64(1) = string(3)"),
+		},
+	}
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestCrossJoinPlan_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
+	for i, tt := range tests {
+		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
+		if err != nil {
+			t.Errorf("statement parse error %s", err)
+			break
+		}
+
+		if table, ok := stmt.Sources[0].(*xsql.Table); !ok {
+			t.Errorf("statement source is not a table")
+		} else {
+			pp := &JoinPlan{Joins: stmt.Joins, From: table}
+			result := pp.Apply(ctx, tt.data)
+			if !reflect.DeepEqual(tt.result, result) {
+				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
+			}
+		}
+	}
+}
+
 func str2Map(s string) map[string]interface{} {
 	var input map[string]interface{}
 	if err := json.Unmarshal([]byte(s), &input); err != nil {