Browse Source

fix(watermark): watermark tuple must propagate

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 1 năm trước cách đây
mục cha
commit
37b1b480bf

+ 3 - 6
internal/topo/node/event_window_trigger.go

@@ -123,15 +123,12 @@ func (o *WindowOperator) execEventWindow(ctx api.StreamContext, inputs []*xsql.T
 				o.statManager.IncTotalExceptions("input channel closed")
 				break
 			}
-			if _, ok := item.(*xsql.WatermarkTuple); !ok {
-				processed := false
-				if item, processed = o.preprocess(item); processed {
-					break
-				}
+			processed := false
+			if item, processed = o.preprocess(item); processed {
+				break
 			}
 			switch d := item.(type) {
 			case error:
-				o.statManager.IncTotalRecordsIn()
 				_ = o.Broadcast(d)
 				o.statManager.IncTotalExceptions(d.Error())
 			case *xsql.WatermarkTuple:

+ 2 - 0
internal/topo/node/join_align_node.go

@@ -107,6 +107,8 @@ func (n *JoinAlignNode) Exec(ctx api.StreamContext, errCh chan<- error) {
 					case error:
 						_ = n.Broadcast(d)
 						n.statManager.IncTotalExceptions(d.Error())
+					case *xsql.WatermarkTuple:
+						_ = n.Broadcast(d)
 					case *xsql.Tuple:
 						log.Debugf("JoinAlignNode receive tuple input %s", d)
 						temp := &xsql.WindowTuples{

+ 10 - 8
internal/topo/node/lookup_node.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -128,18 +128,20 @@ func (n *LookupNode) Exec(ctx api.StreamContext, errCh chan<- error) {
 					}
 					switch d := item.(type) {
 					case error:
-						n.Broadcast(d)
+						_ = n.Broadcast(d)
 						n.statManager.IncTotalExceptions(d.Error())
+					case *xsql.WatermarkTuple:
+						_ = n.Broadcast(d)
 					case xsql.TupleRow:
 						log.Debugf("Lookup Node receive tuple input %s", d)
 						n.statManager.ProcessTimeStart()
 						sets := &xsql.JoinTuples{Content: make([]*xsql.JoinTuple, 0)}
 						err := n.lookup(ctx, d, fv, ns, sets, c)
 						if err != nil {
-							n.Broadcast(err)
+							_ = n.Broadcast(err)
 							n.statManager.IncTotalExceptions(err.Error())
 						} else {
-							n.Broadcast(sets)
+							_ = n.Broadcast(sets)
 							n.statManager.IncTotalRecordsOut()
 						}
 						n.statManager.ProcessTimeEnd()
@@ -160,17 +162,17 @@ func (n *LookupNode) Exec(ctx api.StreamContext, errCh chan<- error) {
 							return true, nil
 						})
 						if err != nil {
-							n.Broadcast(err)
+							_ = n.Broadcast(err)
 							n.statManager.IncTotalExceptions(err.Error())
 						} else {
-							n.Broadcast(sets)
+							_ = n.Broadcast(sets)
 							n.statManager.IncTotalRecordsOut()
 						}
 						n.statManager.ProcessTimeEnd()
 						n.statManager.SetBufferLength(int64(len(n.input)))
 					default:
 						e := fmt.Errorf("run lookup node error: invalid input type but got %[1]T(%[1]v)", d)
-						n.Broadcast(e)
+						_ = n.Broadcast(e)
 						n.statManager.IncTotalExceptions(e.Error())
 					}
 				case <-ctx.Done():
@@ -281,7 +283,7 @@ func (n *LookupNode) merge(ctx api.StreamContext, d xsql.TupleRow, r []map[strin
 		sets.Content = append(sets.Content, merged)
 	}
 
-	n.Broadcast(sets)
+	_ = n.Broadcast(sets)
 	n.statManager.ProcessTimeEnd()
 	n.statManager.IncTotalRecordsOut()
 	n.statManager.SetBufferLength(int64(len(n.input)))

+ 0 - 5
internal/topo/node/node.go

@@ -177,11 +177,6 @@ func (o *defaultSinkNode) preprocess(data interface{}) (interface{}, bool) {
 			}
 		}
 	}
-	// Filter all the watermark tuples.
-	// Only event time window op needs this, so handle it there
-	if _, ok := data.(*xsql.WatermarkTuple); ok {
-		return nil, true
-	}
 	return data, false
 }
 

+ 10 - 0
internal/topo/node/operations.go

@@ -127,6 +127,16 @@ func (o *UnaryOperator) doOp(ctx api.StreamContext, errCh chan<- error) {
 			if item, processed = o.preprocess(item); processed {
 				break
 			}
+			switch d := item.(type) {
+			case error:
+				_ = o.Broadcast(d)
+				stats.IncTotalExceptions(d.Error())
+				continue
+			case *xsql.WatermarkTuple:
+				_ = o.Broadcast(d)
+				continue
+			}
+
 			stats.IncTotalRecordsIn()
 			stats.ProcessTimeStart()
 			result := o.op.Apply(exeCtx, item, fv, afv)

+ 4 - 6
internal/topo/node/sink_node.go

@@ -179,9 +179,8 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 							for {
 								select {
 								case data := <-m.input:
-									if temp, processed := m.preprocess(data); !processed {
-										data = temp
-									} else {
+									processed := false
+									if data, processed = m.preprocess(data); processed {
 										break
 									}
 									stats.SetBufferLength(int64(len(m.input)))
@@ -206,9 +205,8 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 								for {
 									select {
 									case data := <-m.input:
-										if temp, processed := m.preprocess(data); !processed {
-											data = temp
-										} else {
+										processed := false
+										if data, processed = m.preprocess(data); processed {
 											break
 										}
 										stats.IncTotalRecordsIn()

+ 3 - 3
internal/topo/node/source_node.go

@@ -182,12 +182,12 @@ func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
 									continue
 								case error:
 									logger.Errorf("Source %s preprocess error: %s", ctx.GetOpId(), val)
-									m.Broadcast(val)
+									_ = m.Broadcast(val)
 									stats.IncTotalExceptions(val.Error())
 								default:
-									m.Broadcast(val)
+									_ = m.Broadcast(val)
+									stats.IncTotalRecordsOut()
 								}
-								stats.IncTotalRecordsOut()
 								stats.SetBufferLength(int64(buffer.GetLength()))
 								if rw, ok := si.source.(api.Rewindable); ok {
 									if offset, err := rw.GetOffset(); err != nil {

+ 5 - 3
internal/topo/node/switch_node.go

@@ -110,8 +110,10 @@ func (n *SwitchNode) Exec(ctx api.StreamContext, errCh chan<- error) {
 					var ve *xsql.ValuerEval
 					switch d := item.(type) {
 					case error:
+						_ = n.Broadcast(d)
 						n.statManager.IncTotalExceptions(d.Error())
-						break
+					case *xsql.WatermarkTuple:
+						_ = n.Broadcast(d)
 					case xsql.TupleRow:
 						ctx.GetLogger().Debugf("SwitchNode receive tuple input %s", d)
 						ve = &xsql.ValuerEval{Valuer: xsql.MultiValuer(d, fv)}
@@ -121,7 +123,7 @@ func (n *SwitchNode) Exec(ctx api.StreamContext, errCh chan<- error) {
 						ve = &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(d, fv, d, fv, afv, &xsql.WildcardValuer{Data: d})}
 					default:
 						e := fmt.Errorf("run switch node error: invalid input type but got %[1]T(%[1]v)", d)
-						n.Broadcast(e)
+						_ = n.Broadcast(e)
 						n.statManager.IncTotalExceptions(e.Error())
 						break
 					}
@@ -134,7 +136,7 @@ func (n *SwitchNode) Exec(ctx api.StreamContext, errCh chan<- error) {
 							n.statManager.IncTotalExceptions(r.Error())
 						case bool:
 							if r {
-								n.outputNodes[i].Broadcast(item)
+								_ = n.outputNodes[i].Broadcast(item)
 								if n.conf.StopAtFirstMatch {
 									break caseLoop
 								}

+ 4 - 4
internal/topo/topotest/rule_test.go

@@ -373,7 +373,7 @@ func TestSingleSQL(t *testing.T) {
 			M: map[string]interface{}{
 				"op_3_project_0_exceptions_total":   int64(2),
 				"op_3_project_0_process_latency_us": int64(0),
-				"op_3_project_0_records_in_total":   int64(4),
+				"op_3_project_0_records_in_total":   int64(2),
 				"op_3_project_0_records_out_total":  int64(2),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
@@ -382,11 +382,11 @@ func TestSingleSQL(t *testing.T) {
 
 				"source_demoError_0_exceptions_total":  int64(2),
 				"source_demoError_0_records_in_total":  int64(5),
-				"source_demoError_0_records_out_total": int64(5),
+				"source_demoError_0_records_out_total": int64(3),
 
 				"op_2_filter_0_exceptions_total":   int64(2),
 				"op_2_filter_0_process_latency_us": int64(0),
-				"op_2_filter_0_records_in_total":   int64(5),
+				"op_2_filter_0_records_in_total":   int64(3),
 				"op_2_filter_0_records_out_total":  int64(2),
 			},
 		},
@@ -937,7 +937,7 @@ func TestSingleSQLError(t *testing.T) {
 			M: map[string]interface{}{
 				"op_3_project_0_exceptions_total":   int64(1),
 				"op_3_project_0_process_latency_us": int64(0),
-				"op_3_project_0_records_in_total":   int64(3),
+				"op_3_project_0_records_in_total":   int64(2),
 				"op_3_project_0_records_out_total":  int64(2),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),

+ 9 - 9
internal/topo/topotest/window_rule_test.go

@@ -452,7 +452,7 @@ func TestWindow(t *testing.T) {
 			M: map[string]interface{}{
 				"op_3_project_0_exceptions_total":   int64(3),
 				"op_3_project_0_process_latency_us": int64(0),
-				"op_3_project_0_records_in_total":   int64(7),
+				"op_3_project_0_records_in_total":   int64(4),
 				"op_3_project_0_records_out_total":  int64(4),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
@@ -461,7 +461,7 @@ func TestWindow(t *testing.T) {
 
 				"source_demoError_0_exceptions_total":  int64(3),
 				"source_demoError_0_records_in_total":  int64(5),
-				"source_demoError_0_records_out_total": int64(5),
+				"source_demoError_0_records_out_total": int64(2),
 
 				"op_2_window_0_exceptions_total":   int64(3),
 				"op_2_window_0_process_latency_us": int64(0),
@@ -1095,7 +1095,7 @@ func TestEventWindow(t *testing.T) {
 			M: map[string]interface{}{
 				"op_4_project_0_exceptions_total":   int64(1),
 				"op_4_project_0_process_latency_us": int64(0),
-				"op_4_project_0_records_in_total":   int64(6),
+				"op_4_project_0_records_in_total":   int64(5),
 				"op_4_project_0_records_out_total":  int64(5),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
@@ -1104,11 +1104,11 @@ func TestEventWindow(t *testing.T) {
 
 				"source_demoErr_0_exceptions_total":  int64(1),
 				"source_demoErr_0_records_in_total":  int64(6),
-				"source_demoErr_0_records_out_total": int64(6),
+				"source_demoErr_0_records_out_total": int64(5),
 
 				"op_3_window_0_exceptions_total":   int64(1),
 				"op_3_window_0_process_latency_us": int64(0),
-				"op_3_window_0_records_in_total":   int64(4),
+				"op_3_window_0_records_in_total":   int64(3),
 				"op_3_window_0_records_out_total":  int64(5),
 			},
 		}, {
@@ -1317,7 +1317,7 @@ func TestWindowError(t *testing.T) {
 			M: map[string]interface{}{
 				"op_4_project_0_exceptions_total":   int64(1),
 				"op_4_project_0_process_latency_us": int64(0),
-				"op_4_project_0_records_in_total":   int64(5),
+				"op_4_project_0_records_in_total":   int64(4),
 				"op_4_project_0_records_out_total":  int64(4),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
@@ -1371,7 +1371,7 @@ func TestWindowError(t *testing.T) {
 			M: map[string]interface{}{
 				"op_5_project_0_exceptions_total":   int64(3),
 				"op_5_project_0_process_latency_us": int64(0),
-				"op_5_project_0_records_in_total":   int64(8),
+				"op_5_project_0_records_in_total":   int64(5),
 				"op_5_project_0_records_out_total":  int64(5),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
@@ -1415,7 +1415,7 @@ func TestWindowError(t *testing.T) {
 			M: map[string]interface{}{
 				"op_6_project_0_exceptions_total":   int64(3),
 				"op_6_project_0_process_latency_us": int64(0),
-				"op_6_project_0_records_in_total":   int64(5),
+				"op_6_project_0_records_in_total":   int64(2),
 				"op_6_project_0_records_out_total":  int64(2),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),
@@ -1459,7 +1459,7 @@ func TestWindowError(t *testing.T) {
 			M: map[string]interface{}{
 				"op_4_project_0_exceptions_total":   int64(1),
 				"op_4_project_0_process_latency_us": int64(0),
-				"op_4_project_0_records_in_total":   int64(4),
+				"op_4_project_0_records_in_total":   int64(3),
 				"op_4_project_0_records_out_total":  int64(3),
 
 				"sink_mockSink_0_exceptions_total":  int64(0),

+ 1 - 1
internal/xsql/valuer.go

@@ -271,7 +271,7 @@ func (v *ValuerEval) Eval(expr ast.Expr) interface{} {
 		}
 		return &BracketEvalResult{Start: ii, End: ii}
 	case *ast.Call:
-		// The analytic function are calculated prior to all ops, so just get the cached field value
+		// The analytic functions are calculated prior to all ops, so just get the cached field value
 		if expr.Cached {
 			val, ok := v.Valuer.Value(expr.CachedField, "")
 			if ok {