123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- package nodes
- import (
- "fmt"
- "github.com/emqx/kuiper/common"
- "github.com/emqx/kuiper/xstream/api"
- "github.com/prometheus/client_golang/prometheus"
- "strconv"
- "time"
- )
- type StatManager interface {
- IncTotalRecordsIn()
- IncTotalRecordsOut()
- IncTotalExceptions()
- ProcessTimeStart()
- ProcessTimeEnd()
- SetBufferLength(l int64)
- GetMetrics() []interface{}
- }
- //The statManager is not thread safe. Make sure it is used in only one instance
- type DefaultStatManager struct {
- //metrics
- totalRecordsIn int64
- totalRecordsOut int64
- totalExceptions int64
- processLatency int64
- lastInvocation time.Time
- bufferLength int64
- //configs
- opType string //"source", "op", "sink"
- prefix string
- processTimeStart time.Time
- opId string
- instanceId int
- }
- type PrometheusStatManager struct {
- DefaultStatManager
- //prometheus metrics
- pTotalRecordsIn prometheus.Counter
- pTotalRecordsOut prometheus.Counter
- pTotalExceptions prometheus.Counter
- pProcessLatency prometheus.Gauge
- pBufferLength prometheus.Gauge
- }
- func NewStatManager(opType string, ctx api.StreamContext) (StatManager, error) {
- var prefix string
- switch opType {
- case "source":
- prefix = "source_"
- case "op":
- prefix = "op_"
- case "sink":
- prefix = "sink_"
- default:
- return nil, fmt.Errorf("invalid opType %s, must be \"source\", \"sink\" or \"op\"", opType)
- }
- var sm StatManager
- if common.Config != nil && common.Config.Prometheus {
- ctx.GetLogger().Debugf("Create prometheus stat manager")
- psm := &PrometheusStatManager{
- DefaultStatManager: DefaultStatManager{
- opType: opType,
- prefix: prefix,
- opId: ctx.GetOpId(),
- instanceId: ctx.GetInstanceId(),
- },
- }
- //assign prometheus
- mg := GetPrometheusMetrics().GetMetricsGroup(opType)
- strInId := strconv.Itoa(ctx.GetInstanceId())
- psm.pTotalRecordsIn = mg.TotalRecordsIn.WithLabelValues(ctx.GetRuleId(), opType, ctx.GetOpId(), strInId)
- psm.pTotalRecordsOut = mg.TotalRecordsOut.WithLabelValues(ctx.GetRuleId(), opType, ctx.GetOpId(), strInId)
- psm.pTotalExceptions = mg.TotalExceptions.WithLabelValues(ctx.GetRuleId(), opType, ctx.GetOpId(), strInId)
- psm.pProcessLatency = mg.ProcessLatency.WithLabelValues(ctx.GetRuleId(), opType, ctx.GetOpId(), strInId)
- psm.pBufferLength = mg.BufferLength.WithLabelValues(ctx.GetRuleId(), opType, ctx.GetOpId(), strInId)
- sm = psm
- } else {
- sm = &DefaultStatManager{
- opType: opType,
- prefix: prefix,
- opId: ctx.GetOpId(),
- instanceId: ctx.GetInstanceId(),
- }
- }
- return sm, nil
- }
- func (sm *DefaultStatManager) IncTotalRecordsIn() {
- sm.totalRecordsIn++
- }
- func (sm *DefaultStatManager) IncTotalRecordsOut() {
- sm.totalRecordsOut++
- }
- func (sm *DefaultStatManager) IncTotalExceptions() {
- sm.totalExceptions++
- var t time.Time
- sm.processTimeStart = t
- }
- func (sm *DefaultStatManager) ProcessTimeStart() {
- sm.lastInvocation = time.Now()
- sm.processTimeStart = sm.lastInvocation
- }
- func (sm *DefaultStatManager) ProcessTimeEnd() {
- if !sm.processTimeStart.IsZero() {
- sm.processLatency = int64(time.Since(sm.processTimeStart) / time.Millisecond)
- }
- }
- func (sm *DefaultStatManager) SetBufferLength(l int64) {
- sm.bufferLength = l
- }
- func (sm *PrometheusStatManager) IncTotalRecordsIn() {
- sm.totalRecordsIn++
- sm.pTotalRecordsIn.Inc()
- }
- func (sm *PrometheusStatManager) IncTotalRecordsOut() {
- sm.totalRecordsOut++
- sm.pTotalRecordsOut.Inc()
- }
- func (sm *PrometheusStatManager) IncTotalExceptions() {
- sm.totalExceptions++
- sm.pTotalExceptions.Inc()
- var t time.Time
- sm.processTimeStart = t
- }
- func (sm *PrometheusStatManager) ProcessTimeEnd() {
- if !sm.processTimeStart.IsZero() {
- sm.processLatency = int64(time.Since(sm.processTimeStart) / time.Millisecond)
- sm.pProcessLatency.Set(float64(sm.processLatency))
- }
- }
- func (sm *PrometheusStatManager) SetBufferLength(l int64) {
- sm.bufferLength = l
- sm.pBufferLength.Set(float64(l))
- }
- func (sm *DefaultStatManager) GetMetrics() []interface{} {
- result := []interface{}{
- sm.totalRecordsIn, sm.totalRecordsOut, sm.totalExceptions, sm.processLatency, sm.bufferLength,
- }
- if !sm.lastInvocation.IsZero() {
- result = append(result, sm.lastInvocation.Format("2006-01-02T15:04:05.999999"))
- } else {
- result = append(result, 0)
- }
- return result
- }
|