|
@@ -18,7 +18,6 @@ import (
|
|
"bytes"
|
|
"bytes"
|
|
"compress/zlib"
|
|
"compress/zlib"
|
|
"fmt"
|
|
"fmt"
|
|
- "github.com/lf-edge/ekuiper/pkg/api"
|
|
|
|
"github.com/lf-edge/ekuiper/pkg/message"
|
|
"github.com/lf-edge/ekuiper/pkg/message"
|
|
)
|
|
)
|
|
|
|
|
|
@@ -38,14 +37,6 @@ type zlibCompressor struct {
|
|
buffer bytes.Buffer
|
|
buffer bytes.Buffer
|
|
}
|
|
}
|
|
|
|
|
|
-func (z *zlibCompressor) Close(ctx api.StreamContext) error {
|
|
|
|
- if z.writer != nil {
|
|
|
|
- ctx.GetLogger().Infof("closing zlib compressor")
|
|
|
|
- return z.writer.Close()
|
|
|
|
- }
|
|
|
|
- return nil
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
func (z *zlibCompressor) Compress(data []byte) ([]byte, error) {
|
|
func (z *zlibCompressor) Compress(data []byte) ([]byte, error) {
|
|
z.buffer.Reset()
|
|
z.buffer.Reset()
|
|
z.writer.Reset(&z.buffer)
|
|
z.writer.Reset(&z.buffer)
|