window_op_test.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package node
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/xsql"
  18. "reflect"
  19. "testing"
  20. )
  21. var fivet = []*xsql.Tuple{
  22. {
  23. Message: map[string]interface{}{
  24. "f1": "v1",
  25. },
  26. },
  27. {
  28. Message: map[string]interface{}{
  29. "f2": "v2",
  30. },
  31. },
  32. {
  33. Message: map[string]interface{}{
  34. "f3": "v3",
  35. },
  36. },
  37. {
  38. Message: map[string]interface{}{
  39. "f4": "v4",
  40. },
  41. },
  42. {
  43. Message: map[string]interface{}{
  44. "f5": "v5",
  45. },
  46. },
  47. }
  48. func TestNewTupleList(t *testing.T) {
  49. _, e := NewTupleList(nil, 0)
  50. es1 := "Window size should not be less than zero."
  51. if !reflect.DeepEqual(es1, e.Error()) {
  52. t.Errorf("error mismatch:\n exp=%s\n got=%s\n\n", es1, e)
  53. }
  54. _, e = NewTupleList(nil, 2)
  55. es1 = "The tuples should not be nil or empty."
  56. if !reflect.DeepEqual(es1, e.Error()) {
  57. t.Errorf("error mismatch:\n exp=%s\n got=%s\n\n", es1, e)
  58. }
  59. }
  60. func TestCountWindow(t *testing.T) {
  61. var tests = []struct {
  62. tuplelist TupleList
  63. expWinCount int
  64. winTupleSets []xsql.WindowTuplesSet
  65. expRestTuples []*xsql.Tuple
  66. }{
  67. {
  68. tuplelist: TupleList{
  69. tuples: fivet,
  70. size: 5,
  71. },
  72. expWinCount: 1,
  73. winTupleSets: []xsql.WindowTuplesSet{
  74. {
  75. Content: []xsql.WindowTuples{
  76. {
  77. Emitter: "",
  78. Tuples: []xsql.Tuple{
  79. {
  80. Message: map[string]interface{}{
  81. "f1": "v1",
  82. },
  83. },
  84. {
  85. Message: map[string]interface{}{
  86. "f2": "v2",
  87. },
  88. },
  89. {
  90. Message: map[string]interface{}{
  91. "f3": "v3",
  92. },
  93. },
  94. {
  95. Message: map[string]interface{}{
  96. "f4": "v4",
  97. },
  98. },
  99. {
  100. Message: map[string]interface{}{
  101. "f5": "v5",
  102. },
  103. },
  104. },
  105. },
  106. },
  107. },
  108. },
  109. expRestTuples: []*xsql.Tuple{
  110. {
  111. Message: map[string]interface{}{
  112. "f2": "v2",
  113. },
  114. },
  115. {
  116. Message: map[string]interface{}{
  117. "f3": "v3",
  118. },
  119. },
  120. {
  121. Message: map[string]interface{}{
  122. "f4": "v4",
  123. },
  124. },
  125. {
  126. Message: map[string]interface{}{
  127. "f5": "v5",
  128. },
  129. },
  130. },
  131. },
  132. {
  133. tuplelist: TupleList{
  134. tuples: fivet,
  135. size: 3,
  136. },
  137. expWinCount: 1,
  138. winTupleSets: []xsql.WindowTuplesSet{
  139. {
  140. Content: []xsql.WindowTuples{
  141. {
  142. Emitter: "",
  143. Tuples: []xsql.Tuple{
  144. {
  145. Message: map[string]interface{}{
  146. "f3": "v3",
  147. },
  148. },
  149. {
  150. Message: map[string]interface{}{
  151. "f4": "v4",
  152. },
  153. },
  154. {
  155. Message: map[string]interface{}{
  156. "f5": "v5",
  157. },
  158. },
  159. },
  160. },
  161. },
  162. },
  163. },
  164. expRestTuples: []*xsql.Tuple{
  165. {
  166. Message: map[string]interface{}{
  167. "f4": "v4",
  168. },
  169. },
  170. {
  171. Message: map[string]interface{}{
  172. "f5": "v5",
  173. },
  174. },
  175. },
  176. },
  177. {
  178. tuplelist: TupleList{
  179. tuples: fivet,
  180. size: 2,
  181. },
  182. expWinCount: 1,
  183. winTupleSets: []xsql.WindowTuplesSet{
  184. {
  185. Content: []xsql.WindowTuples{
  186. {
  187. Emitter: "",
  188. Tuples: []xsql.Tuple{
  189. {
  190. Message: map[string]interface{}{
  191. "f4": "v4",
  192. },
  193. },
  194. {
  195. Message: map[string]interface{}{
  196. "f5": "v5",
  197. },
  198. },
  199. },
  200. },
  201. },
  202. },
  203. },
  204. expRestTuples: []*xsql.Tuple{
  205. {
  206. Message: map[string]interface{}{
  207. "f5": "v5",
  208. },
  209. },
  210. },
  211. },
  212. {
  213. tuplelist: TupleList{
  214. tuples: fivet,
  215. size: 6,
  216. },
  217. expWinCount: 0,
  218. winTupleSets: nil,
  219. expRestTuples: []*xsql.Tuple{
  220. {
  221. Message: map[string]interface{}{
  222. "f1": "v1",
  223. },
  224. },
  225. {
  226. Message: map[string]interface{}{
  227. "f2": "v2",
  228. },
  229. },
  230. {
  231. Message: map[string]interface{}{
  232. "f3": "v3",
  233. },
  234. },
  235. {
  236. Message: map[string]interface{}{
  237. "f4": "v4",
  238. },
  239. },
  240. {
  241. Message: map[string]interface{}{
  242. "f5": "v5",
  243. },
  244. },
  245. },
  246. },
  247. }
  248. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  249. for i, tt := range tests {
  250. if tt.expWinCount == 0 {
  251. if tt.tuplelist.hasMoreCountWindow() {
  252. t.Errorf("%d \n Should not have more count window.", i)
  253. }
  254. } else {
  255. for j := 0; j < tt.expWinCount; j++ {
  256. if !tt.tuplelist.hasMoreCountWindow() {
  257. t.Errorf("%d \n Expect more element, but cannot find more element.", i)
  258. }
  259. cw := tt.tuplelist.nextCountWindow()
  260. if cw.WindowRange == nil {
  261. t.Errorf("%d. got nil window range", i)
  262. }
  263. if !reflect.DeepEqual(tt.winTupleSets[j].Content, cw.Content) {
  264. t.Errorf("%d. \nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.winTupleSets[j], cw)
  265. }
  266. }
  267. rest := tt.tuplelist.getRestTuples()
  268. if !reflect.DeepEqual(tt.expRestTuples, rest) {
  269. t.Errorf("%d. \nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.expRestTuples, rest)
  270. }
  271. }
  272. }
  273. }