Explorar el Código

refactor(lookup): require key for memory table

All lookup tables must have a primary key.
Memory table supports upsert(insert/update) and delete only based on pk

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang hace 2 años
padre
commit
6db3c67e40

+ 0 - 2
etc/sources/memory.yaml

@@ -1,2 +0,0 @@
-default:
-  index: ["id"]

+ 2 - 2
internal/processor/stream_test.go

@@ -205,8 +205,8 @@ func TestTableProcessor(t *testing.T) {
 func TestTableList(t *testing.T) {
 	p := NewStreamProcessor()
 	p.ExecStmt(`CREATE TABLE tt1 () WITH (DATASOURCE="users", FORMAT="JSON", KIND="scan")`)
-	p.ExecStmt(`CREATE TABLE tt2 () WITH (DATASOURCE="users", TYPE="memory", FORMAT="JSON", KIND="lookup")`)
-	p.ExecStmt(`CREATE TABLE tt3 () WITH (DATASOURCE="users", TYPE="memory", FORMAT="JSON", KIND="lookup")`)
+	p.ExecStmt(`CREATE TABLE tt2 () WITH (DATASOURCE="users", TYPE="memory", FORMAT="JSON", KEY="id", KIND="lookup")`)
+	p.ExecStmt(`CREATE TABLE tt3 () WITH (DATASOURCE="users", TYPE="memory", FORMAT="JSON", KEY="id", KIND="lookup")`)
 	p.ExecStmt(`CREATE TABLE tt4 () WITH (DATASOURCE="users", FORMAT="JSON")`)
 	defer func() {
 		p.ExecStmt(`DROP TABLE tt1`)

+ 2 - 0
internal/topo/lookup/table_test.go

@@ -27,6 +27,7 @@ func TestTable(t *testing.T) {
 		DATASOURCE: "test",
 		TYPE:       "memory",
 		KIND:       "lookup",
+		KEY:        "id",
 	})
 	if err != nil {
 		t.Error(err)
@@ -36,6 +37,7 @@ func TestTable(t *testing.T) {
 		DATASOURCE: "test2",
 		TYPE:       "memory",
 		KIND:       "lookup",
+		KEY:        "id",
 	})
 	if err != nil {
 		t.Error(err)

+ 11 - 8
internal/topo/memory/lookupsource.go

@@ -15,9 +15,9 @@
 package memory
 
 import (
+	"fmt"
 	"github.com/lf-edge/ekuiper/internal/topo/memory/store"
 	"github.com/lf-edge/ekuiper/pkg/api"
-	"github.com/lf-edge/ekuiper/pkg/cast"
 	"regexp"
 	"strings"
 )
@@ -27,14 +27,14 @@ import (
 type lookupsource struct {
 	topic      string
 	topicRegex *regexp.Regexp
-	keys       []string
 	table      *store.Table
+	key        string
 }
 
 func (s *lookupsource) Open(ctx api.StreamContext) error {
-	ctx.GetLogger().Infof("lookup source %s is opened with keys %v", s.topic, s.keys)
+	ctx.GetLogger().Infof("lookup source %s is opened with key %v", s.topic, s.key)
 	var err error
-	s.table, err = store.Reg(s.topic, s.topicRegex, s.keys)
+	s.table, err = store.Reg(s.topic, s.topicRegex, s.key)
 	return err
 }
 
@@ -47,11 +47,14 @@ func (s *lookupsource) Configure(datasource string, props map[string]interface{}
 		}
 		s.topicRegex = r
 	}
-	if c, ok := props["index"]; ok {
-		if bl, err := cast.ToStringSlice(c, cast.CONVERT_SAMEKIND); err != nil {
-			s.keys = bl
+	if k, ok := props["key"]; ok {
+		if kk, ok := k.(string); ok {
+			s.key = kk
 		}
 	}
+	if s.key == "" {
+		return fmt.Errorf("key is required for lookup source")
+	}
 	return nil
 }
 
@@ -62,5 +65,5 @@ func (s *lookupsource) Lookup(ctx api.StreamContext, _ []string, keys []string,
 
 func (s *lookupsource) Close(ctx api.StreamContext) error {
 	ctx.GetLogger().Infof("lookup source %s is closing", s.topic)
-	return store.Unreg(s.topic)
+	return store.Unreg(s.topic, s.key)
 }

+ 16 - 10
internal/topo/memory/lookupsource_test.go

@@ -25,11 +25,11 @@ import (
 	"time"
 )
 
-func TestNoIndexLookup(t *testing.T) {
+func TestUpdateLookup(t *testing.T) {
 	contextLogger := conf.Log.WithField("rule", "test")
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 	ls := GetLookupSource()
-	err := ls.Configure("test", map[string]interface{}{"option": "value"})
+	err := ls.Configure("test", map[string]interface{}{"key": "ff"})
 	if err != nil {
 		t.Error(err)
 		return
@@ -42,10 +42,10 @@ func TestNoIndexLookup(t *testing.T) {
 	// wait for the source to be ready
 	time.Sleep(100 * time.Millisecond)
 	pubsub.Produce(ctx, "test", map[string]interface{}{"ff": "value1", "gg": "value2"})
-	pubsub.ProduceUpdatable(ctx, "test", map[string]interface{}{"ff": "value1", "gg": "value2"}, "delete", "ff")
-	pubsub.ProduceUpdatable(ctx, "test", map[string]interface{}{"ff": "value2", "gg": "value2"}, "insert", "ff")
+	pubsub.ProduceUpdatable(ctx, "test", map[string]interface{}{"ff": "value1", "gg": "value2"}, "delete", "value1")
+	pubsub.ProduceUpdatable(ctx, "test", map[string]interface{}{"ff": "value2", "gg": "value2"}, "insert", "value2")
 	pubsub.Produce(ctx, "test", map[string]interface{}{"ff": "value1", "gg": "value4"})
-	pubsub.ProduceUpdatable(ctx, "test", map[string]interface{}{"ff": "value2", "gg": "value2"}, "delete", "ff")
+	pubsub.ProduceUpdatable(ctx, "test", map[string]interface{}{"ff": "value2", "gg": "value2"}, "delete", "value2")
 	pubsub.Produce(ctx, "test", map[string]interface{}{"ff": "value1", "gg": "value2"})
 	pubsub.Produce(ctx, "test", map[string]interface{}{"ff": "value2", "gg": "value2"})
 	// wait for table accumulation
@@ -63,7 +63,6 @@ func TestNoIndexLookup(t *testing.T) {
 		}
 	}()
 	expected := []api.SourceTuple{
-		api.NewDefaultSourceTuple(map[string]interface{}{"ff": "value1", "gg": "value4"}, map[string]interface{}{"topic": "test"}),
 		api.NewDefaultSourceTuple(map[string]interface{}{"ff": "value1", "gg": "value2"}, map[string]interface{}{"topic": "test"}),
 	}
 	result, err := ls.Lookup(ctx, []string{}, []string{"ff"}, []interface{}{"value1"})
@@ -77,11 +76,11 @@ func TestNoIndexLookup(t *testing.T) {
 	}
 }
 
-func TestSingleIndexLookup(t *testing.T) {
+func TestLookup(t *testing.T) {
 	contextLogger := conf.Log.WithField("rule", "test2")
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 	ls := GetLookupSource()
-	err := ls.Configure("test2", map[string]interface{}{"index": []string{"ff"}})
+	err := ls.Configure("test2", map[string]interface{}{"key": "gg"})
 	if err != nil {
 		t.Error(err)
 		return
@@ -94,7 +93,7 @@ func TestSingleIndexLookup(t *testing.T) {
 	// wait for the source to be ready
 	time.Sleep(100 * time.Millisecond)
 	pubsub.Produce(ctx, "test2", map[string]interface{}{"ff": "value1", "gg": "value2"})
-	pubsub.Produce(ctx, "test2", map[string]interface{}{"ff": "value2", "gg": "value2"})
+	pubsub.Produce(ctx, "test2", map[string]interface{}{"ff": "value2", "gg": "value3"})
 	pubsub.Produce(ctx, "test2", map[string]interface{}{"ff": "value1", "gg": "value4"})
 	// wait for table accumulation
 	time.Sleep(100 * time.Millisecond)
@@ -106,7 +105,7 @@ func TestSingleIndexLookup(t *testing.T) {
 			case <-canctx.Done():
 				return
 			case <-time.After(10 * time.Millisecond):
-				pubsub.Produce(ctx, "test", map[string]interface{}{"ff": "value4", "gg": "value2"})
+				pubsub.Produce(ctx, "test", map[string]interface{}{"ff": "value4", "gg": "value5"})
 			}
 		}
 	}()
@@ -115,6 +114,13 @@ func TestSingleIndexLookup(t *testing.T) {
 		api.NewDefaultSourceTuple(map[string]interface{}{"ff": "value1", "gg": "value4"}, map[string]interface{}{"topic": "test2"}),
 	}
 	result, err := ls.Lookup(ctx, []string{}, []string{"ff"}, []interface{}{"value1"})
+	if len(result) != 2 {
+		t.Errorf("expect %v but got %v", expected, result)
+	} else {
+		if result[0].Message()["gg"] != "value2" {
+			result[0], result[1] = result[1], result[0]
+		}
+	}
 	if !reflect.DeepEqual(result, expected) {
 		t.Errorf("expect %v but got %v", expected, result)
 	}

+ 2 - 2
internal/topo/memory/pubsub/manager.go

@@ -113,11 +113,11 @@ func Produce(ctx api.StreamContext, topic string, data map[string]interface{}) {
 	doProduce(ctx, topic, api.NewDefaultSourceTuple(data, map[string]interface{}{"topic": topic}))
 }
 
-func ProduceUpdatable(ctx api.StreamContext, topic string, data map[string]interface{}, rowkind string, key string) {
+func ProduceUpdatable(ctx api.StreamContext, topic string, data map[string]interface{}, rowkind string, keyval interface{}) {
 	doProduce(ctx, topic, &UpdatableTuple{
 		DefaultSourceTuple: api.NewDefaultSourceTuple(data, map[string]interface{}{"topic": topic}),
 		Rowkind:            rowkind,
-		Key:                key,
+		Keyval:             keyval,
 	})
 }
 

+ 1 - 1
internal/topo/memory/pubsub/tuple.go

@@ -19,5 +19,5 @@ import "github.com/lf-edge/ekuiper/pkg/api"
 type UpdatableTuple struct {
 	*api.DefaultSourceTuple
 	Rowkind string
-	Key     string
+	Keyval  interface{}
 }

+ 12 - 10
internal/topo/memory/sink.go

@@ -111,21 +111,23 @@ func (s *sink) Close(ctx api.StreamContext) error {
 func (s *sink) publish(ctx api.StreamContext, topic string, el map[string]interface{}) error {
 	if s.rowkindField != "" {
 		c, ok := el[s.rowkindField]
+		var rowkind string
 		if !ok {
-			return fmt.Errorf("rowkind field %s not found in data %v", s.rowkindField, el)
-		}
-		rowkind, ok := c.(string)
-		if !ok {
-			return fmt.Errorf("rowkind field %s is not a string in data %v", s.rowkindField, el)
-		}
-		if rowkind != ast.RowkindInsert && rowkind != ast.RowkindUpdate && rowkind != ast.RowkindDelete && rowkind != ast.RowkindUpsert {
-			return fmt.Errorf("invalid rowkind %s", rowkind)
+			rowkind = ast.RowkindUpsert
+		} else {
+			rowkind, ok = c.(string)
+			if !ok {
+				return fmt.Errorf("rowkind field %s is not a string in data %v", s.rowkindField, el)
+			}
+			if rowkind != ast.RowkindInsert && rowkind != ast.RowkindUpdate && rowkind != ast.RowkindDelete && rowkind != ast.RowkindUpsert {
+				return fmt.Errorf("invalid rowkind %s", rowkind)
+			}
 		}
-		c, ok = el[s.keyField]
+		key, ok := el[s.keyField]
 		if !ok {
 			return fmt.Errorf("key field %s not found in data %v", s.keyField, el)
 		}
-		pubsub.ProduceUpdatable(ctx, topic, el, rowkind, s.keyField)
+		pubsub.ProduceUpdatable(ctx, topic, el, rowkind, key)
 	} else {
 		pubsub.Produce(ctx, topic, el)
 	}

+ 4 - 4
internal/topo/memory/sink_test.go

@@ -60,22 +60,22 @@ func TestUpdate(t *testing.T) {
 		&pubsub.UpdatableTuple{
 			DefaultSourceTuple: api.NewDefaultSourceTuple(map[string]interface{}{"id": "1", "verb": "insert", "name": "test1"}, map[string]interface{}{"topic": "testupdate"}),
 			Rowkind:            "insert",
-			Key:                "id",
+			Keyval:             "1",
 		},
 		&pubsub.UpdatableTuple{
 			DefaultSourceTuple: api.NewDefaultSourceTuple(map[string]interface{}{"id": "2", "verb": "insert", "name": "test2"}, map[string]interface{}{"topic": "testupdate"}),
 			Rowkind:            "insert",
-			Key:                "id",
+			Keyval:             "2",
 		},
 		&pubsub.UpdatableTuple{
 			DefaultSourceTuple: api.NewDefaultSourceTuple(map[string]interface{}{"id": "1", "verb": "update", "name": "test1"}, map[string]interface{}{"topic": "testupdate"}),
 			Rowkind:            "update",
-			Key:                "id",
+			Keyval:             "1",
 		},
 		&pubsub.UpdatableTuple{
 			DefaultSourceTuple: api.NewDefaultSourceTuple(map[string]interface{}{"id": "2", "verb": "delete", "name": "test2"}, map[string]interface{}{"topic": "testupdate"}),
 			Rowkind:            "delete",
-			Key:                "id",
+			Keyval:             "2",
 		},
 	}
 	if !reflect.DeepEqual(actual, expects) {

+ 46 - 80
internal/topo/memory/store/db.go

@@ -17,6 +17,7 @@ package store
 import (
 	"context"
 	"fmt"
+	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"sync"
 )
@@ -39,21 +40,22 @@ func (tc *tableCount) Decrease() int {
 	defer tc.Unlock()
 	tc.count--
 	if tc.count < 0 {
-		fmt.Errorf("Table count is less than 0: %d", tc.count)
+		conf.Log.Errorf("Table count is less than 0: %d", tc.count)
 	}
 	return tc.count
 }
 
 type database struct {
 	sync.RWMutex
-	tables map[string]*tableCount // topic: table
+	tables map[string]*tableCount // topic_key: table
 }
 
 // getTable return the table of the topic.
-func (db *database) getTable(topic string) (*Table, bool) {
+func (db *database) getTable(topic string, key string) (*Table, bool) {
 	db.RLock()
 	defer db.RUnlock()
-	tc, ok := db.tables[topic]
+	tableId := fmt.Sprintf("%s_%s", topic, key)
+	tc, ok := db.tables[tableId]
 	if ok {
 		return tc.t, true
 	} else {
@@ -65,19 +67,20 @@ func (db *database) getTable(topic string) (*Table, bool) {
 // If the table already exists, return the existing table;
 // otherwise, create a new table and return it.
 // The second argument is to indicate if the table is newly created
-func (db *database) addTable(topic string, keys []string) (*Table, bool) {
+func (db *database) addTable(topic string, key string) (*Table, bool) {
 	db.Lock()
 	defer db.Unlock()
-	tc, ok := db.tables[topic]
+	tableId := fmt.Sprintf("%s_%s", topic, key)
+	tc, ok := db.tables[tableId]
 	if ok {
 		tc.Increase()
 	} else {
-		t := createTable(keys)
+		t := createTable(topic, key)
 		tc = &tableCount{
 			count: 1,
 			t:     t,
 		}
-		db.tables[topic] = tc
+		db.tables[tableId] = tc
 	}
 	return tc.t, !ok
 }
@@ -85,118 +88,81 @@ func (db *database) addTable(topic string, keys []string) (*Table, bool) {
 // dropTable drop the table of the topic/values
 // stops to accumulate job
 // deletes the cache data
-func (db *database) dropTable(topic string) error {
+func (db *database) dropTable(topic string, key string) error {
+	tableId := fmt.Sprintf("%s_%s", topic, key)
 	db.Lock()
 	defer db.Unlock()
-	if tc, ok := db.tables[topic]; ok {
+	if tc, ok := db.tables[tableId]; ok {
 		if tc.Decrease() == 0 {
 			if tc.t != nil && tc.t.cancel != nil {
 				tc.t.cancel()
 			}
-			delete(db.tables, topic)
+			delete(db.tables, tableId)
 		}
 		return nil
 	}
-	return fmt.Errorf("Table %s not found", topic)
+	return fmt.Errorf("Table %s not found", tableId)
 }
 
 // Table has one writer and multiple reader
 type Table struct {
 	sync.RWMutex
-	// datamap is the overall data
-	datamap  []api.SourceTuple
-	hasIndex bool
-	// indexes is the indexed data
-	indexes map[string]map[interface{}][]api.SourceTuple
+	topic string
+	key   string
+	// datamap is the overall data indexed by primary key
+	datamap map[interface{}]api.SourceTuple
 	cancel  context.CancelFunc
 }
 
-func createTable(keys []string) *Table {
-	t := &Table{}
-	if len(keys) > 0 {
-		t.indexes = make(map[string]map[interface{}][]api.SourceTuple, len(keys))
-		for _, k := range keys {
-			t.indexes[k] = make(map[interface{}][]api.SourceTuple)
-		}
-		t.hasIndex = true
-	}
+func createTable(topic string, key string) *Table {
+	t := &Table{topic: topic, key: key, datamap: make(map[interface{}]api.SourceTuple)}
 	return t
 }
 
 func (t *Table) add(value api.SourceTuple) {
 	t.Lock()
 	defer t.Unlock()
-	t.datamap = append(t.datamap, value)
-	for k, v := range t.indexes {
-		if val, ok := value.Message()[k]; ok {
-			if _, ok := v[val]; !ok {
-				v[val] = make([]api.SourceTuple, 0)
-			}
-			v[val] = append(v[val], value)
-		}
+	keyval, ok := value.Message()[t.key]
+	if !ok {
+		conf.Log.Errorf("add to table %s omitted, value not found for key %s", t.topic, t.key)
 	}
+	t.datamap[keyval] = value
 }
 
-func (t *Table) delete(key string, value api.SourceTuple) error {
-	v, ok := value.Message()[key]
-	if !ok {
-		return fmt.Errorf("value not found for key %s", key)
-	}
+func (t *Table) delete(key interface{}) {
 	t.Lock()
 	defer t.Unlock()
-	if d, ok := t.indexes[key]; ok {
-		if _, kok := d[v]; kok {
-			delete(d, v)
-		} else {
-			// has index but not hit, so just return
-			return nil
-		}
-	}
-	// After delete index, also delete in the data
-	arr := make([]api.SourceTuple, 0, len(t.datamap))
-	for _, st := range t.datamap {
-		if val, ok := st.Message()[key]; ok && val == v {
-			for k, d := range t.indexes {
-				if kval, ok := st.Message()[k]; ok {
-					newarr := make([]api.SourceTuple, 0, len(d[kval]))
-					for _, tuple := range d[kval] {
-						if tv, ok := tuple.Message()[key]; ok && tv == v {
-							continue
-						}
-						newarr = append(newarr, tuple)
-					}
-					d[kval] = newarr
-				}
-			}
-			continue
-		}
-		arr = append(arr, st)
-	}
-	t.datamap = arr
-	return nil
+	delete(t.datamap, key)
 }
 
 func (t *Table) Read(keys []string, values []interface{}) ([]api.SourceTuple, error) {
 	t.RLock()
 	defer t.RUnlock()
-	data := t.datamap
-	excludeKey := -1
-	if t.hasIndex {
-		// Find the first indexed key
+	// Find the primary key
+	var matched api.SourceTuple
+	for i, k := range keys {
+		if k == t.key {
+			matched = t.datamap[values[i]]
+		}
+	}
+	if matched != nil {
+		match := true
 		for i, k := range keys {
-			if d, ok := t.indexes[k]; ok {
-				data = d[values[i]]
-				excludeKey = i
+			if val, ok := matched.Message()[k]; !ok || val != values[i] {
+				match = false
+				break
 			}
 		}
+		if match {
+			return []api.SourceTuple{matched}, nil
+		} else {
+			return nil, nil
+		}
 	}
 	var result []api.SourceTuple
-	for _, v := range data {
+	for _, v := range t.datamap {
 		match := true
 		for i, k := range keys {
-			if i == excludeKey {
-				continue
-			}
 			if val, ok := v.Message()[k]; !ok || val != values[i] {
 				match = false
 				break

+ 28 - 31
internal/topo/memory/store/db_test.go

@@ -21,36 +21,34 @@ import (
 )
 
 func TestTable(t *testing.T) {
-	tb := createTable([]string{"a"})
+	tb := createTable("topicT", "a")
 	tb.add(api.NewDefaultSourceTuple(map[string]interface{}{"a": 1, "b": "0"}, nil))
 	tb.add(api.NewDefaultSourceTuple(map[string]interface{}{"a": 2, "b": "0"}, nil))
 	tb.add(api.NewDefaultSourceTuple(map[string]interface{}{"a": 3, "b": "4"}, nil))
 	tb.add(api.NewDefaultSourceTuple(map[string]interface{}{"a": 1, "b": "1"}, nil))
 	v, _ := tb.Read([]string{"a"}, []interface{}{1})
 	exp := []api.SourceTuple{
-		api.NewDefaultSourceTuple(map[string]interface{}{"a": 1, "b": "0"}, nil),
 		api.NewDefaultSourceTuple(map[string]interface{}{"a": 1, "b": "1"}, nil),
 	}
 	if !reflect.DeepEqual(v, exp) {
-		t.Errorf("read 1 expect %v, but got %v", exp, v)
+		t.Errorf("read a 1 expect %v, but got %v", exp, v)
 		return
 	}
-	v, _ = tb.Read([]string{"a"}, []interface{}{3})
+	v, _ = tb.Read([]string{"b"}, []interface{}{"0"})
 	exp = []api.SourceTuple{
-		api.NewDefaultSourceTuple(map[string]interface{}{"a": 3, "b": "4"}, nil),
+		api.NewDefaultSourceTuple(map[string]interface{}{"a": 2, "b": "0"}, nil),
 	}
 	if !reflect.DeepEqual(v, exp) {
-		t.Errorf("read 3 expect %v, but got %v", exp, v)
+		t.Errorf("read b 0 expect %v, but got %v", exp, v)
 		return
 	}
 	tb.add(api.NewDefaultSourceTuple(map[string]interface{}{"a": 5, "b": "0"}, nil))
-	tb.delete("b", api.NewDefaultSourceTuple(map[string]interface{}{"a": 3, "b": "4"}, nil))
+	tb.delete(3)
 	tb.add(api.NewDefaultSourceTuple(map[string]interface{}{"a": 1, "b": "1"}, nil))
-	v, _ = tb.Read([]string{"a"}, []interface{}{1})
+	v, _ = tb.Read([]string{"b"}, []interface{}{"0"})
 	exp = []api.SourceTuple{
-		api.NewDefaultSourceTuple(map[string]interface{}{"a": 1, "b": "0"}, nil),
-		api.NewDefaultSourceTuple(map[string]interface{}{"a": 1, "b": "1"}, nil),
-		api.NewDefaultSourceTuple(map[string]interface{}{"a": 1, "b": "1"}, nil),
+		api.NewDefaultSourceTuple(map[string]interface{}{"a": 2, "b": "0"}, nil),
+		api.NewDefaultSourceTuple(map[string]interface{}{"a": 5, "b": "0"}, nil),
 	}
 	if !reflect.DeepEqual(v, exp) {
 		t.Errorf("read 1 again expect %v, but got %v", exp, v)
@@ -59,7 +57,6 @@ func TestTable(t *testing.T) {
 	v, _ = tb.Read([]string{"a", "b"}, []interface{}{1, "1"})
 	exp = []api.SourceTuple{
 		api.NewDefaultSourceTuple(map[string]interface{}{"a": 1, "b": "1"}, nil),
-		api.NewDefaultSourceTuple(map[string]interface{}{"a": 1, "b": "1"}, nil),
 	}
 	if !reflect.DeepEqual(v, exp) {
 		t.Errorf("read a,b expect %v, but got %v", exp, v)
@@ -70,7 +67,7 @@ func TestTable(t *testing.T) {
 		t.Errorf("read a 3 expect nil, but got %v", v)
 		return
 	}
-	tb.delete("a", api.NewDefaultSourceTuple(map[string]interface{}{"a": 1, "b": "1"}, nil))
+	tb.delete(1)
 	v, _ = tb.Read([]string{"a"}, []interface{}{1})
 	if v != nil {
 		t.Errorf("read a 1 expect nil, but got %v", v)
@@ -81,45 +78,45 @@ func TestDb(t *testing.T) {
 	db = &database{
 		tables: make(map[string]*tableCount),
 	}
-	db.addTable("t1", []string{"a"})
-	db.addTable("t1", []string{"a", "b"})
-	db.addTable("t2", []string{"a"})
-	db.addTable("t1", []string{"a"})
-	_, ok := db.getTable("t1")
+	db.addTable("t1", "a")
+	db.addTable("t1", "b")
+	db.addTable("t2", "a")
+	db.addTable("t1", "a")
+	_, ok := db.getTable("t1", "a")
 	if !ok {
 		t.Errorf("table t1 a should exist")
 		return
 	}
-	_, ok = db.getTable("t1")
+	_, ok = db.getTable("t1", "b")
 	if !ok {
 		t.Errorf("table t1 b should exist")
 		return
 	}
-	_, ok = db.getTable("t3")
+	_, ok = db.getTable("t3", "a")
 	if ok {
 		t.Errorf("table t1 c should not exist")
 		return
 	}
-	tc := db.tables["t1"]
-	if tc.count != 3 {
+	tc := db.tables["t1_a"]
+	if tc.count != 2 {
 		t.Errorf("table t1 a should have 2 instances but got %d", tc.count)
 		return
 	}
-	tc = db.tables["t2"]
+	tc = db.tables["t2_a"]
 	if tc.count != 1 {
-		t.Errorf("table t1 a should have 1 instances")
+		t.Errorf("table t2 a should have 1 instances")
 		return
 	}
-	db.dropTable("t1")
-	db.dropTable("t2")
-	_, ok = db.getTable("t2")
+	db.dropTable("t1", "a")
+	db.dropTable("t2", "a")
+	_, ok = db.getTable("t2", "a")
 	if ok {
-		t.Errorf("table ta a should not exist")
+		t.Errorf("table t2 a should not exist")
 		return
 	}
-	tc = db.tables["t1"]
-	if tc.count != 2 {
-		t.Errorf("table t1 a should have 2 instances but got %d", tc.count)
+	tc = db.tables["t1_a"]
+	if tc.count != 1 {
+		t.Errorf("table t1 a should have 1 instances but got %d", tc.count)
 		return
 	}
 }

+ 5 - 5
internal/topo/memory/store/store.go

@@ -25,8 +25,8 @@ import (
 
 // Reg registers a topic to save it to memory store
 // Create a new go routine to listen to the topic and save the data to memory
-func Reg(topic string, topicRegex *regexp.Regexp, keys []string) (*Table, error) {
-	t, isNew := db.addTable(topic, keys)
+func Reg(topic string, topicRegex *regexp.Regexp, key string) (*Table, error) {
+	t, isNew := db.addTable(topic, key)
 	if isNew {
 		go runTable(topic, topicRegex, t)
 	}
@@ -53,7 +53,7 @@ func runTable(topic string, topicRegex *regexp.Regexp, t *Table) {
 				case ast.RowkindInsert, ast.RowkindUpdate, ast.RowkindUpsert:
 					t.add(vv.DefaultSourceTuple)
 				case ast.RowkindDelete:
-					t.delete(vv.Key, vv.DefaultSourceTuple)
+					t.delete(vv.Keyval)
 				}
 			default:
 				t.add(v)
@@ -66,7 +66,7 @@ func runTable(topic string, topicRegex *regexp.Regexp, t *Table) {
 }
 
 // Unreg unregisters a topic to remove it from memory store
-func Unreg(topic string) error {
+func Unreg(topic string, key string) error {
 	// Must be an atomic operation
-	return db.dropTable(topic)
+	return db.dropTable(topic, key)
 }

+ 5 - 5
internal/topo/memory/store/store_test.go

@@ -23,18 +23,18 @@ func TestReg(t *testing.T) {
 	db = &database{
 		tables: make(map[string]*tableCount),
 	}
-	reg1, err := Reg("test", nil, []string{"a"})
+	reg1, err := Reg("test", nil, "a")
 	if err != nil {
 		t.Errorf("register test error: %v", err)
 		return
 	}
-	_, err2 := Reg("test", nil, []string{"a", "b"})
+	_, err2 := Reg("test", nil, "a")
 	if err2 != nil {
 		t.Errorf("register test error: %v", err2)
 		return
 	}
 	exp := map[string]*tableCount{
-		"test": {
+		"test_a": {
 			count: 2,
 			t:     reg1,
 		},
@@ -43,9 +43,9 @@ func TestReg(t *testing.T) {
 		t.Errorf("register expect %v, but got %v", exp, db.tables)
 		return
 	}
-	Unreg("test")
+	Unreg("test", "a")
 	exp = map[string]*tableCount{
-		"test": {
+		"test_a": {
 			count: 1,
 			t:     reg1,
 		},

+ 1 - 0
internal/topo/node/conf/source.go

@@ -54,6 +54,7 @@ func GetSourceConf(sourceType string, options *ast.Options) map[string]interface
 		f = "json"
 	}
 	props["format"] = strings.ToLower(f)
+	props["key"] = strings.ToLower(options.KEY)
 	conf.Log.Debugf("get conf for %s with conf key %s: %v", sourceType, confkey, printable(props))
 	return props
 }

+ 2 - 0
internal/topo/node/source_node_test.go

@@ -31,6 +31,7 @@ func TestGetConf_Apply(t *testing.T) {
 		"incremental":        false,
 		"body":               "{}",
 		"bodyType":           "json",
+		"key":                "",
 		"format":             "json",
 		"insecureSkipVerify": true,
 		"headers": map[string]interface{}{
@@ -56,6 +57,7 @@ func TestGetConfAndConvert_Apply(t *testing.T) {
 		"incremental":        true,
 		"body":               "{}",
 		"bodyType":           "json",
+		"key":                "",
 		"format":             "json",
 		"insecureSkipVerify": true,
 		"headers": map[string]interface{}{

+ 3 - 0
internal/xsql/parser.go

@@ -1461,6 +1461,9 @@ func (p *Parser) parseStreamOptions() (*ast.Options, error) {
 	} else {
 		return nil, fmt.Errorf("found %q, expect stream options.", lit)
 	}
+	if opts.KIND == ast.StreamKindLookup && opts.TYPE == "memory" && opts.KEY == "" {
+		return nil, fmt.Errorf("Option \"key\" is required for memory lookup table.")
+	}
 	return opts, nil
 }
 

+ 10 - 1
internal/xsql/parser_tree_test.go

@@ -112,7 +112,7 @@ func TestParser_ParseTree(t *testing.T) {
 					name STRING,
 					size BIGINT,
 					id BIGINT
-				) WITH (DATASOURCE="devices", KIND="LOOKUP", TYPE="sql");`,
+				) WITH (DATASOURCE="devices", KIND="LOOKUP", TYPE="sql", KEY="id");`,
 			stmt: &ast.StreamStmt{
 				Name: ast.StreamName("table1"),
 				StreamFields: []ast.StreamField{
@@ -125,11 +125,20 @@ func TestParser_ParseTree(t *testing.T) {
 					STRICT_VALIDATION: true,
 					KIND:              ast.StreamKindLookup,
 					TYPE:              "sql",
+					KEY:               "id",
 				},
 				StreamType: ast.TypeTable,
 			},
 		},
 		{
+			s: `CREATE TABLE table1 (
+					name STRING,
+					size BIGINT,
+					id BIGINT
+				) WITH (DATASOURCE="devices", KIND="LOOKUP", TYPE="memory");`,
+			err: `Option "key" is required for memory lookup table.`,
+		},
+		{
 			s:    `SHOW STREAMS`,
 			stmt: &ast.ShowStreamsStatement{},
 		},