123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103 |
- // Copyright 2023 EMQ Technologies Co., Ltd.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package mqtt
- import (
- "bytes"
- "compress/zlib"
- "reflect"
- "testing"
- "github.com/lf-edge/ekuiper/internal/compressor"
- "github.com/lf-edge/ekuiper/internal/conf"
- "github.com/lf-edge/ekuiper/internal/converter"
- "github.com/lf-edge/ekuiper/internal/topo/context"
- "github.com/lf-edge/ekuiper/pkg/api"
- "github.com/lf-edge/ekuiper/pkg/ast"
- )
- // TestGetTupleWithZlibCompressor is a unit test for getTuple with zlib compressor
- func TestGetTupleWithZlibCompressor(t *testing.T) {
- contextLogger := conf.Log.WithField("rule", "TestTupleZlib_Apply")
- ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
- cv, _ := converter.GetOrCreateConverter(&ast.Options{FORMAT: "json"})
- ctx = context.WithValue(ctx, context.DecodeKey, cv)
- dc, _ := compressor.GetDecompressor("zlib")
- ms := &MQTTSource{
- decompressor: dc,
- }
- // Prepare a compressed payload
- originalPayload := []byte(`{"key": "value"}`)
- var buf bytes.Buffer
- zlibWriter := zlib.NewWriter(&buf)
- _, _ = zlibWriter.Write(originalPayload)
- _ = zlibWriter.Close()
- compressedPayload := buf.Bytes()
- // Create a mock MQTT message with the compressed payload
- msg := MockMessage{
- payload: compressedPayload,
- topic: "test/topic",
- }
- // Call getTuple with the mock MQTT message
- results := getTuples(ctx, ms, msg)
- for _, result := range results {
- // Check if the result is a valid SourceTuple and has the correct content
- if st, ok := result.(api.SourceTuple); ok {
- if !reflect.DeepEqual(st.Message(), map[string]interface{}{"key": "value"}) {
- t.Errorf("Expected message to be %v, but got %v", map[string]interface{}{"key": "value"}, st.Message())
- }
- if !reflect.DeepEqual(st.Meta(), map[string]interface{}{"topic": "test/topic", "messageid": "1"}) {
- t.Errorf("Expected metadata to be %v, but got %v", map[string]interface{}{"topic": "test/topic", "messageid": "1"}, st.Meta())
- }
- } else {
- t.Errorf("Expected result to be a SourceTuple, but got %T", result)
- }
- }
- }
- type MockMessage struct {
- payload []byte
- topic string
- }
- func (mm MockMessage) Payload() []byte {
- return mm.payload
- }
- func (MockMessage) Duplicate() bool {
- panic("function not expected to be invoked")
- }
- func (MockMessage) Qos() byte {
- panic("function not expected to be invoked")
- }
- func (MockMessage) Retained() bool {
- panic("function not expected to be invoked")
- }
- func (mm MockMessage) Topic() string {
- return mm.topic
- }
- func (MockMessage) MessageID() uint16 {
- return 1
- }
- func (MockMessage) Ack() {
- panic("function not expected to be invoked")
- }
|