浏览代码

test(stream): add test for errors during stream execution

ngjaying 5 年之前
父节点
当前提交
c01c88b521
共有 1 个文件被更改,包括 303 次插入4 次删除
  1. 303 4
      xsql/processors/xsql_processor_test.go

+ 303 - 4
xsql/processors/xsql_processor_test.go

@@ -743,7 +743,7 @@ func getMockSourceL(name string, done <-chan int, size int) *nodes.SourceNode {
 			{
 				Emitter: name,
 				Message: map[string]interface{}{
-					"size": 2,
+					"size": 3,
 					"ts":   1541152487632,
 				},
 				Timestamp: 1541152487632,
@@ -766,7 +766,7 @@ func getMockSourceL(name string, done <-chan int, size int) *nodes.SourceNode {
 				Timestamp: 1541152489252,
 			},
 		}
-	case "demo1":
+	case "ldemo1":
 		data = []*xsql.Tuple{
 			{
 				Emitter: name,
@@ -800,7 +800,7 @@ func getMockSourceL(name string, done <-chan int, size int) *nodes.SourceNode {
 				Message: map[string]interface{}{
 					"temp": 27.4,
 					"hum":  80,
-					"ts":   1541152488442,
+					"ts":   "1541152488442",
 				},
 				Timestamp: 1541152488442,
 			},
@@ -814,7 +814,7 @@ func getMockSourceL(name string, done <-chan int, size int) *nodes.SourceNode {
 				Timestamp: 1541152489252,
 			},
 		}
-	case "sessionDemo":
+	case "lsessionDemo":
 		data = []*xsql.Tuple{
 			{
 				Emitter: name,
@@ -1551,6 +1551,305 @@ func TestWindow(t *testing.T) {
 	}
 }
 
+func TestWindowError(t *testing.T) {
+	var tests = []struct {
+		name string
+		sql  string
+		size int
+		r    [][]map[string]interface{}
+		m    map[string]interface{}
+	}{
+		{
+			name: `rule1`,
+			sql:  `SELECT size * 3 FROM ldemo GROUP BY TUMBLINGWINDOW(ss, 2)`,
+			size: 5,
+			r: [][]map[string]interface{}{
+				{{
+					"error": "run Select error: invalid operation string(string) * int64(3)",
+				}},
+			},
+			m: map[string]interface{}{
+				"op_preprocessor_ldemo_0_exceptions_total":   int64(0),
+				"op_preprocessor_ldemo_0_process_latency_ms": int64(0),
+				"op_preprocessor_ldemo_0_records_in_total":   int64(5),
+				"op_preprocessor_ldemo_0_records_out_total":  int64(5),
+
+				"op_project_0_exceptions_total":   int64(1),
+				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_records_in_total":   int64(1),
+				"op_project_0_records_out_total":  int64(0),
+
+				"sink_mockSink_0_exceptions_total":  int64(0),
+				"sink_mockSink_0_records_in_total":  int64(1),
+				"sink_mockSink_0_records_out_total": int64(1),
+
+				"source_ldemo_0_exceptions_total":  int64(0),
+				"source_ldemo_0_records_in_total":  int64(5),
+				"source_ldemo_0_records_out_total": int64(5),
+
+				"op_window_0_exceptions_total":   int64(0),
+				"op_window_0_process_latency_ms": int64(0),
+				"op_window_0_records_in_total":   int64(5),
+				"op_window_0_records_out_total":  int64(1),
+			},
+		}, {
+			name: `rule2`,
+			sql:  `SELECT color, ts FROM ldemo where size > 2 GROUP BY tumblingwindow(ss, 1)`,
+			size: 5,
+			r: [][]map[string]interface{}{
+				{{
+					"error": "run Where error: invalid operation string(string) > int64(2)",
+				}}, {{
+					"ts": float64(1541152487632),
+				}},
+			},
+			m: map[string]interface{}{
+				"op_preprocessor_ldemo_0_exceptions_total":   int64(0),
+				"op_preprocessor_ldemo_0_process_latency_ms": int64(0),
+				"op_preprocessor_ldemo_0_records_in_total":   int64(5),
+				"op_preprocessor_ldemo_0_records_out_total":  int64(5),
+
+				"op_project_0_exceptions_total":   int64(1),
+				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_records_in_total":   int64(2),
+				"op_project_0_records_out_total":  int64(1),
+
+				"sink_mockSink_0_exceptions_total":  int64(0),
+				"sink_mockSink_0_records_in_total":  int64(2),
+				"sink_mockSink_0_records_out_total": int64(2),
+
+				"source_ldemo_0_exceptions_total":  int64(0),
+				"source_ldemo_0_records_in_total":  int64(5),
+				"source_ldemo_0_records_out_total": int64(5),
+
+				"op_window_0_exceptions_total":   int64(0),
+				"op_window_0_process_latency_ms": int64(0),
+				"op_window_0_records_in_total":   int64(5),
+				"op_window_0_records_out_total":  int64(3),
+
+				"op_filter_0_exceptions_total":   int64(1),
+				"op_filter_0_process_latency_ms": int64(0),
+				"op_filter_0_records_in_total":   int64(3),
+				"op_filter_0_records_out_total":  int64(1),
+			},
+		}, {
+			name: `rule3`,
+			sql:  `SELECT color, temp, ts FROM ldemo INNER JOIN ldemo1 ON ldemo.ts = ldemo1.ts GROUP BY SlidingWindow(ss, 1)`,
+			size: 5,
+			r: [][]map[string]interface{}{
+				{{
+					"color": "red",
+					"temp":  25.5,
+					"ts":    float64(1541152486013),
+				}}, {{
+					"color": "red",
+					"temp":  25.5,
+					"ts":    float64(1541152486013),
+				}}, {{
+					"color": "red",
+					"temp":  25.5,
+					"ts":    float64(1541152486013),
+				}}, {{
+					"temp": 28.1,
+					"ts":   float64(1541152487632),
+				}}, {{
+					"temp": 28.1,
+					"ts":   float64(1541152487632),
+				}}, {{
+					"error": "run Join error: invalid operation int64(1541152487632) = string(1541152488442)",
+				}}, {{
+					"error": "run Join error: invalid operation int64(1541152488442) = string(1541152488442)",
+				}}, {{
+					"error": "run Join error: invalid operation int64(1541152488442) = string(1541152488442)",
+				}},
+			},
+			m: map[string]interface{}{
+				"op_preprocessor_ldemo_0_exceptions_total":   int64(0),
+				"op_preprocessor_ldemo_0_process_latency_ms": int64(0),
+				"op_preprocessor_ldemo_0_records_in_total":   int64(5),
+				"op_preprocessor_ldemo_0_records_out_total":  int64(5),
+
+				"op_preprocessor_ldemo1_0_exceptions_total":   int64(0),
+				"op_preprocessor_ldemo1_0_process_latency_ms": int64(0),
+				"op_preprocessor_ldemo1_0_records_in_total":   int64(5),
+				"op_preprocessor_ldemo1_0_records_out_total":  int64(5),
+
+				"op_project_0_exceptions_total":   int64(3),
+				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_records_in_total":   int64(8),
+				"op_project_0_records_out_total":  int64(5),
+
+				"sink_mockSink_0_exceptions_total":  int64(0),
+				"sink_mockSink_0_records_in_total":  int64(8),
+				"sink_mockSink_0_records_out_total": int64(8),
+
+				"source_ldemo_0_exceptions_total":  int64(0),
+				"source_ldemo_0_records_in_total":  int64(5),
+				"source_ldemo_0_records_out_total": int64(5),
+
+				"source_ldemo1_0_exceptions_total":  int64(0),
+				"source_ldemo1_0_records_in_total":  int64(5),
+				"source_ldemo1_0_records_out_total": int64(5),
+
+				"op_window_0_exceptions_total":   int64(0),
+				"op_window_0_process_latency_ms": int64(0),
+				"op_window_0_records_in_total":   int64(10),
+				"op_window_0_records_out_total":  int64(10),
+
+				"op_join_0_exceptions_total":   int64(3),
+				"op_join_0_process_latency_ms": int64(0),
+				"op_join_0_records_in_total":   int64(10),
+				"op_join_0_records_out_total":  int64(5),
+			},
+			//}, {
+			//	name: `rule4`,
+			//	sql:  `SELECT color FROM ldemo GROUP BY SlidingWindow(ss, 2), color having color > 5`,
+			//	size: 5,
+			//	r: [][]map[string]interface{}{
+			//		{{
+			//			"color": "red",
+			//		}}, {{
+			//			"color": "blue",
+			//		}, {
+			//			"color": "red",
+			//		}}, {{
+			//			"color": "blue",
+			//		}, {
+			//			"color": "red",
+			//		}}, {{
+			//			"color": "blue",
+			//		}, {
+			//			"color": "yellow",
+			//		}}, {{
+			//			"color": "blue",
+			//		}, {
+			//			"color": "red",
+			//		}, {
+			//			"color": "yellow",
+			//		}},
+			//	},
+			//	m: map[string]interface{}{
+			//		"op_preprocessor_demo_0_exceptions_total":   int64(0),
+			//		"op_preprocessor_demo_0_process_latency_ms": int64(0),
+			//		"op_preprocessor_demo_0_records_in_total":   int64(5),
+			//		"op_preprocessor_demo_0_records_out_total":  int64(5),
+			//
+			//		"op_project_0_exceptions_total":   int64(0),
+			//		"op_project_0_process_latency_ms": int64(0),
+			//		"op_project_0_records_in_total":   int64(5),
+			//		"op_project_0_records_out_total":  int64(5),
+			//
+			//		"sink_mockSink_0_exceptions_total":  int64(0),
+			//		"sink_mockSink_0_records_in_total":  int64(5),
+			//		"sink_mockSink_0_records_out_total": int64(5),
+			//
+			//		"source_demo_0_exceptions_total":  int64(0),
+			//		"source_demo_0_records_in_total":  int64(5),
+			//		"source_demo_0_records_out_total": int64(5),
+			//
+			//		"op_window_0_exceptions_total":   int64(0),
+			//		"op_window_0_process_latency_ms": int64(0),
+			//		"op_window_0_records_in_total":   int64(5),
+			//		"op_window_0_records_out_total":  int64(5),
+			//
+			//		"op_aggregate_0_exceptions_total":   int64(0),
+			//		"op_aggregate_0_process_latency_ms": int64(0),
+			//		"op_aggregate_0_records_in_total":   int64(5),
+			//		"op_aggregate_0_records_out_total":  int64(5),
+			//
+			//		"op_order_0_exceptions_total":   int64(0),
+			//		"op_order_0_process_latency_ms": int64(0),
+			//		"op_order_0_records_in_total":   int64(5),
+			//		"op_order_0_records_out_total":  int64(5),
+			//	},
+		},
+	}
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	createSchemalessStreams(t)
+	defer dropSchemalessStreams(t)
+	common.InitConf()
+	for i, tt := range tests {
+		test.ResetClock(1541152486000)
+		p := NewRuleProcessor(DbDir)
+		parser := xsql.NewParser(strings.NewReader(tt.sql))
+		var (
+			sources []*nodes.SourceNode
+			syncs   []chan int
+		)
+		if stmt, err := xsql.Language.Parse(parser); err != nil {
+			t.Errorf("parse sql %s error: %s", tt.sql, err)
+		} else {
+			if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
+				t.Errorf("sql %s is not a select statement", tt.sql)
+			} else {
+				streams := xsql.GetStreams(selectStmt)
+				for _, stream := range streams {
+					next := make(chan int)
+					syncs = append(syncs, next)
+					source := getMockSourceL(stream, next, tt.size)
+					sources = append(sources, source)
+				}
+			}
+		}
+		tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql}, sources)
+		if err != nil {
+			t.Error(err)
+		}
+		mockSink := test.NewMockSink()
+		sink := nodes.NewSinkNodeWithSink("mockSink", mockSink)
+		tp.AddSink(inputs, sink)
+		errCh := tp.Open()
+		func() {
+			for i := 0; i < tt.size*len(syncs); i++ {
+				syncs[i%len(syncs)] <- i
+				for {
+					time.Sleep(1)
+					if getMetric(tp, "op_window_0_records_in_total") == (i + 1) {
+						break
+					}
+				}
+				select {
+				case err = <-errCh:
+					t.Log(err)
+					tp.Cancel()
+					return
+				default:
+				}
+			}
+			retry := 100
+			for ; retry > 0; retry-- {
+				if err := compareMetrics(tp, tt.m, tt.sql); err == nil {
+					break
+				}
+				t.Logf("wait to try another %d times", retry)
+				time.Sleep(time.Duration(retry) * time.Millisecond)
+			}
+			if retry == 0 {
+				err := compareMetrics(tp, tt.m, tt.sql)
+				t.Errorf("could not get correct metrics: %v", err)
+			}
+		}()
+		results := mockSink.GetResults()
+		var maps [][]map[string]interface{}
+		for _, v := range results {
+			var mapRes []map[string]interface{}
+			err := json.Unmarshal(v, &mapRes)
+			if err != nil {
+				t.Errorf("Failed to parse the input into map")
+				continue
+			}
+			maps = append(maps, mapRes)
+		}
+		if !reflect.DeepEqual(tt.r, maps) {
+			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
+		}
+		if err := compareMetrics(tp, tt.m, tt.sql); err != nil {
+			t.Errorf("%d. %q\n\n%v", i, tt.sql, err)
+		}
+		tp.Cancel()
+	}
+}
+
 func createEventStreams(t *testing.T) {
 	p := NewStreamProcessor(path.Join(DbDir, "stream"))
 	demo := `CREATE STREAM demoE (