Kaynağa Gözat

refactor(sink): refactor redis sink and add ut

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 yıl önce
ebeveyn
işleme
ad37711402

+ 5 - 1
internal/topo/redis/lookup_test.go

@@ -25,7 +25,10 @@ import (
 	"testing"
 )
 
-var addr string
+var (
+	addr string
+	mr   *miniredis.Miniredis
+)
 
 func init() {
 	s, err := miniredis.Run()
@@ -41,6 +44,7 @@ func init() {
 	s.Lpush("group1", `{"id":2,"name":"Susan"}`)
 	s.Lpush("group2", `{"id":3,"name":"Nancy"}`)
 	s.Lpush("group3", `{"id":4,"name":"Tom"}`)
+	mr = s
 }
 
 // TestSingle test lookup value of a single map

+ 93 - 137
internal/topo/redis/sink.go

@@ -17,6 +17,7 @@
 package redis
 
 import (
+	"encoding/json"
 	"errors"
 	"fmt"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
@@ -28,87 +29,42 @@ import (
 	"github.com/lf-edge/ekuiper/pkg/cast"
 )
 
-type RedisSink struct {
+type config struct {
 	// host:port address.
-	addr     string
-	username string
+	Addr     string `json:"addr,omitempty"`
+	Username string `json:"username,omitempty"`
 	// Optional password. Must match the password specified in the
-	password string
+	Password string `json:"password,omitempty"`
 	// Database to be selected after connecting to the server.
-	db int
-
+	Db int `json:"db,omitempty"`
 	// key of field
-	field string
-
+	Field string `json:"field,omitempty"`
 	// key define
-	key string
-
-	dataType string
-
-	expiration time.Duration
-
-	sendSingle bool
+	Key          string        `json:"key,omitempty"`
+	DataType     string        `json:"dataType,omitempty"`
+	Expiration   time.Duration `json:"expiration,omitempty"`
+	RowkindField string        `json:"rowkindField"`
+	DataTemplate string        `json:"dataTemplate"`
+}
 
+type RedisSink struct {
+	c   *config
 	cli *redis.Client
 }
 
 func (r *RedisSink) Configure(props map[string]interface{}) error {
-	if i, ok := props["addr"]; ok {
-		if i, ok := i.(string); ok {
-			r.addr = i
-		}
-	} else {
-		return errors.New("redis addr is null")
-	}
-
-	if i, ok := props["password"]; ok {
-		if i, ok := i.(string); ok {
-			r.password = i
-		}
-	}
-
-	r.db = 0
-	if i, ok := props["db"]; ok {
-		if t, err := cast.ToInt(i, cast.STRICT); err == nil {
-			r.db = t
-		}
-	}
-
-	if i, ok := props["key"]; ok {
-		if i, ok := i.(string); ok {
-			r.key = i
-		}
-	} else {
-		return errors.New("not config data key for redis")
-	}
-
-	if i, ok := props["field"]; ok {
-		if i, ok := i.(string); ok {
-			r.field = i
-		}
-	}
-
-	r.sendSingle = true
-	if i, ok := props["sendSingle"]; ok {
-		if i, ok := i.(bool); ok {
-			r.sendSingle = i
-		}
+	c := &config{DataType: "string", Expiration: -1}
+	err := cast.MapToStruct(props, c)
+	if err != nil {
+		return err
 	}
-
-	r.dataType = "string"
-	if i, ok := props["dataType"]; ok {
-		if i, ok := i.(string); ok {
-			r.dataType = i
-		}
+	if c.Key == "" && c.Field == "" {
+		return errors.New("redis sink must have key or field")
 	}
-
-	r.expiration = -1
-	if i, ok := props["expiration"]; ok {
-		if t, err := cast.ToInt(i, cast.STRICT); err == nil {
-			r.expiration = time.Duration(t)
-		}
+	if c.DataType != "string" && c.DataType != "list" {
+		return errors.New("redis sink only support string or list data type")
 	}
-
+	r.c = c
 	return nil
 }
 
@@ -117,10 +73,10 @@ func (r *RedisSink) Open(ctx api.StreamContext) (err error) {
 	logger.Debug("Opening redis sink")
 
 	r.cli = redis.NewClient(&redis.Options{
-		Addr:     r.addr,
-		Username: r.username,
-		Password: r.password,
-		DB:       r.db, // use default DB
+		Addr:     r.c.Addr,
+		Username: r.c.Username,
+		Password: r.c.Password,
+		DB:       r.c.Db, // use default DB
 	})
 
 	return nil
@@ -128,86 +84,86 @@ func (r *RedisSink) Open(ctx api.StreamContext) (err error) {
 
 func (r *RedisSink) Collect(ctx api.StreamContext, data interface{}) error {
 	logger := ctx.GetLogger()
-	v, _, err := ctx.TransformOutput(data)
-	if err != nil {
-		logger.Error(err)
-		return err
+	var val string
+	if r.c.DataTemplate != "" { // The result is a string
+		v, _, err := ctx.TransformOutput(data)
+		if err != nil {
+			logger.Error(err)
+			return err
+		}
+		m := make(map[string]interface{})
+		err = json.Unmarshal(v, &m)
+		if err != nil {
+			return fmt.Errorf("fail to decode data %s after applying dataTemplate for error %v", string(v), err)
+		}
+		data = m
+		val = string(v)
 	}
-	if r.field != "" {
-		switch out := data.(type) {
-		case []map[string]interface{}:
-			for _, m := range out {
-				key := r.field
-				k, err := cast.ToString(m[key], cast.CONVERT_ALL)
-				if err != nil {
-					return fmt.Errorf("key must be string or convertible to string, but got %v", m[key])
-				}
-
-				if r.dataType == "list" {
-					err := r.cli.LPush(k, v).Err()
-					if err != nil {
-						logger.Error(err)
-						return fmt.Errorf("%s:%s", errorx.IOErr, err.Error())
-					}
-					logger.Debugf("send redis list success, key:%s data: %s", k, string(v))
-				} else {
-					err := r.cli.Set(k, v, r.expiration*time.Second).Err()
-					if err != nil {
-						logger.Error(err)
-						return fmt.Errorf("%s:%s", errorx.IOErr, err.Error())
-					}
-					logger.Debugf("send redis string success, key:%s data: %s", k, string(v))
-				}
-			}
-		case map[string]interface{}:
-			key := r.field
-			k, err := cast.ToString(out[key], cast.CONVERT_ALL)
+	switch d := data.(type) {
+	case []map[string]interface{}:
+		for _, el := range d {
+			err := r.save(ctx, el, val)
 			if err != nil {
-				return fmt.Errorf("key must be string or convertible to string, but got %v", out[key])
-			}
-
-			if r.dataType == "list" {
-				err := r.cli.LPush(k, v).Err()
-				if err != nil {
-					logger.Error(err)
-					return fmt.Errorf("%s:%s", errorx.IOErr, err.Error())
-				}
-				logger.Debugf("send redis list success, key:%s data: %s", k, string(v))
-			} else {
-				err := r.cli.Set(k, v, r.expiration*time.Second).Err()
-				if err != nil {
-					logger.Error(err)
-					return fmt.Errorf("%s:%s", errorx.IOErr, err.Error())
-				}
-				logger.Debugf("send redis string success, key:%s data: %s", k, string(v))
+				return err
 			}
 		}
-	} else if r.key != "" {
-		if r.dataType == "list" {
-			err := r.cli.LPush(r.key, v).Err()
-			if err != nil {
-				logger.Error(err)
-				return fmt.Errorf("%s:%s", errorx.IOErr, err.Error())
-			}
-			logger.Debugf("send redis list success, key:%s data: %s", r.key, string(v))
-		} else {
-			err := r.cli.Set(r.key, v, r.expiration*time.Second).Err()
-			if err != nil {
-				logger.Error(err)
-				return fmt.Errorf("%s:%s", errorx.IOErr, err.Error())
-			}
-			logger.Debugf("send redis string success, key:%s data: %s", r.key, string(v))
+	case map[string]interface{}:
+		err := r.save(ctx, d, val)
+		if err != nil {
+			return err
 		}
+	default:
+		return fmt.Errorf("unrecognized format of %s", data)
 	}
 	logger.Debug("insert success %v", data)
 	return nil
 }
 
 func (r *RedisSink) Close(ctx api.StreamContext) error {
+	ctx.GetLogger().Infof("Closing redis sink")
 	err := r.cli.Close()
 	return err
 }
 
+func (r *RedisSink) save(ctx api.StreamContext, data map[string]interface{}, val string) error {
+	logger := ctx.GetLogger()
+	if val == "" {
+		jsonBytes, err := json.Marshal(data)
+		if err != nil {
+			return err
+		}
+		val = string(jsonBytes)
+	}
+	key := r.c.Key
+	var err error
+	if r.c.Field != "" {
+		keyval, ok := data[r.c.Field]
+		if !ok {
+			return fmt.Errorf("field %s does not exist in data %v", r.c.Field, data)
+		}
+		key, err = cast.ToString(keyval, cast.CONVERT_ALL)
+		if err != nil {
+			return fmt.Errorf("key must be string or convertible to string, but got %v", keyval)
+		}
+	}
+	if r.c.DataType == "list" {
+		err = r.cli.LPush(key, val).Err()
+		if err != nil {
+			logger.Error(err)
+			return fmt.Errorf("%s:%s", errorx.IOErr, err.Error())
+		}
+		logger.Debugf("send redis list success, key:%s data: %v", key, val)
+	} else {
+		err = r.cli.Set(key, val, r.c.Expiration*time.Second).Err()
+		if err != nil {
+			logger.Error(err)
+			return fmt.Errorf("%s:%s", errorx.IOErr, err.Error())
+		}
+		logger.Debugf("send redis string success, key:%s data: %s", key, val)
+	}
+	return nil
+}
+
 func GetSink() api.Sink {
 	return &RedisSink{}
 }

+ 112 - 0
internal/topo/redis/sink_test.go

@@ -0,0 +1,112 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package redis
+
+import (
+	econf "github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/topo/context"
+	"github.com/lf-edge/ekuiper/pkg/cast"
+	"reflect"
+	"testing"
+)
+
+func TestSink(t *testing.T) {
+	s := &RedisSink{}
+	err := s.Configure(map[string]interface{}{
+		"addr": addr,
+		"key":  "test",
+	})
+	if err != nil {
+		t.Error(err)
+		return
+	}
+	contextLogger := econf.Log.WithField("rule", "test")
+	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
+	err = s.Open(ctx)
+	if err != nil {
+		t.Error(err)
+		return
+	}
+	var tests = []struct {
+		c map[string]interface{}
+		d interface{}
+		k string
+		v interface{}
+	}{
+		{
+			c: map[string]interface{}{"key": "1"},
+			d: map[string]interface{}{"id": 1, "name": "John", "address": 34, "mobile": "334433"},
+			k: "1",
+			v: `{"address":34,"id":1,"mobile":"334433","name":"John"}`,
+		},
+		{
+			c: map[string]interface{}{"field": "id"},
+			d: map[string]interface{}{"id": 2, "name": "Susan", "address": 34, "mobile": "334433"},
+			k: "2",
+			v: `{"address":34,"id":2,"mobile":"334433","name":"Susan"}`,
+		},
+		{
+			c: map[string]interface{}{"field": "name", "datatype": "list"},
+			d: map[string]interface{}{"id": 3, "name": "Susan"},
+			k: "Susan",
+			v: `{"id":3,"name":"Susan"}`,
+		},
+		{
+			c: map[string]interface{}{"field": "id", "datatype": "list"},
+			d: []map[string]interface{}{
+				{"id": 4, "name": "Susan"},
+				{"id": 4, "name": "Bob"},
+				{"id": 4, "name": "John"},
+			},
+			k: "4",
+			v: `{"id":4,"name":"John"}`,
+		},
+		{
+			c: map[string]interface{}{"field": "id", "datatype": "string"},
+			d: []map[string]interface{}{
+				{"id": 25, "name": "Susan"},
+				{"id": 25, "name": "Bob"},
+				{"id": 25, "name": "John"},
+			},
+			k: "25",
+			v: `{"id":25,"name":"John"}`,
+		},
+	}
+	for i, tt := range tests {
+		cast.MapToStruct(tt.c, s.c)
+		err = s.Collect(ctx, tt.d)
+		if err != nil {
+			t.Error(err)
+			return
+		}
+		var (
+			r   string
+			err error
+		)
+		switch tt.c["datatype"] {
+		case "list":
+			r, err = mr.Lpop(tt.k)
+		default:
+			r, err = mr.Get(tt.k)
+		}
+		if err != nil {
+			t.Errorf("case %d err %v", i, err)
+			return
+		}
+		if !reflect.DeepEqual(r, tt.v) {
+			t.Errorf("case %d expect %v, but got %v", i, tt.v, r)
+		}
+	}
+}