window_op_test.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package node
  15. import (
  16. "fmt"
  17. "reflect"
  18. "testing"
  19. "time"
  20. "github.com/lf-edge/ekuiper/internal/xsql"
  21. "github.com/lf-edge/ekuiper/pkg/ast"
  22. )
  23. var fivet = []*xsql.Tuple{
  24. {
  25. Message: map[string]interface{}{
  26. "f1": "v1",
  27. },
  28. },
  29. {
  30. Message: map[string]interface{}{
  31. "f2": "v2",
  32. },
  33. },
  34. {
  35. Message: map[string]interface{}{
  36. "f3": "v3",
  37. },
  38. },
  39. {
  40. Message: map[string]interface{}{
  41. "f4": "v4",
  42. },
  43. },
  44. {
  45. Message: map[string]interface{}{
  46. "f5": "v5",
  47. },
  48. },
  49. }
  50. func TestTime(t *testing.T) {
  51. tests := []struct {
  52. interval int
  53. unit ast.Token
  54. end time.Time
  55. }{
  56. {
  57. interval: 10,
  58. unit: ast.MS,
  59. end: time.UnixMilli(1658218371340),
  60. }, {
  61. interval: 500,
  62. unit: ast.MS,
  63. end: time.UnixMilli(1658218371500),
  64. }, {
  65. interval: 1,
  66. unit: ast.SS,
  67. end: time.UnixMilli(1658218372000),
  68. }, {
  69. interval: 40, // 40 seconds
  70. unit: ast.SS,
  71. end: time.UnixMilli(1658218400000),
  72. }, {
  73. interval: 1,
  74. unit: ast.MI,
  75. end: time.UnixMilli(1658218380000),
  76. }, {
  77. interval: 3,
  78. unit: ast.MI,
  79. end: time.UnixMilli(1658218500000),
  80. }, {
  81. interval: 1,
  82. unit: ast.HH,
  83. end: time.UnixMilli(1658221200000),
  84. }, {
  85. interval: 2,
  86. unit: ast.HH,
  87. end: time.UnixMilli(1658224800000),
  88. }, {
  89. interval: 5, // 5 hours
  90. unit: ast.HH,
  91. end: time.UnixMilli(1658232000000),
  92. }, {
  93. interval: 1, // 1 day
  94. unit: ast.DD,
  95. end: time.UnixMilli(1658246400000),
  96. }, {
  97. interval: 7, // 1 week
  98. unit: ast.DD,
  99. end: time.UnixMilli(1658764800000),
  100. },
  101. }
  102. // Set the global timezone
  103. location, err := time.LoadLocation("Asia/Shanghai")
  104. if err != nil {
  105. fmt.Println("Error loading location:", err)
  106. return
  107. }
  108. time.Local = location
  109. fmt.Println(time.UnixMilli(1658218371337).String())
  110. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  111. for i, tt := range tests {
  112. ae := getAlignedWindowEndTime(time.UnixMilli(1658218371337), tt.interval, tt.unit)
  113. if tt.end.UnixMilli() != ae.UnixMilli() {
  114. t.Errorf("%d for interval %d. error mismatch:\n exp=%s(%d)\n got=%s(%d)\n\n", i, tt.interval, tt.end, tt.end.UnixMilli(), ae, ae.UnixMilli())
  115. }
  116. }
  117. }
  118. func TestNewTupleList(t *testing.T) {
  119. _, e := NewTupleList(nil, 0)
  120. es1 := "Window size should not be less than zero."
  121. if !reflect.DeepEqual(es1, e.Error()) {
  122. t.Errorf("error mismatch:\n exp=%s\n got=%s\n\n", es1, e)
  123. }
  124. _, e = NewTupleList(nil, 2)
  125. es1 = "The tuples should not be nil or empty."
  126. if !reflect.DeepEqual(es1, e.Error()) {
  127. t.Errorf("error mismatch:\n exp=%s\n got=%s\n\n", es1, e)
  128. }
  129. }
  130. func TestCountWindow(t *testing.T) {
  131. tests := []struct {
  132. tuplelist TupleList
  133. expWinCount int
  134. winTupleSets []xsql.WindowTuples
  135. expRestTuples []*xsql.Tuple
  136. }{
  137. {
  138. tuplelist: TupleList{
  139. tuples: fivet,
  140. size: 5,
  141. },
  142. expWinCount: 1,
  143. winTupleSets: []xsql.WindowTuples{
  144. {
  145. Content: []xsql.TupleRow{
  146. &xsql.Tuple{
  147. Message: map[string]interface{}{
  148. "f1": "v1",
  149. },
  150. },
  151. &xsql.Tuple{
  152. Message: map[string]interface{}{
  153. "f2": "v2",
  154. },
  155. },
  156. &xsql.Tuple{
  157. Message: map[string]interface{}{
  158. "f3": "v3",
  159. },
  160. },
  161. &xsql.Tuple{
  162. Message: map[string]interface{}{
  163. "f4": "v4",
  164. },
  165. },
  166. &xsql.Tuple{
  167. Message: map[string]interface{}{
  168. "f5": "v5",
  169. },
  170. },
  171. },
  172. },
  173. },
  174. expRestTuples: []*xsql.Tuple{
  175. {
  176. Message: map[string]interface{}{
  177. "f2": "v2",
  178. },
  179. },
  180. {
  181. Message: map[string]interface{}{
  182. "f3": "v3",
  183. },
  184. },
  185. {
  186. Message: map[string]interface{}{
  187. "f4": "v4",
  188. },
  189. },
  190. {
  191. Message: map[string]interface{}{
  192. "f5": "v5",
  193. },
  194. },
  195. },
  196. },
  197. {
  198. tuplelist: TupleList{
  199. tuples: fivet,
  200. size: 3,
  201. },
  202. expWinCount: 1,
  203. winTupleSets: []xsql.WindowTuples{
  204. {
  205. Content: []xsql.TupleRow{
  206. &xsql.Tuple{
  207. Message: map[string]interface{}{
  208. "f3": "v3",
  209. },
  210. },
  211. &xsql.Tuple{
  212. Message: map[string]interface{}{
  213. "f4": "v4",
  214. },
  215. },
  216. &xsql.Tuple{
  217. Message: map[string]interface{}{
  218. "f5": "v5",
  219. },
  220. },
  221. },
  222. },
  223. },
  224. expRestTuples: []*xsql.Tuple{
  225. {
  226. Message: map[string]interface{}{
  227. "f4": "v4",
  228. },
  229. },
  230. {
  231. Message: map[string]interface{}{
  232. "f5": "v5",
  233. },
  234. },
  235. },
  236. },
  237. {
  238. tuplelist: TupleList{
  239. tuples: fivet,
  240. size: 2,
  241. },
  242. expWinCount: 1,
  243. winTupleSets: []xsql.WindowTuples{
  244. {
  245. Content: []xsql.TupleRow{
  246. &xsql.Tuple{
  247. Message: map[string]interface{}{
  248. "f4": "v4",
  249. },
  250. },
  251. &xsql.Tuple{
  252. Message: map[string]interface{}{
  253. "f5": "v5",
  254. },
  255. },
  256. },
  257. },
  258. },
  259. expRestTuples: []*xsql.Tuple{
  260. {
  261. Message: map[string]interface{}{
  262. "f5": "v5",
  263. },
  264. },
  265. },
  266. },
  267. {
  268. tuplelist: TupleList{
  269. tuples: fivet,
  270. size: 6,
  271. },
  272. expWinCount: 0,
  273. winTupleSets: nil,
  274. expRestTuples: []*xsql.Tuple{
  275. {
  276. Message: map[string]interface{}{
  277. "f1": "v1",
  278. },
  279. },
  280. {
  281. Message: map[string]interface{}{
  282. "f2": "v2",
  283. },
  284. },
  285. {
  286. Message: map[string]interface{}{
  287. "f3": "v3",
  288. },
  289. },
  290. {
  291. Message: map[string]interface{}{
  292. "f4": "v4",
  293. },
  294. },
  295. {
  296. Message: map[string]interface{}{
  297. "f5": "v5",
  298. },
  299. },
  300. },
  301. },
  302. }
  303. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  304. for i, tt := range tests {
  305. if tt.expWinCount == 0 {
  306. if tt.tuplelist.hasMoreCountWindow() {
  307. t.Errorf("%d \n Should not have more count window.", i)
  308. }
  309. } else {
  310. for j := 0; j < tt.expWinCount; j++ {
  311. if !tt.tuplelist.hasMoreCountWindow() {
  312. t.Errorf("%d \n Expect more element, but cannot find more element.", i)
  313. }
  314. cw := tt.tuplelist.nextCountWindow()
  315. if !reflect.DeepEqual(tt.winTupleSets[j].Content, cw.Content) {
  316. t.Errorf("%d. \nresult mismatch:\n\nexp=%#v\n\ngot=%#v", i, tt.winTupleSets[j], cw) //nolint:govet
  317. }
  318. }
  319. rest := tt.tuplelist.getRestTuples()
  320. if !reflect.DeepEqual(tt.expRestTuples, rest) {
  321. t.Errorf("%d. \nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.expRestTuples, rest)
  322. }
  323. }
  324. }
  325. }