ソースを参照

add metrics

Signed-off-by: yisaer <disxiaofei@163.com>
yisaer 1 年間 前
コミット
46e2d2b883

+ 18 - 11
internal/topo/node/metric/prometheus.go

@@ -37,11 +37,12 @@ func GetPrometheusMetrics() *PrometheusMetrics {
 }
 }
 
 
 type MetricGroup struct {
 type MetricGroup struct {
-	TotalRecordsIn  *prometheus.CounterVec
-	TotalRecordsOut *prometheus.CounterVec
-	TotalExceptions *prometheus.CounterVec
-	ProcessLatency  *prometheus.GaugeVec
-	BufferLength    *prometheus.GaugeVec
+	TotalRecordsIn     *prometheus.CounterVec
+	TotalRecordsOut    *prometheus.CounterVec
+	TotalExceptions    *prometheus.CounterVec
+	ProcessLatencyHist *prometheus.HistogramVec
+	ProcessLatency     *prometheus.GaugeVec
+	BufferLength       *prometheus.GaugeVec
 }
 }
 
 
 type PrometheusMetrics struct {
 type PrometheusMetrics struct {
@@ -72,17 +73,23 @@ func newPrometheusMetrics() *PrometheusMetrics {
 			Name: prefix + "_" + ProcessLatencyUs,
 			Name: prefix + "_" + ProcessLatencyUs,
 			Help: "Process latency in millisecond of " + prefix,
 			Help: "Process latency in millisecond of " + prefix,
 		}, labelNames)
 		}, labelNames)
+		processLatencyHist := prometheus.NewHistogramVec(prometheus.HistogramOpts{
+			Name:    prefix + "_" + ProcessLatencyUsHist,
+			Help:    "Histograms of process latency in millisecond of " + prefix,
+			Buckets: prometheus.ExponentialBuckets(10, 2, 20), // 10us ~ 5s
+		}, labelNames)
 		bufferLength := prometheus.NewGaugeVec(prometheus.GaugeOpts{
 		bufferLength := prometheus.NewGaugeVec(prometheus.GaugeOpts{
 			Name: prefix + "_" + BufferLength,
 			Name: prefix + "_" + BufferLength,
 			Help: "The length of the plan buffer which is shared by all instances of " + prefix,
 			Help: "The length of the plan buffer which is shared by all instances of " + prefix,
 		}, labelNames)
 		}, labelNames)
-		prometheus.MustRegister(totalRecordsIn, totalRecordsOut, totalExceptions, processLatency, bufferLength)
+		prometheus.MustRegister(totalRecordsIn, totalRecordsOut, totalExceptions, processLatency, processLatencyHist, bufferLength)
 		vecs = append(vecs, &MetricGroup{
 		vecs = append(vecs, &MetricGroup{
-			TotalRecordsIn:  totalRecordsIn,
-			TotalRecordsOut: totalRecordsOut,
-			TotalExceptions: totalExceptions,
-			ProcessLatency:  processLatency,
-			BufferLength:    bufferLength,
+			TotalRecordsIn:     totalRecordsIn,
+			TotalRecordsOut:    totalRecordsOut,
+			TotalExceptions:    totalExceptions,
+			ProcessLatency:     processLatency,
+			ProcessLatencyHist: processLatencyHist,
+			BufferLength:       bufferLength,
 		})
 		})
 	}
 	}
 	return &PrometheusMetrics{vecs: vecs}
 	return &PrometheusMetrics{vecs: vecs}

+ 9 - 8
internal/topo/node/metric/stats_manager.go

@@ -22,14 +22,15 @@ import (
 )
 )
 
 
 const (
 const (
-	RecordsInTotal    = "records_in_total"
-	RecordsOutTotal   = "records_out_total"
-	ProcessLatencyUs  = "process_latency_us"
-	LastInvocation    = "last_invocation"
-	BufferLength      = "buffer_length"
-	ExceptionsTotal   = "exceptions_total"
-	LastException     = "last_exception"
-	LastExceptionTime = "last_exception_time"
+	RecordsInTotal       = "records_in_total"
+	RecordsOutTotal      = "records_out_total"
+	ProcessLatencyUs     = "process_latency_us"
+	ProcessLatencyUsHist = "process_latency_us_hist"
+	LastInvocation       = "last_invocation"
+	BufferLength         = "buffer_length"
+	ExceptionsTotal      = "exceptions_total"
+	LastException        = "last_exception"
+	LastExceptionTime    = "last_exception_time"
 )
 )
 
 
 var MetricNames = []string{RecordsInTotal, RecordsOutTotal, ProcessLatencyUs, BufferLength, LastInvocation, ExceptionsTotal, LastException, LastExceptionTime}
 var MetricNames = []string{RecordsInTotal, RecordsOutTotal, ProcessLatencyUs, BufferLength, LastInvocation, ExceptionsTotal, LastException, LastExceptionTime}

+ 9 - 5
internal/topo/node/metric/stats_prom.go

@@ -40,12 +40,14 @@ func getStatManager(ctx api.StreamContext, dsm DefaultStatManager) (StatManager,
 		mg.TotalRecordsOut.DeleteLabelValues(ctx.GetRuleId(), dsm.opType, dsm.opId, strInId)
 		mg.TotalRecordsOut.DeleteLabelValues(ctx.GetRuleId(), dsm.opType, dsm.opId, strInId)
 		mg.TotalExceptions.DeleteLabelValues(ctx.GetRuleId(), dsm.opType, dsm.opId, strInId)
 		mg.TotalExceptions.DeleteLabelValues(ctx.GetRuleId(), dsm.opType, dsm.opId, strInId)
 		mg.ProcessLatency.DeleteLabelValues(ctx.GetRuleId(), dsm.opType, dsm.opId, strInId)
 		mg.ProcessLatency.DeleteLabelValues(ctx.GetRuleId(), dsm.opType, dsm.opId, strInId)
+		mg.ProcessLatencyHist.DeleteLabelValues(ctx.GetRuleId(), dsm.opType, dsm.opId, strInId)
 		mg.BufferLength.DeleteLabelValues(ctx.GetRuleId(), dsm.opType, dsm.opId, strInId)
 		mg.BufferLength.DeleteLabelValues(ctx.GetRuleId(), dsm.opType, dsm.opId, strInId)
 
 
 		psm.pTotalRecordsIn = mg.TotalRecordsIn.WithLabelValues(ctx.GetRuleId(), dsm.opType, dsm.opId, strInId)
 		psm.pTotalRecordsIn = mg.TotalRecordsIn.WithLabelValues(ctx.GetRuleId(), dsm.opType, dsm.opId, strInId)
 		psm.pTotalRecordsOut = mg.TotalRecordsOut.WithLabelValues(ctx.GetRuleId(), dsm.opType, dsm.opId, strInId)
 		psm.pTotalRecordsOut = mg.TotalRecordsOut.WithLabelValues(ctx.GetRuleId(), dsm.opType, dsm.opId, strInId)
 		psm.pTotalExceptions = mg.TotalExceptions.WithLabelValues(ctx.GetRuleId(), dsm.opType, dsm.opId, strInId)
 		psm.pTotalExceptions = mg.TotalExceptions.WithLabelValues(ctx.GetRuleId(), dsm.opType, dsm.opId, strInId)
 		psm.pProcessLatency = mg.ProcessLatency.WithLabelValues(ctx.GetRuleId(), dsm.opType, dsm.opId, strInId)
 		psm.pProcessLatency = mg.ProcessLatency.WithLabelValues(ctx.GetRuleId(), dsm.opType, dsm.opId, strInId)
+		psm.pProcessLatencyHist = mg.ProcessLatencyHist.WithLabelValues(ctx.GetRuleId(), dsm.opType, dsm.opId, strInId)
 		psm.pBufferLength = mg.BufferLength.WithLabelValues(ctx.GetRuleId(), dsm.opType, dsm.opId, strInId)
 		psm.pBufferLength = mg.BufferLength.WithLabelValues(ctx.GetRuleId(), dsm.opType, dsm.opId, strInId)
 		sm = psm
 		sm = psm
 	} else {
 	} else {
@@ -57,11 +59,12 @@ func getStatManager(ctx api.StreamContext, dsm DefaultStatManager) (StatManager,
 type PrometheusStatManager struct {
 type PrometheusStatManager struct {
 	DefaultStatManager
 	DefaultStatManager
 	// prometheus metrics
 	// prometheus metrics
-	pTotalRecordsIn  prometheus.Counter
-	pTotalRecordsOut prometheus.Counter
-	pTotalExceptions prometheus.Counter
-	pProcessLatency  prometheus.Gauge
-	pBufferLength    prometheus.Gauge
+	pTotalRecordsIn     prometheus.Counter
+	pTotalRecordsOut    prometheus.Counter
+	pTotalExceptions    prometheus.Counter
+	pProcessLatency     prometheus.Gauge
+	pProcessLatencyHist prometheus.Observer
+	pBufferLength       prometheus.Gauge
 }
 }
 
 
 func (sm *PrometheusStatManager) IncTotalRecordsIn() {
 func (sm *PrometheusStatManager) IncTotalRecordsIn() {
@@ -83,6 +86,7 @@ func (sm *PrometheusStatManager) ProcessTimeEnd() {
 	if !sm.processTimeStart.IsZero() {
 	if !sm.processTimeStart.IsZero() {
 		sm.processLatency = int64(time.Since(sm.processTimeStart) / time.Microsecond)
 		sm.processLatency = int64(time.Since(sm.processTimeStart) / time.Microsecond)
 		sm.pProcessLatency.Set(float64(sm.processLatency))
 		sm.pProcessLatency.Set(float64(sm.processLatency))
+		sm.pProcessLatencyHist.Observe(float64(sm.processLatency))
 	}
 	}
 }
 }