checkpoint_test.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. package processors
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/xstream/api"
  6. "reflect"
  7. "testing"
  8. "time"
  9. )
  10. type ruleCheckpointTest struct {
  11. ruleTest
  12. pauseSize int // Stop stream after sending pauseSize source to test checkpoint resume
  13. cc int // checkpoint count when paused
  14. pauseMetric map[string]interface{} // The metric to check when paused
  15. }
  16. // Full lifecycle test: Run window rule; trigger checkpoints by mock timer; restart rule; make sure the result is right;
  17. func TestCheckpoint(t *testing.T) {
  18. common.IsTesting = true
  19. streamList := []string{"demo"}
  20. handleStream(false, streamList, t)
  21. var tests = []ruleCheckpointTest{{
  22. ruleTest: ruleTest{
  23. name: `TestCheckpointRule1`,
  24. sql: `SELECT * FROM demo GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  25. r: [][]map[string]interface{}{
  26. {{
  27. "color": "red",
  28. "size": float64(3),
  29. "ts": float64(1541152486013),
  30. }, {
  31. "color": "blue",
  32. "size": float64(6),
  33. "ts": float64(1541152486822),
  34. }},
  35. {{
  36. "color": "red",
  37. "size": float64(3),
  38. "ts": float64(1541152486013),
  39. }, {
  40. "color": "blue",
  41. "size": float64(6),
  42. "ts": float64(1541152486822),
  43. }},
  44. {{
  45. "color": "blue",
  46. "size": float64(2),
  47. "ts": float64(1541152487632),
  48. }, {
  49. "color": "yellow",
  50. "size": float64(4),
  51. "ts": float64(1541152488442),
  52. }},
  53. {{
  54. "color": "blue",
  55. "size": float64(2),
  56. "ts": float64(1541152487632),
  57. }, {
  58. "color": "yellow",
  59. "size": float64(4),
  60. "ts": float64(1541152488442),
  61. }, {
  62. "color": "red",
  63. "size": float64(1),
  64. "ts": float64(1541152489252),
  65. }},
  66. },
  67. m: map[string]interface{}{
  68. "op_preprocessor_demo_0_records_in_total": int64(3),
  69. "op_preprocessor_demo_0_records_out_total": int64(3),
  70. "op_project_0_records_in_total": int64(3),
  71. "op_project_0_records_out_total": int64(3),
  72. "sink_mockSink_0_records_in_total": int64(3),
  73. "sink_mockSink_0_records_out_total": int64(3),
  74. "source_demo_0_records_in_total": int64(3),
  75. "source_demo_0_records_out_total": int64(3),
  76. "op_window_0_records_in_total": int64(3),
  77. "op_window_0_records_out_total": int64(3),
  78. },
  79. },
  80. pauseSize: 3,
  81. cc: 2,
  82. pauseMetric: map[string]interface{}{
  83. "op_preprocessor_demo_0_records_in_total": int64(3),
  84. "op_preprocessor_demo_0_records_out_total": int64(3),
  85. "op_project_0_records_in_total": int64(1),
  86. "op_project_0_records_out_total": int64(1),
  87. "sink_mockSink_0_records_in_total": int64(1),
  88. "sink_mockSink_0_records_out_total": int64(1),
  89. "source_demo_0_records_in_total": int64(3),
  90. "source_demo_0_records_out_total": int64(3),
  91. "op_window_0_records_in_total": int64(3),
  92. "op_window_0_records_out_total": int64(1),
  93. }},
  94. }
  95. handleStream(true, streamList, t)
  96. options := []*api.RuleOption{
  97. {
  98. BufferLength: 100,
  99. Qos: api.AtLeastOnce,
  100. CheckpointInterval: 600,
  101. }, {
  102. BufferLength: 100,
  103. Qos: api.ExactlyOnce,
  104. CheckpointInterval: 600,
  105. },
  106. }
  107. for j, opt := range options {
  108. doCheckpointRuleTest(t, tests, j, opt)
  109. }
  110. }
  111. func doCheckpointRuleTest(t *testing.T, tests []ruleCheckpointTest, j int, opt *api.RuleOption) {
  112. fmt.Printf("The test bucket for option %d size is %d.\n\n", j, len(tests))
  113. for i, tt := range tests {
  114. datas, dataLength, tp, mockSink, errCh := createStream(t, tt.ruleTest, j, opt, nil)
  115. log.Debugf("Start sending first phase data done at %d", common.GetNowInMilli())
  116. if err := sendData(t, tt.pauseSize, tt.pauseMetric, datas, errCh, tp, 100); err != nil {
  117. t.Errorf("first phase send data error %s", err)
  118. break
  119. }
  120. log.Debugf("Send first phase data done at %d", common.GetNowInMilli())
  121. // compare checkpoint count
  122. var retry int
  123. for retry = 100; retry > 0; retry-- {
  124. time.Sleep(time.Duration(retry) * time.Millisecond)
  125. actual := tp.GetCoordinator().GetCompleteCount()
  126. if reflect.DeepEqual(tt.cc, actual) {
  127. break
  128. } else {
  129. common.Log.Debugf("check checkpointCount error at %d: %d", retry, actual)
  130. }
  131. }
  132. tp.Cancel()
  133. if retry == 0 {
  134. t.Errorf("%d-%d. checkpoint count\n\nresult mismatch:\n\nexp=%#v\n\ngot=%d\n\n", i, j, tt.cc, tp.GetCoordinator().GetCompleteCount())
  135. return
  136. }
  137. time.Sleep(10 * time.Millisecond)
  138. // resume stream
  139. log.Debugf("Resume stream at %d", common.GetNowInMilli())
  140. errCh = tp.Open()
  141. log.Debugf("After open stream at %d", common.GetNowInMilli())
  142. if err := sendData(t, dataLength, tt.m, datas, errCh, tp, POSTLEAP); err != nil {
  143. t.Errorf("second phase send data error %s", err)
  144. break
  145. }
  146. compareResult(t, mockSink, commonResultFunc, tt.ruleTest, i, tp)
  147. }
  148. }