Quellcode durchsuchen

test(ut): add more multi full join case

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>
Jianxiang Ran vor 3 Jahren
Ursprung
Commit
5650e5b01d

+ 34 - 0
internal/topo/operator/aggregate_test.go

@@ -422,6 +422,40 @@ func TestAggregatePlan_Apply(t *testing.T) {
 				},
 			},
 		},
+
+		{
+			sql: "SELECT * FROM A FULL JOIN B on A.module=B.module FULL JOIN C on A.module=C.module GROUP BY A.module, TUMBLINGWINDOW(ss, 10)",
+			data: &xsql.JoinTupleSets{
+				Content: []xsql.JoinTuple{
+					{
+						Tuples: []xsql.Tuple{
+							{Emitter: "B", Message: xsql.Message{"module": 1, "topic": "moduleB topic", "value": 1}},
+						},
+					},
+					{
+						Tuples: []xsql.Tuple{
+							{Emitter: "C", Message: xsql.Message{"module": 1, "topic": "moduleC topic", "value": 100}},
+						},
+					},
+				},
+			},
+			result: xsql.GroupedTuplesSet{
+				{
+					Content: []xsql.DataValuer{
+						&xsql.JoinTuple{
+							Tuples: []xsql.Tuple{
+								{Emitter: "B", Message: xsql.Message{"module": 1, "topic": "moduleB topic", "value": 1}},
+							},
+						},
+						&xsql.JoinTuple{
+							Tuples: []xsql.Tuple{
+								{Emitter: "C", Message: xsql.Message{"module": 1, "topic": "moduleC topic", "value": 100}},
+							},
+						},
+					},
+				},
+			},
+		},
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))

+ 164 - 0
internal/topo/operator/join_multi_test.go

@@ -1269,6 +1269,170 @@ func TestMultiJoinPlan_Apply(t *testing.T) {
 			},
 			result: nil,
 		},
+
+		{ //19
+			sql: "SELECT id1 FROM src1 full join src2 on src1.id = src2.id full join src3 on src1.id = src3.id",
+			data: xsql.WindowTuplesSet{
+				Content: []xsql.WindowTuples{
+					{
+						Emitter: "src1",
+						Tuples: []xsql.Tuple{
+							{
+								Emitter: "src1",
+								Message: xsql.Message{"id": 1, "f1": "v1"},
+							},
+						},
+					},
+					{
+						Emitter: "src2",
+						Tuples: []xsql.Tuple{
+							{
+								Emitter: "src2",
+								Message: xsql.Message{"id": 1, "f2": "w1"},
+							},
+						},
+					},
+					{
+						Emitter: "src3",
+						Tuples: []xsql.Tuple{
+							{
+								Emitter: "src3",
+								Message: xsql.Message{"id": 1, "f3": "x1"},
+							},
+						},
+					},
+				},
+			},
+			result: &xsql.JoinTupleSets{
+				Content: []xsql.JoinTuple{
+					{
+						Tuples: []xsql.Tuple{
+							{Emitter: "src1", Message: xsql.Message{"id": 1, "f1": "v1"}},
+							{Emitter: "src2", Message: xsql.Message{"id": 1, "f2": "w1"}},
+							{Emitter: "src3", Message: xsql.Message{"id": 1, "f3": "x1"}},
+						},
+					},
+				},
+			},
+		},
+		{ //20
+			sql: "SELECT id1 FROM src1 full join src2 on src1.id = src2.id full join src3 on src1.id = src3.id",
+			data: xsql.WindowTuplesSet{
+				Content: []xsql.WindowTuples{
+					{
+						Emitter: "src1",
+						Tuples:  nil,
+					},
+
+					{
+						Emitter: "src2",
+						Tuples: []xsql.Tuple{
+							{
+								Emitter: "src2",
+								Message: xsql.Message{"id": 1, "f2": "w1"},
+							},
+						},
+					},
+
+					{
+						Emitter: "src3",
+						Tuples: []xsql.Tuple{
+							{
+								Emitter: "src3",
+								Message: xsql.Message{"id": 1, "f3": "x1"},
+							},
+						},
+					},
+				},
+			},
+			result: &xsql.JoinTupleSets{
+				Content: []xsql.JoinTuple{
+					{
+						Tuples: []xsql.Tuple{
+							{Emitter: "src2", Message: xsql.Message{"id": 1, "f2": "w1"}},
+						},
+					},
+					{
+						Tuples: []xsql.Tuple{
+							{Emitter: "src3", Message: xsql.Message{"id": 1, "f3": "x1"}},
+						},
+					},
+				},
+			},
+		},
+		{ //21
+			sql: "SELECT id1 FROM src1 full join src2 on src1.id = src2.id full join src3 on src1.id = src3.id",
+			data: xsql.WindowTuplesSet{
+				Content: []xsql.WindowTuples{
+					{
+						Emitter: "src1",
+						Tuples: []xsql.Tuple{
+							{
+								Emitter: "src1",
+								Message: xsql.Message{"id": 1, "f1": "v1"},
+							},
+						},
+					},
+
+					{
+						Emitter: "src2",
+						Tuples: []xsql.Tuple{
+							{
+								Emitter: "src2",
+								Message: xsql.Message{"id": 1, "f2": "w1"},
+							},
+						},
+					},
+				},
+			},
+			result: &xsql.JoinTupleSets{
+				Content: []xsql.JoinTuple{
+					{
+						Tuples: []xsql.Tuple{
+							{Emitter: "src1", Message: xsql.Message{"id": 1, "f1": "v1"}},
+							{Emitter: "src2", Message: xsql.Message{"id": 1, "f2": "w1"}},
+						},
+					},
+				},
+			},
+		},
+
+		{ //22
+			sql: "SELECT id1 FROM src1 full join src2 on src1.id = src2.id full join src3 on src1.id = src3.id",
+			data: xsql.WindowTuplesSet{
+				Content: []xsql.WindowTuples{
+					{
+						Emitter: "src1",
+						Tuples: []xsql.Tuple{
+							{
+								Emitter: "src1",
+								Message: xsql.Message{"id": 1, "f1": "v1"},
+							},
+						},
+					},
+
+					{
+						Emitter: "src3",
+						Tuples: []xsql.Tuple{
+							{
+								Emitter: "src3",
+								Message: xsql.Message{"id": 1, "f3": "x1"},
+							},
+						},
+					},
+				},
+			},
+			result: &xsql.JoinTupleSets{
+				Content: []xsql.JoinTuple{
+					{
+						Tuples: []xsql.Tuple{
+							{Emitter: "src1", Message: xsql.Message{"id": 1, "f1": "v1"}},
+							{Emitter: "src3", Message: xsql.Message{"id": 1, "f3": "x1"}},
+						},
+					},
+				},
+			},
+		},
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))

+ 169 - 55
internal/topo/operator/project_test.go

@@ -34,7 +34,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 		data   *xsql.Tuple
 		result []map[string]interface{}
 	}{
-		{
+		{ //0
 			sql: "SELECT a FROM test",
 			data: &xsql.Tuple{
 				Emitter: "test",
@@ -54,7 +54,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				},
 			}},
 		},
-		{
+		{ //1
 			sql: "SELECT b FROM test",
 			data: &xsql.Tuple{
 				Emitter: "test",
@@ -64,7 +64,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 			},
 			result: []map[string]interface{}{{}},
 		},
-		{
+		{ //2
 			sql: "SELECT ts FROM test",
 			data: &xsql.Tuple{
 				Emitter: "test",
@@ -78,7 +78,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 			}},
 		},
 		//Schemaless may return a message without selecting column
-		{
+		{ //3
 			sql: "SELECT ts FROM test",
 			data: &xsql.Tuple{
 				Emitter: "test",
@@ -89,7 +89,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 			},
 			result: []map[string]interface{}{{}},
 		},
-		{
+		{ //4
 			sql: "SELECT A FROM test",
 			data: &xsql.Tuple{
 				Emitter: "test",
@@ -101,7 +101,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				"A": "val_a",
 			}},
 		},
-
+		//5
 		{
 			sql: `SELECT "value" FROM test`,
 			data: &xsql.Tuple{
@@ -112,7 +112,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				"": "value",
 			}},
 		},
-
+		//6
 		{
 			sql: `SELECT 3.4 FROM test`,
 			data: &xsql.Tuple{
@@ -123,7 +123,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				"": 3.4,
 			}},
 		},
-
+		//7
 		{
 			sql: `SELECT 5 FROM test`,
 			data: &xsql.Tuple{
@@ -134,7 +134,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				"": 5.0,
 			}},
 		},
-
+		//8
 		{
 			sql: `SELECT a, "value" AS b FROM test`,
 			data: &xsql.Tuple{
@@ -148,7 +148,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				"b": "value",
 			}},
 		},
-
+		//9
 		{
 			sql: `SELECT a, "value" AS b, 3.14 as Pi, 0 as Zero FROM test`,
 			data: &xsql.Tuple{
@@ -164,7 +164,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				"Zero": 0.0,
 			}},
 		},
-
+		//10
 		{
 			sql: `SELECT a->b AS ab FROM test`,
 			data: &xsql.Tuple{
@@ -177,6 +177,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				"ab": "hello",
 			}},
 		},
+		//11
 		{
 			sql: `SELECT a->b AS ab FROM test`,
 			data: &xsql.Tuple{
@@ -187,6 +188,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 			},
 			result: []map[string]interface{}{{}},
 		},
+		//12
 		{
 			sql: `SELECT a->b AS ab FROM test`,
 			data: &xsql.Tuple{
@@ -197,6 +199,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 			},
 			result: []map[string]interface{}{{}},
 		},
+		//13
 		{
 			sql: `SELECT a->b AS ab FROM test`,
 			data: &xsql.Tuple{
@@ -207,6 +210,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 			},
 			result: []map[string]interface{}{{}},
 		},
+		//14
 		{
 			sql: `SELECT a[0]->b AS ab FROM test`,
 			data: &xsql.Tuple{
@@ -222,6 +226,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				"ab": "hello1",
 			}},
 		},
+		//15
 		{
 			sql: `SELECT a[0]->b AS ab FROM test`,
 			data: &xsql.Tuple{
@@ -237,6 +242,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				"ab": "hello1",
 			}},
 		},
+		//16
 		{
 			sql: `SELECT a[2:4] AS ab FROM test`,
 			data: &xsql.Tuple{
@@ -258,7 +264,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				},
 			}},
 		},
-
+		//17
 		{
 			sql: `SELECT a[2:] AS ab FROM test`,
 			data: &xsql.Tuple{
@@ -281,7 +287,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				},
 			}},
 		},
-
+		//18
 		{
 			sql: `SELECT a[2:] AS ab FROM test`,
 			data: &xsql.Tuple{
@@ -298,7 +304,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				},
 			}},
 		},
-
+		//19
 		{
 			sql: `SELECT a[:4] AS ab FROM test`,
 			data: &xsql.Tuple{
@@ -315,7 +321,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				},
 			}},
 		},
-
+		//20
 		{
 			sql: `SELECT a[:4] AS ab FROM test`,
 			data: &xsql.Tuple{
@@ -332,7 +338,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				},
 			}},
 		},
-
+		//21
 		{
 			sql: `SELECT a->b[:4] AS ab FROM test`,
 			data: &xsql.Tuple{
@@ -349,7 +355,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				},
 			}},
 		},
-
+		//22
 		{
 			sql: `SELECT a->b[0:1] AS ab FROM test`,
 			data: &xsql.Tuple{
@@ -366,7 +372,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				},
 			}},
 		},
-
+		//23
 		{
 			sql: `SELECT a->c->d AS f1 FROM test`,
 			data: &xsql.Tuple{
@@ -384,6 +390,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				"f1": 35.2,
 			}},
 		},
+		//24
 		{
 			sql: `SELECT a->c->d AS f1 FROM test`,
 			data: &xsql.Tuple{
@@ -399,6 +406,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 			},
 			result: []map[string]interface{}{{}},
 		},
+		//25
 		{
 			sql: `SELECT a->c->d AS f1 FROM test`,
 			data: &xsql.Tuple{
@@ -411,7 +419,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 			},
 			result: []map[string]interface{}{{}},
 		},
-
+		//26
 		//The int type is not supported yet, the json parser returns float64 for int values
 		{
 			sql: `SELECT a->c->d AS f1 FROM test`,
@@ -430,7 +438,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				"f1": float64(35),
 			}},
 		},
-
+		//27
 		{
 			sql: "SELECT a FROM test",
 			data: &xsql.Tuple{
@@ -441,7 +449,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				{},
 			},
 		},
-
+		//28
 		{
 			sql: "SELECT * FROM test",
 			data: &xsql.Tuple{
@@ -452,7 +460,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				{},
 			},
 		},
-
+		//29
 		{
 			sql: `SELECT * FROM test`,
 			data: &xsql.Tuple{
@@ -475,7 +483,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				},
 			}},
 		},
-
+		//30
 		{
 			sql: `SELECT * FROM test`,
 			data: &xsql.Tuple{
@@ -490,7 +498,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				"b": 3.14,
 			}},
 		},
-
+		//31
 		{
 			sql: `SELECT 3*4 AS f1 FROM test`,
 			data: &xsql.Tuple{
@@ -501,7 +509,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				"f1": float64(12),
 			}},
 		},
-
+		//32
 		{
 			sql: `SELECT 4.5*2 AS f1 FROM test`,
 			data: &xsql.Tuple{
@@ -512,6 +520,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				"f1": float64(9),
 			}},
 		},
+		//33
 		{
 			sql: "SELECT `a.b.c` FROM test",
 			data: &xsql.Tuple{
@@ -524,6 +533,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				"a.b.c": "val_a",
 			}},
 		},
+		//34
 		{
 			sql: `SELECT CASE a WHEN 10 THEN "true" END AS b FROM test`,
 			data: &xsql.Tuple{
@@ -573,7 +583,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 		sql    string
 		data   interface{}
 		result []map[string]interface{}
-	}{
+	}{ //0
 		{
 			sql: "SELECT * FROM tbl WHERE abc*2+3 > 12 AND abc < 20",
 			data: &xsql.Tuple{
@@ -586,7 +596,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				"abc": float64(6), //json marshall problem
 			}},
 		},
-
+		//1
 		{
 			sql: "SELECT abc FROM tbl WHERE abc*2+3 > 12 OR def = \"hello\"",
 			data: &xsql.Tuple{
@@ -600,7 +610,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				"abc": float64(34),
 			}},
 		},
-
+		//2
 		{
 			sql: "SELECT id1 FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
 			data: xsql.WindowTuplesSet{
@@ -630,6 +640,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				"id1": float64(3),
 			}},
 		},
+		//3
 		{
 			sql: "SELECT id1 FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
 			data: xsql.WindowTuplesSet{
@@ -657,6 +668,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				"id1": float64(3),
 			}},
 		},
+		//4
 		{
 			sql: "SELECT * FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
 			data: xsql.WindowTuplesSet{
@@ -689,6 +701,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				"f1":  "v1",
 			}},
 		},
+		//5
 		{
 			sql: "SELECT * FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
 			data: xsql.WindowTuplesSet{
@@ -721,6 +734,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				"f1":  "v1",
 			}},
 		},
+		//6
 		{
 			sql: "SELECT src1.* FROM src1 WHERE f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
 			data: xsql.WindowTuplesSet{
@@ -753,6 +767,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				"f1":  "v1",
 			}},
 		},
+		//7
 		{
 			sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
 			data: &xsql.JoinTupleSets{
@@ -784,6 +799,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				"id1": float64(3),
 			}},
 		},
+		//8
 		{
 			sql: "SELECT id1 FROM src1 left join src2 on src1.id1 = src2.id2 WHERE src1.f1 = \"v1\" GROUP BY TUMBLINGWINDOW(ss, 10)",
 			data: &xsql.JoinTupleSets{
@@ -813,6 +829,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				"id1": float64(2),
 			}, {}},
 		},
+		//9
 		{
 			sql: "SELECT abc FROM tbl group by abc",
 			data: xsql.GroupedTuplesSet{
@@ -832,6 +849,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				"abc": float64(6),
 			}},
 		},
+		//10
 		{
 			sql: "SELECT abc FROM tbl group by abc",
 			data: xsql.GroupedTuplesSet{
@@ -848,6 +866,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 			},
 			result: []map[string]interface{}{{}},
 		},
+		//11
 		{
 			sql: "SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1",
 			data: xsql.GroupedTuplesSet{
@@ -878,6 +897,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				"id1": float64(2),
 			}},
 		},
+		//12
 		{
 			sql: "SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1",
 			data: xsql.GroupedTuplesSet{
@@ -906,6 +926,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				"id1": float64(1),
 			}, {}},
 		},
+		//13
 		{
 			sql: "SELECT src2.id2 FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10)",
 			data: xsql.GroupedTuplesSet{
@@ -945,6 +966,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				"id2": float64(4),
 			}, {}},
 		},
+		//14
 		{
 			sql: "SELECT src1.*, f2 FROM src1 left join src2 GROUP BY TUMBLINGWINDOW(ss, 10)",
 			data: &xsql.JoinTupleSets{
@@ -981,6 +1003,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				"f1":  "v1",
 			}},
 		},
+		//15
 		{
 			sql: "SELECT * FROM src1 left join src2 GROUP BY TUMBLINGWINDOW(ss, 10)",
 			data: &xsql.JoinTupleSets{
@@ -1017,6 +1040,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				"f1": "v1",
 			}},
 		},
+		//16
 		{
 			sql: "SELECT src1.* FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1",
 			data: xsql.GroupedTuplesSet{
@@ -1049,6 +1073,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				"f1":  "v2",
 			}},
 		},
+		//17
 		{
 			sql: "SELECT src2.id2, src1.* FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10)",
 			data: xsql.GroupedTuplesSet{
@@ -1095,6 +1120,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				"f1":  "v1",
 			}},
 		},
+		//18
 		{
 			sql: "SELECT src2.id2, src1.* FROM src1 left join src2 on src1.id1 = src2.id2 GROUP BY src2.f2, TUMBLINGWINDOW(ss, 10)",
 			data: xsql.GroupedTuplesSet{
@@ -1177,6 +1203,7 @@ func TestProjectPlan_Funcs(t *testing.T) {
 		data   interface{}
 		result []map[string]interface{}
 	}{
+		//0
 		{
 			sql: "SELECT round(a) as r FROM test",
 			data: &xsql.Tuple{
@@ -1188,7 +1215,9 @@ func TestProjectPlan_Funcs(t *testing.T) {
 			result: []map[string]interface{}{{
 				"r": float64(48),
 			}},
-		}, {
+		},
+		//1
+		{
 			sql: "SELECT round(a) as r FROM test GROUP BY TumblingWindow(ss, 10)",
 			data: xsql.WindowTuplesSet{
 				Content: []xsql.WindowTuples{
@@ -1216,7 +1245,9 @@ func TestProjectPlan_Funcs(t *testing.T) {
 			}, {
 				"r": float64(123124),
 			}},
-		}, {
+		},
+		//2
+		{
 			sql: "SELECT round(a) as r FROM test GROUP BY TumblingWindow(ss, 10)",
 			data: xsql.WindowTuplesSet{
 				Content: []xsql.WindowTuples{
@@ -1244,7 +1275,9 @@ func TestProjectPlan_Funcs(t *testing.T) {
 			}, {
 				"r": float64(123124),
 			}},
-		}, {
+		},
+		//3
+		{
 			sql: "SELECT round(a) as r FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
 			data: &xsql.JoinTupleSets{
 				Content: []xsql.JoinTuple{
@@ -1275,7 +1308,9 @@ func TestProjectPlan_Funcs(t *testing.T) {
 			}, {
 				"r": float64(89),
 			}},
-		}, {
+		},
+		//4
+		{
 			sql: "SELECT CONCAT(test.id, test.a, test1.b) as concat FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
 			data: &xsql.JoinTupleSets{
 				Content: []xsql.JoinTuple{
@@ -1306,7 +1341,9 @@ func TestProjectPlan_Funcs(t *testing.T) {
 			}, {
 				"concat": "388.886",
 			}},
-		}, {
+		},
+		//5
+		{
 			sql: "SELECT count(a) as r FROM test",
 			data: &xsql.Tuple{
 				Emitter: "test",
@@ -1317,7 +1354,9 @@ func TestProjectPlan_Funcs(t *testing.T) {
 			result: []map[string]interface{}{{
 				"r": float64(1),
 			}},
-		}, {
+		},
+		//6
+		{
 			sql: "SELECT meta(test.device) as d FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
 			data: &xsql.JoinTupleSets{
 				Content: []xsql.JoinTuple{
@@ -1387,6 +1426,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 		data   interface{}
 		result []map[string]interface{}
 	}{
+		//0
 		{
 			sql: "SELECT count(*) as c, round(a) as r, window_start() as ws, window_end() as we FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
 			data: xsql.GroupedTuplesSet{
@@ -1443,6 +1483,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 				"we": float64(1541152487013),
 			}},
 		},
+		//1
 		{
 			sql: "SELECT count(a) as c, avg(a) as a, sum(a) as s, min(a) as min, max(a) as max FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
 			data: xsql.GroupedTuplesSet{
@@ -1493,6 +1534,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 				"max": 89.03,
 			}},
 		},
+		//2
 		{
 			sql: "SELECT avg(a) FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
 			data: xsql.GroupedTuplesSet{
@@ -1547,6 +1589,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 				"avg": 51.815,
 			}},
 		},
+		//3
 		{
 			sql: "SELECT max(a) FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
 			data: xsql.GroupedTuplesSet{
@@ -1595,6 +1638,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 				"max": 89.03,
 			}},
 		},
+		//4
 		{
 			sql: "SELECT min(a), window_start(), window_end() FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
 			data: &xsql.JoinTupleSets{
@@ -1629,7 +1673,9 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 				"window_start": float64(1541152486013),
 				"window_end":   float64(1541152487013),
 			}},
-		}, {
+		},
+		//5
+		{
 			sql: "SELECT count(*) as all, count(a) as c, avg(a) as a, sum(a) as s, min(a) as min, max(a) as max FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10)",
 			data: &xsql.JoinTupleSets{
 				Content: []xsql.JoinTuple{
@@ -1662,7 +1708,9 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 				"min": 68.55,
 				"max": 177.51,
 			}},
-		}, {
+		},
+		//6
+		{
 			sql: "SELECT sum(a), window_start() as ws, window_end() FROM test GROUP BY TumblingWindow(ss, 10)",
 			data: xsql.WindowTuplesSet{
 				Content: []xsql.WindowTuples{{
@@ -1691,7 +1739,9 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 				"ws":         float64(1541152486013),
 				"window_end": float64(1541152487013),
 			}},
-		}, {
+		},
+		//7
+		{
 			sql: "SELECT sum(a) as s FROM test GROUP BY TumblingWindow(ss, 10)",
 			data: xsql.WindowTuplesSet{
 				Content: []xsql.WindowTuples{{
@@ -1714,7 +1764,9 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 			result: []map[string]interface{}{{
 				"s": float64(123203),
 			}},
-		}, {
+		},
+		//8
+		{
 			sql: "SELECT sum(a) FROM test GROUP BY TumblingWindow(ss, 10)",
 			data: xsql.WindowTuplesSet{
 				Content: []xsql.WindowTuples{{
@@ -1737,7 +1789,9 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 			result: []map[string]interface{}{{
 				"sum": float64(123203),
 			}},
-		}, {
+		},
+		//9
+		{
 			sql: "SELECT count(*) as all, count(a) as c, avg(a) as a, sum(a) as s, min(a) as min, max(a) as max  FROM test GROUP BY TumblingWindow(ss, 10)",
 			data: xsql.WindowTuplesSet{
 				Content: []xsql.WindowTuples{{
@@ -1766,6 +1820,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 				"max": float64(53),
 			}},
 		},
+		//10
 		{
 			sql: "SELECT count(*), meta(test1.device) FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
 			data: xsql.GroupedTuplesSet{
@@ -1810,6 +1865,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 				"meta":  "devicec",
 			}},
 		},
+		//11
 		{
 			sql: "SELECT count(*) as c, meta(test1.device) as d FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
 			data: xsql.GroupedTuplesSet{
@@ -1853,7 +1909,9 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 				"c": float64(2),
 				"d": "devicec",
 			}},
-		}, {
+		},
+		//12
+		{
 			sql: "SELECT * FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
 			data: xsql.GroupedTuplesSet{
 				{
@@ -1910,7 +1968,9 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 				"id":    float64(2),
 				"r":     float64(89),
 			}},
-		}, {
+		},
+		//13
+		{
 			sql: "SELECT collect(a) as r1 FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
 			data: xsql.GroupedTuplesSet{
 				{
@@ -1949,7 +2009,9 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 			result: []map[string]interface{}{{
 				"r1": []interface{}{122.33, 177.51},
 			}, {"r1": []interface{}{89.03, 14.6}}},
-		}, {
+		},
+		//14
+		{
 			sql: "SELECT collect(*)[1] as c1 FROM test GROUP BY TumblingWindow(ss, 10)",
 			data: xsql.WindowTuplesSet{
 				Content: []xsql.WindowTuples{
@@ -1979,7 +2041,9 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 					"a": float64(27),
 				},
 			}},
-		}, {
+		},
+		//15
+		{
 			sql: "SELECT collect(*)[1]->a as c1 FROM test GROUP BY TumblingWindow(ss, 10)",
 			data: xsql.WindowTuplesSet{
 				Content: []xsql.WindowTuples{{
@@ -2002,7 +2066,9 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 			result: []map[string]interface{}{{
 				"c1": float64(27),
 			}},
-		}, {
+		},
+		//16
+		{
 			sql: "SELECT collect(*)[1]->sl[0] as c1 FROM test GROUP BY TumblingWindow(ss, 10)",
 			data: xsql.WindowTuplesSet{
 				Content: []xsql.WindowTuples{{
@@ -2025,7 +2091,9 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 			result: []map[string]interface{}{{
 				"c1": "new",
 			}},
-		}, {
+		},
+		//17
+		{
 			sql: "SELECT deduplicate(id, true) as r1 FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
 			data: xsql.GroupedTuplesSet{
 				{
@@ -2072,7 +2140,9 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 						map[string]interface{}{"a": 14.6, "color": "w1", "id": float64(4)}},
 				},
 			},
-		}, {
+		},
+		//18
+		{
 			sql: "SELECT deduplicate(a, false)->a as c1 FROM test GROUP BY TumblingWindow(ss, 10)",
 			data: xsql.WindowTuplesSet{
 				Content: []xsql.WindowTuples{{
@@ -2095,7 +2165,9 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 			result: []map[string]interface{}{{
 				"c1": float64(123123),
 			}},
-		}, {
+		},
+		//19
+		{
 			sql: "SELECT deduplicate(a, false) as c1 FROM test GROUP BY TumblingWindow(ss, 10)",
 			data: xsql.WindowTuplesSet{
 				Content: []xsql.WindowTuples{{
@@ -2116,7 +2188,9 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 				},
 			},
 			result: []map[string]interface{}{{}},
-		}, {
+		},
+		//20
+		{
 			sql: "SELECT deduplicate(a, false) as c1 FROM test GROUP BY TumblingWindow(ss, 10)",
 			data: xsql.WindowTuplesSet{
 				Content: []xsql.WindowTuples{{
@@ -2138,6 +2212,31 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 			},
 			result: []map[string]interface{}{{}},
 		},
+		//21  when got column after group by operation, return the first tuple's column
+		{
+			sql: "SELECT A.module, A.topic , max(A.value), B.topic as var2, max(B.value) as max2, C.topic as var3, max(C.value) as max3 FROM A FULL JOIN B on A.module=B.module FULL JOIN C on A.module=C.module GROUP BY A.module, TUMBLINGWINDOW(ss, 10)",
+			data: xsql.GroupedTuplesSet{
+				{
+					Content: []xsql.DataValuer{
+						&xsql.JoinTuple{
+							Tuples: []xsql.Tuple{
+								{Emitter: "B", Message: xsql.Message{"module": 1, "topic": "moduleB topic", "value": 1}},
+							},
+						},
+						&xsql.JoinTuple{
+							Tuples: []xsql.Tuple{
+								{Emitter: "C", Message: xsql.Message{"module": 1, "topic": "moduleC topic", "value": 100}},
+							},
+						},
+					},
+				},
+			},
+			result: []map[string]interface{}{{
+				"var2": "moduleB topic",
+				"max2": float64(1),
+				"max3": float64(100),
+			}},
+		},
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
@@ -2176,11 +2275,14 @@ func TestProjectPlanError(t *testing.T) {
 		data   interface{}
 		result interface{}
 	}{
+		//0
 		{
 			sql:    "SELECT a FROM test",
 			data:   errors.New("an error from upstream"),
 			result: errors.New("an error from upstream"),
-		}, {
+		},
+		//1
+		{
 			sql: "SELECT a * 5 FROM test",
 			data: &xsql.Tuple{
 				Emitter: "test",
@@ -2189,7 +2291,9 @@ func TestProjectPlanError(t *testing.T) {
 				},
 			},
 			result: errors.New("run Select error: invalid operation string(val_a) * int64(5)"),
-		}, {
+		},
+		//2
+		{
 			sql: `SELECT a[0]->b AS ab FROM test`,
 			data: &xsql.Tuple{
 				Emitter: "test",
@@ -2198,7 +2302,9 @@ func TestProjectPlanError(t *testing.T) {
 				},
 			},
 			result: errors.New("run Select error: invalid operation string(common string) [] *xsql.BracketEvalResult(&{0 0})"),
-		}, {
+		},
+		//3
+		{
 			sql: `SELECT round(a) as r FROM test`,
 			data: &xsql.Tuple{
 				Emitter: "test",
@@ -2207,7 +2313,9 @@ func TestProjectPlanError(t *testing.T) {
 				},
 			},
 			result: errors.New("run Select error: call func round error: only float64 & int type are supported"),
-		}, {
+		},
+		//4
+		{
 			sql: `SELECT round(a) as r FROM test`,
 			data: &xsql.Tuple{
 				Emitter: "test",
@@ -2216,7 +2324,9 @@ func TestProjectPlanError(t *testing.T) {
 				},
 			},
 			result: errors.New("run Select error: call func round error: only float64 & int type are supported"),
-		}, {
+		},
+		//5
+		{
 			sql: "SELECT avg(a) as avg FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
 			data: xsql.GroupedTuplesSet{
 				{
@@ -2265,7 +2375,9 @@ func TestProjectPlanError(t *testing.T) {
 				},
 			},
 			result: errors.New("run Select error: call func avg error: requires float64 but found string(dde)"),
-		}, {
+		},
+		//6
+		{
 			sql: "SELECT sum(a) as sum FROM test GROUP BY TumblingWindow(ss, 10)",
 			data: xsql.WindowTuplesSet{
 				Content: []xsql.WindowTuples{{
@@ -2286,7 +2398,9 @@ func TestProjectPlanError(t *testing.T) {
 				},
 			},
 			result: errors.New("run Select error: call func sum error: requires int but found string(ddd)"),
-		}, {
+		},
+		//7
+		{
 			sql: `SELECT a[0]->b AS ab FROM test`,
 			data: &xsql.Tuple{
 				Emitter: "test",