Explorar o código

fix(operator): inner join for multiple values

Inner join streams with multiple matches, the matches after the first match will miss the left tuple. Check the test case update for detail.

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang %!s(int64=3) %!d(string=hai) anos
pai
achega
640a68863b

+ 71 - 0
internal/topo/operator/join_multi_test.go

@@ -508,6 +508,77 @@ func TestMultiJoinPlan_Apply(t *testing.T) {
 					},
 				},
 			},
+		}, {
+			sql: "SELECT id1 FROM src1 inner join src2 on src1.id = src2.id inner join src3 on src1.id = src3.id",
+			data: xsql.WindowTuplesSet{
+				Content: []xsql.WindowTuples{
+					{
+						Emitter: "src1",
+						Tuples: []xsql.Tuple{
+							{
+								Emitter: "src1",
+								Message: xsql.Message{"id": 1, "f1": "v1"},
+							}, {
+								Emitter: "src1",
+								Message: xsql.Message{"id": 2, "f1": "v5"},
+							}, {
+								Emitter: "src1",
+								Message: xsql.Message{"id": 3, "f1": "v3"},
+							},
+						},
+					},
+
+					{
+						Emitter: "src2",
+						Tuples: []xsql.Tuple{
+							{
+								Emitter: "src2",
+								Message: xsql.Message{"id": 1, "f2": "w1"},
+							}, {
+								Emitter: "src2",
+								Message: xsql.Message{"id": 2, "f2": "w2"},
+							}, {
+								Emitter: "src2",
+								Message: xsql.Message{"id": 4, "f2": "w3"},
+							},
+						},
+					},
+
+					{
+						Emitter: "src3",
+						Tuples: []xsql.Tuple{
+							{
+								Emitter: "src3",
+								Message: xsql.Message{"id": 1, "f3": "x1"},
+							}, {
+								Emitter: "src3",
+								Message: xsql.Message{"id": 1, "f3": "x3"},
+							}, {
+								Emitter: "src3",
+								Message: xsql.Message{"id": 5, "f3": "x5"},
+							},
+						},
+					},
+				},
+			},
+			result: &xsql.JoinTupleSets{
+				Content: []xsql.JoinTuple{
+					{
+						Tuples: []xsql.Tuple{
+							{Emitter: "src1", Message: xsql.Message{"id": 1, "f1": "v1"}},
+							{Emitter: "src2", Message: xsql.Message{"id": 1, "f2": "w1"}},
+							{Emitter: "src3", Message: xsql.Message{"id": 1, "f3": "x1"}},
+						},
+					},
+					{
+						Tuples: []xsql.Tuple{
+							{Emitter: "src1", Message: xsql.Message{"id": 1, "f1": "v1"}},
+							{Emitter: "src2", Message: xsql.Message{"id": 1, "f2": "w1"}},
+							{Emitter: "src3", Message: xsql.Message{"id": 1, "f3": "x3"}},
+						},
+					},
+				},
+			},
 		},
 	}
 

+ 1 - 3
internal/topo/operator/join_operator.go

@@ -278,7 +278,6 @@ func (jp *JoinOp) evalJoinSets(set *xsql.JoinTupleSets, input xsql.WindowTuplesS
 	}
 	for _, left := range set.Content {
 		leftJoined := false
-		innerAppend := false
 		for index, right := range rights {
 			tupleJoined := false
 			merged := &xsql.JoinTuple{}
@@ -299,9 +298,8 @@ func (jp *JoinOp) evalJoinSets(set *xsql.JoinTupleSets, input xsql.WindowTuplesS
 					if val {
 						leftJoined = true
 						tupleJoined = true
-						if join.JoinType == ast.INNER_JOIN && !innerAppend {
+						if join.JoinType == ast.INNER_JOIN {
 							merged.AddTuples(left.Tuples)
-							innerAppend = true
 						}
 						merged.AddTuple(right)
 					}