stats_manager.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package metric
  15. import (
  16. "fmt"
  17. "time"
  18. "github.com/lf-edge/ekuiper/pkg/api"
  19. )
  20. const (
  21. RecordsInTotal = "records_in_total"
  22. RecordsOutTotal = "records_out_total"
  23. ProcessLatencyUs = "process_latency_us"
  24. ProcessLatencyUsHist = "process_latency_us_hist"
  25. LastInvocation = "last_invocation"
  26. BufferLength = "buffer_length"
  27. ExceptionsTotal = "exceptions_total"
  28. LastException = "last_exception"
  29. LastExceptionTime = "last_exception_time"
  30. )
  31. var MetricNames = []string{RecordsInTotal, RecordsOutTotal, ProcessLatencyUs, BufferLength, LastInvocation, ExceptionsTotal, LastException, LastExceptionTime}
  32. type StatManager interface {
  33. IncTotalRecordsIn()
  34. IncTotalRecordsOut()
  35. IncTotalExceptions(err string)
  36. ProcessTimeStart()
  37. ProcessTimeEnd()
  38. SetBufferLength(l int64)
  39. SetProcessTimeStart(t time.Time)
  40. GetMetrics() []interface{}
  41. // Clean remove all metrics history
  42. Clean(ruleId string)
  43. }
  44. // DefaultStatManager The statManager is not thread safe. Make sure it is used in only one instance
  45. type DefaultStatManager struct {
  46. // metrics
  47. totalRecordsIn int64
  48. totalRecordsOut int64
  49. processLatency int64
  50. lastInvocation time.Time
  51. bufferLength int64
  52. totalExceptions int64
  53. lastException string
  54. lastExceptionTime time.Time
  55. // configs
  56. opType string //"source", "op", "sink"
  57. prefix string
  58. processTimeStart time.Time
  59. opId string
  60. instanceId int
  61. }
  62. func NewStatManager(ctx api.StreamContext, opType string) (StatManager, error) {
  63. var prefix string
  64. switch opType {
  65. case "source":
  66. prefix = "source_"
  67. case "op":
  68. prefix = "op_"
  69. case "sink":
  70. prefix = "sink_"
  71. default:
  72. return nil, fmt.Errorf("invalid opType %s, must be \"source\", \"sink\" or \"op\"", opType)
  73. }
  74. ds := DefaultStatManager{
  75. opType: opType,
  76. prefix: prefix,
  77. opId: ctx.GetOpId(),
  78. instanceId: ctx.GetInstanceId(),
  79. }
  80. return getStatManager(ctx, ds)
  81. }
  82. func (sm *DefaultStatManager) IncTotalRecordsIn() {
  83. sm.totalRecordsIn++
  84. }
  85. func (sm *DefaultStatManager) IncTotalRecordsOut() {
  86. sm.totalRecordsOut++
  87. }
  88. func (sm *DefaultStatManager) IncTotalExceptions(err string) {
  89. sm.totalExceptions++
  90. var t time.Time
  91. sm.processTimeStart = t
  92. sm.lastException = err
  93. sm.lastExceptionTime = time.Now()
  94. }
  95. func (sm *DefaultStatManager) ProcessTimeStart() {
  96. sm.lastInvocation = time.Now()
  97. sm.processTimeStart = sm.lastInvocation
  98. }
  99. func (sm *DefaultStatManager) ProcessTimeEnd() {
  100. if !sm.processTimeStart.IsZero() {
  101. sm.processLatency = int64(time.Since(sm.processTimeStart) / time.Microsecond)
  102. }
  103. }
  104. func (sm *DefaultStatManager) SetBufferLength(l int64) {
  105. sm.bufferLength = l
  106. }
  107. func (sm *DefaultStatManager) SetProcessTimeStart(t time.Time) {
  108. sm.processTimeStart = t
  109. sm.lastInvocation = t
  110. }
  111. func (sm *DefaultStatManager) GetMetrics() []interface{} {
  112. result := []interface{}{
  113. sm.totalRecordsIn,
  114. sm.totalRecordsOut,
  115. sm.processLatency,
  116. sm.bufferLength,
  117. 0,
  118. sm.totalExceptions,
  119. sm.lastException,
  120. 0,
  121. }
  122. if !sm.lastInvocation.IsZero() {
  123. result[4] = sm.lastInvocation.Format("2006-01-02T15:04:05.999999")
  124. }
  125. if !sm.lastExceptionTime.IsZero() {
  126. result[7] = sm.lastExceptionTime.Format("2006-01-02T15:04:05.999999")
  127. }
  128. return result
  129. }
  130. func (sm *DefaultStatManager) Clean(_ string) {
  131. // do nothing
  132. }