having_test.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. package plans
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/xsql"
  6. "github.com/emqx/kuiper/xstream/contexts"
  7. "reflect"
  8. "strings"
  9. "testing"
  10. )
  11. func TestHavingPlan_Apply(t *testing.T) {
  12. var tests = []struct {
  13. sql string
  14. data interface{}
  15. result interface{}
  16. }{
  17. {
  18. sql: `SELECT id1 FROM src1 HAVING avg(id1) > 1`,
  19. data: xsql.WindowTuplesSet{
  20. xsql.WindowTuples{
  21. Emitter: "src1",
  22. Tuples: []xsql.Tuple{
  23. {
  24. Emitter: "src1",
  25. Message: xsql.Message{"id1": 1, "f1": "v1"},
  26. }, {
  27. Emitter: "src1",
  28. Message: xsql.Message{"id1": 2, "f1": "v2"},
  29. }, {
  30. Emitter: "src1",
  31. Message: xsql.Message{"id1": 5, "f1": "v1"},
  32. },
  33. },
  34. },
  35. },
  36. result: xsql.WindowTuplesSet{
  37. xsql.WindowTuples{
  38. Emitter: "src1",
  39. Tuples: []xsql.Tuple{
  40. {
  41. Emitter: "src1",
  42. Message: xsql.Message{"id1": 1, "f1": "v1"},
  43. }, {
  44. Emitter: "src1",
  45. Message: xsql.Message{"id1": 2, "f1": "v2"},
  46. }, {
  47. Emitter: "src1",
  48. Message: xsql.Message{"id1": 5, "f1": "v1"},
  49. },
  50. },
  51. },
  52. },
  53. },
  54. {
  55. sql: `SELECT id1 FROM src1 HAVING sum(id1) > 1`,
  56. data: xsql.WindowTuplesSet{
  57. xsql.WindowTuples{
  58. Emitter: "src1",
  59. Tuples: []xsql.Tuple{
  60. {
  61. Emitter: "src1",
  62. Message: xsql.Message{"id1": 1, "f1": "v1"},
  63. },
  64. },
  65. },
  66. },
  67. result: nil,
  68. },
  69. {
  70. sql: `SELECT id1 FROM src1 HAVING sum(id1) = 1`,
  71. data: xsql.WindowTuplesSet{
  72. xsql.WindowTuples{
  73. Emitter: "src1",
  74. Tuples: []xsql.Tuple{
  75. {
  76. Emitter: "src1",
  77. Message: xsql.Message{"id1": 1, "f1": "v1"},
  78. },
  79. },
  80. },
  81. },
  82. result: xsql.WindowTuplesSet{
  83. xsql.WindowTuples{
  84. Emitter: "src1",
  85. Tuples: []xsql.Tuple{
  86. {
  87. Emitter: "src1",
  88. Message: xsql.Message{"id1": 1, "f1": "v1"},
  89. },
  90. },
  91. },
  92. },
  93. },
  94. {
  95. sql: `SELECT id1 FROM src1 HAVING max(id1) > 10`,
  96. data: xsql.WindowTuplesSet{
  97. xsql.WindowTuples{
  98. Emitter: "src1",
  99. Tuples: []xsql.Tuple{
  100. {
  101. Emitter: "src1",
  102. Message: xsql.Message{"id1": 1, "f1": "v1"},
  103. },
  104. },
  105. },
  106. },
  107. result: nil,
  108. },
  109. {
  110. sql: `SELECT id1 FROM src1 HAVING max(id1) = 1`,
  111. data: xsql.WindowTuplesSet{
  112. xsql.WindowTuples{
  113. Emitter: "src1",
  114. Tuples: []xsql.Tuple{
  115. {
  116. Emitter: "src1",
  117. Message: xsql.Message{"id1": 1, "f1": "v1"},
  118. },
  119. },
  120. },
  121. },
  122. result: xsql.WindowTuplesSet{
  123. xsql.WindowTuples{
  124. Emitter: "src1",
  125. Tuples: []xsql.Tuple{
  126. {
  127. Emitter: "src1",
  128. Message: xsql.Message{"id1": 1, "f1": "v1"},
  129. },
  130. },
  131. },
  132. },
  133. },
  134. }
  135. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  136. contextLogger := common.Log.WithField("rule", "TestHavingPlan_Apply")
  137. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  138. for i, tt := range tests {
  139. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  140. if err != nil {
  141. t.Errorf("statement parse error %s", err)
  142. break
  143. }
  144. pp := &HavingPlan{Condition: stmt.Having}
  145. result := pp.Apply(ctx, tt.data)
  146. if !reflect.DeepEqual(tt.result, result) {
  147. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  148. }
  149. }
  150. }