Prechádzať zdrojové kódy

feat(metrics): print merics in order

ngjaying 5 rokov pred
rodič
commit
7deef9554a

+ 1 - 1
xstream/api/stream.go

@@ -80,7 +80,7 @@ type Operator interface {
 	Collector
 	Exec(StreamContext, chan<- error)
 	GetName() string
-	GetMetrics() map[string]interface{}
+	GetMetrics() [][]interface{}
 }
 
 type Function interface {

+ 2 - 5
xstream/nodes/sink_node.go

@@ -179,12 +179,9 @@ func (m *SinkNode) GetInput() (chan<- interface{}, string) {
 	return m.input, m.name
 }
 
-func (m *SinkNode) GetMetrics() map[string]interface{} {
-	result := make(map[string]interface{})
+func (m *SinkNode) GetMetrics() (result [][]interface{}) {
 	for _, stats := range m.statManagers{
-		for k, v := range stats.GetMetrics(){
-			result[k] = v
-		}
+		result = append(result, stats.GetMetrics())
 	}
 	return result
 }

+ 2 - 5
xstream/nodes/source_node.go

@@ -228,12 +228,9 @@ func (m *SourceNode) AddOutput(output chan<- interface{}, name string) (err erro
 	return nil
 }
 
-func (m *SourceNode) GetMetrics() map[string]interface{} {
-	result := make(map[string]interface{})
+func (m *SourceNode) GetMetrics() (result [][]interface{}) {
 	for _, stats := range m.statManagers{
-		for k, v := range stats.GetMetrics(){
-			result[k] = v
-		}
+		result = append(result, stats.GetMetrics())
 	}
 	return result
 }

+ 9 - 9
xstream/nodes/stats_manager.go

@@ -3,7 +3,6 @@ package nodes
 import (
 	"fmt"
 	"github.com/emqx/kuiper/xstream/api"
-	"strconv"
 	"time"
 )
 
@@ -31,6 +30,8 @@ const ProcessLatencyMs = "process_latency_ms"
 const LastInvocation = "last_invocation"
 const BufferLength   = "buffer_length"
 
+var MetricNames = []string{RecordsInTotal, RecordsOutTotal, ExceptionsTotal, ProcessLatencyMs, BufferLength, LastInvocation}
+
 func NewStatManager(opType string, ctx api.StreamContext) (*StatManager, error) {
 	var prefix string
 	switch opType {
@@ -81,15 +82,14 @@ func (sm *StatManager) SetBufferLength(l int64) {
 	sm.bufferLength = l
 }
 
-func (sm *StatManager) GetMetrics() map[string]interface{} {
-	result := make(map[string]interface{})
-	result[sm.prefix+sm.opId+"_"+strconv.Itoa(sm.instanceId)+"_"+RecordsInTotal] = sm.totalRecordsIn
-	result[sm.prefix+sm.opId+"_"+strconv.Itoa(sm.instanceId)+"_"+RecordsOutTotal] = sm.totalRecordsOut
-	result[sm.prefix+sm.opId+"_"+strconv.Itoa(sm.instanceId)+"_"+ExceptionsTotal] = sm.totalExceptions
+func (sm *StatManager) GetMetrics() []interface{} {
+	result := []interface{}{
+		sm.totalRecordsIn, sm.totalRecordsOut, sm.totalExceptions, sm.processLatency, sm.bufferLength,
+	}
+
 	if !sm.lastInvocation.IsZero(){
-		result[sm.prefix+sm.opId+"_"+strconv.Itoa(sm.instanceId)+"_"+LastInvocation] = sm.lastInvocation.Format("2006-01-02T15:04:05.999999")
+		result = append(result, sm.lastInvocation.Format("2006-01-02T15:04:05.999999"))
 	}
-	result[sm.prefix+sm.opId+"_"+strconv.Itoa(sm.instanceId)+"_"+ProcessLatencyMs] = sm.processLatency
-	result[sm.prefix+sm.opId+"_"+strconv.Itoa(sm.instanceId)+"_"+BufferLength] = sm.bufferLength
+
 	return result
 }

+ 3 - 6
xstream/operators/operations.go

@@ -158,12 +158,9 @@ func (o *UnaryOperator) doOp(ctx api.StreamContext, errCh chan<- error) {
 	}
 }
 
-func (o *UnaryOperator) GetMetrics() map[string]interface{} {
-	result := make(map[string]interface{})
-	for _, stats := range o.statManagers{
-		for k, v := range stats.GetMetrics(){
-			result[k] = v
-		}
+func (m *UnaryOperator) GetMetrics() (result [][]interface{}) {
+	for _, stats := range m.statManagers{
+		result = append(result, stats.GetMetrics())
 	}
 	return result
 }

+ 4 - 3
xstream/operators/window_op.go

@@ -268,11 +268,12 @@ func (o *WindowOperator) calDelta(triggerTime int64, delta int64, log api.Logger
 	return delta
 }
 
-func (o *WindowOperator) GetMetrics() map[string]interface{} {
+func (o *WindowOperator) GetMetrics() [][]interface{} {
 	if o.statManager != nil {
-		return o.statManager.GetMetrics()
+		return [][]interface{}{
+			o.statManager.GetMetrics(),
+		}
 	} else {
 		return nil
 	}
-
 }

+ 16 - 9
xstream/streams.go

@@ -7,6 +7,7 @@ import (
 	"github.com/emqx/kuiper/xstream/contexts"
 	"github.com/emqx/kuiper/xstream/nodes"
 	"github.com/emqx/kuiper/xstream/operators"
+	"strconv"
 )
 
 type TopologyNew struct {
@@ -104,21 +105,27 @@ func (s *TopologyNew) Open() <-chan error {
 
 func (s *TopologyNew) GetMetrics() (keys []string, values []interface{}) {
 	for _, node := range s.sources {
-		for k, v := range node.GetMetrics() {
-			keys = append(keys, k)
-			values = append(values, v)
+		for ins, metrics := range node.GetMetrics() {
+			for i, v := range metrics{
+				keys = append(keys, "source_" + node.GetName() + "_" + strconv.Itoa(ins) + "_" + nodes.MetricNames[i])
+				values = append(values, v)
+			}
 		}
 	}
 	for _, node := range s.ops {
-		for k, v := range node.GetMetrics() {
-			keys = append(keys, k)
-			values = append(values, v)
+		for ins, metrics := range node.GetMetrics() {
+			for i, v := range metrics{
+				keys = append(keys, "op_" + node.GetName() + "_" + strconv.Itoa(ins) + "_" + nodes.MetricNames[i])
+				values = append(values, v)
+			}
 		}
 	}
 	for _, node := range s.sinks {
-		for k, v := range node.GetMetrics() {
-			keys = append(keys, k)
-			values = append(values, v)
+		for ins, metrics := range node.GetMetrics() {
+			for i, v := range metrics{
+				keys = append(keys, "sink_" + node.GetName() + "_" + strconv.Itoa(ins) + "_" + nodes.MetricNames[i])
+				values = append(values, v)
+			}
 		}
 	}
 	return