having_test.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. package plans
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/emqx/kuiper/xstream/contexts"
  8. "reflect"
  9. "strings"
  10. "testing"
  11. )
  12. func TestHavingPlan_Apply(t *testing.T) {
  13. var tests = []struct {
  14. sql string
  15. data interface{}
  16. result interface{}
  17. }{
  18. {
  19. sql: `SELECT id1 FROM src1 HAVING avg(id1) > 1`,
  20. data: xsql.WindowTuplesSet{
  21. xsql.WindowTuples{
  22. Emitter: "src1",
  23. Tuples: []xsql.Tuple{
  24. {
  25. Emitter: "src1",
  26. Message: xsql.Message{"id1": 1, "f1": "v1"},
  27. }, {
  28. Emitter: "src1",
  29. Message: xsql.Message{"id1": 2, "f1": "v2"},
  30. }, {
  31. Emitter: "src1",
  32. Message: xsql.Message{"id1": 5, "f1": "v1"},
  33. },
  34. },
  35. },
  36. },
  37. result: xsql.WindowTuplesSet{
  38. xsql.WindowTuples{
  39. Emitter: "src1",
  40. Tuples: []xsql.Tuple{
  41. {
  42. Emitter: "src1",
  43. Message: xsql.Message{"id1": 1, "f1": "v1"},
  44. }, {
  45. Emitter: "src1",
  46. Message: xsql.Message{"id1": 2, "f1": "v2"},
  47. }, {
  48. Emitter: "src1",
  49. Message: xsql.Message{"id1": 5, "f1": "v1"},
  50. },
  51. },
  52. },
  53. },
  54. },
  55. {
  56. sql: `SELECT id1 FROM src1 HAVING sum(id1) > 1`,
  57. data: xsql.WindowTuplesSet{
  58. xsql.WindowTuples{
  59. Emitter: "src1",
  60. Tuples: []xsql.Tuple{
  61. {
  62. Emitter: "src1",
  63. Message: xsql.Message{"id1": 1, "f1": "v1"},
  64. },
  65. },
  66. },
  67. },
  68. result: nil,
  69. },
  70. {
  71. sql: `SELECT id1 FROM src1 HAVING sum(id1) = 1`,
  72. data: xsql.WindowTuplesSet{
  73. xsql.WindowTuples{
  74. Emitter: "src1",
  75. Tuples: []xsql.Tuple{
  76. {
  77. Emitter: "src1",
  78. Message: xsql.Message{"id1": 1, "f1": "v1"},
  79. },
  80. },
  81. },
  82. },
  83. result: xsql.WindowTuplesSet{
  84. xsql.WindowTuples{
  85. Emitter: "src1",
  86. Tuples: []xsql.Tuple{
  87. {
  88. Emitter: "src1",
  89. Message: xsql.Message{"id1": 1, "f1": "v1"},
  90. },
  91. },
  92. },
  93. },
  94. },
  95. {
  96. sql: `SELECT id1 FROM src1 HAVING max(id1) > 10`,
  97. data: xsql.WindowTuplesSet{
  98. xsql.WindowTuples{
  99. Emitter: "src1",
  100. Tuples: []xsql.Tuple{
  101. {
  102. Emitter: "src1",
  103. Message: xsql.Message{"id1": 1, "f1": "v1"},
  104. },
  105. },
  106. },
  107. },
  108. result: nil,
  109. },
  110. {
  111. sql: `SELECT id1 FROM src1 HAVING max(id1) = 1`,
  112. data: xsql.WindowTuplesSet{
  113. xsql.WindowTuples{
  114. Emitter: "src1",
  115. Tuples: []xsql.Tuple{
  116. {
  117. Emitter: "src1",
  118. Message: xsql.Message{"id1": 1, "f1": "v1"},
  119. },
  120. },
  121. },
  122. },
  123. result: xsql.WindowTuplesSet{
  124. xsql.WindowTuples{
  125. Emitter: "src1",
  126. Tuples: []xsql.Tuple{
  127. {
  128. Emitter: "src1",
  129. Message: xsql.Message{"id1": 1, "f1": "v1"},
  130. },
  131. },
  132. },
  133. },
  134. },
  135. }
  136. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  137. contextLogger := common.Log.WithField("rule", "TestHavingPlan_Apply")
  138. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  139. for i, tt := range tests {
  140. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  141. if err != nil {
  142. t.Errorf("statement parse error %s", err)
  143. break
  144. }
  145. pp := &HavingPlan{Condition: stmt.Having}
  146. result := pp.Apply(ctx, tt.data)
  147. if !reflect.DeepEqual(tt.result, result) {
  148. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  149. }
  150. }
  151. }
  152. func TestHavingPlanError(t *testing.T) {
  153. var tests = []struct {
  154. sql string
  155. data interface{}
  156. result interface{}
  157. }{
  158. {
  159. sql: `SELECT id1 FROM src1 HAVING avg(id1) > "str"`,
  160. data: xsql.WindowTuplesSet{
  161. xsql.WindowTuples{
  162. Emitter: "src1",
  163. Tuples: []xsql.Tuple{
  164. {
  165. Emitter: "src1",
  166. Message: xsql.Message{"id1": 1, "f1": "v1"},
  167. }, {
  168. Emitter: "src1",
  169. Message: xsql.Message{"id1": 2, "f1": "v2"},
  170. }, {
  171. Emitter: "src1",
  172. Message: xsql.Message{"id1": 5, "f1": "v1"},
  173. },
  174. },
  175. },
  176. },
  177. result: errors.New("invalid operation int64 > string"),
  178. }, {
  179. sql: `SELECT id1 FROM src1 HAVING avg(id1) > "str"`,
  180. data: errors.New("an error from upstream"),
  181. result: errors.New("an error from upstream"),
  182. },
  183. }
  184. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  185. contextLogger := common.Log.WithField("rule", "TestHavingPlan_Apply")
  186. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  187. for i, tt := range tests {
  188. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  189. if err != nil {
  190. t.Errorf("statement parse error %s", err)
  191. break
  192. }
  193. pp := &HavingPlan{Condition: stmt.Having}
  194. result := pp.Apply(ctx, tt.data)
  195. if !reflect.DeepEqual(tt.result, result) {
  196. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
  197. }
  198. }
  199. }