Просмотр исходного кода

fix(metrics): remove prometheus metric when rule deleted

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 лет назад
Родитель
Сommit
42e5811e63

+ 1 - 2
internal/topo/node/metric/prometheus.go

@@ -1,4 +1,4 @@
-// Copyright 2022 EMQ Technologies Co., Ltd.
+// Copyright 2022-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -13,7 +13,6 @@
 // limitations under the License.
 
 //go:build prometheus || !core
-// +build prometheus !core
 
 package metric
 

+ 8 - 2
internal/topo/node/metric/stats_manager.go

@@ -1,4 +1,4 @@
-// Copyright 2022 EMQ Technologies Co., Ltd.
+// Copyright 2022-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -39,9 +39,11 @@ type StatManager interface {
 	ProcessTimeEnd()
 	SetBufferLength(l int64)
 	GetMetrics() []interface{}
+	// Clean remove all metrics history
+	Clean(ruleId string)
 }
 
-//The statManager is not thread safe. Make sure it is used in only one instance
+// DefaultStatManager The statManager is not thread safe. Make sure it is used in only one instance
 type DefaultStatManager struct {
 	//metrics
 	totalRecordsIn    int64
@@ -133,3 +135,7 @@ func (sm *DefaultStatManager) GetMetrics() []interface{} {
 	}
 	return result
 }
+
+func (sm *DefaultStatManager) Clean(_ string) {
+	// do nothing
+}

+ 12 - 1
internal/topo/node/metric/stats_prom.go

@@ -13,7 +13,6 @@
 // limitations under the License.
 
 //go:build prometheus || !core
-// +build prometheus !core
 
 package metric
 
@@ -89,3 +88,15 @@ func (sm *PrometheusStatManager) SetBufferLength(l int64) {
 	sm.bufferLength = l
 	sm.pBufferLength.Set(float64(l))
 }
+
+func (sm *PrometheusStatManager) Clean(ruleId string) {
+	if conf.Config != nil && conf.Config.Basic.Prometheus {
+		mg := GetPrometheusMetrics().GetMetricsGroup(sm.opType)
+		strInId := strconv.Itoa(sm.instanceId)
+		mg.TotalRecordsIn.DeleteLabelValues(ruleId, sm.opType, sm.opId, strInId)
+		mg.TotalRecordsOut.DeleteLabelValues(ruleId, sm.opType, sm.opId, strInId)
+		mg.TotalExceptions.DeleteLabelValues(ruleId, sm.opType, sm.opId, strInId)
+		mg.ProcessLatency.DeleteLabelValues(ruleId, sm.opType, sm.opId, strInId)
+		mg.BufferLength.DeleteLabelValues(ruleId, sm.opType, sm.opId, strInId)
+	}
+}

+ 9 - 1
internal/topo/node/node.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -33,6 +33,7 @@ type OperatorNode interface {
 	AddInputCount()
 	SetQos(api.Qos)
 	SetBarrierHandler(checkpoint.BarrierHandler)
+	RemoveMetrics(name string)
 }
 
 type DataSourceNode interface {
@@ -40,6 +41,7 @@ type DataSourceNode interface {
 	Open(ctx api.StreamContext, errCh chan<- error)
 	GetName() string
 	GetMetrics() [][]interface{}
+	RemoveMetrics(ruleId string)
 	Broadcast(val interface{}) error
 	GetStreamContext() api.StreamContext
 	SetQos(api.Qos)
@@ -87,6 +89,12 @@ func (o *defaultNode) GetMetrics() (result [][]interface{}) {
 	return result
 }
 
+func (o *defaultNode) RemoveMetrics(ruleId string) {
+	for _, stats := range o.statManagers {
+		stats.Clean(ruleId)
+	}
+}
+
 func (o *defaultNode) Broadcast(val interface{}) error {
 	if _, ok := val.(error); ok && !o.sendError {
 		return nil

+ 9 - 9
internal/topo/node/source_node.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -113,6 +113,14 @@ func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
 							err    error
 						)
 
+						stats, err := metric.NewStatManager(ctx, "source")
+						if err != nil {
+							return err
+						}
+						m.mutex.Lock()
+						m.statManagers = append(m.statManagers, stats)
+						m.mutex.Unlock()
+
 						si, err = getSourceInstance(m, instance)
 						if err != nil {
 							return err
@@ -127,14 +135,6 @@ func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
 							m.close()
 							buffer.Close()
 						}()
-
-						stats, err := metric.NewStatManager(ctx, "source")
-						if err != nil {
-							return err
-						}
-						m.mutex.Lock()
-						m.statManagers = append(m.statManagers, stats)
-						m.mutex.Unlock()
 						logger.Infof("Start source %s instance %d successfully", m.name, instance)
 						for {
 							select {

+ 4 - 2
internal/topo/rule/ruleState.go

@@ -245,7 +245,6 @@ func (rs *RuleState) Stop() error {
 	rs.triggered = 0
 	if rs.Topology != nil {
 		rs.Topology.Cancel()
-		rs.Topology = nil
 	}
 	rs.ActionCh <- ActionSignalStop
 	return nil
@@ -254,6 +253,9 @@ func (rs *RuleState) Stop() error {
 func (rs *RuleState) Close() error {
 	rs.Lock()
 	defer rs.Unlock()
+	if rs.Topology != nil {
+		rs.Topology.RemoveMetrics()
+	}
 	if rs.triggered == 1 && rs.Topology != nil {
 		rs.Topology.Cancel()
 	}
@@ -276,7 +278,7 @@ func (rs *RuleState) GetState() (string, error) {
 			case nil:
 				result = "Running"
 			case context.Canceled:
-				result = "Stopped: canceled by error."
+				result = "Stopped: canceled manually."
 			case context.DeadlineExceeded:
 				result = "Stopped: deadline exceed."
 			default:

+ 13 - 1
internal/topo/topo.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -226,6 +226,18 @@ func (s *Topo) GetMetrics() (keys []string, values []interface{}) {
 	return
 }
 
+func (s *Topo) RemoveMetrics() {
+	for _, sn := range s.sources {
+		sn.RemoveMetrics(s.name)
+	}
+	for _, so := range s.ops {
+		so.RemoveMetrics(s.name)
+	}
+	for _, sn := range s.sinks {
+		sn.RemoveMetrics(s.name)
+	}
+}
+
 func (s *Topo) GetTopo() *api.PrintableTopo {
 	return s.topo
 }

+ 3 - 3
test/change_rule_status.jmx

@@ -411,7 +411,7 @@
           <hashTree>
             <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
               <collectionProp name="Asserion.test_strings">
-                <stringProp name="1607612103">Stopped: canceled manually or by error.</stringProp>
+                <stringProp name="1607612103">Stopped: canceled manually.</stringProp>
               </collectionProp>
               <stringProp name="Assertion.custom_message"></stringProp>
               <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
@@ -591,7 +591,7 @@
           <hashTree>
             <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
               <collectionProp name="Asserion.test_strings">
-                <stringProp name="1607612103">Stopped: canceled manually or by error.</stringProp>
+                <stringProp name="1607612103">Stopped: canceled manually.</stringProp>
               </collectionProp>
               <stringProp name="Assertion.custom_message"></stringProp>
               <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
@@ -844,7 +844,7 @@
           <hashTree>
             <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
               <collectionProp name="Asserion.test_strings">
-                <stringProp name="1607612103">Stopped: canceled manually or by error.</stringProp>
+                <stringProp name="1607612103">Stopped: canceled manually.</stringProp>
               </collectionProp>
               <stringProp name="Assertion.custom_message"></stringProp>
               <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>