gzip.go 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. package compressor
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "fmt"
  6. "github.com/lf-edge/ekuiper/internal/conf"
  7. "io"
  8. )
  9. func newGzipCompressor() (*gzipCompressor, error) {
  10. return &gzipCompressor{
  11. writer: gzip.NewWriter(nil),
  12. }, nil
  13. }
  14. type gzipCompressor struct {
  15. writer *gzip.Writer
  16. buffer bytes.Buffer
  17. }
  18. func (g *gzipCompressor) Compress(data []byte) ([]byte, error) {
  19. g.buffer.Reset()
  20. g.writer.Reset(&g.buffer)
  21. _, err := g.writer.Write(data)
  22. if err != nil {
  23. return nil, err
  24. }
  25. err = g.writer.Close()
  26. if err != nil {
  27. return nil, err
  28. }
  29. return g.buffer.Bytes(), nil
  30. }
  31. func newGzipDecompressor() (*gzipDecompressor, error) {
  32. return &gzipDecompressor{}, nil
  33. }
  34. type gzipDecompressor struct {
  35. reader *gzip.Reader
  36. }
  37. func (z *gzipDecompressor) Decompress(data []byte) ([]byte, error) {
  38. if z.reader == nil {
  39. r, err := gzip.NewReader(bytes.NewReader(data))
  40. if err != nil {
  41. return nil, fmt.Errorf("failed to decompress: %v", err)
  42. }
  43. z.reader = r
  44. } else {
  45. err := z.reader.Reset(bytes.NewReader(data))
  46. if err != nil {
  47. return nil, fmt.Errorf("failed to decompress: %v", err)
  48. }
  49. }
  50. defer func() {
  51. err := z.reader.Close()
  52. if err != nil {
  53. conf.Log.Warnf("failed to close gzip decompressor: %v", err)
  54. }
  55. }()
  56. return io.ReadAll(z.reader)
  57. }