file_sink_test.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453
  1. // Copyright 2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package file
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/conf"
  18. "github.com/lf-edge/ekuiper/internal/topo/context"
  19. "github.com/lf-edge/ekuiper/internal/topo/topotest/mockclock"
  20. "github.com/lf-edge/ekuiper/internal/topo/transform"
  21. "github.com/lf-edge/ekuiper/pkg/message"
  22. "os"
  23. "path/filepath"
  24. "reflect"
  25. "strconv"
  26. "testing"
  27. "time"
  28. )
  29. // Unit test for Configure function
  30. func TestConfigure(t *testing.T) {
  31. props := map[string]interface{}{
  32. "interval": 500,
  33. "path": "test",
  34. }
  35. m := File().(*fileSink)
  36. err := m.Configure(props)
  37. if err != nil {
  38. t.Errorf("Configure() error = %v, wantErr nil", err)
  39. }
  40. if *m.c.Interval != 500 {
  41. t.Errorf("Configure() Interval = %v, want 500", m.c.Interval)
  42. }
  43. if m.c.Path != "test" {
  44. t.Errorf("Configure() Path = %v, want test", m.c.Path)
  45. }
  46. err = m.Configure(map[string]interface{}{"interval": -1, "path": "test"})
  47. if err == nil {
  48. t.Errorf("Configure() error = %v, wantErr not nil", err)
  49. }
  50. err = m.Configure(map[string]interface{}{"interval": 500, "path": ""})
  51. if err == nil {
  52. t.Errorf("Configure() error = %v, wantErr not nil", err)
  53. }
  54. err = m.Configure(map[string]interface{}{"fileType": "csv2"})
  55. if err == nil {
  56. t.Errorf("Configure() error = %v, wantErr not nil", err)
  57. }
  58. err = m.Configure(map[string]interface{}{"interval": 500,
  59. "path": "test",
  60. "fileType": "csv"})
  61. if err == nil {
  62. t.Errorf("Configure() error = %v, wantErr not nil", err)
  63. }
  64. err = m.Configure(map[string]interface{}{"interval": 60, "path": "test", "checkInterval": -1})
  65. if err == nil {
  66. t.Errorf("Configure() error = %v, wantErr not nil", err)
  67. }
  68. err = m.Configure(map[string]interface{}{"rollingInterval": -1})
  69. if err == nil {
  70. t.Errorf("Configure() error = %v, wantErr not nil", err)
  71. }
  72. err = m.Configure(map[string]interface{}{"rollingCount": -1})
  73. if err == nil {
  74. t.Errorf("Configure() error = %v, wantErr not nil", err)
  75. }
  76. err = m.Configure(map[string]interface{}{"rollingCount": 0, "rollingInterval": 0})
  77. if err == nil {
  78. t.Errorf("Configure() error = %v, wantErr not nil", err)
  79. }
  80. err = m.Configure(map[string]interface{}{"RollingNamePattern": "test"})
  81. if err == nil {
  82. t.Errorf("Configure() error = %v, wantErr not nil", err)
  83. }
  84. err = m.Configure(map[string]interface{}{"RollingNamePattern": 0})
  85. if err == nil {
  86. t.Errorf("Configure() error = %v, wantErr not nil", err)
  87. }
  88. }
  89. func TestFileSink_Configure(t *testing.T) {
  90. var (
  91. defaultCheckInterval = (5 * time.Minute).Milliseconds()
  92. int500 = 500
  93. int64_500 = int64(int500)
  94. )
  95. tests := []struct {
  96. name string
  97. c *sinkConf
  98. p map[string]interface{}
  99. }{
  100. {
  101. name: "default configurations",
  102. c: &sinkConf{
  103. CheckInterval: &defaultCheckInterval,
  104. Path: "cache",
  105. FileType: LINES_TYPE,
  106. RollingCount: 1000000,
  107. },
  108. p: map[string]interface{}{},
  109. },
  110. {
  111. name: "previous setting",
  112. c: &sinkConf{
  113. Interval: &int500,
  114. CheckInterval: &int64_500,
  115. Path: "test",
  116. FileType: LINES_TYPE,
  117. RollingCount: 1000000,
  118. },
  119. p: map[string]interface{}{
  120. "interval": 500,
  121. "path": "test",
  122. },
  123. },
  124. {
  125. name: "new props",
  126. c: &sinkConf{
  127. CheckInterval: &int64_500,
  128. Path: "test",
  129. FileType: CSV_TYPE,
  130. Format: message.FormatDelimited,
  131. Delimiter: ",",
  132. RollingCount: 1000000,
  133. RollingNamePattern: "none",
  134. },
  135. p: map[string]interface{}{
  136. "checkInterval": 500,
  137. "path": "test",
  138. "fileType": "csv",
  139. "format": message.FormatDelimited,
  140. "rollingNamePattern": "none",
  141. },
  142. },
  143. { // only set rolling interval
  144. name: "rolling",
  145. c: &sinkConf{
  146. CheckInterval: &defaultCheckInterval,
  147. Path: "cache",
  148. FileType: LINES_TYPE,
  149. RollingInterval: 500,
  150. RollingCount: 0,
  151. },
  152. p: map[string]interface{}{
  153. "rollingInterval": 500,
  154. "rollingCount": 0,
  155. },
  156. },
  157. }
  158. for _, tt := range tests {
  159. t.Run(tt.name, func(t *testing.T) {
  160. m := &fileSink{}
  161. if err := m.Configure(tt.p); err != nil {
  162. t.Errorf("fileSink.Configure() error = %v", err)
  163. return
  164. }
  165. if !reflect.DeepEqual(m.c, tt.c) {
  166. t.Errorf("fileSink.Configure() = %v, want %v", m.c, tt.c)
  167. }
  168. })
  169. }
  170. }
  171. // Test single file writing and flush by close
  172. func TestFileSink_Collect(t *testing.T) {
  173. tests := []struct {
  174. name string
  175. ft FileType
  176. fname string
  177. content []byte
  178. }{
  179. {
  180. name: "lines",
  181. ft: LINES_TYPE,
  182. fname: "test_lines",
  183. content: []byte("{\"key\":\"value1\"}\n{\"key\":\"value2\"}\n"),
  184. }, {
  185. name: "json",
  186. ft: JSON_TYPE,
  187. fname: "test_json",
  188. content: []byte(`[{"key":"value1"}{"key":"value2"}]`),
  189. }, {
  190. name: "csv",
  191. ft: CSV_TYPE,
  192. fname: "test_csv",
  193. content: []byte("key\n{\"key\":\"value1\"}\n{\"key\":\"value2\"}\n"),
  194. },
  195. }
  196. // Create a stream context for testing
  197. contextLogger := conf.Log.WithField("rule", "test2")
  198. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  199. tf, _ := transform.GenTransform("", "json", "", "")
  200. vCtx := context.WithValue(ctx, context.TransKey, tf)
  201. for _, tt := range tests {
  202. t.Run(tt.name, func(t *testing.T) {
  203. // Create a temporary file for testing
  204. tmpfile, err := os.CreateTemp("", tt.fname)
  205. if err != nil {
  206. t.Fatal(err)
  207. }
  208. defer os.Remove(tmpfile.Name())
  209. // Create a file sink with the temporary file path
  210. sink := &fileSink{}
  211. f := message.FormatJson
  212. if tt.ft == CSV_TYPE {
  213. f = message.FormatDelimited
  214. }
  215. err = sink.Configure(map[string]interface{}{
  216. "path": tmpfile.Name(),
  217. "fileType": tt.ft,
  218. "hasHeader": true,
  219. "format": f,
  220. "rollingNamePattern": "none",
  221. })
  222. if err != nil {
  223. t.Fatal(err)
  224. }
  225. err = sink.Open(ctx)
  226. if err != nil {
  227. t.Fatal(err)
  228. }
  229. // Test collecting a map item
  230. m := map[string]interface{}{"key": "value1"}
  231. if err := sink.Collect(vCtx, m); err != nil {
  232. t.Errorf("unexpected error: %s", err)
  233. }
  234. // Test collecting another map item
  235. m = map[string]interface{}{"key": "value2"}
  236. if err := sink.Collect(ctx, m); err != nil {
  237. t.Errorf("unexpected error: %s", err)
  238. }
  239. if err = sink.Close(ctx); err != nil {
  240. t.Errorf("unexpected close error: %s", err)
  241. }
  242. // Read the contents of the temporary file and check if they match the collected items
  243. contents, err := os.ReadFile(tmpfile.Name())
  244. if err != nil {
  245. t.Fatal(err)
  246. }
  247. if !reflect.DeepEqual(contents, tt.content) {
  248. t.Errorf("\nexpected\t %q \nbut got\t\t %q", tt.content, string(contents))
  249. }
  250. })
  251. }
  252. }
  253. // Test file rolling by time
  254. func TestFileSinkRolling_Collect(t *testing.T) {
  255. // Remove existing files
  256. err := filepath.Walk(".", func(path string, info os.FileInfo, err error) error {
  257. if err != nil {
  258. return err
  259. }
  260. if filepath.Ext(path) == ".log" {
  261. fmt.Println("Deleting file:", path)
  262. return os.Remove(path)
  263. }
  264. return nil
  265. })
  266. if err != nil {
  267. t.Fatal(err)
  268. }
  269. conf.IsTesting = true
  270. tests := []struct {
  271. name string
  272. ft FileType
  273. fname string
  274. contents [2][]byte
  275. }{
  276. {
  277. name: "lines",
  278. ft: LINES_TYPE,
  279. fname: "test_lines.log",
  280. contents: [2][]byte{
  281. []byte("{\"key\":\"value0\",\"ts\":460}\n{\"key\":\"value1\",\"ts\":910}\n{\"key\":\"value2\",\"ts\":1360}\n"),
  282. []byte("{\"key\":\"value3\",\"ts\":1810}\n{\"key\":\"value4\",\"ts\":2260}\n"),
  283. },
  284. }, {
  285. name: "json",
  286. ft: JSON_TYPE,
  287. fname: "test_json.log",
  288. contents: [2][]byte{
  289. []byte("[{\"key\":\"value0\",\"ts\":460}{\"key\":\"value1\",\"ts\":910}{\"key\":\"value2\",\"ts\":1360}]"),
  290. []byte("[{\"key\":\"value3\",\"ts\":1810}{\"key\":\"value4\",\"ts\":2260}]"),
  291. },
  292. },
  293. }
  294. // Create a stream context for testing
  295. contextLogger := conf.Log.WithField("rule", "testRolling")
  296. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  297. tf, _ := transform.GenTransform("", "json", "", "")
  298. vCtx := context.WithValue(ctx, context.TransKey, tf)
  299. for _, tt := range tests {
  300. t.Run(tt.name, func(t *testing.T) {
  301. // Create a file sink with the temporary file path
  302. sink := &fileSink{}
  303. err := sink.Configure(map[string]interface{}{
  304. "path": tt.fname,
  305. "fileType": tt.ft,
  306. "rollingInterval": 1000,
  307. "checkInterval": 500,
  308. "rollingCount": 0,
  309. "rollingNamePattern": "suffix",
  310. })
  311. if err != nil {
  312. t.Fatal(err)
  313. }
  314. mockclock.ResetClock(10)
  315. err = sink.Open(ctx)
  316. if err != nil {
  317. t.Fatal(err)
  318. }
  319. c := mockclock.GetMockClock()
  320. for i := 0; i < 5; i++ {
  321. c.Add(450 * time.Millisecond)
  322. m := map[string]interface{}{"key": "value" + strconv.Itoa(i), "ts": c.Now().UnixMilli()}
  323. if err := sink.Collect(vCtx, m); err != nil {
  324. t.Errorf("unexpected error: %s", err)
  325. }
  326. }
  327. c.After(2000 * time.Millisecond)
  328. if err = sink.Close(ctx); err != nil {
  329. t.Errorf("unexpected close error: %s", err)
  330. }
  331. // Should write to 2 files
  332. for i := 0; i < 2; i++ {
  333. // Read the contents of the temporary file and check if they match the collected items
  334. fn := fmt.Sprintf("test_%s-%d.log", tt.ft, 460+1350*i)
  335. contents, err := os.ReadFile(fn)
  336. if err != nil {
  337. t.Fatal(err)
  338. }
  339. if !reflect.DeepEqual(contents, tt.contents[i]) {
  340. t.Errorf("\nexpected\t %q \nbut got\t\t %q", tt.contents[i], string(contents))
  341. }
  342. }
  343. })
  344. }
  345. }
  346. // Test file rolling by count
  347. func TestFileSinkRollingCount_Collect(t *testing.T) {
  348. // Remove existing files
  349. err := filepath.Walk(".", func(path string, info os.FileInfo, err error) error {
  350. if err != nil {
  351. return err
  352. }
  353. if filepath.Ext(path) == ".dd" {
  354. fmt.Println("Deleting file:", path)
  355. return os.Remove(path)
  356. }
  357. return nil
  358. })
  359. if err != nil {
  360. t.Fatal(err)
  361. }
  362. conf.IsTesting = true
  363. tests := []struct {
  364. name string
  365. ft FileType
  366. fname string
  367. contents [3][]byte
  368. }{
  369. {
  370. name: "csv",
  371. ft: CSV_TYPE,
  372. fname: "test_csv_{{.ts}}.dd",
  373. contents: [3][]byte{
  374. []byte("key,ts\nvalue0,460\n"),
  375. []byte("key,ts\nvalue1,910\n"),
  376. []byte("key,ts\nvalue2,1360\n"),
  377. },
  378. },
  379. }
  380. // Create a stream context for testing
  381. contextLogger := conf.Log.WithField("rule", "testRollingCount")
  382. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  383. tf, _ := transform.GenTransform("", "delimited", "", ",")
  384. vCtx := context.WithValue(ctx, context.TransKey, tf)
  385. for _, tt := range tests {
  386. t.Run(tt.name, func(t *testing.T) {
  387. // Create a file sink with the temporary file path
  388. sink := &fileSink{}
  389. err := sink.Configure(map[string]interface{}{
  390. "path": tt.fname,
  391. "fileType": tt.ft,
  392. "rollingInterval": 0,
  393. "rollingCount": 1,
  394. "rollingNamePattern": "none",
  395. "hasHeader": true,
  396. "format": "delimited",
  397. })
  398. if err != nil {
  399. t.Fatal(err)
  400. }
  401. mockclock.ResetClock(10)
  402. err = sink.Open(ctx)
  403. if err != nil {
  404. t.Fatal(err)
  405. }
  406. c := mockclock.GetMockClock()
  407. for i := 0; i < 3; i++ {
  408. c.Add(450 * time.Millisecond)
  409. m := map[string]interface{}{"key": "value" + strconv.Itoa(i), "ts": c.Now().UnixMilli()}
  410. if err := sink.Collect(vCtx, m); err != nil {
  411. t.Errorf("unexpected error: %s", err)
  412. }
  413. }
  414. c.After(2000 * time.Millisecond)
  415. if err = sink.Close(ctx); err != nil {
  416. t.Errorf("unexpected close error: %s", err)
  417. }
  418. // Should write to 2 files
  419. for i := 0; i < 3; i++ {
  420. // Read the contents of the temporary file and check if they match the collected items
  421. fn := fmt.Sprintf("test_%s_%d.dd", tt.ft, 460+450*i)
  422. contents, err := os.ReadFile(fn)
  423. if err != nil {
  424. t.Fatal(err)
  425. }
  426. if !reflect.DeepEqual(contents, tt.contents[i]) {
  427. t.Errorf("\nexpected\t %q \nbut got\t\t %q", tt.contents[i], string(contents))
  428. }
  429. }
  430. })
  431. }
  432. }