stats_manager.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package node
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/pkg/api"
  18. "time"
  19. )
  20. const RecordsInTotal = "records_in_total"
  21. const RecordsOutTotal = "records_out_total"
  22. const ExceptionsTotal = "exceptions_total"
  23. const ProcessLatencyUs = "process_latency_us"
  24. const LastInvocation = "last_invocation"
  25. const BufferLength = "buffer_length"
  26. var MetricNames = []string{RecordsInTotal, RecordsOutTotal, ExceptionsTotal, ProcessLatencyUs, BufferLength, LastInvocation}
  27. type StatManager interface {
  28. IncTotalRecordsIn()
  29. IncTotalRecordsOut()
  30. IncTotalExceptions()
  31. ProcessTimeStart()
  32. ProcessTimeEnd()
  33. SetBufferLength(l int64)
  34. GetMetrics() []interface{}
  35. }
  36. //The statManager is not thread safe. Make sure it is used in only one instance
  37. type DefaultStatManager struct {
  38. //metrics
  39. totalRecordsIn int64
  40. totalRecordsOut int64
  41. totalExceptions int64
  42. processLatency int64
  43. lastInvocation time.Time
  44. bufferLength int64
  45. //configs
  46. opType string //"source", "op", "sink"
  47. prefix string
  48. processTimeStart time.Time
  49. opId string
  50. instanceId int
  51. }
  52. func NewStatManager(ctx api.StreamContext, opType string) (StatManager, error) {
  53. var prefix string
  54. switch opType {
  55. case "source":
  56. prefix = "source_"
  57. case "op":
  58. prefix = "op_"
  59. case "sink":
  60. prefix = "sink_"
  61. default:
  62. return nil, fmt.Errorf("invalid opType %s, must be \"source\", \"sink\" or \"op\"", opType)
  63. }
  64. ds := DefaultStatManager{
  65. opType: opType,
  66. prefix: prefix,
  67. opId: ctx.GetOpId(),
  68. instanceId: ctx.GetInstanceId(),
  69. }
  70. return getStatManager(ctx, ds)
  71. }
  72. func (sm *DefaultStatManager) IncTotalRecordsIn() {
  73. sm.totalRecordsIn++
  74. }
  75. func (sm *DefaultStatManager) IncTotalRecordsOut() {
  76. sm.totalRecordsOut++
  77. }
  78. func (sm *DefaultStatManager) IncTotalExceptions() {
  79. sm.totalExceptions++
  80. var t time.Time
  81. sm.processTimeStart = t
  82. }
  83. func (sm *DefaultStatManager) ProcessTimeStart() {
  84. sm.lastInvocation = time.Now()
  85. sm.processTimeStart = sm.lastInvocation
  86. }
  87. func (sm *DefaultStatManager) ProcessTimeEnd() {
  88. if !sm.processTimeStart.IsZero() {
  89. sm.processLatency = int64(time.Since(sm.processTimeStart) / time.Microsecond)
  90. }
  91. }
  92. func (sm *DefaultStatManager) SetBufferLength(l int64) {
  93. sm.bufferLength = l
  94. }
  95. func (sm *DefaultStatManager) GetMetrics() []interface{} {
  96. result := []interface{}{
  97. sm.totalRecordsIn, sm.totalRecordsOut, sm.totalExceptions, sm.processLatency, sm.bufferLength,
  98. }
  99. if !sm.lastInvocation.IsZero() {
  100. result = append(result, sm.lastInvocation.Format("2006-01-02T15:04:05.999999"))
  101. } else {
  102. result = append(result, 0)
  103. }
  104. return result
  105. }