zlib.go 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. package compressor
  2. import (
  3. "bytes"
  4. "compress/zlib"
  5. "fmt"
  6. "github.com/lf-edge/ekuiper/internal/conf"
  7. "io"
  8. )
  9. func newZlibCompressor() (*zlibCompressor, error) {
  10. return &zlibCompressor{
  11. writer: zlib.NewWriter(nil),
  12. }, nil
  13. }
  14. type zlibCompressor struct {
  15. writer *zlib.Writer
  16. buffer bytes.Buffer
  17. }
  18. func (z *zlibCompressor) Compress(data []byte) ([]byte, error) {
  19. z.buffer.Reset()
  20. z.writer.Reset(&z.buffer)
  21. _, err := z.writer.Write(data)
  22. if err != nil {
  23. return nil, err
  24. }
  25. err = z.writer.Close()
  26. if err != nil {
  27. return nil, err
  28. }
  29. return z.buffer.Bytes(), nil
  30. }
  31. func newZlibDecompressor() (*zlibDecompressor, error) {
  32. return &zlibDecompressor{}, nil
  33. }
  34. type zlibDecompressor struct {
  35. reader io.ReadCloser
  36. }
  37. func (z *zlibDecompressor) Decompress(data []byte) ([]byte, error) {
  38. if z.reader == nil {
  39. r, err := zlib.NewReader(bytes.NewReader(data))
  40. if err != nil {
  41. return nil, fmt.Errorf("failed to decompress: %v", err)
  42. }
  43. z.reader = r
  44. } else {
  45. err := z.reader.(zlib.Resetter).Reset(bytes.NewReader(data), nil)
  46. if err != nil {
  47. return nil, fmt.Errorf("failed to decompress: %v", err)
  48. }
  49. }
  50. defer func() {
  51. err := z.reader.Close()
  52. if err != nil {
  53. conf.Log.Warnf("failed to close zlib decompressor: %v", err)
  54. }
  55. }()
  56. return io.ReadAll(z.reader)
  57. }