Jelajahi Sumber

fix(join): flat join output

ngjaying 3 tahun lalu
induk
melakukan
d371dfe8ca

+ 95 - 16
xstream/operators/join_multi_test.go

@@ -305,6 +305,89 @@ func TestMultiJoinPlan_Apply(t *testing.T) {
 		},
 
 		{
+			sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 right join src3 on src2.id2 = src3.id3",
+			data: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter: "src1",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 1, "f1": "v1"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 1, "f1": "v3"},
+						},
+					},
+				},
+
+				xsql.WindowTuples{
+					Emitter: "src2",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src2",
+							Message: xsql.Message{"id2": 1, "f2": "w1"},
+						}, {
+							Emitter: "src2",
+							Message: xsql.Message{"id2": 1, "f2": "w3"},
+						},
+					},
+				},
+
+				xsql.WindowTuples{
+					Emitter: "src3",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src3",
+							Message: xsql.Message{"id3": 1, "f3": "x1"},
+						}, {
+							Emitter: "src3",
+							Message: xsql.Message{"id3": 5, "f3": "x5"},
+						},
+					},
+				},
+			},
+			result: xsql.JoinTupleSets{
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src3", Message: xsql.Message{"id3": 1, "f3": "x1"}},
+						{Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
+						{Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
+					},
+				},
+
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src3", Message: xsql.Message{"id3": 1, "f3": "x1"}},
+						{Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
+						{Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w3"}},
+					},
+				},
+
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src3", Message: xsql.Message{"id3": 1, "f3": "x1"}},
+						{Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v3"}},
+						{Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
+					},
+				},
+
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src3", Message: xsql.Message{"id3": 1, "f3": "x1"}},
+						{Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v3"}},
+						{Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w3"}},
+					},
+				},
+
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src3", Message: xsql.Message{"id3": 5, "f3": "x5"}},
+					},
+				},
+			},
+		},
+
+		{
 			sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 cross join src3",
 			data: xsql.WindowTuplesSet{
 				xsql.WindowTuples{
@@ -352,33 +435,29 @@ func TestMultiJoinPlan_Apply(t *testing.T) {
 						{Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
 						{Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
 						{Emitter: "src3", Message: xsql.Message{"id3": 2, "f3": "x1"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
+						{Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
 						{Emitter: "src3", Message: xsql.Message{"id3": 5, "f3": "x5"}},
 					},
 				},
 
-				//xsql.JoinTuple{
-				//	Tuples: []xsql.Tuple{
-				//		{Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1" },},
-				//		{Emitter: "src2", Message: xsql.Message{ "id2" : 1, "f2" : "w1" },},
-				//		{Emitter: "src3", Message: xsql.Message{ "id3" : 5, "f3" : "x5" },},
-				//	},
-				//},
-
 				xsql.JoinTuple{
 					Tuples: []xsql.Tuple{
 						{Emitter: "src1", Message: xsql.Message{"id1": 5, "f1": "v5"}},
 						{Emitter: "src3", Message: xsql.Message{"id3": 2, "f3": "x1"}},
-						{Emitter: "src3", Message: xsql.Message{"id3": 5, "f3": "x5"}},
 					},
 				},
 
-				//xsql.JoinTuple{
-				//	Tuples: []xsql.Tuple{
-				//		{Emitter: "src1", Message: xsql.Message{ "id1" : 5, "f1" : "v5" },},
-				//		{Emitter: "src3", Message: xsql.Message{ "id3" : 5, "f3" : "x5" },},
-				//	},
-				//},
-
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{"id1": 5, "f1": "v5"}},
+						{Emitter: "src3", Message: xsql.Message{"id3": 5, "f3": "x5"}},
+					},
+				},
 			},
 		},
 	}

+ 60 - 36
xstream/operators/join_operator.go

@@ -112,12 +112,15 @@ func (jp *JoinOp) evalSet(input xsql.WindowTuplesSet, join xsql.Join, fv *xsql.F
 		return jp.evalSetWithRightJoin(input, join, false, fv)
 	}
 	for _, left := range lefts {
-		merged := &xsql.JoinTuple{}
-		if join.JoinType == xsql.LEFT_JOIN || join.JoinType == xsql.FULL_JOIN || join.JoinType == xsql.CROSS_JOIN {
-			merged.AddTuple(left)
-		}
-		for _, right := range rights {
+		leftJoined := false
+		for index, right := range rights {
+			tupleJoined := false
+			merged := &xsql.JoinTuple{}
+			if join.JoinType == xsql.LEFT_JOIN || join.JoinType == xsql.FULL_JOIN || join.JoinType == xsql.CROSS_JOIN {
+				merged.AddTuple(left)
+			}
 			if join.JoinType == xsql.CROSS_JOIN {
+				tupleJoined = true
 				merged.AddTuple(right)
 			} else {
 				temp := &xsql.JoinTuple{}
@@ -131,11 +134,11 @@ func (jp *JoinOp) evalSet(input xsql.WindowTuplesSet, join xsql.Join, fv *xsql.F
 					return nil, val
 				case bool:
 					if val {
+						leftJoined = true
+						tupleJoined = true
 						if join.JoinType == xsql.INNER_JOIN {
 							merged.AddTuple(left)
 							merged.AddTuple(right)
-							sets = append(sets, *merged)
-							merged = &xsql.JoinTuple{}
 						} else {
 							merged.AddTuple(right)
 						}
@@ -144,8 +147,15 @@ func (jp *JoinOp) evalSet(input xsql.WindowTuplesSet, join xsql.Join, fv *xsql.F
 					return nil, fmt.Errorf("invalid join condition that returns non-bool value %[1]T(%[1]v)", val)
 				}
 			}
+			if tupleJoined || (!leftJoined && index == len(rights)-1 && len(merged.Tuples) > 0) {
+				leftJoined = true
+				sets = append(sets, *merged)
+			}
 		}
-		if len(merged.Tuples) > 0 {
+		// If no messages in the right
+		if !leftJoined && join.JoinType != xsql.INNER_JOIN {
+			merged := &xsql.JoinTuple{}
+			merged.AddTuple(left)
 			sets = append(sets, *merged)
 		}
 	}
@@ -191,11 +201,11 @@ func (jp *JoinOp) evalSetWithRightJoin(input xsql.WindowTuplesSet, join xsql.Joi
 	sets := xsql.JoinTupleSets{}
 
 	for _, right := range rights {
-		merged := &xsql.JoinTuple{}
-		merged.AddTuple(right)
 		isJoint := false
-
-		for _, left := range lefts {
+		for index, left := range lefts {
+			tupleJoined := false
+			merged := &xsql.JoinTuple{}
+			merged.AddTuple(right)
 			temp := &xsql.JoinTuple{}
 			temp.AddTuple(right)
 			temp.AddTuple(left)
@@ -208,20 +218,21 @@ func (jp *JoinOp) evalSetWithRightJoin(input xsql.WindowTuplesSet, join xsql.Joi
 				if val {
 					merged.AddTuple(left)
 					isJoint = true
+					tupleJoined = true
 				}
 			default:
 				return nil, fmt.Errorf("invalid join condition that returns non-bool value %[1]T(%[1]v)", val)
 			}
-		}
-		if excludeJoint {
-			if len(merged.Tuples) > 0 && (!isJoint) {
-				sets = append(sets, *merged)
-			}
-		} else {
-			if len(merged.Tuples) > 0 {
+			if !excludeJoint && (tupleJoined || (!isJoint && index == len(lefts)-1 && len(merged.Tuples) > 0)) {
+				isJoint = true
 				sets = append(sets, *merged)
 			}
 		}
+		if !isJoint {
+			merged := &xsql.JoinTuple{}
+			merged.AddTuple(right)
+			sets = append(sets, *merged)
+		}
 	}
 	return sets, nil
 }
@@ -241,13 +252,16 @@ func (jp *JoinOp) evalJoinSets(set *xsql.JoinTupleSets, input xsql.WindowTuplesS
 		return jp.evalRightJoinSets(set, input, join, false, fv)
 	}
 	for _, left := range *set {
-		merged := &xsql.JoinTuple{}
-		if join.JoinType == xsql.LEFT_JOIN || join.JoinType == xsql.FULL_JOIN || join.JoinType == xsql.CROSS_JOIN {
-			merged.AddTuples(left.Tuples)
-		}
+		leftJoined := false
 		innerAppend := false
-		for _, right := range rights {
+		for index, right := range rights {
+			tupleJoined := false
+			merged := &xsql.JoinTuple{}
+			if join.JoinType == xsql.LEFT_JOIN || join.JoinType == xsql.FULL_JOIN || join.JoinType == xsql.CROSS_JOIN {
+				merged.AddTuples(left.Tuples)
+			}
 			if join.JoinType == xsql.CROSS_JOIN {
+				tupleJoined = true
 				merged.AddTuple(right)
 			} else {
 				ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(&left, &right, fv)}
@@ -257,6 +271,8 @@ func (jp *JoinOp) evalJoinSets(set *xsql.JoinTupleSets, input xsql.WindowTuplesS
 					return nil, val
 				case bool:
 					if val {
+						leftJoined = true
+						tupleJoined = true
 						if join.JoinType == xsql.INNER_JOIN && !innerAppend {
 							merged.AddTuples(left.Tuples)
 							innerAppend = true
@@ -267,9 +283,15 @@ func (jp *JoinOp) evalJoinSets(set *xsql.JoinTupleSets, input xsql.WindowTuplesS
 					return nil, fmt.Errorf("invalid join condition that returns non-bool value %[1]T(%[1]v)", val)
 				}
 			}
+			if tupleJoined || (!leftJoined && index == len(rights)-1 && len(merged.Tuples) > 0) {
+				leftJoined = true
+				newSets = append(newSets, *merged)
+			}
 		}
 
-		if len(merged.Tuples) > 0 {
+		if !leftJoined && join.JoinType != xsql.INNER_JOIN {
+			merged := &xsql.JoinTuple{}
+			merged.AddTuples(left.Tuples)
 			newSets = append(newSets, *merged)
 		}
 	}
@@ -297,10 +319,11 @@ func (jp *JoinOp) evalRightJoinSets(set *xsql.JoinTupleSets, input xsql.WindowTu
 	newSets := xsql.JoinTupleSets{}
 
 	for _, right := range rights {
-		merged := &xsql.JoinTuple{}
-		merged.AddTuple(right)
 		isJoint := false
-		for _, left := range *set {
+		for index, left := range *set {
+			tupleJoined := false
+			merged := &xsql.JoinTuple{}
+			merged.AddTuple(right)
 			ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(&right, &left, fv)}
 			result := evalOn(join, ve, &left, &right)
 			switch val := result.(type) {
@@ -309,22 +332,23 @@ func (jp *JoinOp) evalRightJoinSets(set *xsql.JoinTupleSets, input xsql.WindowTu
 			case bool:
 				if val {
 					isJoint = true
+					tupleJoined = true
 					merged.AddTuples(left.Tuples)
 				}
 			default:
 				return nil, fmt.Errorf("invalid join condition that returns non-bool value %[1]T(%[1]v)", val)
 			}
-		}
-
-		if excludeJoint {
-			if len(merged.Tuples) > 0 && (!isJoint) {
-				newSets = append(newSets, *merged)
-			}
-		} else {
-			if len(merged.Tuples) > 0 {
+			if !excludeJoint && (tupleJoined || (!isJoint && index == len(*set)-1 && len(merged.Tuples) > 0)) {
+				isJoint = true
 				newSets = append(newSets, *merged)
 			}
 		}
+
+		if !isJoint {
+			merged := &xsql.JoinTuple{}
+			merged.AddTuple(right)
+			newSets = append(newSets, *merged)
+		}
 	}
 	return newSets, nil
 }

+ 262 - 8
xstream/operators/join_test.go

@@ -18,7 +18,7 @@ func TestLeftJoinPlan_Apply(t *testing.T) {
 		data   xsql.WindowTuplesSet
 		result interface{}
 	}{
-		{
+		{ //0
 			sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
 			data: xsql.WindowTuplesSet{
 				xsql.WindowTuples{
@@ -73,7 +73,7 @@ func TestLeftJoinPlan_Apply(t *testing.T) {
 				},
 			},
 		},
-		{
+		{ // 1
 			sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
 			data: xsql.WindowTuplesSet{
 				xsql.WindowTuples{
@@ -127,7 +127,7 @@ func TestLeftJoinPlan_Apply(t *testing.T) {
 				},
 			},
 		},
-		{
+		{ // 2
 			sql: "SELECT id1 FROM src1 left join src2 on src1.ts = src2.ts",
 			data: xsql.WindowTuplesSet{
 				xsql.WindowTuples{
@@ -182,7 +182,7 @@ func TestLeftJoinPlan_Apply(t *testing.T) {
 				},
 			},
 		},
-		{
+		{ // 3
 			sql: "SELECT id1 FROM src1 inner join src2 on src1.id1 = src2.id2",
 			data: xsql.WindowTuplesSet{
 				xsql.WindowTuples{
@@ -220,7 +220,7 @@ func TestLeftJoinPlan_Apply(t *testing.T) {
 			result: nil,
 		},
 
-		{
+		{ // 4
 			sql: "SELECT id1 FROM src1 As s1 left join src2 as s2 on s1.id1 = s2.id2",
 			data: xsql.WindowTuplesSet{
 				xsql.WindowTuples{
@@ -276,7 +276,7 @@ func TestLeftJoinPlan_Apply(t *testing.T) {
 			},
 		},
 
-		{
+		{ // 5
 			sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
 			data: xsql.WindowTuplesSet{
 				xsql.WindowTuples{
@@ -307,13 +307,18 @@ func TestLeftJoinPlan_Apply(t *testing.T) {
 					Tuples: []xsql.Tuple{
 						{Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
 						{Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
 						{Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w2"}},
 					},
 				},
 			},
 		},
 
-		{
+		{ // 6
 			sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
 			data: xsql.WindowTuplesSet{
 				xsql.WindowTuples{
@@ -647,6 +652,152 @@ func TestLeftJoinPlan_Apply(t *testing.T) {
 				},
 			},
 		},
+		{
+			sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2",
+			data: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter: "src1",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 1, "f1": "v1"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 1, "f1": "v2"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 3, "f1": "v3"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 3, "f1": "v4"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 4, "f1": "v5"},
+						},
+					},
+				},
+
+				xsql.WindowTuples{
+					Emitter: "src2",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src2",
+							Message: xsql.Message{"id2": 1, "f2": "w1"},
+						}, {
+							Emitter: "src2",
+							Message: xsql.Message{"id2": 3, "f2": "w2"},
+						}, {
+							Emitter: "src2",
+							Message: xsql.Message{"id2": 3, "f2": "w3"},
+						},
+					},
+				},
+			},
+			result: xsql.JoinTupleSets{
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
+						{Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v2"}},
+						{Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
+						{Emitter: "src2", Message: xsql.Message{"id2": 3, "f2": "w2"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
+						{Emitter: "src2", Message: xsql.Message{"id2": 3, "f2": "w3"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v4"}},
+						{Emitter: "src2", Message: xsql.Message{"id2": 3, "f2": "w2"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v4"}},
+						{Emitter: "src2", Message: xsql.Message{"id2": 3, "f2": "w3"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{"id1": 4, "f1": "v5"}},
+					},
+				},
+			},
+		},
+		{
+			sql: "SELECT id1 FROM src1 full join src2 on src1.id1 = src2.id2",
+			data: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter: "src1",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 1, "f1": "v1"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 2, "f1": "v2"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"id1": 3, "f1": "v3"},
+						},
+					},
+				},
+
+				xsql.WindowTuples{
+					Emitter: "src2",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src2",
+							Message: xsql.Message{"id2": 1, "f2": "w1"},
+						}, {
+							Emitter: "src2",
+							Message: xsql.Message{"id2": 2, "f2": "w2"},
+						}, {
+							Emitter: "src2",
+							Message: xsql.Message{"id2": 2, "f2": "w3"},
+						},
+					},
+				},
+			},
+			result: xsql.JoinTupleSets{
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
+						{Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
+						{Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
+						{Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w3"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
+					},
+				},
+			},
+		},
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
@@ -1113,6 +1264,62 @@ func TestInnerJoinPlan_Apply(t *testing.T) {
 				},
 			},
 		},
+		{
+			sql: "SELECT id1 FROM src1 As s1 inner join src2 as s2 on s1.id1 = s2.id2",
+			data: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter: "s1",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "s1",
+							Message: xsql.Message{"id1": 1, "f1": "v1"},
+						}, {
+							Emitter: "s1",
+							Message: xsql.Message{"id1": 2, "f1": "v2"},
+						}, {
+							Emitter: "s1",
+							Message: xsql.Message{"id1": 3, "f1": "v3"},
+						},
+					},
+				},
+
+				xsql.WindowTuples{
+					Emitter: "s2",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "s2",
+							Message: xsql.Message{"id2": 1, "f2": "w1"},
+						}, {
+							Emitter: "s2",
+							Message: xsql.Message{"id2": 2, "f2": "w2"},
+						}, {
+							Emitter: "s2",
+							Message: xsql.Message{"id2": 2, "f2": "w3"},
+						},
+					},
+				},
+			},
+			result: xsql.JoinTupleSets{
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "s1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
+						{Emitter: "s2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "s1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
+						{Emitter: "s2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "s1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
+						{Emitter: "s2", Message: xsql.Message{"id2": 2, "f2": "w3"}},
+					},
+				},
+			},
+		},
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
@@ -1215,7 +1422,7 @@ func TestRightJoinPlan_Apply(t *testing.T) {
 							Message: xsql.Message{"id1": 2, "f1": "v2"},
 						}, {
 							Emitter: "src1",
-							Message: xsql.Message{"id1": 3, "f1": "v3"},
+							Message: xsql.Message{"id1": 1, "f1": "v3"},
 						},
 					},
 				},
@@ -1243,6 +1450,12 @@ func TestRightJoinPlan_Apply(t *testing.T) {
 						{Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
 					},
 				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
+						{Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v3"}},
+					},
+				},
 
 				xsql.JoinTuple{
 					Tuples: []xsql.Tuple{
@@ -1378,6 +1591,12 @@ func TestFullJoinPlan_Apply(t *testing.T) {
 					Tuples: []xsql.Tuple{
 						{Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
 						{Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
+					},
+				},
+
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
 						{Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w4"}},
 					},
 				},
@@ -1619,7 +1838,17 @@ func TestCrossJoinPlan_Apply(t *testing.T) {
 					Tuples: []xsql.Tuple{
 						{Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
 						{Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
 						{Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
 						{Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
 					},
 				},
@@ -1628,7 +1857,17 @@ func TestCrossJoinPlan_Apply(t *testing.T) {
 					Tuples: []xsql.Tuple{
 						{Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
 						{Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
 						{Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{"id1": 2, "f1": "v2"}},
 						{Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
 					},
 				},
@@ -1637,7 +1876,17 @@ func TestCrossJoinPlan_Apply(t *testing.T) {
 					Tuples: []xsql.Tuple{
 						{Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
 						{Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
 						{Emitter: "src2", Message: xsql.Message{"id2": 2, "f2": "w2"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{"id1": 3, "f1": "v3"}},
 						{Emitter: "src2", Message: xsql.Message{"id2": 4, "f2": "w3"}},
 					},
 				},
@@ -1675,6 +1924,11 @@ func TestCrossJoinPlan_Apply(t *testing.T) {
 					Tuples: []xsql.Tuple{
 						{Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
 						{Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}},
 						{Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w2"}},
 					},
 				},