ソースを参照

fix(node): should add statManagers (#2242)

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
ngjaying 1 年間 前
コミット
38b7872234

+ 1 - 10
internal/topo/node/join_align_node.go

@@ -68,6 +68,7 @@ func (n *JoinAlignNode) Exec(ctx api.StreamContext, errCh chan<- error) {
 		return
 	}
 	n.statManager = stats
+	n.statManagers = []metric.StatManager{stats}
 	go func() {
 		err := infra.SafeRun(func() error {
 			// restore batch state
@@ -179,13 +180,3 @@ func (n *JoinAlignNode) alignBatch(_ api.StreamContext, input any) {
 	n.statManager.IncTotalRecordsOut()
 	n.statManager.SetBufferLength(int64(len(n.input)))
 }
-
-func (n *JoinAlignNode) GetMetrics() [][]interface{} {
-	if n.statManager != nil {
-		return [][]interface{}{
-			n.statManager.GetMetrics(),
-		}
-	} else {
-		return nil
-	}
-}

+ 1 - 10
internal/topo/node/lookup_node.go

@@ -97,6 +97,7 @@ func (n *LookupNode) Exec(ctx api.StreamContext, errCh chan<- error) {
 		return
 	}
 	n.statManager = stats
+	n.statManagers = []metric.StatManager{stats}
 	go func() {
 		err := infra.SafeRun(func() error {
 			ns, err := lookup.Attach(n.name)
@@ -247,16 +248,6 @@ func (n *LookupNode) lookup(ctx api.StreamContext, d xsql.TupleRow, fv *xsql.Fun
 	}
 }
 
-func (n *LookupNode) GetMetrics() [][]interface{} {
-	if n.statManager != nil {
-		return [][]interface{}{
-			n.statManager.GetMetrics(),
-		}
-	} else {
-		return nil
-	}
-}
-
 func (n *LookupNode) merge(ctx api.StreamContext, d xsql.TupleRow, r []map[string]interface{}) {
 	n.statManager.ProcessTimeStart()
 	sets := &xsql.JoinTuples{Content: make([]*xsql.JoinTuple, 0)}

+ 1 - 10
internal/topo/node/switch_node.go

@@ -85,6 +85,7 @@ func (n *SwitchNode) Exec(ctx api.StreamContext, errCh chan<- error) {
 		return
 	}
 	n.statManager = stats
+	n.statManagers = []metric.StatManager{stats}
 	n.ctx = ctx
 	for i := range n.outputNodes {
 		n.outputNodes[i].ctx = ctx
@@ -163,13 +164,3 @@ func (n *SwitchNode) Exec(ctx api.StreamContext, errCh chan<- error) {
 		}
 	}()
 }
-
-func (n *SwitchNode) GetMetrics() [][]interface{} {
-	if n.statManager != nil {
-		return [][]interface{}{
-			n.statManager.GetMetrics(),
-		}
-	} else {
-		return nil
-	}
-}

+ 1 - 10
internal/topo/node/watermark_op.go

@@ -80,6 +80,7 @@ func (w *WatermarkOp) Exec(ctx api.StreamContext, errCh chan<- error) {
 		return
 	}
 	w.statManager = stats
+	w.statManagers = []metric.StatManager{stats}
 	w.ctx = ctx
 	// restore state
 	if s, err := ctx.GetState(WatermarkKey); err == nil && s != nil {
@@ -232,13 +233,3 @@ func (w *WatermarkOp) computeWatermarkTs() int64 {
 	}
 	return ts - w.lateTolerance
 }
-
-func (w *WatermarkOp) GetMetrics() [][]interface{} {
-	if w.statManager != nil {
-		return [][]interface{}{
-			w.statManager.GetMetrics(),
-		}
-	} else {
-		return nil
-	}
-}

+ 2 - 7
internal/topo/node/window_op.go

@@ -123,6 +123,7 @@ func (o *WindowOperator) Exec(ctx api.StreamContext, errCh chan<- error) {
 		return
 	}
 	o.statManager = stats
+	o.statManagers = []metric.StatManager{stats}
 	var inputs []*xsql.Tuple
 	if s, err := ctx.GetState(WindowInputsKey); err == nil {
 		switch st := s.(type) {
@@ -633,13 +634,7 @@ func (o *WindowOperator) calDelta(triggerTime int64, log api.Logger) int64 {
 }
 
 func (o *WindowOperator) GetMetrics() [][]interface{} {
-	if o.statManager != nil {
-		return [][]interface{}{
-			o.statManager.GetMetrics(),
-		}
-	} else {
-		return nil
-	}
+	return o.defaultNode.GetMetrics()
 }
 
 func (o *WindowOperator) isMatchCondition(ctx api.StreamContext, d *xsql.Tuple) bool {