checkpoint_test.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. package topotest
  2. import (
  3. "github.com/emqx/kuiper/common"
  4. "github.com/emqx/kuiper/xstream/api"
  5. "testing"
  6. )
  7. // Full lifecycle test: Run window rule; trigger checkpoints by mock timer; restart rule; make sure the result is right;
  8. func TestCheckpoint(t *testing.T) {
  9. common.IsTesting = true
  10. streamList := []string{"demo"}
  11. HandleStream(false, streamList, t)
  12. var tests = []RuleCheckpointTest{{
  13. RuleTest: RuleTest{
  14. Name: `TestCheckpointRule1`,
  15. Sql: `SELECT * FROM demo GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
  16. R: [][]map[string]interface{}{
  17. {{
  18. "color": "red",
  19. "size": float64(3),
  20. "ts": float64(1541152486013),
  21. }, {
  22. "color": "blue",
  23. "size": float64(6),
  24. "ts": float64(1541152486822),
  25. }},
  26. {{
  27. "color": "red",
  28. "size": float64(3),
  29. "ts": float64(1541152486013),
  30. }, {
  31. "color": "blue",
  32. "size": float64(6),
  33. "ts": float64(1541152486822),
  34. }},
  35. {{
  36. "color": "blue",
  37. "size": float64(2),
  38. "ts": float64(1541152487632),
  39. }, {
  40. "color": "yellow",
  41. "size": float64(4),
  42. "ts": float64(1541152488442),
  43. }},
  44. {{
  45. "color": "blue",
  46. "size": float64(2),
  47. "ts": float64(1541152487632),
  48. }, {
  49. "color": "yellow",
  50. "size": float64(4),
  51. "ts": float64(1541152488442),
  52. }, {
  53. "color": "red",
  54. "size": float64(1),
  55. "ts": float64(1541152489252),
  56. }},
  57. },
  58. M: map[string]interface{}{
  59. "op_1_preprocessor_demo_0_records_in_total": int64(3),
  60. "op_1_preprocessor_demo_0_records_out_total": int64(3),
  61. "op_3_project_0_records_in_total": int64(3),
  62. "op_3_project_0_records_out_total": int64(3),
  63. "sink_mockSink_0_records_in_total": int64(3),
  64. "sink_mockSink_0_records_out_total": int64(3),
  65. "source_demo_0_records_in_total": int64(3),
  66. "source_demo_0_records_out_total": int64(3),
  67. "op_2_window_0_records_in_total": int64(3),
  68. "op_2_window_0_records_out_total": int64(3),
  69. },
  70. },
  71. PauseSize: 3,
  72. Cc: 2,
  73. PauseMetric: map[string]interface{}{
  74. "op_1_preprocessor_demo_0_records_in_total": int64(3),
  75. "op_1_preprocessor_demo_0_records_out_total": int64(3),
  76. "op_3_project_0_records_in_total": int64(1),
  77. "op_3_project_0_records_out_total": int64(1),
  78. "sink_mockSink_0_records_in_total": int64(1),
  79. "sink_mockSink_0_records_out_total": int64(1),
  80. "source_demo_0_records_in_total": int64(3),
  81. "source_demo_0_records_out_total": int64(3),
  82. "op_2_window_0_records_in_total": int64(3),
  83. "op_2_window_0_records_out_total": int64(1),
  84. }},
  85. }
  86. HandleStream(true, streamList, t)
  87. options := []*api.RuleOption{
  88. {
  89. BufferLength: 100,
  90. Qos: api.AtLeastOnce,
  91. CheckpointInterval: 600,
  92. SendError: true,
  93. }, {
  94. BufferLength: 100,
  95. Qos: api.ExactlyOnce,
  96. CheckpointInterval: 600,
  97. SendError: true,
  98. },
  99. }
  100. for j, opt := range options {
  101. DoCheckpointRuleTest(t, tests, j, opt)
  102. }
  103. }