Browse Source

feat(planner): merge preprocessor into source node

Allow the shared source to be shared in preprocess level

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 3 năm trước cách đây
mục cha
commit
b593a495d9

+ 1 - 4
docs/en_US/cli/rules.md

@@ -212,14 +212,11 @@ Sample result:
     "source_stream"
   ],
   "edges": {
-    "op_preprocessor_stream": [
-      "op_project"
-    ],
     "op_project": [
       "sink_log"
     ],
     "source_stream": [
-      "op_preprocessor_stream"
+      "op_project"
     ]
   }
 }

+ 0 - 6
docs/en_US/edgex/edgex_rule_engine_tutorial.md

@@ -265,12 +265,6 @@ Connecting to 127.0.0.1:20498...
   "source_demo_0_process_latency_ms": 0,
   "source_demo_0_buffer_length": 0,
   "source_demo_0_last_invocation": "2020-04-17T10:30:09.294337",
-  "op_preprocessor_demo_0_records_in_total": 29,
-  "op_preprocessor_demo_0_records_out_total": 29,
-  "op_preprocessor_demo_0_exceptions_total": 0,
-  "op_preprocessor_demo_0_process_latency_ms": 0,
-  "op_preprocessor_demo_0_buffer_length": 0,
-  "op_preprocessor_demo_0_last_invocation": "2020-04-17T10:30:09.294355",
   "op_filter_0_records_in_total": 29,
   "op_filter_0_records_out_total": 21,
   "op_filter_0_exceptions_total": 0,

+ 1 - 4
docs/en_US/restapi/rules.md

@@ -179,14 +179,11 @@ Response Sample:
     "source_stream"
   ],
   "edges": {
-    "op_preprocessor_stream": [
-      "op_project"
-    ],
     "op_project": [
       "sink_log"
     ],
     "source_stream": [
-      "op_preprocessor_stream"
+      "op_project"
     ]
   }
 }

+ 0 - 6
docs/zh_CN/edgex/edgex_rule_engine_tutorial.md

@@ -252,12 +252,6 @@ Connecting to 127.0.0.1:20498...
   "source_demo_0_process_latency_ms": 0,
   "source_demo_0_buffer_length": 0,
   "source_demo_0_last_invocation": "2020-04-17T10:30:09.294337",
-  "op_preprocessor_demo_0_records_in_total": 29,
-  "op_preprocessor_demo_0_records_out_total": 29,
-  "op_preprocessor_demo_0_exceptions_total": 0,
-  "op_preprocessor_demo_0_process_latency_ms": 0,
-  "op_preprocessor_demo_0_buffer_length": 0,
-  "op_preprocessor_demo_0_last_invocation": "2020-04-17T10:30:09.294355",
   "op_filter_0_records_in_total": 29,
   "op_filter_0_records_out_total": 21,
   "op_filter_0_exceptions_total": 0,

+ 16 - 3
internal/topo/node/source_node.go

@@ -32,9 +32,10 @@ type SourceNode struct {
 	props        map[string]interface{}
 	mutex        sync.RWMutex
 	sources      []api.Source
+	preprocessOp UnOperation
 }
 
-func NewSourceNode(name string, st ast.StreamType, options *ast.Options) *SourceNode {
+func NewSourceNode(name string, st ast.StreamType, op UnOperation, options *ast.Options, sendError bool) *SourceNode {
 	t := options.TYPE
 	if t == "" {
 		if st == ast.TypeStream {
@@ -50,8 +51,10 @@ func NewSourceNode(name string, st ast.StreamType, options *ast.Options) *Source
 			name:        name,
 			outputs:     make(map[string]chan<- interface{}),
 			concurrency: 1,
+			sendError:   sendError,
 		},
-		options: options,
+		preprocessOp: op,
+		options:      options,
 	}
 }
 
@@ -131,10 +134,20 @@ func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
 						stats.IncTotalRecordsIn()
 						stats.ProcessTimeStart()
 						tuple := &xsql.Tuple{Emitter: m.name, Message: data.Message(), Timestamp: conf.GetNowInMilli(), Metadata: data.Meta()}
+						processedData := m.preprocessOp.Apply(ctx, tuple, nil, nil)
 						stats.ProcessTimeEnd()
 						logger.Debugf("source node %s is sending tuple %+v of timestamp %d", m.name, tuple, tuple.Timestamp)
 						//blocking
-						m.Broadcast(tuple)
+						switch val := processedData.(type) {
+						case nil:
+							continue
+						case error:
+							logger.Errorf("Source %s preprocess error: %s", ctx.GetOpId(), val)
+							m.Broadcast(val)
+							stats.IncTotalExceptions()
+						default:
+							m.Broadcast(val)
+						}
 						stats.IncTotalRecordsOut()
 						stats.SetBufferLength(int64(buffer.GetLength()))
 						if rw, ok := si.source.(api.Rewindable); ok {

+ 4 - 4
internal/topo/node/source_node_test.go

@@ -42,10 +42,10 @@ func TestGetConf_Apply(t *testing.T) {
 			},
 		},
 	}
-	n := NewSourceNode("test", ast.TypeStream, &ast.Options{
+	n := NewSourceNode("test", ast.TypeStream, nil, &ast.Options{
 		DATASOURCE: "RFC_READ_TABLE",
 		TYPE:       "test",
-	})
+	}, false)
 	contextLogger := conf.Log.WithField("rule", "test")
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 	conf := getSourceConf(ctx, n.sourceType, n.options)
@@ -64,11 +64,11 @@ func TestGetConfAndConvert_Apply(t *testing.T) {
 		},
 		"deduplicate": 50,
 	}
-	n := NewSourceNode("test", ast.TypeStream, &ast.Options{
+	n := NewSourceNode("test", ast.TypeStream, nil, &ast.Options{
 		DATASOURCE: "test",
 		TYPE:       "random",
 		CONF_KEY:   "dedup",
-	})
+	}, false)
 	contextLogger := conf.Log.WithField("rule", "test")
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 	conf := getSourceConf(ctx, n.sourceType, n.options)

+ 6 - 6
internal/topo/node/source_pool_test.go

@@ -24,30 +24,30 @@ import (
 )
 
 func TestSourcePool(t *testing.T) {
-	n := NewSourceNode("test", ast.TypeStream, &ast.Options{
+	n := NewSourceNode("test", ast.TypeStream, nil, &ast.Options{
 		DATASOURCE: "demo",
 		TYPE:       "mock",
 		SHARED:     true,
-	})
+	}, false)
 	n.concurrency = 2
 	contextLogger := conf.Log.WithField("rule", "mockRule0")
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
 	n.ctx = ctx.WithMeta("mockRule0", "test", tempStore)
-	n1 := NewSourceNode("test", ast.TypeStream, &ast.Options{
+	n1 := NewSourceNode("test", ast.TypeStream, nil, &ast.Options{
 		DATASOURCE: "demo1",
 		TYPE:       "mock",
 		SHARED:     true,
-	})
+	}, false)
 
 	contextLogger = conf.Log.WithField("rule", "mockRule1")
 	ctx = context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 	tempStore, _ = state.CreateStore("mockRule1", api.AtMostOnce)
 	n1.ctx = ctx.WithMeta("mockRule1", "test1", tempStore)
-	n2 := NewSourceNode("test2", ast.TypeStream, &ast.Options{
+	n2 := NewSourceNode("test2", ast.TypeStream, nil, &ast.Options{
 		DATASOURCE: "demo1",
 		TYPE:       "mock",
-	})
+	}, false)
 	contextLogger = conf.Log.WithField("rule", "mockRule2")
 	ctx = context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 	tempStore, _ = state.CreateStore("mockRule2", api.AtMostOnce)

+ 2 - 2
internal/topo/operator/preprocessor.go

@@ -44,7 +44,7 @@ func NewPreprocessor(fields []interface{}, allMeta bool, metaFields []string, ie
  *	input: *xsql.Tuple
  *	output: *xsql.Tuple
  */
-func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
+func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}, _ *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
 	log := ctx.GetLogger()
 	tuple, ok := data.(*xsql.Tuple)
 	if !ok {
@@ -53,7 +53,7 @@ func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}, fv *xsql.F
 
 	log.Debugf("preprocessor receive %s", tuple.Message)
 
-	result, err := p.processField(tuple, fv)
+	result, err := p.processField(tuple, nil)
 	if err != nil {
 		return fmt.Errorf("error in preprocessor: %s", err)
 	}

+ 8 - 6
internal/topo/planner/planner.go

@@ -110,7 +110,7 @@ func buildOps(lp LogicalPlan, tp *topo.Topo, options *api.RuleOption, sources []
 	}
 	newIndex++
 	var (
-		op  node.OperatorNode
+		op  api.Emitter
 		err error
 	)
 	switch t := lp.(type) {
@@ -123,7 +123,7 @@ func buildOps(lp LogicalPlan, tp *topo.Topo, options *api.RuleOption, sources []
 			}
 			var srcNode *node.SourceNode
 			if len(sources) == 0 {
-				sourceNode := node.NewSourceNode(string(t.name), t.streamStmt.StreamType, t.streamStmt.Options)
+				sourceNode := node.NewSourceNode(string(t.name), t.streamStmt.StreamType, pp, t.streamStmt.Options, options.SendError)
 				srcNode = sourceNode
 			} else {
 				srcNode = getMockSource(sources, string(t.name))
@@ -132,8 +132,8 @@ func buildOps(lp LogicalPlan, tp *topo.Topo, options *api.RuleOption, sources []
 				}
 			}
 			tp.AddSrc(srcNode)
-			op = Transform(pp, fmt.Sprintf("%d_preprocessor_%s", newIndex, t.name), options)
 			inputs = []api.Emitter{srcNode}
+			op = srcNode
 		case ast.TypeTable:
 			pp, err := operator.NewTableProcessor(string(t.name), t.streamFields, t.streamStmt.Options)
 			if err != nil {
@@ -144,11 +144,11 @@ func buildOps(lp LogicalPlan, tp *topo.Topo, options *api.RuleOption, sources []
 				srcNode = getMockSource(sources, string(t.name))
 			}
 			if srcNode == nil {
-				srcNode = node.NewSourceNode(string(t.name), t.streamStmt.StreamType, t.streamStmt.Options)
+				srcNode = node.NewSourceNode(string(t.name), t.streamStmt.StreamType, pp, t.streamStmt.Options, options.SendError)
 			}
 			tp.AddSrc(srcNode)
-			op = Transform(pp, fmt.Sprintf("%d_tableprocessor_%s", newIndex, t.name), options)
 			inputs = []api.Emitter{srcNode}
+			op = srcNode
 		}
 	case *WindowPlan:
 		if t.condition != nil {
@@ -186,7 +186,9 @@ func buildOps(lp LogicalPlan, tp *topo.Topo, options *api.RuleOption, sources []
 	if uop, ok := op.(*node.UnaryOperator); ok {
 		uop.SetConcurrency(options.Concurrency)
 	}
-	tp.AddOperator(inputs, op)
+	if onode, ok := op.(node.OperatorNode); ok {
+		tp.AddOperator(inputs, onode)
+	}
 	return op, newIndex, nil
 }
 

+ 0 - 6
internal/topo/topotest/checkpoint_test.go

@@ -72,9 +72,6 @@ func TestCheckpoint(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_demo_0_records_in_total":  int64(3),
-				"op_1_preprocessor_demo_0_records_out_total": int64(3),
-
 				"op_3_project_0_records_in_total":  int64(3),
 				"op_3_project_0_records_out_total": int64(3),
 
@@ -91,9 +88,6 @@ func TestCheckpoint(t *testing.T) {
 		PauseSize: 3,
 		Cc:        2,
 		PauseMetric: map[string]interface{}{
-			"op_1_preprocessor_demo_0_records_in_total":  int64(3),
-			"op_1_preprocessor_demo_0_records_out_total": int64(3),
-
 			"op_3_project_0_records_in_total":  int64(1),
 			"op_3_project_0_records_out_total": int64(1),
 

+ 0 - 15
internal/topo/topotest/plugin_rule_test.go

@@ -211,11 +211,6 @@ func TestFuncState(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_text_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_text_0_process_latency_us": int64(0),
-				"op_1_preprocessor_text_0_records_in_total":   int64(8),
-				"op_1_preprocessor_text_0_records_out_total":  int64(8),
-
 				"op_2_project_0_exceptions_total":   int64(0),
 				"op_2_project_0_process_latency_us": int64(0),
 				"op_2_project_0_records_in_total":   int64(8),
@@ -276,11 +271,6 @@ func TestFuncStateCheckpoint(t *testing.T) {
 					}},
 				},
 				M: map[string]interface{}{
-					"op_1_preprocessor_text_0_exceptions_total":   int64(0),
-					"op_1_preprocessor_text_0_process_latency_us": int64(0),
-					"op_1_preprocessor_text_0_records_in_total":   int64(6),
-					"op_1_preprocessor_text_0_records_out_total":  int64(6),
-
 					"op_2_project_0_exceptions_total":   int64(0),
 					"op_2_project_0_process_latency_us": int64(0),
 					"op_2_project_0_records_in_total":   int64(6),
@@ -298,11 +288,6 @@ func TestFuncStateCheckpoint(t *testing.T) {
 			PauseSize: 3,
 			Cc:        1,
 			PauseMetric: map[string]interface{}{
-				"op_1_preprocessor_text_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_text_0_process_latency_us": int64(0),
-				"op_1_preprocessor_text_0_records_in_total":   int64(3),
-				"op_1_preprocessor_text_0_records_out_total":  int64(3),
-
 				"op_2_project_0_exceptions_total":   int64(0),
 				"op_2_project_0_process_latency_us": int64(0),
 				"op_2_project_0_records_in_total":   int64(3),

+ 7 - 113
internal/topo/topotest/rule_test.go

@@ -59,11 +59,6 @@ func TestSingleSQL(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_demo_0_process_latency_us": int64(0),
-				"op_1_preprocessor_demo_0_records_in_total":   int64(5),
-				"op_1_preprocessor_demo_0_records_out_total":  int64(5),
-
 				"op_2_project_0_exceptions_total":   int64(0),
 				"op_2_project_0_process_latency_us": int64(0),
 				"op_2_project_0_records_in_total":   int64(5),
@@ -80,9 +75,8 @@ func TestSingleSQL(t *testing.T) {
 			T: &topo.PrintableTopo{
 				Sources: []string{"source_demo"},
 				Edges: map[string][]string{
-					"source_demo":            {"op_1_preprocessor_demo"},
-					"op_1_preprocessor_demo": {"op_2_project"},
-					"op_2_project":           {"sink_mockSink"},
+					"source_demo":  {"op_2_project"},
+					"op_2_project": {"sink_mockSink"},
 				},
 			},
 		}, {
@@ -99,11 +93,6 @@ func TestSingleSQL(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_demo_0_process_latency_us": int64(0),
-				"op_1_preprocessor_demo_0_records_in_total":   int64(5),
-				"op_1_preprocessor_demo_0_records_out_total":  int64(5),
-
 				"op_3_project_0_exceptions_total":   int64(0),
 				"op_3_project_0_process_latency_us": int64(0),
 				"op_3_project_0_records_in_total":   int64(2),
@@ -136,11 +125,6 @@ func TestSingleSQL(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_demo_0_process_latency_us": int64(0),
-				"op_1_preprocessor_demo_0_records_in_total":   int64(5),
-				"op_1_preprocessor_demo_0_records_out_total":  int64(5),
-
 				"op_3_project_0_exceptions_total":   int64(0),
 				"op_3_project_0_process_latency_us": int64(0),
 				"op_3_project_0_records_in_total":   int64(2),
@@ -179,11 +163,6 @@ func TestSingleSQL(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_demoError_0_exceptions_total":   int64(2),
-				"op_1_preprocessor_demoError_0_process_latency_us": int64(0),
-				"op_1_preprocessor_demoError_0_records_in_total":   int64(5),
-				"op_1_preprocessor_demoError_0_records_out_total":  int64(3),
-
 				"op_3_project_0_exceptions_total":   int64(2),
 				"op_3_project_0_process_latency_us": int64(0),
 				"op_3_project_0_records_in_total":   int64(4),
@@ -193,7 +172,7 @@ func TestSingleSQL(t *testing.T) {
 				"sink_mockSink_0_records_in_total":  int64(4),
 				"sink_mockSink_0_records_out_total": int64(4),
 
-				"source_demoError_0_exceptions_total":  int64(0),
+				"source_demoError_0_exceptions_total":  int64(2),
 				"source_demoError_0_records_in_total":  int64(5),
 				"source_demoError_0_records_out_total": int64(5),
 
@@ -228,11 +207,6 @@ func TestSingleSQL(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_demo_0_process_latency_us": int64(0),
-				"op_1_preprocessor_demo_0_records_in_total":   int64(5),
-				"op_1_preprocessor_demo_0_records_out_total":  int64(5),
-
 				"op_2_project_0_exceptions_total":   int64(0),
 				"op_2_project_0_process_latency_us": int64(0),
 				"op_2_project_0_records_in_total":   int64(5),
@@ -260,11 +234,6 @@ func TestSingleSQL(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_demo_0_process_latency_us": int64(0),
-				"op_1_preprocessor_demo_0_records_in_total":   int64(5),
-				"op_1_preprocessor_demo_0_records_out_total":  int64(5),
-
 				"op_3_project_0_exceptions_total":   int64(0),
 				"op_3_project_0_process_latency_us": int64(0),
 				"op_3_project_0_records_in_total":   int64(2),
@@ -304,11 +273,6 @@ func TestSingleSQL(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_demo1_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_demo1_0_process_latency_us": int64(0),
-				"op_1_preprocessor_demo1_0_records_in_total":   int64(5),
-				"op_1_preprocessor_demo1_0_records_out_total":  int64(5),
-
 				"op_2_project_0_exceptions_total":   int64(0),
 				"op_2_project_0_process_latency_us": int64(0),
 				"op_2_project_0_records_in_total":   int64(5),
@@ -340,11 +304,6 @@ func TestSingleSQL(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_demo1_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_demo1_0_process_latency_us": int64(0),
-				"op_1_preprocessor_demo1_0_records_in_total":   int64(5),
-				"op_1_preprocessor_demo1_0_records_out_total":  int64(5),
-
 				"op_3_project_0_exceptions_total":   int64(0),
 				"op_3_project_0_process_latency_us": int64(0),
 				"op_3_project_0_records_in_total":   int64(2),
@@ -394,11 +353,6 @@ func TestSingleSQL(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_demo_0_process_latency_us": int64(0),
-				"op_1_preprocessor_demo_0_records_in_total":   int64(5),
-				"op_1_preprocessor_demo_0_records_out_total":  int64(5),
-
 				"op_2_project_0_exceptions_total":   int64(0),
 				"op_2_project_0_process_latency_us": int64(0),
 				"op_2_project_0_records_in_total":   int64(5),
@@ -415,9 +369,8 @@ func TestSingleSQL(t *testing.T) {
 			T: &topo.PrintableTopo{
 				Sources: []string{"source_demo"},
 				Edges: map[string][]string{
-					"source_demo":            {"op_1_preprocessor_demo"},
-					"op_1_preprocessor_demo": {"op_2_project"},
-					"op_2_project":           {"sink_mockSink"},
+					"source_demo":  {"op_2_project"},
+					"op_2_project": {"sink_mockSink"},
 				},
 			},
 		}, {
@@ -448,14 +401,6 @@ func TestSingleSQL(t *testing.T) {
 			},
 			W: 15,
 			M: map[string]interface{}{
-				"op_1_preprocessor_demo_0_exceptions_total":  int64(0),
-				"op_1_preprocessor_demo_0_records_in_total":  int64(5),
-				"op_1_preprocessor_demo_0_records_out_total": int64(5),
-
-				"op_2_tableprocessor_table1_0_exceptions_total":  int64(0),
-				"op_2_tableprocessor_table1_0_records_in_total":  int64(4),
-				"op_2_tableprocessor_table1_0_records_out_total": int64(1),
-
 				"op_3_join_aligner_0_records_in_total":  int64(6),
 				"op_3_join_aligner_0_records_out_total": int64(5),
 
@@ -477,7 +422,7 @@ func TestSingleSQL(t *testing.T) {
 
 				"source_table1_0_exceptions_total":  int64(0),
 				"source_table1_0_records_in_total":  int64(4),
-				"source_table1_0_records_out_total": int64(4),
+				"source_table1_0_records_out_total": int64(1),
 			},
 		}, {
 			Name: `TestSingleSQLRule11`,
@@ -494,14 +439,6 @@ func TestSingleSQL(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_demo_0_exceptions_total":  int64(0),
-				"op_1_preprocessor_demo_0_records_in_total":  int64(5),
-				"op_1_preprocessor_demo_0_records_out_total": int64(5),
-
-				"op_2_tableprocessor_demoTable_0_exceptions_total":  int64(0),
-				"op_2_tableprocessor_demoTable_0_records_in_total":  int64(5),
-				"op_2_tableprocessor_demoTable_0_records_out_total": int64(5),
-
 				"op_3_join_aligner_0_records_in_total":  int64(10),
 				"op_3_join_aligner_0_records_out_total": int64(5),
 
@@ -544,14 +481,6 @@ func TestSingleSQL(t *testing.T) {
 			},
 			W: 15,
 			M: map[string]interface{}{
-				"op_1_preprocessor_demo_0_exceptions_total":  int64(0),
-				"op_1_preprocessor_demo_0_records_in_total":  int64(5),
-				"op_1_preprocessor_demo_0_records_out_total": int64(5),
-
-				"op_2_tableprocessor_table1_0_exceptions_total":  int64(0),
-				"op_2_tableprocessor_table1_0_records_in_total":  int64(4),
-				"op_2_tableprocessor_table1_0_records_out_total": int64(1),
-
 				"op_3_join_aligner_0_records_in_total":  int64(6),
 				"op_3_join_aligner_0_records_out_total": int64(5),
 
@@ -573,7 +502,7 @@ func TestSingleSQL(t *testing.T) {
 
 				"source_table1_0_exceptions_total":  int64(0),
 				"source_table1_0_records_in_total":  int64(4),
-				"source_table1_0_records_out_total": int64(4),
+				"source_table1_0_records_out_total": int64(1),
 			},
 		},
 	}
@@ -621,11 +550,6 @@ func TestSingleSQLError(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_ldemo_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_ldemo_0_process_latency_us": int64(0),
-				"op_1_preprocessor_ldemo_0_records_in_total":   int64(5),
-				"op_1_preprocessor_ldemo_0_records_out_total":  int64(5),
-
 				"op_3_project_0_exceptions_total":   int64(1),
 				"op_3_project_0_process_latency_us": int64(0),
 				"op_3_project_0_records_in_total":   int64(3),
@@ -663,11 +587,6 @@ func TestSingleSQLError(t *testing.T) {
 				{{}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_ldemo_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_ldemo_0_process_latency_us": int64(0),
-				"op_1_preprocessor_ldemo_0_records_in_total":   int64(5),
-				"op_1_preprocessor_ldemo_0_records_out_total":  int64(5),
-
 				"op_2_project_0_exceptions_total":   int64(1),
 				"op_2_project_0_process_latency_us": int64(0),
 				"op_2_project_0_records_in_total":   int64(5),
@@ -709,11 +628,6 @@ func TestSingleSQLOmitError(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_ldemo_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_ldemo_0_process_latency_us": int64(0),
-				"op_1_preprocessor_ldemo_0_records_in_total":   int64(5),
-				"op_1_preprocessor_ldemo_0_records_out_total":  int64(5),
-
 				"op_3_project_0_exceptions_total":   int64(0),
 				"op_3_project_0_process_latency_us": int64(0),
 				"op_3_project_0_records_in_total":   int64(2),
@@ -748,11 +662,6 @@ func TestSingleSQLOmitError(t *testing.T) {
 				{{}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_ldemo_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_ldemo_0_process_latency_us": int64(0),
-				"op_1_preprocessor_ldemo_0_records_in_total":   int64(5),
-				"op_1_preprocessor_ldemo_0_records_out_total":  int64(5),
-
 				"op_2_project_0_exceptions_total":   int64(1),
 				"op_2_project_0_process_latency_us": int64(0),
 				"op_2_project_0_records_in_total":   int64(5),
@@ -807,11 +716,6 @@ func TestSingleSQLTemplate(t *testing.T) {
 				},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_demo_0_process_latency_us": int64(0),
-				"op_1_preprocessor_demo_0_records_in_total":   int64(5),
-				"op_1_preprocessor_demo_0_records_out_total":  int64(5),
-
 				"op_2_project_0_exceptions_total":   int64(0),
 				"op_2_project_0_process_latency_us": int64(0),
 				"op_2_project_0_records_in_total":   int64(5),
@@ -866,11 +770,6 @@ func TestNoneSingleSQLTemplate(t *testing.T) {
 				[]byte("<div>results</div><ul><li>red - 1</li></ul>"),
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_demo_0_process_latency_us": int64(0),
-				"op_1_preprocessor_demo_0_records_in_total":   int64(5),
-				"op_1_preprocessor_demo_0_records_out_total":  int64(5),
-
 				"op_2_project_0_exceptions_total":   int64(0),
 				"op_2_project_0_process_latency_us": int64(0),
 				"op_2_project_0_records_in_total":   int64(5),
@@ -913,11 +812,6 @@ func TestSingleSQLForBinary(t *testing.T) {
 			},
 			W: 50,
 			M: map[string]interface{}{
-				"op_1_preprocessor_binDemo_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_binDemo_0_process_latency_us": int64(0),
-				"op_1_preprocessor_binDemo_0_records_in_total":   int64(1),
-				"op_1_preprocessor_binDemo_0_records_out_total":  int64(1),
-
 				"op_2_project_0_exceptions_total":   int64(0),
 				"op_2_project_0_process_latency_us": int64(0),
 				"op_2_project_0_records_in_total":   int64(1),

+ 8 - 168
internal/topo/topotest/window_rule_test.go

@@ -71,11 +71,6 @@ func TestWindow(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_demo_0_process_latency_us": int64(0),
-				"op_1_preprocessor_demo_0_records_in_total":   int64(5),
-				"op_1_preprocessor_demo_0_records_out_total":  int64(5),
-
 				"op_3_project_0_exceptions_total":   int64(0),
 				"op_3_project_0_process_latency_us": int64(0),
 				"op_3_project_0_records_in_total":   int64(4),
@@ -111,11 +106,6 @@ func TestWindow(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_demo_0_process_latency_us": int64(0),
-				"op_1_preprocessor_demo_0_records_in_total":   int64(5),
-				"op_1_preprocessor_demo_0_records_out_total":  int64(5),
-
 				"op_4_project_0_exceptions_total":   int64(0),
 				"op_4_project_0_process_latency_us": int64(0),
 				"op_4_project_0_records_in_total":   int64(2),
@@ -206,16 +196,6 @@ func TestWindow(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_demo_0_process_latency_us": int64(0),
-				"op_1_preprocessor_demo_0_records_in_total":   int64(5),
-				"op_1_preprocessor_demo_0_records_out_total":  int64(5),
-
-				"op_2_preprocessor_demo1_0_exceptions_total":   int64(0),
-				"op_2_preprocessor_demo1_0_process_latency_us": int64(0),
-				"op_2_preprocessor_demo1_0_records_in_total":   int64(5),
-				"op_2_preprocessor_demo1_0_records_out_total":  int64(5),
-
 				"op_5_project_0_exceptions_total":   int64(0),
 				"op_5_project_0_process_latency_us": int64(0),
 				"op_5_project_0_records_in_total":   int64(8),
@@ -246,13 +226,11 @@ func TestWindow(t *testing.T) {
 			T: &topo.PrintableTopo{
 				Sources: []string{"source_demo", "source_demo1"},
 				Edges: map[string][]string{
-					"source_demo":             {"op_1_preprocessor_demo"},
-					"source_demo1":            {"op_2_preprocessor_demo1"},
-					"op_1_preprocessor_demo":  {"op_3_window"},
-					"op_2_preprocessor_demo1": {"op_3_window"},
-					"op_3_window":             {"op_4_join"},
-					"op_4_join":               {"op_5_project"},
-					"op_5_project":            {"sink_mockSink"},
+					"source_demo":  {"op_3_window"},
+					"source_demo1": {"op_3_window"},
+					"op_3_window":  {"op_4_join"},
+					"op_4_join":    {"op_5_project"},
+					"op_5_project": {"sink_mockSink"},
 				},
 			},
 		}, {
@@ -292,11 +270,6 @@ func TestWindow(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_demo_0_process_latency_us": int64(0),
-				"op_1_preprocessor_demo_0_records_in_total":   int64(5),
-				"op_1_preprocessor_demo_0_records_out_total":  int64(5),
-
 				"op_5_project_0_exceptions_total":   int64(0),
 				"op_5_project_0_process_latency_us": int64(0),
 				"op_5_project_0_records_in_total":   int64(5),
@@ -348,11 +321,6 @@ func TestWindow(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_sessionDemo_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_sessionDemo_0_process_latency_us": int64(0),
-				"op_1_preprocessor_sessionDemo_0_records_in_total":   int64(11),
-				"op_1_preprocessor_sessionDemo_0_records_out_total":  int64(11),
-
 				"op_3_project_0_exceptions_total":   int64(0),
 				"op_3_project_0_process_latency_us": int64(0),
 				"op_3_project_0_records_in_total":   int64(4),
@@ -418,16 +386,6 @@ func TestWindow(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_demo_0_process_latency_us": int64(0),
-				"op_1_preprocessor_demo_0_records_in_total":   int64(5),
-				"op_1_preprocessor_demo_0_records_out_total":  int64(5),
-
-				"op_2_preprocessor_demo1_0_exceptions_total":   int64(0),
-				"op_2_preprocessor_demo1_0_process_latency_us": int64(0),
-				"op_2_preprocessor_demo1_0_records_in_total":   int64(5),
-				"op_2_preprocessor_demo1_0_records_out_total":  int64(5),
-
 				"op_5_project_0_exceptions_total":   int64(0),
 				"op_5_project_0_process_latency_us": int64(0),
 				"op_5_project_0_records_in_total":   int64(8),
@@ -489,11 +447,6 @@ func TestWindow(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_demoError_0_exceptions_total":   int64(3),
-				"op_1_preprocessor_demoError_0_process_latency_us": int64(0),
-				"op_1_preprocessor_demoError_0_records_in_total":   int64(5),
-				"op_1_preprocessor_demoError_0_records_out_total":  int64(2),
-
 				"op_3_project_0_exceptions_total":   int64(3),
 				"op_3_project_0_process_latency_us": int64(0),
 				"op_3_project_0_records_in_total":   int64(6),
@@ -503,7 +456,7 @@ func TestWindow(t *testing.T) {
 				"sink_mockSink_0_records_in_total":  int64(6),
 				"sink_mockSink_0_records_out_total": int64(6),
 
-				"source_demoError_0_exceptions_total":  int64(0),
+				"source_demoError_0_exceptions_total":  int64(3),
 				"source_demoError_0_records_in_total":  int64(5),
 				"source_demoError_0_records_out_total": int64(5),
 
@@ -525,11 +478,6 @@ func TestWindow(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_demo_0_process_latency_us": int64(0),
-				"op_1_preprocessor_demo_0_records_in_total":   int64(5),
-				"op_1_preprocessor_demo_0_records_out_total":  int64(5),
-
 				"op_5_project_0_exceptions_total":   int64(0),
 				"op_5_project_0_process_latency_us": int64(0),
 				"op_5_project_0_records_in_total":   int64(1),
@@ -592,11 +540,6 @@ func TestWindow(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_demo_0_process_latency_us": int64(0),
-				"op_1_preprocessor_demo_0_records_in_total":   int64(5),
-				"op_1_preprocessor_demo_0_records_out_total":  int64(5),
-
 				"op_4_project_0_exceptions_total":   int64(0),
 				"op_4_project_0_process_latency_us": int64(0),
 				"op_4_project_0_records_in_total":   int64(4),
@@ -624,11 +567,6 @@ func TestWindow(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_demo_0_process_latency_us": int64(0),
-				"op_1_preprocessor_demo_0_records_in_total":   int64(5),
-				"op_1_preprocessor_demo_0_records_out_total":  int64(5),
-
 				"op_3_project_0_exceptions_total":   int64(0),
 				"op_3_project_0_process_latency_us": int64(0),
 				"op_3_project_0_records_in_total":   int64(1),
@@ -660,11 +598,6 @@ func TestWindow(t *testing.T) {
 				}}, {{}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_demo_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_demo_0_process_latency_us": int64(0),
-				"op_1_preprocessor_demo_0_records_in_total":   int64(5),
-				"op_1_preprocessor_demo_0_records_out_total":  int64(5),
-
 				"op_3_project_0_exceptions_total":   int64(0),
 				"op_3_project_0_process_latency_us": int64(0),
 				"op_3_project_0_records_in_total":   int64(5),
@@ -710,14 +643,6 @@ func TestWindow(t *testing.T) {
 				"op_2_filter_0_records_in_total":   int64(5),
 				"op_2_filter_0_records_out_total":  int64(3),
 
-				"op_1_preprocessor_demo_0_exceptions_total":  int64(0),
-				"op_1_preprocessor_demo_0_records_in_total":  int64(5),
-				"op_1_preprocessor_demo_0_records_out_total": int64(5),
-
-				"op_4_tableprocessor_table1_0_exceptions_total":  int64(0),
-				"op_4_tableprocessor_table1_0_records_in_total":  int64(4),
-				"op_4_tableprocessor_table1_0_records_out_total": int64(1),
-
 				"op_5_filter_0_exceptions_total":  int64(0),
 				"op_5_filter_0_records_in_total":  int64(1),
 				"op_5_filter_0_records_out_total": int64(1),
@@ -743,7 +668,7 @@ func TestWindow(t *testing.T) {
 
 				"source_table1_0_exceptions_total":  int64(0),
 				"source_table1_0_records_in_total":  int64(4),
-				"source_table1_0_records_out_total": int64(4),
+				"source_table1_0_records_out_total": int64(1),
 			},
 		},
 	}
@@ -815,11 +740,6 @@ func TestEventWindow(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_demoE_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_demoE_0_process_latency_us": int64(0),
-				"op_1_preprocessor_demoE_0_records_in_total":   int64(6),
-				"op_1_preprocessor_demoE_0_records_out_total":  int64(6),
-
 				"op_3_project_0_exceptions_total":   int64(0),
 				"op_3_project_0_process_latency_us": int64(0),
 				"op_3_project_0_records_in_total":   int64(5),
@@ -856,11 +776,6 @@ func TestEventWindow(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_demoE_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_demoE_0_process_latency_us": int64(0),
-				"op_1_preprocessor_demoE_0_records_in_total":   int64(6),
-				"op_1_preprocessor_demoE_0_records_out_total":  int64(6),
-
 				"op_4_project_0_exceptions_total":   int64(0),
 				"op_4_project_0_process_latency_us": int64(0),
 				"op_4_project_0_records_in_total":   int64(2),
@@ -919,16 +834,6 @@ func TestEventWindow(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_demoE_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_demoE_0_process_latency_us": int64(0),
-				"op_1_preprocessor_demoE_0_records_in_total":   int64(6),
-				"op_1_preprocessor_demoE_0_records_out_total":  int64(6),
-
-				"op_2_preprocessor_demo1E_0_exceptions_total":   int64(0),
-				"op_2_preprocessor_demo1E_0_process_latency_us": int64(0),
-				"op_2_preprocessor_demo1E_0_records_in_total":   int64(6),
-				"op_2_preprocessor_demo1E_0_records_out_total":  int64(6),
-
 				"op_5_project_0_exceptions_total":   int64(0),
 				"op_5_project_0_process_latency_us": int64(0),
 				"op_5_project_0_records_in_total":   int64(5),
@@ -995,11 +900,6 @@ func TestEventWindow(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_demoE_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_demoE_0_process_latency_us": int64(0),
-				"op_1_preprocessor_demoE_0_records_in_total":   int64(6),
-				"op_1_preprocessor_demoE_0_records_out_total":  int64(6),
-
 				"op_5_project_0_exceptions_total":   int64(0),
 				"op_5_project_0_process_latency_us": int64(0),
 				"op_5_project_0_records_in_total":   int64(4),
@@ -1055,11 +955,6 @@ func TestEventWindow(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_sessionDemoE_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_sessionDemoE_0_process_latency_us": int64(0),
-				"op_1_preprocessor_sessionDemoE_0_records_in_total":   int64(12),
-				"op_1_preprocessor_sessionDemoE_0_records_out_total":  int64(12),
-
 				"op_3_project_0_exceptions_total":   int64(0),
 				"op_3_project_0_process_latency_us": int64(0),
 				"op_3_project_0_records_in_total":   int64(4),
@@ -1100,16 +995,6 @@ func TestEventWindow(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_demoE_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_demoE_0_process_latency_us": int64(0),
-				"op_1_preprocessor_demoE_0_records_in_total":   int64(6),
-				"op_1_preprocessor_demoE_0_records_out_total":  int64(6),
-
-				"op_2_preprocessor_demo1E_0_exceptions_total":   int64(0),
-				"op_2_preprocessor_demo1E_0_process_latency_us": int64(0),
-				"op_2_preprocessor_demo1E_0_records_in_total":   int64(6),
-				"op_2_preprocessor_demo1E_0_records_out_total":  int64(6),
-
 				"op_5_project_0_exceptions_total":   int64(0),
 				"op_5_project_0_process_latency_us": int64(0),
 				"op_5_project_0_records_in_total":   int64(5),
@@ -1172,11 +1057,6 @@ func TestEventWindow(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_demoErr_0_exceptions_total":   int64(1),
-				"op_1_preprocessor_demoErr_0_process_latency_us": int64(0),
-				"op_1_preprocessor_demoErr_0_records_in_total":   int64(6),
-				"op_1_preprocessor_demoErr_0_records_out_total":  int64(5),
-
 				"op_3_project_0_exceptions_total":   int64(1),
 				"op_3_project_0_process_latency_us": int64(0),
 				"op_3_project_0_records_in_total":   int64(6),
@@ -1186,7 +1066,7 @@ func TestEventWindow(t *testing.T) {
 				"sink_mockSink_0_records_in_total":  int64(6),
 				"sink_mockSink_0_records_out_total": int64(6),
 
-				"source_demoErr_0_exceptions_total":  int64(0),
+				"source_demoErr_0_exceptions_total":  int64(1),
 				"source_demoErr_0_records_in_total":  int64(6),
 				"source_demoErr_0_records_out_total": int64(6),
 
@@ -1242,11 +1122,6 @@ func TestEventWindow(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_sessionDemoE_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_sessionDemoE_0_process_latency_us": int64(0),
-				"op_1_preprocessor_sessionDemoE_0_records_in_total":   int64(12),
-				"op_1_preprocessor_sessionDemoE_0_records_out_total":  int64(12),
-
 				"op_3_project_0_exceptions_total":   int64(0),
 				"op_3_project_0_process_latency_us": int64(0),
 				"op_3_project_0_records_in_total":   int64(4),
@@ -1306,11 +1181,6 @@ func TestEventWindow(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_demoE_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_demoE_0_process_latency_us": int64(0),
-				"op_1_preprocessor_demoE_0_records_in_total":   int64(6),
-				"op_1_preprocessor_demoE_0_records_out_total":  int64(6),
-
 				"op_3_project_0_exceptions_total":   int64(0),
 				"op_3_project_0_process_latency_us": int64(0),
 				"op_3_project_0_records_in_total":   int64(5),
@@ -1375,11 +1245,6 @@ func TestWindowError(t *testing.T) {
 				}, {}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_ldemo_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_ldemo_0_process_latency_us": int64(0),
-				"op_1_preprocessor_ldemo_0_records_in_total":   int64(5),
-				"op_1_preprocessor_ldemo_0_records_out_total":  int64(5),
-
 				"op_3_project_0_exceptions_total":   int64(1),
 				"op_3_project_0_process_latency_us": int64(0),
 				"op_3_project_0_records_in_total":   int64(2),
@@ -1412,11 +1277,6 @@ func TestWindowError(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_ldemo_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_ldemo_0_process_latency_us": int64(0),
-				"op_1_preprocessor_ldemo_0_records_in_total":   int64(5),
-				"op_1_preprocessor_ldemo_0_records_out_total":  int64(5),
-
 				"op_4_project_0_exceptions_total":   int64(1),
 				"op_4_project_0_process_latency_us": int64(0),
 				"op_4_project_0_records_in_total":   int64(3),
@@ -1471,16 +1331,6 @@ func TestWindowError(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_ldemo_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_ldemo_0_process_latency_us": int64(0),
-				"op_1_preprocessor_ldemo_0_records_in_total":   int64(5),
-				"op_1_preprocessor_ldemo_0_records_out_total":  int64(5),
-
-				"op_2_preprocessor_ldemo1_0_exceptions_total":   int64(0),
-				"op_2_preprocessor_ldemo1_0_process_latency_us": int64(0),
-				"op_2_preprocessor_ldemo1_0_records_in_total":   int64(5),
-				"op_2_preprocessor_ldemo1_0_records_out_total":  int64(5),
-
 				"op_5_project_0_exceptions_total":   int64(3),
 				"op_5_project_0_process_latency_us": int64(0),
 				"op_5_project_0_records_in_total":   int64(8),
@@ -1525,11 +1375,6 @@ func TestWindowError(t *testing.T) {
 				}, {}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_ldemo_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_ldemo_0_process_latency_us": int64(0),
-				"op_1_preprocessor_ldemo_0_records_in_total":   int64(5),
-				"op_1_preprocessor_ldemo_0_records_out_total":  int64(5),
-
 				"op_6_project_0_exceptions_total":   int64(3),
 				"op_6_project_0_process_latency_us": int64(0),
 				"op_6_project_0_records_in_total":   int64(5),
@@ -1574,11 +1419,6 @@ func TestWindowError(t *testing.T) {
 				}},
 			},
 			M: map[string]interface{}{
-				"op_1_preprocessor_ldemo_0_exceptions_total":   int64(0),
-				"op_1_preprocessor_ldemo_0_process_latency_us": int64(0),
-				"op_1_preprocessor_ldemo_0_records_in_total":   int64(5),
-				"op_1_preprocessor_ldemo_0_records_out_total":  int64(5),
-
 				"op_4_project_0_exceptions_total":   int64(1),
 				"op_4_project_0_process_latency_us": int64(0),
 				"op_4_project_0_records_in_total":   int64(4),