stats_manager.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. package nodes
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/xstream/api"
  6. "github.com/prometheus/client_golang/prometheus"
  7. "strconv"
  8. "time"
  9. )
  10. type StatManager interface {
  11. IncTotalRecordsIn()
  12. IncTotalRecordsOut()
  13. IncTotalExceptions()
  14. ProcessTimeStart()
  15. ProcessTimeEnd()
  16. SetBufferLength(l int64)
  17. GetMetrics() []interface{}
  18. }
  19. //The statManager is not thread safe. Make sure it is used in only one instance
  20. type DefaultStatManager struct {
  21. //metrics
  22. totalRecordsIn int64
  23. totalRecordsOut int64
  24. totalExceptions int64
  25. processLatency int64
  26. lastInvocation time.Time
  27. bufferLength int64
  28. //configs
  29. opType string //"source", "op", "sink"
  30. prefix string
  31. processTimeStart time.Time
  32. opId string
  33. instanceId int
  34. }
  35. type PrometheusStatManager struct {
  36. DefaultStatManager
  37. //prometheus metrics
  38. pTotalRecordsIn prometheus.Counter
  39. pTotalRecordsOut prometheus.Counter
  40. pTotalExceptions prometheus.Counter
  41. pProcessLatency prometheus.Gauge
  42. pBufferLength prometheus.Gauge
  43. }
  44. func NewStatManager(opType string, ctx api.StreamContext) (StatManager, error) {
  45. var prefix string
  46. switch opType {
  47. case "source":
  48. prefix = "source_"
  49. case "op":
  50. prefix = "op_"
  51. case "sink":
  52. prefix = "sink_"
  53. default:
  54. return nil, fmt.Errorf("invalid opType %s, must be \"source\", \"sink\" or \"op\"", opType)
  55. }
  56. var sm StatManager
  57. if common.Config != nil && common.Config.Prometheus {
  58. ctx.GetLogger().Debugf("Create prometheus stat manager")
  59. psm := &PrometheusStatManager{
  60. DefaultStatManager: DefaultStatManager{
  61. opType: opType,
  62. prefix: prefix,
  63. opId: ctx.GetOpId(),
  64. instanceId: ctx.GetInstanceId(),
  65. },
  66. }
  67. //assign prometheus
  68. mg := GetPrometheusMetrics().GetMetricsGroup(opType)
  69. strInId := strconv.Itoa(ctx.GetInstanceId())
  70. psm.pTotalRecordsIn = mg.TotalRecordsIn.WithLabelValues(ctx.GetRuleId(), opType, ctx.GetOpId(), strInId)
  71. psm.pTotalRecordsOut = mg.TotalRecordsOut.WithLabelValues(ctx.GetRuleId(), opType, ctx.GetOpId(), strInId)
  72. psm.pTotalExceptions = mg.TotalExceptions.WithLabelValues(ctx.GetRuleId(), opType, ctx.GetOpId(), strInId)
  73. psm.pProcessLatency = mg.ProcessLatency.WithLabelValues(ctx.GetRuleId(), opType, ctx.GetOpId(), strInId)
  74. psm.pBufferLength = mg.BufferLength.WithLabelValues(ctx.GetRuleId(), opType, ctx.GetOpId(), strInId)
  75. sm = psm
  76. } else {
  77. sm = &DefaultStatManager{
  78. opType: opType,
  79. prefix: prefix,
  80. opId: ctx.GetOpId(),
  81. instanceId: ctx.GetInstanceId(),
  82. }
  83. }
  84. return sm, nil
  85. }
  86. func (sm *DefaultStatManager) IncTotalRecordsIn() {
  87. sm.totalRecordsIn++
  88. }
  89. func (sm *DefaultStatManager) IncTotalRecordsOut() {
  90. sm.totalRecordsOut++
  91. }
  92. func (sm *DefaultStatManager) IncTotalExceptions() {
  93. sm.totalExceptions++
  94. var t time.Time
  95. sm.processTimeStart = t
  96. }
  97. func (sm *DefaultStatManager) ProcessTimeStart() {
  98. sm.lastInvocation = time.Now()
  99. sm.processTimeStart = sm.lastInvocation
  100. }
  101. func (sm *DefaultStatManager) ProcessTimeEnd() {
  102. if !sm.processTimeStart.IsZero() {
  103. sm.processLatency = int64(time.Since(sm.processTimeStart) / time.Millisecond)
  104. }
  105. }
  106. func (sm *DefaultStatManager) SetBufferLength(l int64) {
  107. sm.bufferLength = l
  108. }
  109. func (sm *PrometheusStatManager) IncTotalRecordsIn() {
  110. sm.totalRecordsIn++
  111. sm.pTotalRecordsIn.Inc()
  112. }
  113. func (sm *PrometheusStatManager) IncTotalRecordsOut() {
  114. sm.totalRecordsOut++
  115. sm.pTotalRecordsOut.Inc()
  116. }
  117. func (sm *PrometheusStatManager) IncTotalExceptions() {
  118. sm.totalExceptions++
  119. sm.pTotalExceptions.Inc()
  120. var t time.Time
  121. sm.processTimeStart = t
  122. }
  123. func (sm *PrometheusStatManager) ProcessTimeEnd() {
  124. if !sm.processTimeStart.IsZero() {
  125. sm.processLatency = int64(time.Since(sm.processTimeStart) / time.Millisecond)
  126. sm.pProcessLatency.Set(float64(sm.processLatency))
  127. }
  128. }
  129. func (sm *PrometheusStatManager) SetBufferLength(l int64) {
  130. sm.bufferLength = l
  131. sm.pBufferLength.Set(float64(l))
  132. }
  133. func (sm *DefaultStatManager) GetMetrics() []interface{} {
  134. result := []interface{}{
  135. sm.totalRecordsIn, sm.totalRecordsOut, sm.totalExceptions, sm.processLatency, sm.bufferLength,
  136. }
  137. if !sm.lastInvocation.IsZero() {
  138. result = append(result, sm.lastInvocation.Format("2006-01-02T15:04:05.999999"))
  139. } else {
  140. result = append(result, 0)
  141. }
  142. return result
  143. }