checkpoint_test.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. package processors
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/xstream/api"
  6. "testing"
  7. "time"
  8. )
  9. type ruleCheckpointTest struct {
  10. ruleTest
  11. pauseSize int // Stop stream after sending pauseSize source to test checkpoint resume
  12. cc int // checkpoint count when paused
  13. pauseMetric map[string]interface{} // The metric to check when paused
  14. }
  15. // Full lifecycle test: Run window rule; trigger checkpoints by mock timer; restart rule; make sure the result is right;
  16. func TestCheckpoint(t *testing.T) {
  17. common.IsTesting = true
  18. streamList := []string{"demo"}
  19. handleStream(false, streamList, t)
  20. var tests = []ruleCheckpointTest{{
  21. ruleTest: ruleTest{
  22. name: `TestCheckpointRule1`,
  23. sql: `SELECT * FROM demo GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  24. r: [][]map[string]interface{}{
  25. {{
  26. "color": "red",
  27. "size": float64(3),
  28. "ts": float64(1541152486013),
  29. }, {
  30. "color": "blue",
  31. "size": float64(6),
  32. "ts": float64(1541152486822),
  33. }},
  34. {{
  35. "color": "red",
  36. "size": float64(3),
  37. "ts": float64(1541152486013),
  38. }, {
  39. "color": "blue",
  40. "size": float64(6),
  41. "ts": float64(1541152486822),
  42. }},
  43. {{
  44. "color": "blue",
  45. "size": float64(2),
  46. "ts": float64(1541152487632),
  47. }, {
  48. "color": "yellow",
  49. "size": float64(4),
  50. "ts": float64(1541152488442),
  51. }},
  52. {{
  53. "color": "blue",
  54. "size": float64(2),
  55. "ts": float64(1541152487632),
  56. }, {
  57. "color": "yellow",
  58. "size": float64(4),
  59. "ts": float64(1541152488442),
  60. }, {
  61. "color": "red",
  62. "size": float64(1),
  63. "ts": float64(1541152489252),
  64. }},
  65. },
  66. m: map[string]interface{}{
  67. "op_1_preprocessor_demo_0_records_in_total": int64(3),
  68. "op_1_preprocessor_demo_0_records_out_total": int64(3),
  69. "op_3_project_0_records_in_total": int64(3),
  70. "op_3_project_0_records_out_total": int64(3),
  71. "sink_mockSink_0_records_in_total": int64(3),
  72. "sink_mockSink_0_records_out_total": int64(3),
  73. "source_demo_0_records_in_total": int64(3),
  74. "source_demo_0_records_out_total": int64(3),
  75. "op_2_window_0_records_in_total": int64(3),
  76. "op_2_window_0_records_out_total": int64(3),
  77. },
  78. },
  79. pauseSize: 3,
  80. cc: 2,
  81. pauseMetric: map[string]interface{}{
  82. "op_1_preprocessor_demo_0_records_in_total": int64(3),
  83. "op_1_preprocessor_demo_0_records_out_total": int64(3),
  84. "op_3_project_0_records_in_total": int64(1),
  85. "op_3_project_0_records_out_total": int64(1),
  86. "sink_mockSink_0_records_in_total": int64(1),
  87. "sink_mockSink_0_records_out_total": int64(1),
  88. "source_demo_0_records_in_total": int64(3),
  89. "source_demo_0_records_out_total": int64(3),
  90. "op_2_window_0_records_in_total": int64(3),
  91. "op_2_window_0_records_out_total": int64(1),
  92. }},
  93. }
  94. handleStream(true, streamList, t)
  95. options := []*api.RuleOption{
  96. {
  97. BufferLength: 100,
  98. Qos: api.AtLeastOnce,
  99. CheckpointInterval: 600,
  100. SendError: true,
  101. }, {
  102. BufferLength: 100,
  103. Qos: api.ExactlyOnce,
  104. CheckpointInterval: 600,
  105. SendError: true,
  106. },
  107. }
  108. for j, opt := range options {
  109. doCheckpointRuleTest(t, tests, j, opt)
  110. }
  111. }
  112. func doCheckpointRuleTest(t *testing.T, tests []ruleCheckpointTest, j int, opt *api.RuleOption) {
  113. fmt.Printf("The test bucket for option %d size is %d.\n\n", j, len(tests))
  114. for i, tt := range tests {
  115. datas, dataLength, tp, mockSink, errCh := createStream(t, tt.ruleTest, j, opt, nil)
  116. log.Debugf("Start sending first phase data done at %d", common.GetNowInMilli())
  117. if err := sendData(t, tt.pauseSize, tt.pauseMetric, datas, errCh, tp, 100, 1); err != nil {
  118. t.Errorf("first phase send data error %s", err)
  119. break
  120. }
  121. log.Debugf("Send first phase data done at %d", common.GetNowInMilli())
  122. // compare checkpoint count
  123. time.Sleep(10 * time.Millisecond)
  124. var retry int
  125. for retry = 3; retry > 0; retry-- {
  126. actual := tp.GetCoordinator().GetCompleteCount()
  127. if tt.cc == actual {
  128. break
  129. } else {
  130. common.Log.Debugf("check checkpointCount error at %d: %d", retry, actual)
  131. }
  132. time.Sleep(200 * time.Millisecond)
  133. }
  134. cc := tp.GetCoordinator().GetCompleteCount()
  135. tp.Cancel()
  136. if retry == 0 {
  137. t.Errorf("%d-%d. checkpoint count\n\nresult mismatch:\n\nexp=%#v\n\ngot=%d\n\n", i, j, tt.cc, cc)
  138. return
  139. } else if retry < 3 {
  140. fmt.Printf("try %d for checkpoint count\n", 4-retry)
  141. }
  142. tp.Cancel()
  143. time.Sleep(10 * time.Millisecond)
  144. // resume stream
  145. log.Debugf("Resume stream at %d", common.GetNowInMilli())
  146. errCh = tp.Open()
  147. log.Debugf("After open stream at %d", common.GetNowInMilli())
  148. if err := sendData(t, dataLength, tt.m, datas, errCh, tp, POSTLEAP, 10); err != nil {
  149. t.Errorf("second phase send data error %s", err)
  150. break
  151. }
  152. compareResult(t, mockSink, commonResultFunc, tt.ruleTest, i, tp)
  153. }
  154. }