Quellcode durchsuchen

feat(sink): updatable memory sink

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang vor 2 Jahren
Ursprung
Commit
d2dcc86ad4

+ 6 - 2
internal/topo/memory/lookupsource_test.go

@@ -42,8 +42,12 @@ func TestNoIndexLookup(t *testing.T) {
 	// wait for the source to be ready
 	// wait for the source to be ready
 	time.Sleep(100 * time.Millisecond)
 	time.Sleep(100 * time.Millisecond)
 	pubsub.Produce(ctx, "test", map[string]interface{}{"ff": "value1", "gg": "value2"})
 	pubsub.Produce(ctx, "test", map[string]interface{}{"ff": "value1", "gg": "value2"})
-	pubsub.Produce(ctx, "test", map[string]interface{}{"ff": "value2", "gg": "value2"})
+	pubsub.ProduceUpdatable(ctx, "test", map[string]interface{}{"ff": "value1", "gg": "value2"}, "delete", "ff")
+	pubsub.ProduceUpdatable(ctx, "test", map[string]interface{}{"ff": "value2", "gg": "value2"}, "insert", "ff")
 	pubsub.Produce(ctx, "test", map[string]interface{}{"ff": "value1", "gg": "value4"})
 	pubsub.Produce(ctx, "test", map[string]interface{}{"ff": "value1", "gg": "value4"})
+	pubsub.ProduceUpdatable(ctx, "test", map[string]interface{}{"ff": "value2", "gg": "value2"}, "delete", "ff")
+	pubsub.Produce(ctx, "test", map[string]interface{}{"ff": "value1", "gg": "value2"})
+	pubsub.Produce(ctx, "test", map[string]interface{}{"ff": "value2", "gg": "value2"})
 	// wait for table accumulation
 	// wait for table accumulation
 	time.Sleep(100 * time.Millisecond)
 	time.Sleep(100 * time.Millisecond)
 	canctx, cancel := gocontext.WithCancel(gocontext.Background())
 	canctx, cancel := gocontext.WithCancel(gocontext.Background())
@@ -59,8 +63,8 @@ func TestNoIndexLookup(t *testing.T) {
 		}
 		}
 	}()
 	}()
 	expected := []api.SourceTuple{
 	expected := []api.SourceTuple{
-		api.NewDefaultSourceTuple(map[string]interface{}{"ff": "value1", "gg": "value2"}, map[string]interface{}{"topic": "test"}),
 		api.NewDefaultSourceTuple(map[string]interface{}{"ff": "value1", "gg": "value4"}, map[string]interface{}{"topic": "test"}),
 		api.NewDefaultSourceTuple(map[string]interface{}{"ff": "value1", "gg": "value4"}, map[string]interface{}{"topic": "test"}),
+		api.NewDefaultSourceTuple(map[string]interface{}{"ff": "value1", "gg": "value2"}, map[string]interface{}{"topic": "test"}),
 	}
 	}
 	result, err := ls.Lookup(ctx, []string{}, []string{"ff"}, []interface{}{"value1"})
 	result, err := ls.Lookup(ctx, []string{}, []string{"ff"}, []interface{}{"value1"})
 	if !reflect.DeepEqual(result, expected) {
 	if !reflect.DeepEqual(result, expected) {

+ 13 - 2
internal/topo/memory/pubsub/manager.go

@@ -110,6 +110,18 @@ func RemovePub(topic string) {
 }
 }
 
 
 func Produce(ctx api.StreamContext, topic string, data map[string]interface{}) {
 func Produce(ctx api.StreamContext, topic string, data map[string]interface{}) {
+	doProduce(ctx, topic, api.NewDefaultSourceTuple(data, map[string]interface{}{"topic": topic}))
+}
+
+func ProduceUpdatable(ctx api.StreamContext, topic string, data map[string]interface{}, rowkind string, key string) {
+	doProduce(ctx, topic, &UpdatableTuple{
+		DefaultSourceTuple: api.NewDefaultSourceTuple(data, map[string]interface{}{"topic": topic}),
+		Rowkind:            rowkind,
+		Key:                key,
+	})
+}
+
+func doProduce(ctx api.StreamContext, topic string, data api.SourceTuple) {
 	c, exists := pubTopics[topic]
 	c, exists := pubTopics[topic]
 	if !exists {
 	if !exists {
 		return
 		return
@@ -120,7 +132,7 @@ func Produce(ctx api.StreamContext, topic string, data map[string]interface{}) {
 	// broadcast to all consumers
 	// broadcast to all consumers
 	for name, out := range c.consumers {
 	for name, out := range c.consumers {
 		select {
 		select {
-		case out <- api.NewDefaultSourceTuple(data, map[string]interface{}{"topic": topic}):
+		case out <- data:
 			logger.Debugf("memory source broadcast from topic %s to %s done", topic, name)
 			logger.Debugf("memory source broadcast from topic %s to %s done", topic, name)
 		case <-ctx.Done():
 		case <-ctx.Done():
 			// rule stop so stop waiting
 			// rule stop so stop waiting
@@ -128,7 +140,6 @@ func Produce(ctx api.StreamContext, topic string, data map[string]interface{}) {
 			logger.Errorf("memory source topic %s drop message to %s", topic, name)
 			logger.Errorf("memory source topic %s drop message to %s", topic, name)
 		}
 		}
 	}
 	}
-
 }
 }
 
 
 func ProduceError(ctx api.StreamContext, topic string, err error) {
 func ProduceError(ctx api.StreamContext, topic string, err error) {

+ 23 - 0
internal/topo/memory/pubsub/tuple.go

@@ -0,0 +1,23 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pubsub
+
+import "github.com/lf-edge/ekuiper/pkg/api"
+
+type UpdatableTuple struct {
+	*api.DefaultSourceTuple
+	Rowkind string
+	Key     string
+}

+ 57 - 13
internal/topo/memory/sink.go

@@ -19,12 +19,23 @@ import (
 	"fmt"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/topo/memory/pubsub"
 	"github.com/lf-edge/ekuiper/internal/topo/memory/pubsub"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/ast"
+	"github.com/lf-edge/ekuiper/pkg/cast"
 	"strings"
 	"strings"
 )
 )
 
 
+type config struct {
+	Topic        string `json:"topic"`
+	DataTemplate string `json:"dataTemplate"`
+	RowkindField string `json:"rowkindField"`
+	KeyField     string `json:"keyField"`
+}
+
 type sink struct {
 type sink struct {
 	topic        string
 	topic        string
 	hasTransform bool
 	hasTransform bool
+	keyField     string
+	rowkindField string
 }
 }
 
 
 func (s *sink) Open(ctx api.StreamContext) error {
 func (s *sink) Open(ctx api.StreamContext) error {
@@ -34,19 +45,23 @@ func (s *sink) Open(ctx api.StreamContext) error {
 }
 }
 
 
 func (s *sink) Configure(props map[string]interface{}) error {
 func (s *sink) Configure(props map[string]interface{}) error {
-	if t, ok := props[pubsub.IdProperty]; ok {
-		if id, casted := t.(string); casted {
-			if strings.ContainsAny(id, "#+") {
-				return fmt.Errorf("invalid memory topic %s: wildcard found", id)
-			}
-			s.topic = id
-		} else {
-			return fmt.Errorf("can't cast value %s to string", t)
-		}
+	cfg := &config{}
+	err := cast.MapToStruct(props, cfg)
+	if err != nil {
+		return err
+	}
+	if strings.ContainsAny(cfg.Topic, "#+") {
+		return fmt.Errorf("invalid memory topic %s: wildcard found", cfg.Topic)
 	}
 	}
-	if _, ok := props["dataTemplate"]; ok {
+	s.topic = cfg.Topic
+	if cfg.DataTemplate != "" {
 		s.hasTransform = true
 		s.hasTransform = true
 	}
 	}
+	s.rowkindField = cfg.RowkindField
+	s.keyField = cfg.KeyField
+	if s.rowkindField != "" && s.keyField == "" {
+		return fmt.Errorf("keyField is required when rowkindField is set")
+	}
 	return nil
 	return nil
 }
 }
 
 
@@ -68,14 +83,19 @@ func (s *sink) Collect(ctx api.StreamContext, data interface{}) error {
 		}
 		}
 		data = m
 		data = m
 	}
 	}
-
 	switch d := data.(type) {
 	switch d := data.(type) {
 	case []map[string]interface{}:
 	case []map[string]interface{}:
 		for _, el := range d {
 		for _, el := range d {
-			pubsub.Produce(ctx, topic, el)
+			err := s.publish(ctx, topic, el)
+			if err != nil {
+				return fmt.Errorf("fail to publish data %v for error %v", d, err)
+			}
 		}
 		}
 	case map[string]interface{}:
 	case map[string]interface{}:
-		pubsub.Produce(ctx, topic, d)
+		err := s.publish(ctx, topic, d)
+		if err != nil {
+			return fmt.Errorf("fail to publish data %v for error %v", d, err)
+		}
 	default:
 	default:
 		return fmt.Errorf("unrecognized format of %s", data)
 		return fmt.Errorf("unrecognized format of %s", data)
 	}
 	}
@@ -87,3 +107,27 @@ func (s *sink) Close(ctx api.StreamContext) error {
 	pubsub.RemovePub(s.topic)
 	pubsub.RemovePub(s.topic)
 	return nil
 	return nil
 }
 }
+
+func (s *sink) publish(ctx api.StreamContext, topic string, el map[string]interface{}) error {
+	if s.rowkindField != "" {
+		c, ok := el[s.rowkindField]
+		if !ok {
+			return fmt.Errorf("rowkind field %s not found in data %v", s.rowkindField, el)
+		}
+		rowkind, ok := c.(string)
+		if !ok {
+			return fmt.Errorf("rowkind field %s is not a string in data %v", s.rowkindField, el)
+		}
+		if rowkind != ast.RowkindInsert && rowkind != ast.RowkindUpdate && rowkind != ast.RowkindDelete && rowkind != ast.RowkindUpsert {
+			return fmt.Errorf("invalid rowkind %s", rowkind)
+		}
+		c, ok = el[s.keyField]
+		if !ok {
+			return fmt.Errorf("key field %s not found in data %v", s.keyField, el)
+		}
+		pubsub.ProduceUpdatable(ctx, topic, el, rowkind, s.keyField)
+	} else {
+		pubsub.Produce(ctx, topic, el)
+	}
+	return nil
+}

+ 84 - 0
internal/topo/memory/sink_test.go

@@ -0,0 +1,84 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package memory
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/topo/context"
+	"github.com/lf-edge/ekuiper/internal/topo/memory/pubsub"
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"reflect"
+	"testing"
+)
+
+func TestUpdate(t *testing.T) {
+	contextLogger := conf.Log.WithField("rule", "test2")
+	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
+	ms := GetSink()
+	err := ms.Configure(map[string]interface{}{"topic": "testupdate", "rowkindField": "verb", "keyField": "id"})
+	if err != nil {
+		t.Error(err)
+		return
+	}
+	err = ms.Open(ctx)
+	if err != nil {
+		t.Error(err)
+		return
+	}
+	var data = []map[string]interface{}{
+		{"id": "1", "verb": "insert", "name": "test1"},
+		{"id": "2", "verb": "insert", "name": "test2"},
+		{"id": "1", "verb": "update", "name": "test1"},
+		{"id": "2", "verb": "delete", "name": "test2"},
+	}
+	c := pubsub.CreateSub("testupdate", nil, "testSource", 100)
+	go func() {
+		for _, d := range data {
+			ms.Collect(ctx, d)
+		}
+	}()
+	var actual []api.SourceTuple
+	for i := 0; i < 4; i++ {
+		d := <-c
+		fmt.Println(d)
+		actual = append(actual, d)
+	}
+	expects := []api.SourceTuple{
+		&pubsub.UpdatableTuple{
+			DefaultSourceTuple: api.NewDefaultSourceTuple(map[string]interface{}{"id": "1", "verb": "insert", "name": "test1"}, map[string]interface{}{"topic": "testupdate"}),
+			Rowkind:            "insert",
+			Key:                "id",
+		},
+		&pubsub.UpdatableTuple{
+			DefaultSourceTuple: api.NewDefaultSourceTuple(map[string]interface{}{"id": "2", "verb": "insert", "name": "test2"}, map[string]interface{}{"topic": "testupdate"}),
+			Rowkind:            "insert",
+			Key:                "id",
+		},
+		&pubsub.UpdatableTuple{
+			DefaultSourceTuple: api.NewDefaultSourceTuple(map[string]interface{}{"id": "1", "verb": "update", "name": "test1"}, map[string]interface{}{"topic": "testupdate"}),
+			Rowkind:            "update",
+			Key:                "id",
+		},
+		&pubsub.UpdatableTuple{
+			DefaultSourceTuple: api.NewDefaultSourceTuple(map[string]interface{}{"id": "2", "verb": "delete", "name": "test2"}, map[string]interface{}{"topic": "testupdate"}),
+			Rowkind:            "delete",
+			Key:                "id",
+		},
+	}
+	if !reflect.DeepEqual(actual, expects) {
+		t.Errorf("expect %v but got %v", expects, actual)
+	}
+}

+ 39 - 0
internal/topo/memory/store/db.go

@@ -137,6 +137,45 @@ func (t *Table) add(value api.SourceTuple) {
 	}
 	}
 }
 }
 
 
+func (t *Table) delete(key string, value api.SourceTuple) error {
+	v, ok := value.Message()[key]
+	if !ok {
+		return fmt.Errorf("value not found for key %s", key)
+	}
+	t.Lock()
+	defer t.Unlock()
+	if d, ok := t.indexes[key]; ok {
+		if _, kok := d[v]; kok {
+			delete(d, v)
+		} else {
+			// has index but not hit, so just return
+			return nil
+		}
+	}
+	// After delete index, also delete in the data
+	arr := make([]api.SourceTuple, 0, len(t.datamap))
+	for _, st := range t.datamap {
+		if val, ok := st.Message()[key]; ok && val == v {
+			for k, d := range t.indexes {
+				if kval, ok := st.Message()[k]; ok {
+					newarr := make([]api.SourceTuple, 0, len(d[kval]))
+					for _, tuple := range d[kval] {
+						if tv, ok := tuple.Message()[key]; ok && tv == v {
+							continue
+						}
+						newarr = append(newarr, tuple)
+					}
+					d[kval] = newarr
+				}
+			}
+			continue
+		}
+		arr = append(arr, st)
+	}
+	t.datamap = arr
+	return nil
+}
+
 func (t *Table) Read(keys []string, values []interface{}) ([]api.SourceTuple, error) {
 func (t *Table) Read(keys []string, values []interface{}) ([]api.SourceTuple, error) {
 	t.RLock()
 	t.RLock()
 	defer t.RUnlock()
 	defer t.RUnlock()

+ 13 - 2
internal/topo/memory/store/db_test.go

@@ -24,7 +24,7 @@ func TestTable(t *testing.T) {
 	tb := createTable([]string{"a"})
 	tb := createTable([]string{"a"})
 	tb.add(api.NewDefaultSourceTuple(map[string]interface{}{"a": 1, "b": "0"}, nil))
 	tb.add(api.NewDefaultSourceTuple(map[string]interface{}{"a": 1, "b": "0"}, nil))
 	tb.add(api.NewDefaultSourceTuple(map[string]interface{}{"a": 2, "b": "0"}, nil))
 	tb.add(api.NewDefaultSourceTuple(map[string]interface{}{"a": 2, "b": "0"}, nil))
-	tb.add(api.NewDefaultSourceTuple(map[string]interface{}{"a": 3, "b": "0"}, nil))
+	tb.add(api.NewDefaultSourceTuple(map[string]interface{}{"a": 3, "b": "4"}, nil))
 	tb.add(api.NewDefaultSourceTuple(map[string]interface{}{"a": 1, "b": "1"}, nil))
 	tb.add(api.NewDefaultSourceTuple(map[string]interface{}{"a": 1, "b": "1"}, nil))
 	v, _ := tb.Read([]string{"a"}, []interface{}{1})
 	v, _ := tb.Read([]string{"a"}, []interface{}{1})
 	exp := []api.SourceTuple{
 	exp := []api.SourceTuple{
@@ -37,13 +37,14 @@ func TestTable(t *testing.T) {
 	}
 	}
 	v, _ = tb.Read([]string{"a"}, []interface{}{3})
 	v, _ = tb.Read([]string{"a"}, []interface{}{3})
 	exp = []api.SourceTuple{
 	exp = []api.SourceTuple{
-		api.NewDefaultSourceTuple(map[string]interface{}{"a": 3, "b": "0"}, nil),
+		api.NewDefaultSourceTuple(map[string]interface{}{"a": 3, "b": "4"}, nil),
 	}
 	}
 	if !reflect.DeepEqual(v, exp) {
 	if !reflect.DeepEqual(v, exp) {
 		t.Errorf("read 3 expect %v, but got %v", exp, v)
 		t.Errorf("read 3 expect %v, but got %v", exp, v)
 		return
 		return
 	}
 	}
 	tb.add(api.NewDefaultSourceTuple(map[string]interface{}{"a": 5, "b": "0"}, nil))
 	tb.add(api.NewDefaultSourceTuple(map[string]interface{}{"a": 5, "b": "0"}, nil))
+	tb.delete("b", api.NewDefaultSourceTuple(map[string]interface{}{"a": 3, "b": "4"}, nil))
 	tb.add(api.NewDefaultSourceTuple(map[string]interface{}{"a": 1, "b": "1"}, nil))
 	tb.add(api.NewDefaultSourceTuple(map[string]interface{}{"a": 1, "b": "1"}, nil))
 	v, _ = tb.Read([]string{"a"}, []interface{}{1})
 	v, _ = tb.Read([]string{"a"}, []interface{}{1})
 	exp = []api.SourceTuple{
 	exp = []api.SourceTuple{
@@ -64,6 +65,16 @@ func TestTable(t *testing.T) {
 		t.Errorf("read a,b expect %v, but got %v", exp, v)
 		t.Errorf("read a,b expect %v, but got %v", exp, v)
 		return
 		return
 	}
 	}
+	v, _ = tb.Read([]string{"a"}, []interface{}{3})
+	if v != nil {
+		t.Errorf("read a 3 expect nil, but got %v", v)
+		return
+	}
+	tb.delete("a", api.NewDefaultSourceTuple(map[string]interface{}{"a": 1, "b": "1"}, nil))
+	v, _ = tb.Read([]string{"a"}, []interface{}{1})
+	if v != nil {
+		t.Errorf("read a 1 expect nil, but got %v", v)
+	}
 }
 }
 
 
 func TestDb(t *testing.T) {
 func TestDb(t *testing.T) {

+ 12 - 1
internal/topo/memory/store/store.go

@@ -19,6 +19,7 @@ import (
 	"fmt"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/topo/memory/pubsub"
 	"github.com/lf-edge/ekuiper/internal/topo/memory/pubsub"
+	"github.com/lf-edge/ekuiper/pkg/ast"
 	"regexp"
 	"regexp"
 )
 )
 
 
@@ -46,7 +47,17 @@ func runTable(topic string, topicRegex *regexp.Regexp, t *Table) {
 			if !opened { // exit go routine is not sync with drop table
 			if !opened { // exit go routine is not sync with drop table
 				return
 				return
 			}
 			}
-			t.add(v)
+			switch vv := v.(type) {
+			case *pubsub.UpdatableTuple:
+				switch vv.Rowkind {
+				case ast.RowkindInsert, ast.RowkindUpdate, ast.RowkindUpsert:
+					t.add(vv.DefaultSourceTuple)
+				case ast.RowkindDelete:
+					t.delete(vv.Key, vv.DefaultSourceTuple)
+				}
+			default:
+				t.add(v)
+			}
 			conf.Log.Debugf("receive data %v for %s", v, topic)
 			conf.Log.Debugf("receive data %v for %s", v, topic)
 		case <-ctx.Done():
 		case <-ctx.Done():
 			return
 			return

+ 8 - 1
pkg/ast/statement.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -189,3 +189,10 @@ type SortField struct {
 type SortFields []SortField
 type SortFields []SortField
 
 
 func (d SortFields) node() {}
 func (d SortFields) node() {}
+
+const (
+	RowkindInsert = "insert"
+	RowkindUpdate = "update"
+	RowkindUpsert = "upsert"
+	RowkindDelete = "delete"
+)