Quellcode durchsuchen

fix(node): typo and metric missing

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang vor 2 Jahren
Ursprung
Commit
18d73ac53c
2 geänderte Dateien mit 16 neuen und 4 gelöschten Zeilen
  1. 2 2
      internal/topo/node/join_align_node.go
  2. 14 2
      internal/topo/node/switch_node.go

+ 2 - 2
internal/topo/node/join_align_node.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -63,7 +63,7 @@ func (n *JoinAlignNode) Exec(ctx api.StreamContext, errCh chan<- error) {
 	}
 	stats, err := metric.NewStatManager(ctx, "op")
 	if err != nil {
-		infra.DrainError(ctx, fmt.Errorf("no output channel found"), errCh)
+		infra.DrainError(ctx, fmt.Errorf("fail to create stat manager"), errCh)
 		return
 	}
 	n.statManager = stats

+ 14 - 2
internal/topo/node/switch_node.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -114,7 +114,6 @@ func (n *SwitchNode) Exec(ctx api.StreamContext, errCh chan<- error) {
 					case xsql.TupleRow:
 						ctx.GetLogger().Debugf("SwitchNode receive tuple input %s", d)
 						ve = &xsql.ValuerEval{Valuer: xsql.MultiValuer(d, fv)}
-
 					case xsql.SingleCollection:
 						ctx.GetLogger().Debugf("SwitchNode receive window input %s", d)
 						afv.SetData(d)
@@ -147,6 +146,9 @@ func (n *SwitchNode) Exec(ctx api.StreamContext, errCh chan<- error) {
 							n.statManager.IncTotalExceptions(m)
 						}
 					}
+					n.statManager.ProcessTimeEnd()
+					n.statManager.IncTotalRecordsOut()
+					n.statManager.SetBufferLength(int64(len(n.input)))
 				case <-ctx.Done():
 					ctx.GetLogger().Infoln("Cancelling switch node....")
 					return nil
@@ -158,3 +160,13 @@ func (n *SwitchNode) Exec(ctx api.StreamContext, errCh chan<- error) {
 		}
 	}()
 }
+
+func (n *SwitchNode) GetMetrics() [][]interface{} {
+	if n.statManager != nil {
+		return [][]interface{}{
+			n.statManager.GetMetrics(),
+		}
+	} else {
+		return nil
+	}
+}