Bladeren bron

feat(sink): updatable redis sink

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 jaren geleden
bovenliggende
commit
014198d4e6
3 gewijzigde bestanden met toevoegingen van 217 en 14 verwijderingen
  1. 10 1
      internal/topo/memory/store/db_test.go
  2. 45 13
      internal/topo/redis/sink.go
  3. 162 0
      internal/topo/redis/sink_test.go

+ 10 - 1
internal/topo/memory/store/db_test.go

@@ -50,10 +50,19 @@ func TestTable(t *testing.T) {
 		api.NewDefaultSourceTuple(map[string]interface{}{"a": 2, "b": "0"}, nil),
 		api.NewDefaultSourceTuple(map[string]interface{}{"a": 2, "b": "0"}, nil),
 		api.NewDefaultSourceTuple(map[string]interface{}{"a": 5, "b": "0"}, nil),
 		api.NewDefaultSourceTuple(map[string]interface{}{"a": 5, "b": "0"}, nil),
 	}
 	}
-	if !reflect.DeepEqual(v, exp) {
+	if len(v) != 2 {
 		t.Errorf("read 1 again expect %v, but got %v", exp, v)
 		t.Errorf("read 1 again expect %v, but got %v", exp, v)
 		return
 		return
+	} else {
+		if v[0].Message()["a"] != 2 {
+			v[0], v[1] = v[1], v[0]
+		}
+		if !reflect.DeepEqual(v, exp) {
+			t.Errorf("read 1 again expect %v, but got %v", exp, v)
+			return
+		}
 	}
 	}
+
 	v, _ = tb.Read([]string{"a", "b"}, []interface{}{1, "1"})
 	v, _ = tb.Read([]string{"a", "b"}, []interface{}{1, "1"})
 	exp = []api.SourceTuple{
 	exp = []api.SourceTuple{
 		api.NewDefaultSourceTuple(map[string]interface{}{"a": 1, "b": "1"}, nil),
 		api.NewDefaultSourceTuple(map[string]interface{}{"a": 1, "b": "1"}, nil),

+ 45 - 13
internal/topo/redis/sink.go

@@ -20,7 +20,7 @@ import (
 	"encoding/json"
 	"encoding/json"
 	"errors"
 	"errors"
 	"fmt"
 	"fmt"
-	"github.com/lf-edge/ekuiper/pkg/errorx"
+	"github.com/lf-edge/ekuiper/pkg/ast"
 	"time"
 	"time"
 
 
 	"github.com/go-redis/redis/v7"
 	"github.com/go-redis/redis/v7"
@@ -146,20 +146,52 @@ func (r *RedisSink) save(ctx api.StreamContext, data map[string]interface{}, val
 			return fmt.Errorf("key must be string or convertible to string, but got %v", keyval)
 			return fmt.Errorf("key must be string or convertible to string, but got %v", keyval)
 		}
 		}
 	}
 	}
-	if r.c.DataType == "list" {
-		err = r.cli.LPush(key, val).Err()
-		if err != nil {
-			logger.Error(err)
-			return fmt.Errorf("%s:%s", errorx.IOErr, err.Error())
+	rowkind := ast.RowkindUpsert
+	if r.c.RowkindField != "" {
+		c, ok := data[r.c.RowkindField]
+		if ok {
+			rowkind, ok = c.(string)
+			if !ok {
+				return fmt.Errorf("rowkind field %s is not a string in data %v", r.c.RowkindField, data)
+			}
+			if rowkind != ast.RowkindInsert && rowkind != ast.RowkindUpdate && rowkind != ast.RowkindDelete && rowkind != ast.RowkindUpsert {
+				return fmt.Errorf("invalid rowkind %s", rowkind)
+			}
 		}
 		}
-		logger.Debugf("send redis list success, key:%s data: %v", key, val)
-	} else {
-		err = r.cli.Set(key, val, r.c.Expiration*time.Second).Err()
-		if err != nil {
-			logger.Error(err)
-			return fmt.Errorf("%s:%s", errorx.IOErr, err.Error())
+	}
+	switch rowkind {
+	case ast.RowkindInsert, ast.RowkindUpdate, ast.RowkindUpsert:
+		if r.c.DataType == "list" {
+			err = r.cli.LPush(key, val).Err()
+			if err != nil {
+				return fmt.Errorf("lpush %s:%s error, %v", key, val, err)
+			}
+			logger.Debugf("push redis list success, key:%s data: %v", key, val)
+		} else {
+			err = r.cli.Set(key, val, r.c.Expiration*time.Second).Err()
+			if err != nil {
+				return fmt.Errorf("set %s:%s error, %v", key, val, err)
+			}
+			logger.Debugf("set redis string success, key:%s data: %s", key, val)
 		}
 		}
-		logger.Debugf("send redis string success, key:%s data: %s", key, val)
+	case ast.RowkindDelete:
+		if r.c.DataType == "list" {
+			err = r.cli.LPop(key).Err()
+			if err != nil {
+				return fmt.Errorf("lpop %s error, %v", key, err)
+			}
+			logger.Debugf("pop redis list success, key:%s data: %v", key, val)
+		} else {
+			err = r.cli.Del(key).Err()
+			if err != nil {
+				logger.Error(err)
+				return err
+			}
+			logger.Debugf("delete redis string success, key:%s data: %s", key, val)
+		}
+	default:
+		// never happen
+		logger.Errorf("unexpected rowkind %s", rowkind)
 	}
 	}
 	return nil
 	return nil
 }
 }

+ 162 - 0
internal/topo/redis/sink_test.go

@@ -110,3 +110,165 @@ func TestSink(t *testing.T) {
 		}
 		}
 	}
 	}
 }
 }
+
+func TestUpdateString(t *testing.T) {
+	s := &RedisSink{}
+	err := s.Configure(map[string]interface{}{
+		"addr":         addr,
+		"field":        "id",
+		"rowkindField": "action",
+	})
+	if err != nil {
+		t.Error(err)
+		return
+	}
+	contextLogger := econf.Log.WithField("rule", "test")
+	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
+	err = s.Open(ctx)
+	if err != nil {
+		t.Error(err)
+		return
+	}
+	var tests = []struct {
+		d interface{}
+		k string
+		v interface{}
+	}{
+		{
+			d: map[string]interface{}{ // add without action
+				"id": "testUpdate1", "name": "Susan",
+			},
+			k: "testUpdate1",
+			v: `{"id":"testUpdate1","name":"Susan"}`,
+		},
+		{
+			d: map[string]interface{}{ // update with action
+				"action": "update", "id": "testUpdate1", "name": "John",
+			},
+			k: "testUpdate1",
+			v: `{"action":"update","id":"testUpdate1","name":"John"}`,
+		},
+		{
+			d: map[string]interface{}{ // delete
+				"action": "delete", "id": "testUpdate1",
+			},
+			k: "testUpdate1",
+			v: ``,
+		},
+		{
+			d: []map[string]interface{}{ // multiple actions
+				{"action": "delete", "id": "testUpdate1"},
+				{"action": "insert", "id": "testUpdate1", "name": "Susan"},
+			},
+			k: "testUpdate1",
+			v: `{"action":"insert","id":"testUpdate1","name":"Susan"}`,
+		},
+	}
+	for i, tt := range tests {
+		err = s.Collect(ctx, tt.d)
+		if err != nil {
+			t.Error(err)
+			return
+		}
+		r, err := mr.Get(tt.k)
+		if tt.v == "" {
+			if err == nil || err.Error() != "ERR no such key" {
+				t.Errorf("case %d err %v", i, err)
+				return
+			}
+		} else {
+			if err != nil {
+				t.Errorf("case %d err %v", i, err)
+				return
+			}
+			if !reflect.DeepEqual(r, tt.v) {
+				t.Errorf("case %d expect %v, but got %v", i, tt.v, r)
+			}
+		}
+	}
+}
+
+func TestUpdateList(t *testing.T) {
+	s := &RedisSink{}
+	err := s.Configure(map[string]interface{}{
+		"addr":         addr,
+		"field":        "id",
+		"datatype":     "list",
+		"rowkindField": "action",
+	})
+	if err != nil {
+		t.Error(err)
+		return
+	}
+	contextLogger := econf.Log.WithField("rule", "test")
+	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
+	err = s.Open(ctx)
+	if err != nil {
+		t.Error(err)
+		return
+	}
+	var tests = []struct {
+		d interface{}
+		k string
+		v []string
+	}{
+		{
+			d: map[string]interface{}{ // add without action
+				"id": "testUpdateList", "name": "Susan",
+			},
+			k: "testUpdateList",
+			v: []string{`{"id":"testUpdateList","name":"Susan"}`},
+		},
+		{
+			d: map[string]interface{}{ // update with action
+				"action": "update", "id": "testUpdateList", "name": "John",
+			},
+			k: "testUpdateList",
+			v: []string{`{"action":"update","id":"testUpdateList","name":"John"}`, `{"id":"testUpdateList","name":"Susan"}`},
+		},
+		{
+			d: map[string]interface{}{ // delete
+				"action": "delete", "id": "testUpdateList",
+			},
+			k: "testUpdateList",
+			v: []string{`{"id":"testUpdateList","name":"Susan"}`},
+		},
+		{
+			d: []map[string]interface{}{ // multiple actions
+				{"action": "delete", "id": "testUpdateList"},
+				{"action": "insert", "id": "testUpdateList", "name": "Susan"},
+			},
+			k: "testUpdateList",
+			v: []string{`{"action":"insert","id":"testUpdateList","name":"Susan"}`},
+		},
+		{
+			d: map[string]interface{}{ // delete
+				"action": "delete", "id": "testUpdateList",
+			},
+			k: "testUpdateList",
+			v: nil,
+		},
+	}
+	for i, tt := range tests {
+		err = s.Collect(ctx, tt.d)
+		if err != nil {
+			t.Error(err)
+			return
+		}
+		r, err := mr.List(tt.k)
+		if tt.v == nil {
+			if err == nil || err.Error() != "ERR no such key" {
+				t.Errorf("case %d err %v", i, err)
+				return
+			}
+		} else {
+			if err != nil {
+				t.Errorf("case %d err %v", i, err)
+				return
+			}
+			if !reflect.DeepEqual(r, tt.v) {
+				t.Errorf("case %d expect %v, but got %v", i, tt.v, r)
+			}
+		}
+	}
+}