浏览代码

feat(metrics): add metric for channel length

ngjaying 5 年之前
父节点
当前提交
cec5e64095

+ 4 - 0
common/utils/dynamic_channel_buffer.go

@@ -43,3 +43,7 @@ func (b *DynamicChannelBuffer) run() {
 		}
 	}
 }
+
+func (b *DynamicChannelBuffer) GetLength() int {
+	return len(b.buffer)
+}

+ 1 - 1
xstream/nodes/sink_node.go

@@ -113,7 +113,7 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 						} else {
 							doCollect(sink, item, stats, ctx)
 						}
-
+						stats.SetBufferLength(int64(len(m.input)))
 					case <-ctx.Done():
 						logger.Infof("sink node %s instance %d done", m.name, instance)
 						if err := sink.Close(ctx); err != nil {

+ 5 - 1
xstream/nodes/source_node.go

@@ -113,9 +113,9 @@ func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
 					stats.ProcessTimeStart()
 					tuple := &xsql.Tuple{Emitter: m.name, Message: message, Timestamp: common.GetNowInMilli(), Metadata: meta}
 					stats.ProcessTimeEnd()
-					//blocking
 					m.Broadcast(tuple)
 					stats.IncTotalRecordsOut()
+					stats.SetBufferLength(int64(m.getBufferLength()))
 					logger.Debugf("%s consume data %v complete", m.name, tuple)
 				}); err != nil {
 					m.drainError(errCh, err, ctx, logger)
@@ -211,6 +211,10 @@ func (m *SourceNode) Broadcast(data interface{}) {
 	m.buffer.In <- data
 }
 
+func (m *SourceNode) getBufferLength() int {
+	return m.buffer.GetLength()
+}
+
 func (m *SourceNode) GetName() string {
 	return m.name
 }

+ 7 - 1
xstream/nodes/stats_manager.go

@@ -15,6 +15,7 @@ type StatManager struct {
 	totalExceptions int64
 	processLatency  int64
 	lastInvocation  time.Time
+	bufferLength    int64
 	//configs
 	opType           string //"source", "op", "sink"
 	prefix           string
@@ -28,6 +29,7 @@ const RecordsOutTotal = "records_out_total"
 const ExceptionsTotal = "exceptions_total"
 const ProcessLatencyMs = "process_latency_ms"
 const LastInvocation = "last_invocation"
+const BufferLength   = "buffer_length"
 
 func NewStatManager(opType string, ctx api.StreamContext) (*StatManager, error) {
 	var prefix string
@@ -75,6 +77,10 @@ func (sm *StatManager) ProcessTimeEnd() {
 	}
 }
 
+func (sm *StatManager) SetBufferLength(l int64) {
+	sm.bufferLength = l
+}
+
 func (sm *StatManager) GetMetrics() map[string]interface{} {
 	result := make(map[string]interface{})
 	result[sm.prefix+sm.opId+"_"+strconv.Itoa(sm.instanceId)+"_"+RecordsInTotal] = sm.totalRecordsIn
@@ -84,6 +90,6 @@ func (sm *StatManager) GetMetrics() map[string]interface{} {
 		result[sm.prefix+sm.opId+"_"+strconv.Itoa(sm.instanceId)+"_"+LastInvocation] = sm.lastInvocation.Format("2006-01-02T15:04:05.999999")
 	}
 	result[sm.prefix+sm.opId+"_"+strconv.Itoa(sm.instanceId)+"_"+ProcessLatencyMs] = sm.processLatency
-
+	result[sm.prefix+sm.opId+"_"+strconv.Itoa(sm.instanceId)+"_"+BufferLength] = sm.bufferLength
 	return result
 }

+ 1 - 0
xstream/operators/operations.go

@@ -144,6 +144,7 @@ func (o *UnaryOperator) doOp(ctx api.StreamContext, errCh chan<- error) {
 				stats.ProcessTimeEnd()
 				nodes.Broadcast(o.outputs, val, ctx)
 				stats.IncTotalRecordsOut()
+				stats.SetBufferLength(int64(len(o.input)))
 			}
 		// is cancelling
 		case <-ctx.Done():

+ 1 - 0
xstream/operators/window_op.go

@@ -163,6 +163,7 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, errCh chan<
 				}
 			}
 			o.statManager.ProcessTimeEnd()
+			o.statManager.SetBufferLength(int64(len(o.input)))
 		case now := <-c:
 			if len(inputs) > 0 {
 				o.statManager.ProcessTimeStart()