conf_test.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package conf
  15. import (
  16. "errors"
  17. "fmt"
  18. "reflect"
  19. "testing"
  20. "github.com/stretchr/testify/assert"
  21. "github.com/lf-edge/ekuiper/pkg/api"
  22. )
  23. func TestSourceConfValidate(t *testing.T) {
  24. tests := []struct {
  25. s *SourceConf
  26. e *SourceConf
  27. err string
  28. }{
  29. {
  30. s: &SourceConf{},
  31. e: &SourceConf{
  32. HttpServerIp: "0.0.0.0",
  33. HttpServerPort: 10081,
  34. },
  35. err: "invalidHttpServerPort:httpServerPort must between 0 and 65535",
  36. }, {
  37. s: &SourceConf{
  38. HttpServerIp: "192.168.0.1",
  39. },
  40. e: &SourceConf{
  41. HttpServerIp: "192.168.0.1",
  42. HttpServerPort: 10081,
  43. },
  44. err: "invalidHttpServerPort:httpServerPort must between 0 and 65535",
  45. }, {
  46. s: &SourceConf{
  47. HttpServerPort: 99999,
  48. },
  49. e: &SourceConf{
  50. HttpServerIp: "0.0.0.0",
  51. HttpServerPort: 10081,
  52. },
  53. err: "invalidHttpServerPort:httpServerPort must between 0 and 65535",
  54. }, {
  55. s: &SourceConf{
  56. HttpServerPort: 9090,
  57. HttpServerTls: &tlsConf{
  58. Certfile: "certfile",
  59. Keyfile: "keyfile",
  60. },
  61. },
  62. e: &SourceConf{
  63. HttpServerIp: "0.0.0.0",
  64. HttpServerPort: 9090,
  65. HttpServerTls: &tlsConf{
  66. Certfile: "certfile",
  67. Keyfile: "keyfile",
  68. },
  69. },
  70. },
  71. }
  72. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  73. for i, tt := range tests {
  74. err := tt.s.Validate()
  75. if err != nil && tt.err != err.Error() {
  76. t.Errorf("%d: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.err, err)
  77. }
  78. if !reflect.DeepEqual(tt.s, tt.e) {
  79. t.Errorf("%d\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.s, tt.e)
  80. }
  81. }
  82. }
  83. func TestRuleOptionValidate(t *testing.T) {
  84. tests := []struct {
  85. s *api.RuleOption
  86. e *api.RuleOption
  87. err string
  88. }{
  89. {
  90. s: &api.RuleOption{},
  91. e: &api.RuleOption{},
  92. },
  93. {
  94. s: &api.RuleOption{
  95. LateTol: 1000,
  96. Concurrency: 1,
  97. BufferLength: 1024,
  98. CheckpointInterval: 300000, // 5 minutes
  99. SendError: true,
  100. Restart: &api.RestartStrategy{
  101. Attempts: 0,
  102. Delay: 1000,
  103. Multiplier: 1,
  104. MaxDelay: 1000,
  105. JitterFactor: 0.1,
  106. },
  107. },
  108. e: &api.RuleOption{
  109. LateTol: 1000,
  110. Concurrency: 1,
  111. BufferLength: 1024,
  112. CheckpointInterval: 300000, // 5 minutes
  113. SendError: true,
  114. Restart: &api.RestartStrategy{
  115. Attempts: 0,
  116. Delay: 1000,
  117. Multiplier: 1,
  118. MaxDelay: 1000,
  119. JitterFactor: 0.1,
  120. },
  121. },
  122. },
  123. {
  124. s: &api.RuleOption{
  125. LateTol: 1000,
  126. Concurrency: 1,
  127. BufferLength: 1024,
  128. CheckpointInterval: 300000, // 5 minutes
  129. SendError: true,
  130. Restart: &api.RestartStrategy{
  131. Attempts: 3,
  132. Delay: 1000,
  133. Multiplier: 1,
  134. MaxDelay: 1000,
  135. JitterFactor: 0.1,
  136. },
  137. },
  138. e: &api.RuleOption{
  139. LateTol: 1000,
  140. Concurrency: 1,
  141. BufferLength: 1024,
  142. CheckpointInterval: 300000, // 5 minutes
  143. SendError: true,
  144. Restart: &api.RestartStrategy{
  145. Attempts: 3,
  146. Delay: 1000,
  147. Multiplier: 1,
  148. MaxDelay: 1000,
  149. JitterFactor: 0.1,
  150. },
  151. },
  152. },
  153. {
  154. s: &api.RuleOption{
  155. LateTol: 1000,
  156. Concurrency: 1,
  157. BufferLength: 1024,
  158. CheckpointInterval: 300000, // 5 minutes
  159. SendError: true,
  160. Restart: &api.RestartStrategy{
  161. Attempts: 3,
  162. Delay: 1000,
  163. Multiplier: 1.5,
  164. MaxDelay: 10000,
  165. JitterFactor: 0.1,
  166. },
  167. },
  168. e: &api.RuleOption{
  169. LateTol: 1000,
  170. Concurrency: 1,
  171. BufferLength: 1024,
  172. CheckpointInterval: 300000, // 5 minutes
  173. SendError: true,
  174. Restart: &api.RestartStrategy{
  175. Attempts: 3,
  176. Delay: 1000,
  177. Multiplier: 1.5,
  178. MaxDelay: 10000,
  179. JitterFactor: 0.1,
  180. },
  181. },
  182. },
  183. {
  184. s: &api.RuleOption{
  185. LateTol: 1000,
  186. Concurrency: 1,
  187. BufferLength: 1024,
  188. CheckpointInterval: 300000, // 5 minutes
  189. SendError: true,
  190. Restart: &api.RestartStrategy{
  191. Attempts: -2,
  192. Delay: 0,
  193. Multiplier: 0,
  194. MaxDelay: 0,
  195. JitterFactor: 1.1,
  196. },
  197. },
  198. e: &api.RuleOption{
  199. LateTol: 1000,
  200. Concurrency: 1,
  201. BufferLength: 1024,
  202. CheckpointInterval: 300000, // 5 minutes
  203. SendError: true,
  204. Restart: &api.RestartStrategy{
  205. Attempts: 0,
  206. Delay: 1000,
  207. Multiplier: 2,
  208. MaxDelay: 1000,
  209. JitterFactor: 0.1,
  210. },
  211. },
  212. err: "multiple errors",
  213. },
  214. }
  215. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  216. for i, tt := range tests {
  217. err := ValidateRuleOption(tt.s)
  218. if err != nil && tt.err == "" {
  219. t.Errorf("%d: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.err, err)
  220. }
  221. if !reflect.DeepEqual(tt.s, tt.e) {
  222. t.Errorf("%d\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.s, tt.e)
  223. }
  224. }
  225. }
  226. func TestSinkConf_Validate(t *testing.T) {
  227. tests := []struct {
  228. name string
  229. sc SinkConf
  230. wantErr error
  231. }{
  232. {
  233. name: "valid config",
  234. sc: SinkConf{
  235. MemoryCacheThreshold: 1024,
  236. MaxDiskCache: 1024000,
  237. BufferPageSize: 256,
  238. EnableCache: true,
  239. ResendInterval: 0,
  240. CleanCacheAtStop: true,
  241. ResendAlterQueue: true,
  242. ResendPriority: 0,
  243. },
  244. wantErr: nil,
  245. },
  246. {
  247. name: "invalid memoryCacheThreshold",
  248. sc: SinkConf{
  249. MemoryCacheThreshold: -1,
  250. MaxDiskCache: 1024000,
  251. BufferPageSize: 256,
  252. EnableCache: true,
  253. ResendInterval: 0,
  254. CleanCacheAtStop: true,
  255. ResendAlterQueue: true,
  256. ResendPriority: 0,
  257. },
  258. wantErr: errors.Join(errors.New("memoryCacheThreshold:memoryCacheThreshold must be positive")),
  259. },
  260. {
  261. name: "invalid maxDiskCache",
  262. sc: SinkConf{
  263. MemoryCacheThreshold: 1024,
  264. MaxDiskCache: -1,
  265. BufferPageSize: 256,
  266. EnableCache: true,
  267. ResendInterval: 0,
  268. CleanCacheAtStop: true,
  269. ResendAlterQueue: true,
  270. ResendPriority: 0,
  271. },
  272. wantErr: errors.Join(errors.New("maxDiskCache:maxDiskCache must be positive")),
  273. },
  274. {
  275. name: "invalid bufferPageSize",
  276. sc: SinkConf{
  277. MemoryCacheThreshold: 1024,
  278. MaxDiskCache: 1024000,
  279. BufferPageSize: 0,
  280. EnableCache: true,
  281. ResendInterval: 0,
  282. CleanCacheAtStop: true,
  283. ResendAlterQueue: true,
  284. ResendPriority: 0,
  285. },
  286. wantErr: errors.Join(errors.New("bufferPageSize:bufferPageSize must be positive")),
  287. },
  288. {
  289. name: "invalid resendInterval",
  290. sc: SinkConf{
  291. MemoryCacheThreshold: 1024,
  292. MaxDiskCache: 1024000,
  293. BufferPageSize: 256,
  294. EnableCache: true,
  295. ResendInterval: -1,
  296. CleanCacheAtStop: true,
  297. ResendAlterQueue: true,
  298. ResendPriority: 0,
  299. },
  300. wantErr: errors.Join(errors.New("resendInterval:resendInterval must be positive")),
  301. },
  302. {
  303. name: "memoryCacheThresholdTooSmall",
  304. sc: SinkConf{
  305. MemoryCacheThreshold: 128,
  306. MaxDiskCache: 1024000,
  307. BufferPageSize: 256,
  308. EnableCache: true,
  309. ResendInterval: 0,
  310. CleanCacheAtStop: true,
  311. ResendAlterQueue: true,
  312. ResendPriority: 0,
  313. },
  314. wantErr: errors.Join(errors.New("memoryCacheThresholdTooSmall:memoryCacheThreshold must be greater than or equal to bufferPageSize")),
  315. },
  316. {
  317. name: "memoryCacheThresholdNotMultiple",
  318. sc: SinkConf{
  319. MemoryCacheThreshold: 300,
  320. MaxDiskCache: 1024000,
  321. BufferPageSize: 256,
  322. EnableCache: true,
  323. ResendInterval: 0,
  324. CleanCacheAtStop: true,
  325. ResendAlterQueue: true,
  326. ResendPriority: 0,
  327. },
  328. wantErr: errors.Join(errors.New("memoryCacheThresholdNotMultiple:memoryCacheThreshold must be a multiple of bufferPageSize")),
  329. },
  330. {
  331. name: "maxDiskCacheTooSmall",
  332. sc: SinkConf{
  333. MemoryCacheThreshold: 1024,
  334. MaxDiskCache: 128,
  335. BufferPageSize: 256,
  336. EnableCache: true,
  337. ResendInterval: 0,
  338. CleanCacheAtStop: true,
  339. ResendAlterQueue: true,
  340. ResendPriority: 0,
  341. },
  342. wantErr: errors.Join(errors.New("maxDiskCacheTooSmall:maxDiskCache must be greater than bufferPageSize")),
  343. },
  344. {
  345. name: "maxDiskCacheNotMultiple",
  346. sc: SinkConf{
  347. MemoryCacheThreshold: 1024,
  348. MaxDiskCache: 300,
  349. BufferPageSize: 256,
  350. EnableCache: true,
  351. ResendInterval: 0,
  352. CleanCacheAtStop: true,
  353. ResendAlterQueue: true,
  354. ResendPriority: 0,
  355. },
  356. wantErr: errors.Join(errors.New("maxDiskCacheNotMultiple:maxDiskCache must be a multiple of bufferPageSize")),
  357. },
  358. {
  359. name: "invalid resendPriority",
  360. sc: SinkConf{
  361. MemoryCacheThreshold: 1024,
  362. MaxDiskCache: 1024000,
  363. BufferPageSize: 256,
  364. EnableCache: true,
  365. ResendInterval: 0,
  366. CleanCacheAtStop: true,
  367. ResendAlterQueue: true,
  368. ResendPriority: 2,
  369. },
  370. wantErr: errors.Join(errors.New("resendPriority:resendPriority must be -1, 0 or 1")),
  371. },
  372. }
  373. for _, tt := range tests {
  374. t.Run(tt.name, func(t *testing.T) {
  375. err := tt.sc.Validate()
  376. assert.Equal(t, tt.wantErr, err)
  377. })
  378. }
  379. }