window_op_test.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package node
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/xsql"
  18. "reflect"
  19. "testing"
  20. "time"
  21. )
  22. var fivet = []*xsql.Tuple{
  23. {
  24. Message: map[string]interface{}{
  25. "f1": "v1",
  26. },
  27. },
  28. {
  29. Message: map[string]interface{}{
  30. "f2": "v2",
  31. },
  32. },
  33. {
  34. Message: map[string]interface{}{
  35. "f3": "v3",
  36. },
  37. },
  38. {
  39. Message: map[string]interface{}{
  40. "f4": "v4",
  41. },
  42. },
  43. {
  44. Message: map[string]interface{}{
  45. "f5": "v5",
  46. },
  47. },
  48. }
  49. func TestTime(t *testing.T) {
  50. var tests = []struct {
  51. interval int
  52. end time.Time
  53. }{
  54. {
  55. interval: 10,
  56. end: time.UnixMilli(1658218371340),
  57. }, {
  58. interval: 500,
  59. end: time.UnixMilli(1658218371500),
  60. }, {
  61. interval: 1000,
  62. end: time.UnixMilli(1658218372000),
  63. }, {
  64. interval: 40000, // 4oms
  65. end: time.UnixMilli(1658218400000),
  66. }, {
  67. interval: 60000,
  68. end: time.UnixMilli(1658218380000),
  69. }, {
  70. interval: 180000,
  71. end: time.UnixMilli(1658218500000),
  72. }, {
  73. interval: 3600000,
  74. end: time.UnixMilli(1658221200000),
  75. }, {
  76. interval: 7200000,
  77. end: time.UnixMilli(1658224800000),
  78. }, {
  79. interval: 18000000, // 5 hours
  80. end: time.UnixMilli(1658232000000),
  81. }, {
  82. interval: 3600000 * 24, // 1 day
  83. end: time.UnixMilli(1658246400000),
  84. }, {
  85. interval: 3600000 * 24 * 7, // 1 week
  86. end: time.UnixMilli(1658764800000),
  87. },
  88. }
  89. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  90. for i, tt := range tests {
  91. ae := getAlignedWindowEndTime(1658218371337, int64(tt.interval))
  92. if tt.end.UnixMilli() != ae.UnixMilli() {
  93. t.Errorf("%d for interval %d. error mismatch:\n exp=%s(%d)\n got=%s(%d)\n\n", i, tt.interval, tt.end, tt.end.UnixMilli(), ae, ae.UnixMilli())
  94. }
  95. }
  96. }
  97. func TestNewTupleList(t *testing.T) {
  98. _, e := NewTupleList(nil, 0)
  99. es1 := "Window size should not be less than zero."
  100. if !reflect.DeepEqual(es1, e.Error()) {
  101. t.Errorf("error mismatch:\n exp=%s\n got=%s\n\n", es1, e)
  102. }
  103. _, e = NewTupleList(nil, 2)
  104. es1 = "The tuples should not be nil or empty."
  105. if !reflect.DeepEqual(es1, e.Error()) {
  106. t.Errorf("error mismatch:\n exp=%s\n got=%s\n\n", es1, e)
  107. }
  108. }
  109. func TestCountWindow(t *testing.T) {
  110. var tests = []struct {
  111. tuplelist TupleList
  112. expWinCount int
  113. winTupleSets []xsql.WindowTuples
  114. expRestTuples []*xsql.Tuple
  115. }{
  116. {
  117. tuplelist: TupleList{
  118. tuples: fivet,
  119. size: 5,
  120. },
  121. expWinCount: 1,
  122. winTupleSets: []xsql.WindowTuples{
  123. {
  124. Content: []xsql.TupleRow{
  125. &xsql.Tuple{
  126. Message: map[string]interface{}{
  127. "f1": "v1",
  128. },
  129. },
  130. &xsql.Tuple{
  131. Message: map[string]interface{}{
  132. "f2": "v2",
  133. },
  134. },
  135. &xsql.Tuple{
  136. Message: map[string]interface{}{
  137. "f3": "v3",
  138. },
  139. },
  140. &xsql.Tuple{
  141. Message: map[string]interface{}{
  142. "f4": "v4",
  143. },
  144. },
  145. &xsql.Tuple{
  146. Message: map[string]interface{}{
  147. "f5": "v5",
  148. },
  149. },
  150. },
  151. },
  152. },
  153. expRestTuples: []*xsql.Tuple{
  154. {
  155. Message: map[string]interface{}{
  156. "f2": "v2",
  157. },
  158. },
  159. {
  160. Message: map[string]interface{}{
  161. "f3": "v3",
  162. },
  163. },
  164. {
  165. Message: map[string]interface{}{
  166. "f4": "v4",
  167. },
  168. },
  169. {
  170. Message: map[string]interface{}{
  171. "f5": "v5",
  172. },
  173. },
  174. },
  175. },
  176. {
  177. tuplelist: TupleList{
  178. tuples: fivet,
  179. size: 3,
  180. },
  181. expWinCount: 1,
  182. winTupleSets: []xsql.WindowTuples{
  183. {
  184. Content: []xsql.TupleRow{
  185. &xsql.Tuple{
  186. Message: map[string]interface{}{
  187. "f3": "v3",
  188. },
  189. },
  190. &xsql.Tuple{
  191. Message: map[string]interface{}{
  192. "f4": "v4",
  193. },
  194. },
  195. &xsql.Tuple{
  196. Message: map[string]interface{}{
  197. "f5": "v5",
  198. },
  199. },
  200. },
  201. },
  202. },
  203. expRestTuples: []*xsql.Tuple{
  204. {
  205. Message: map[string]interface{}{
  206. "f4": "v4",
  207. },
  208. },
  209. {
  210. Message: map[string]interface{}{
  211. "f5": "v5",
  212. },
  213. },
  214. },
  215. },
  216. {
  217. tuplelist: TupleList{
  218. tuples: fivet,
  219. size: 2,
  220. },
  221. expWinCount: 1,
  222. winTupleSets: []xsql.WindowTuples{
  223. {
  224. Content: []xsql.TupleRow{
  225. &xsql.Tuple{
  226. Message: map[string]interface{}{
  227. "f4": "v4",
  228. },
  229. },
  230. &xsql.Tuple{
  231. Message: map[string]interface{}{
  232. "f5": "v5",
  233. },
  234. },
  235. },
  236. },
  237. },
  238. expRestTuples: []*xsql.Tuple{
  239. {
  240. Message: map[string]interface{}{
  241. "f5": "v5",
  242. },
  243. },
  244. },
  245. },
  246. {
  247. tuplelist: TupleList{
  248. tuples: fivet,
  249. size: 6,
  250. },
  251. expWinCount: 0,
  252. winTupleSets: nil,
  253. expRestTuples: []*xsql.Tuple{
  254. {
  255. Message: map[string]interface{}{
  256. "f1": "v1",
  257. },
  258. },
  259. {
  260. Message: map[string]interface{}{
  261. "f2": "v2",
  262. },
  263. },
  264. {
  265. Message: map[string]interface{}{
  266. "f3": "v3",
  267. },
  268. },
  269. {
  270. Message: map[string]interface{}{
  271. "f4": "v4",
  272. },
  273. },
  274. {
  275. Message: map[string]interface{}{
  276. "f5": "v5",
  277. },
  278. },
  279. },
  280. },
  281. }
  282. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  283. for i, tt := range tests {
  284. if tt.expWinCount == 0 {
  285. if tt.tuplelist.hasMoreCountWindow() {
  286. t.Errorf("%d \n Should not have more count window.", i)
  287. }
  288. } else {
  289. for j := 0; j < tt.expWinCount; j++ {
  290. if !tt.tuplelist.hasMoreCountWindow() {
  291. t.Errorf("%d \n Expect more element, but cannot find more element.", i)
  292. }
  293. cw := tt.tuplelist.nextCountWindow()
  294. if !reflect.DeepEqual(tt.winTupleSets[j].Content, cw.Content) {
  295. t.Errorf("%d. \nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.winTupleSets[j], cw)
  296. }
  297. }
  298. rest := tt.tuplelist.getRestTuples()
  299. if !reflect.DeepEqual(tt.expRestTuples, rest) {
  300. t.Errorf("%d. \nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.expRestTuples, rest)
  301. }
  302. }
  303. }
  304. }