浏览代码

feat(window): pass preprocessor error immediately

ngjaying 5 年之前
父节点
当前提交
2ba0a8a134
共有 3 个文件被更改,包括 209 次插入10 次删除
  1. 190 0
      xsql/processors/xsql_processor_test.go
  2. 10 4
      xstream/operators/watermark.go
  3. 9 6
      xstream/operators/window_op.go

+ 190 - 0
xsql/processors/xsql_processor_test.go

@@ -1464,6 +1464,64 @@ func TestWindow(t *testing.T) {
 				"op_join_0_records_in_total":   int64(10),
 				"op_join_0_records_out_total":  int64(8),
 			},
+		}, {
+			name: `rule7`,
+			sql:  `SELECT * FROM demoE GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
+			size: 5,
+			r: [][]map[string]interface{}{
+				{{
+					"error": "error in preprocessor: invalid data type for color, expect string but found int(3)",
+				}},
+				{{
+					"color": "blue",
+					"size":  float64(6),
+					"ts":    float64(1541152486822),
+				}},
+				{{
+					"color": "blue",
+					"size":  float64(6),
+					"ts":    float64(1541152486822),
+				}, {
+					"color": "blue",
+					"size":  float64(2),
+					"ts":    float64(1541152487632),
+				}},
+				{{
+					"error": "error in preprocessor: invalid data type for color, expect string but found int(7)",
+				}},
+				{{
+					"color": "blue",
+					"size":  float64(2),
+					"ts":    float64(1541152487632),
+				}},
+				{{
+					"error": "error in preprocessor: invalid data type for size, expect bigint but found string(blue)",
+				}},
+			},
+			m: map[string]interface{}{
+				"op_preprocessor_demoE_0_exceptions_total":   int64(3),
+				"op_preprocessor_demoE_0_process_latency_ms": int64(0),
+				"op_preprocessor_demoE_0_records_in_total":   int64(5),
+				"op_preprocessor_demoE_0_records_out_total":  int64(2),
+
+				"op_project_0_exceptions_total":   int64(3),
+				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_records_in_total":   int64(6),
+				"op_project_0_records_out_total":  int64(3),
+
+				"sink_mockSink_0_exceptions_total":  int64(0),
+				"sink_mockSink_0_records_in_total":  int64(6),
+				"sink_mockSink_0_records_out_total": int64(6),
+
+				"source_demoE_0_exceptions_total":  int64(0),
+				"source_demoE_0_records_in_total":  int64(5),
+				"source_demoE_0_records_out_total": int64(5),
+
+				"op_window_0_exceptions_total":   int64(3),
+				"op_window_0_process_latency_ms": int64(0),
+				"op_window_0_records_in_total":   int64(5),
+				"op_window_0_records_out_total":  int64(3),
+			},
 		},
 	}
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
@@ -1879,6 +1937,15 @@ func createEventStreams(t *testing.T) {
 	if err != nil {
 		t.Log(err)
 	}
+	demoErr := `CREATE STREAM demoErr (
+					color STRING,
+					size BIGINT,
+					ts BIGINT
+				) WITH (DATASOURCE="demoErr", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
+	_, err = p.ExecStmt(demoErr)
+	if err != nil {
+		t.Log(err)
+	}
 }
 
 func dropEventStreams(t *testing.T) {
@@ -1898,6 +1965,11 @@ func dropEventStreams(t *testing.T) {
 	if err != nil {
 		t.Log(err)
 	}
+	demoErr := `DROP STREAM demoErr`
+	_, err = p.ExecStmt(demoErr)
+	if err != nil {
+		t.Log(err)
+	}
 }
 
 func getEventMockSource(name string, done <-chan int, size int) *nodes.SourceNode {
@@ -2128,6 +2200,63 @@ func getEventMockSource(name string, done <-chan int, size int) *nodes.SourceNod
 				Timestamp: 1541152499202,
 			},
 		}
+	case "demoErr":
+		data = []*xsql.Tuple{
+			{
+				Emitter: name,
+				Message: map[string]interface{}{
+					"color": "red",
+					"size":  3,
+					"ts":    1541152486013,
+				},
+				Timestamp: 1541152486013,
+			},
+			{
+				Emitter: name,
+				Message: map[string]interface{}{
+					"color": 2,
+					"size":  "blue",
+					"ts":    1541152487632,
+				},
+				Timestamp: 1541152487632,
+			},
+			{
+				Emitter: name,
+				Message: map[string]interface{}{
+					"color": "red",
+					"size":  1,
+					"ts":    1541152489252,
+				},
+				Timestamp: 1541152489252,
+			},
+			{ //dropped item
+				Emitter: name,
+				Message: map[string]interface{}{
+					"color": "blue",
+					"size":  6,
+					"ts":    1541152486822,
+				},
+				Timestamp: 1541152486822,
+			},
+			{
+				Emitter: name,
+				Message: map[string]interface{}{
+					"color": "yellow",
+					"size":  4,
+					"ts":    1541152488442,
+				},
+				Timestamp: 1541152488442,
+			},
+			{ //To lift the watermark and issue all windows
+				Emitter: name,
+				Message: map[string]interface{}{
+					"color": "yellow",
+					"size":  4,
+					"ts":    1541152492342,
+				},
+				Timestamp: 1541152488442,
+			},
+		}
 	}
 	return nodes.NewSourceNodeWithSource(name, test.NewMockSource(data[:size], done, true), map[string]string{
 		"DATASOURCE": name,
@@ -2490,11 +2619,72 @@ func TestEventWindow(t *testing.T) {
 				"op_join_0_records_in_total":   int64(5),
 				"op_join_0_records_out_total":  int64(5),
 			},
+		}, {
+			name: `rule7`,
+			sql:  `SELECT * FROM demoErr GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
+			size: 6,
+			r: [][]map[string]interface{}{
+				{{
+					"error": "error in preprocessor: invalid data type for color, expect string but found int(2)",
+				}},
+				{{
+					"color": "red",
+					"size":  float64(3),
+					"ts":    float64(1541152486013),
+				}},
+				{{
+					"color": "red",
+					"size":  float64(3),
+					"ts":    float64(1541152486013),
+				}},
+				{{
+					"color": "yellow",
+					"size":  float64(4),
+					"ts":    float64(1541152488442),
+				}}, {{
+					"color": "yellow",
+					"size":  float64(4),
+					"ts":    float64(1541152488442),
+				}, {
+					"color": "red",
+					"size":  float64(1),
+					"ts":    float64(1541152489252),
+				}}, {{
+					"color": "red",
+					"size":  float64(1),
+					"ts":    float64(1541152489252),
+				}},
+			},
+			m: map[string]interface{}{
+				"op_preprocessor_demoErr_0_exceptions_total":   int64(1),
+				"op_preprocessor_demoErr_0_process_latency_ms": int64(0),
+				"op_preprocessor_demoErr_0_records_in_total":   int64(6),
+				"op_preprocessor_demoErr_0_records_out_total":  int64(5),
+
+				"op_project_0_exceptions_total":   int64(1),
+				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_records_in_total":   int64(6),
+				"op_project_0_records_out_total":  int64(5),
+
+				"sink_mockSink_0_exceptions_total":  int64(0),
+				"sink_mockSink_0_records_in_total":  int64(6),
+				"sink_mockSink_0_records_out_total": int64(6),
+
+				"source_demoErr_0_exceptions_total":  int64(0),
+				"source_demoErr_0_records_in_total":  int64(6),
+				"source_demoErr_0_records_out_total": int64(6),
+
+				"op_window_0_exceptions_total":   int64(1),
+				"op_window_0_process_latency_ms": int64(0),
+				"op_window_0_records_in_total":   int64(6),
+				"op_window_0_records_out_total":  int64(5),
+			},
 		},
 	}
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
 	createEventStreams(t)
 	defer dropEventStreams(t)
+	common.InitConf()
 	for i, tt := range tests {
 		test.ResetClock(1541152486000)
 		p := NewRuleProcessor(DbDir)

+ 10 - 4
xstream/operators/watermark.go

@@ -7,6 +7,7 @@ import (
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xsql"
 	"github.com/emqx/kuiper/xstream/api"
+	"github.com/emqx/kuiper/xstream/nodes"
 	"math"
 	"sort"
 	"time"
@@ -204,11 +205,12 @@ func (o *WindowOperator) execEventWindow(ctx api.StreamContext, errCh chan<- err
 				o.statManager.IncTotalExceptions()
 				break
 			}
-			if d, ok := item.(xsql.Event); !ok {
-				log.Errorf("Expect xsql.Event type")
+			switch d := item.(type) {
+			case error:
+				o.statManager.IncTotalRecordsIn()
+				nodes.Broadcast(o.outputs, d, ctx)
 				o.statManager.IncTotalExceptions()
-				break
-			} else {
+			case xsql.Event:
 				if d.IsWatermark() {
 					watermarkTs := d.GetTimestamp()
 					windowEndTs := nextWindowEndTs
@@ -238,6 +240,10 @@ func (o *WindowOperator) execEventWindow(ctx api.StreamContext, errCh chan<- err
 					}
 				}
 				o.statManager.ProcessTimeEnd()
+			default:
+				o.statManager.IncTotalRecordsIn()
+				nodes.Broadcast(o.outputs, fmt.Errorf("run Window error: expect xsql.Event type but got %[1]T(%[1]v)", d), ctx)
+				o.statManager.IncTotalExceptions()
 			}
 		// is cancelling
 		case <-ctx.Done():

+ 9 - 6
xstream/operators/window_op.go

@@ -141,11 +141,11 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, errCh chan<
 				o.statManager.IncTotalExceptions()
 				break
 			}
-			if d, ok := item.(*xsql.Tuple); !ok {
-				log.Errorf("Expect xsql.Tuple type")
+			switch d := item.(type) {
+			case error:
+				nodes.Broadcast(o.outputs, d, ctx)
 				o.statManager.IncTotalExceptions()
-				break
-			} else {
+			case *xsql.Tuple:
 				log.Debugf("Event window receive tuple %s", d.Message)
 				inputs = append(inputs, d)
 				switch o.window.Type {
@@ -162,9 +162,12 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, errCh chan<
 						timeout = timeoutTicker.C
 					}
 				}
+				o.statManager.ProcessTimeEnd()
+				o.statManager.SetBufferLength(int64(len(o.input)))
+			default:
+				nodes.Broadcast(o.outputs, fmt.Errorf("run Window error: expect xsql.Tuple type but got %[1]T(%[1]v)", d), ctx)
+				o.statManager.IncTotalExceptions()
 			}
-			o.statManager.ProcessTimeEnd()
-			o.statManager.SetBufferLength(int64(len(o.input)))
 		case now := <-c:
 			if len(inputs) > 0 {
 				o.statManager.ProcessTimeStart()