Quellcode durchsuchen

feat(lookup): lookup memory source

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang vor 2 Jahren
Ursprung
Commit
0da4f55519

+ 3 - 1
internal/binder/io/builtin.go

@@ -44,7 +44,9 @@ var (
 		"memory":      func() api.Sink { return memory.GetSink() },
 		"memory":      func() api.Sink { return memory.GetSink() },
 		"neuron":      func() api.Sink { return neuron.GetSink() },
 		"neuron":      func() api.Sink { return neuron.GetSink() },
 	}
 	}
-	lookupSources = map[string]NewLookupSourceFunc{}
+	lookupSources = map[string]NewLookupSourceFunc{
+		"memory": func() api.LookupSource { return memory.GetLookupSource() },
+	}
 )
 )
 
 
 type Manager struct{}
 type Manager struct{}

+ 5 - 1
internal/binder/mock/mock_factory.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -35,6 +35,10 @@ func (f *MockFactory) Source(name string) (api.Source, error) {
 	}
 	}
 }
 }
 
 
+func (f *MockFactory) LookupSource(name string) (api.LookupSource, error) {
+	return nil, errorx.NotFoundErr
+}
+
 func (f *MockFactory) Sink(name string) (api.Sink, error) {
 func (f *MockFactory) Sink(name string) (api.Sink, error) {
 	if strings.HasPrefix(name, "mock") {
 	if strings.HasPrefix(name, "mock") {
 		return &mockSink{}, nil
 		return &mockSink{}, nil

+ 19 - 1
internal/plugin/native/manager.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -543,6 +543,24 @@ func (rr *Manager) Source(name string) (api.Source, error) {
 	}
 	}
 }
 }
 
 
+func (rr *Manager) LookupSource(name string) (api.LookupSource, error) {
+	nf, err := rr.loadRuntime(plugin2.SOURCE, name+"Lookup", "")
+	if err != nil {
+		return nil, err
+	}
+	if nf == nil {
+		return nil, nil
+	}
+	switch t := nf.(type) {
+	case api.LookupSource:
+		return t, nil
+	case func() api.LookupSource:
+		return t(), nil
+	default:
+		return nil, fmt.Errorf("exported symbol %s is not type of api.LookupSource or function that return api.LookupSource", t)
+	}
+}
+
 func (rr *Manager) Sink(name string) (api.Sink, error) {
 func (rr *Manager) Sink(name string) (api.Sink, error) {
 	nf, err := rr.loadRuntime(plugin2.SINK, name, "")
 	nf, err := rr.loadRuntime(plugin2.SINK, name, "")
 	if err != nil {
 	if err != nil {

+ 6 - 1
internal/plugin/portable/factory.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -30,6 +30,11 @@ func (m *Manager) Source(name string) (api.Source, error) {
 	return runtime.NewPortableSource(name, meta), nil
 	return runtime.NewPortableSource(name, meta), nil
 }
 }
 
 
+func (m *Manager) LookupSource(_ string) (api.LookupSource, error) {
+	// TODO add support
+	return nil, nil
+}
+
 func (m *Manager) Sink(name string) (api.Sink, error) {
 func (m *Manager) Sink(name string) (api.Sink, error) {
 	meta, ok := m.GetPluginMeta(plugin.SINK, name)
 	meta, ok := m.GetPluginMeta(plugin.SINK, name)
 	if !ok {
 	if !ok {

+ 27 - 0
internal/topo/memory/export.go

@@ -0,0 +1,27 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package memory
+
+func GetSink() *sink {
+	return &sink{}
+}
+
+func GetSource() *source {
+	return &source{}
+}
+
+func GetLookupSource() *lookupsource {
+	return &lookupsource{}
+}

+ 63 - 0
internal/topo/memory/lookupsource.go

@@ -0,0 +1,63 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package memory
+
+import (
+	"github.com/lf-edge/ekuiper/internal/topo/memory/store"
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"regexp"
+	"strings"
+)
+
+// lookupsource is a lookup source that reads data from memory
+// The memory lookup table reads a global memory store for data
+type lookupsource struct {
+	topic      string
+	topicRegex *regexp.Regexp
+	key        string
+	keys       []string
+	table      *store.Table
+}
+
+func (s *lookupsource) Open(ctx api.StreamContext) error {
+	ctx.GetLogger().Infof("lookup source %s is opened with keys %v", s.topic, s.keys)
+	var err error
+	s.table, err = store.Reg(s.topic, s.topicRegex, s.key, s.keys)
+	return err
+}
+
+func (s *lookupsource) Configure(datasource string, _ map[string]interface{}, keys []string) error {
+	s.topic = datasource
+	if strings.ContainsAny(datasource, "+#") {
+		r, err := getRegexp(datasource)
+		if err != nil {
+			return err
+		}
+		s.topicRegex = r
+	}
+	s.keys = keys
+	s.key = strings.Join(keys, ",")
+	return nil
+}
+
+func (s *lookupsource) Lookup(ctx api.StreamContext, values []interface{}) ([]api.SourceTuple, error) {
+	ctx.GetLogger().Debugf("lookup source %s_%s is looking up %v", s.topic, s.key, values)
+	return s.table.Read(values)
+}
+
+func (s *lookupsource) Close(ctx api.StreamContext) error {
+	ctx.GetLogger().Infof("lookup source %s_%s is closing", s.topic, s.key)
+	return store.Unreg(s.topic, s.key)
+}

+ 74 - 0
internal/topo/memory/lookupsource_test.go

@@ -0,0 +1,74 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package memory
+
+import (
+	gocontext "context"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/topo/context"
+	"github.com/lf-edge/ekuiper/internal/topo/memory/pubsub"
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"reflect"
+	"testing"
+	"time"
+)
+
+func TestSingleKeyLookup(t *testing.T) {
+	contextLogger := conf.Log.WithField("rule", "test")
+	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
+	ls := GetLookupSource()
+	err := ls.Configure("test", map[string]interface{}{"option": "value"}, []string{"ff"})
+	if err != nil {
+		t.Error(err)
+		return
+	}
+	err = ls.Open(ctx)
+	if err != nil {
+		t.Error(err)
+		return
+	}
+	// wait for the source to be ready
+	time.Sleep(100 * time.Millisecond)
+	pubsub.Produce(ctx, "test", map[string]interface{}{"ff": "value1", "gg": "value2"})
+	pubsub.Produce(ctx, "test", map[string]interface{}{"ff": "value2", "gg": "value2"})
+	pubsub.Produce(ctx, "test", map[string]interface{}{"ff": "value1", "gg": "value4"})
+	// wait for table accumulation
+	time.Sleep(100 * time.Millisecond)
+	canctx, cancel := gocontext.WithCancel(gocontext.Background())
+	defer cancel()
+	go func() {
+		for {
+			select {
+			case <-canctx.Done():
+				return
+			case <-time.After(10 * time.Millisecond):
+				pubsub.Produce(ctx, "test", map[string]interface{}{"ff": "value4", "gg": "value2"})
+			}
+		}
+	}()
+	expected := []api.SourceTuple{
+		api.NewDefaultSourceTuple(map[string]interface{}{"ff": "value1", "gg": "value2"}, map[string]interface{}{"topic": "test"}),
+		api.NewDefaultSourceTuple(map[string]interface{}{"ff": "value1", "gg": "value4"}, map[string]interface{}{"topic": "test"}),
+	}
+	result, err := ls.Lookup(ctx, []interface{}{"value1"})
+	if !reflect.DeepEqual(result, expected) {
+		t.Errorf("expect %v but got %v", expected, result)
+	}
+	err = ls.Close(ctx)
+	if err != nil {
+		t.Error(err)
+		return
+	}
+}

+ 10 - 102
internal/topo/memory/manager_test.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2022 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -19,25 +19,19 @@ import (
 	"github.com/gdexlab/go-render/render"
 	"github.com/gdexlab/go-render/render"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
+	"github.com/lf-edge/ekuiper/internal/topo/memory/pubsub"
 	"github.com/lf-edge/ekuiper/internal/topo/state"
 	"github.com/lf-edge/ekuiper/internal/topo/state"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"reflect"
 	"reflect"
-	"regexp"
-	"strings"
 	"testing"
 	"testing"
 	"time"
 	"time"
 )
 )
 
 
-func reset() {
-	pubTopics = make(map[string]*pubConsumers)
-	subExps = make(map[string]*subChan)
-}
-
 func TestSharedInmemoryNode(t *testing.T) {
 func TestSharedInmemoryNode(t *testing.T) {
-	reset()
+	pubsub.Reset()
 	id := "test_id"
 	id := "test_id"
 	sinkProps := make(map[string]interface{})
 	sinkProps := make(map[string]interface{})
-	sinkProps[IdProperty] = id
+	sinkProps[pubsub.IdProperty] = id
 	src := GetSource()
 	src := GetSource()
 	snk := GetSink()
 	snk := GetSink()
 	contextLogger := conf.Log.WithField("rule", "test")
 	contextLogger := conf.Log.WithField("rule", "test")
@@ -56,7 +50,7 @@ func TestSharedInmemoryNode(t *testing.T) {
 		t.Error(err)
 		t.Error(err)
 		return
 		return
 	}
 	}
-	srcProps[IdProperty] = id
+	srcProps[pubsub.IdProperty] = id
 	err = src.Configure(id, srcProps)
 	err = src.Configure(id, srcProps)
 	if err != nil {
 	if err != nil {
 		t.Error(err)
 		t.Error(err)
@@ -65,9 +59,9 @@ func TestSharedInmemoryNode(t *testing.T) {
 		src.Open(ctx, consumer, errorChannel)
 		src.Open(ctx, consumer, errorChannel)
 	}()
 	}()
 
 
-	if _, contains := pubTopics[id]; !contains {
-		t.Errorf("there should be memory node for topic")
-	}
+	//if _, contains := pubTopics[id]; !contains {
+	//	t.Errorf("there should be memory node for topic")
+	//}
 
 
 	data := make(map[string]interface{})
 	data := make(map[string]interface{})
 	data["temperature"] = 33.0
 	data["temperature"] = 33.0
@@ -92,94 +86,8 @@ func TestSharedInmemoryNode(t *testing.T) {
 	}
 	}
 }
 }
 
 
-func TestCreateAndClose(t *testing.T) {
-	reset()
-	var (
-		sourceTopics = []string{"h/d1/c1/s2", "h/+/+/s1", "h/d3/#", "h/d1/c1/s2", "h/+/c1/s1"}
-		sinkTopics   = []string{"h/d1/c1/s1", "h/d1/c1/s2", "h/d2/c2/s1", "h/d3/c3/s1", "h/d1/c1/s1"}
-		chans        []chan api.SourceTuple
-	)
-	for i, topic := range sinkTopics {
-		CreatePub(topic)
-		var (
-			r   *regexp.Regexp
-			err error
-		)
-		if strings.ContainsAny(sourceTopics[i], "+#") {
-			r, err = getRegexp(sourceTopics[i])
-			if err != nil {
-				t.Error(err)
-				return
-			}
-		}
-		c := CreateSub(sourceTopics[i], r, fmt.Sprintf("%d", i), 100)
-		chans = append(chans, c)
-	}
-
-	expPub := map[string]*pubConsumers{
-		"h/d1/c1/s1": {
-			count: 2,
-			consumers: map[string]chan api.SourceTuple{
-				"1": chans[1],
-				"4": chans[4],
-			},
-		},
-		"h/d1/c1/s2": {
-			count: 1,
-			consumers: map[string]chan api.SourceTuple{
-				"0": chans[0],
-				"3": chans[3],
-			},
-		},
-		"h/d2/c2/s1": {
-			count: 1,
-			consumers: map[string]chan api.SourceTuple{
-				"1": chans[1],
-			},
-		},
-		"h/d3/c3/s1": {
-			count: 1,
-			consumers: map[string]chan api.SourceTuple{
-				"1": chans[1],
-				"2": chans[2],
-			},
-		},
-	}
-	if !reflect.DeepEqual(expPub, pubTopics) {
-		t.Errorf("Error adding: Expect\n\t%v\nbut got\n\t%v", render.AsCode(expPub), render.AsCode(pubTopics))
-		return
-	}
-	i := 0
-	for i < 3 {
-		CloseSourceConsumerChannel(sourceTopics[i], fmt.Sprintf("%d", i))
-		RemovePub(sinkTopics[i])
-		i++
-	}
-	expPub = map[string]*pubConsumers{
-		"h/d1/c1/s1": {
-			count: 1,
-			consumers: map[string]chan api.SourceTuple{
-				"4": chans[4],
-			},
-		},
-		"h/d1/c1/s2": {
-			count: 0,
-			consumers: map[string]chan api.SourceTuple{
-				"3": chans[3],
-			},
-		},
-		"h/d3/c3/s1": {
-			count:     1,
-			consumers: map[string]chan api.SourceTuple{},
-		},
-	}
-	if !reflect.DeepEqual(expPub, pubTopics) {
-		t.Errorf("Error closing: Expect\n\t%v\nbut got\n\t %v", render.AsCode(expPub), render.AsCode(pubTopics))
-	}
-}
-
 func TestMultipleTopics(t *testing.T) {
 func TestMultipleTopics(t *testing.T) {
-	reset()
+	pubsub.Reset()
 	var (
 	var (
 		sourceTopics = []string{"h/d1/c1/s2", "h/+/+/s1", "h/d3/#", "h/d1/c1/s2", "h/+/c1/s1"}
 		sourceTopics = []string{"h/d1/c1/s2", "h/+/+/s1", "h/d3/#", "h/d1/c1/s2", "h/+/c1/s1"}
 		sinkTopics   = []string{"h/d1/c1/s1", "h/d1/c1/s2", "h/d2/c2/s1", "h/d3/c3/s1"}
 		sinkTopics   = []string{"h/d1/c1/s1", "h/d1/c1/s2", "h/d2/c2/s1", "h/d3/c3/s1"}
@@ -480,7 +388,7 @@ func TestMultipleTopics(t *testing.T) {
 		for c < 3 {
 		for c < 3 {
 			for i, v := range sinkData {
 			for i, v := range sinkData {
 				time.Sleep(10 * time.Millisecond)
 				time.Sleep(10 * time.Millisecond)
-				Produce(ctx, sinkTopics[i], v[c])
+				pubsub.Produce(ctx, sinkTopics[i], v[c])
 			}
 			}
 			c++
 			c++
 		}
 		}

+ 7 - 9
internal/topo/memory/manager.go

@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // See the License for the specific language governing permissions and
 // limitations under the License.
 // limitations under the License.
 
 
-package memory
+package pubsub
 
 
 import (
 import (
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
@@ -40,14 +40,6 @@ var (
 	mu        = sync.RWMutex{}
 	mu        = sync.RWMutex{}
 )
 )
 
 
-func GetSink() *sink {
-	return &sink{}
-}
-
-func GetSource() *source {
-	return &source{}
-}
-
 func CreatePub(topic string) {
 func CreatePub(topic string) {
 	mu.Lock()
 	mu.Lock()
 	defer mu.Unlock()
 	defer mu.Unlock()
@@ -186,3 +178,9 @@ func removePubConsumer(topic string, sourceId string, c *pubConsumers) {
 		delete(pubTopics, topic)
 		delete(pubTopics, topic)
 	}
 	}
 }
 }
+
+// Reset For testing only
+func Reset() {
+	pubTopics = make(map[string]*pubConsumers)
+	subExps = make(map[string]*subChan)
+}

+ 126 - 0
internal/topo/memory/pubsub/manager_test.go

@@ -0,0 +1,126 @@
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pubsub
+
+import (
+	"fmt"
+	"github.com/gdexlab/go-render/render"
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"reflect"
+	"regexp"
+	"strings"
+	"testing"
+)
+
+func TestCreateAndClose(t *testing.T) {
+	Reset()
+	var (
+		sourceTopics = []string{"h/d1/c1/s2", "h/+/+/s1", "h/d3/#", "h/d1/c1/s2", "h/+/c1/s1"}
+		sinkTopics   = []string{"h/d1/c1/s1", "h/d1/c1/s2", "h/d2/c2/s1", "h/d3/c3/s1", "h/d1/c1/s1"}
+		chans        []chan api.SourceTuple
+	)
+	for i, topic := range sinkTopics {
+		CreatePub(topic)
+		var (
+			r   *regexp.Regexp
+			err error
+		)
+		if strings.ContainsAny(sourceTopics[i], "+#") {
+			r, err = getRegexp(sourceTopics[i])
+			if err != nil {
+				t.Error(err)
+				return
+			}
+		}
+		c := CreateSub(sourceTopics[i], r, fmt.Sprintf("%d", i), 100)
+		chans = append(chans, c)
+	}
+
+	expPub := map[string]*pubConsumers{
+		"h/d1/c1/s1": {
+			count: 2,
+			consumers: map[string]chan api.SourceTuple{
+				"1": chans[1],
+				"4": chans[4],
+			},
+		},
+		"h/d1/c1/s2": {
+			count: 1,
+			consumers: map[string]chan api.SourceTuple{
+				"0": chans[0],
+				"3": chans[3],
+			},
+		},
+		"h/d2/c2/s1": {
+			count: 1,
+			consumers: map[string]chan api.SourceTuple{
+				"1": chans[1],
+			},
+		},
+		"h/d3/c3/s1": {
+			count: 1,
+			consumers: map[string]chan api.SourceTuple{
+				"1": chans[1],
+				"2": chans[2],
+			},
+		},
+	}
+	if !reflect.DeepEqual(expPub, pubTopics) {
+		t.Errorf("Error adding: Expect\n\t%v\nbut got\n\t%v", render.AsCode(expPub), render.AsCode(pubTopics))
+		return
+	}
+	i := 0
+	for i < 3 {
+		CloseSourceConsumerChannel(sourceTopics[i], fmt.Sprintf("%d", i))
+		RemovePub(sinkTopics[i])
+		i++
+	}
+	expPub = map[string]*pubConsumers{
+		"h/d1/c1/s1": {
+			count: 1,
+			consumers: map[string]chan api.SourceTuple{
+				"4": chans[4],
+			},
+		},
+		"h/d1/c1/s2": {
+			count: 0,
+			consumers: map[string]chan api.SourceTuple{
+				"3": chans[3],
+			},
+		},
+		"h/d3/c3/s1": {
+			count:     1,
+			consumers: map[string]chan api.SourceTuple{},
+		},
+	}
+	if !reflect.DeepEqual(expPub, pubTopics) {
+		t.Errorf("Error closing: Expect\n\t%v\nbut got\n\t %v", render.AsCode(expPub), render.AsCode(pubTopics))
+	}
+}
+
+func getRegexp(topic string) (*regexp.Regexp, error) {
+	if len(topic) == 0 {
+		return nil, fmt.Errorf("invalid empty topic")
+	}
+
+	levels := strings.Split(topic, "/")
+	for i, level := range levels {
+		if level == "#" && i != len(levels)-1 {
+			return nil, fmt.Errorf("invalid topic %s: # must at the last level", topic)
+		}
+	}
+	regstr := strings.Replace(strings.ReplaceAll(topic, "+", "([^/]+)"), "#", ".", 1)
+	return regexp.Compile(regstr)
+}

+ 6 - 5
internal/topo/memory/sink.go

@@ -17,6 +17,7 @@ package memory
 import (
 import (
 	"encoding/json"
 	"encoding/json"
 	"fmt"
 	"fmt"
+	"github.com/lf-edge/ekuiper/internal/topo/memory/pubsub"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"strings"
 	"strings"
 )
 )
@@ -28,12 +29,12 @@ type sink struct {
 
 
 func (s *sink) Open(ctx api.StreamContext) error {
 func (s *sink) Open(ctx api.StreamContext) error {
 	ctx.GetLogger().Debugf("Opening memory sink: %v", s.topic)
 	ctx.GetLogger().Debugf("Opening memory sink: %v", s.topic)
-	CreatePub(s.topic)
+	pubsub.CreatePub(s.topic)
 	return nil
 	return nil
 }
 }
 
 
 func (s *sink) Configure(props map[string]interface{}) error {
 func (s *sink) Configure(props map[string]interface{}) error {
-	if t, ok := props[IdProperty]; ok {
+	if t, ok := props[pubsub.IdProperty]; ok {
 		if id, casted := t.(string); casted {
 		if id, casted := t.(string); casted {
 			if strings.ContainsAny(id, "#+") {
 			if strings.ContainsAny(id, "#+") {
 				return fmt.Errorf("invalid memory topic %s: wildcard found", id)
 				return fmt.Errorf("invalid memory topic %s: wildcard found", id)
@@ -71,10 +72,10 @@ func (s *sink) Collect(ctx api.StreamContext, data interface{}) error {
 	switch d := data.(type) {
 	switch d := data.(type) {
 	case []map[string]interface{}:
 	case []map[string]interface{}:
 		for _, el := range d {
 		for _, el := range d {
-			Produce(ctx, topic, el)
+			pubsub.Produce(ctx, topic, el)
 		}
 		}
 	case map[string]interface{}:
 	case map[string]interface{}:
-		Produce(ctx, topic, d)
+		pubsub.Produce(ctx, topic, d)
 	default:
 	default:
 		return fmt.Errorf("unrecognized format of %s", data)
 		return fmt.Errorf("unrecognized format of %s", data)
 	}
 	}
@@ -83,6 +84,6 @@ func (s *sink) Collect(ctx api.StreamContext, data interface{}) error {
 
 
 func (s *sink) Close(ctx api.StreamContext) error {
 func (s *sink) Close(ctx api.StreamContext) error {
 	ctx.GetLogger().Debugf("closing memory sink")
 	ctx.GetLogger().Debugf("closing memory sink")
-	RemovePub(s.topic)
+	pubsub.RemovePub(s.topic)
 	return nil
 	return nil
 }
 }

+ 3 - 2
internal/topo/memory/source.go

@@ -16,6 +16,7 @@ package memory
 
 
 import (
 import (
 	"fmt"
 	"fmt"
+	"github.com/lf-edge/ekuiper/internal/topo/memory/pubsub"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"regexp"
 	"regexp"
@@ -29,7 +30,7 @@ type source struct {
 }
 }
 
 
 func (s *source) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, _ chan<- error) {
 func (s *source) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, _ chan<- error) {
-	ch := CreateSub(s.topic, s.topicRegex, fmt.Sprintf("%s_%s_%d", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId()), s.bufferLength)
+	ch := pubsub.CreateSub(s.topic, s.topicRegex, fmt.Sprintf("%s_%s_%d", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId()), s.bufferLength)
 	for {
 	for {
 		select {
 		select {
 		case v, opened := <-ch:
 		case v, opened := <-ch:
@@ -78,6 +79,6 @@ func getRegexp(topic string) (*regexp.Regexp, error) {
 
 
 func (s *source) Close(ctx api.StreamContext) error {
 func (s *source) Close(ctx api.StreamContext) error {
 	ctx.GetLogger().Debugf("closing memory source")
 	ctx.GetLogger().Debugf("closing memory source")
-	CloseSourceConsumerChannel(s.topic, fmt.Sprintf("%s_%s_%d", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId()))
+	pubsub.CloseSourceConsumerChannel(s.topic, fmt.Sprintf("%s_%s_%d", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId()))
 	return nil
 	return nil
 }
 }

+ 148 - 0
internal/topo/memory/store/db.go

@@ -0,0 +1,148 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package store
+
+import (
+	"context"
+	"fmt"
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"sync"
+)
+
+type tableCount struct {
+	sync.RWMutex
+	count int
+	t     *Table
+}
+
+func (tc *tableCount) Increase() int {
+	tc.Lock()
+	defer tc.Unlock()
+	tc.count++
+	return tc.count
+}
+
+func (tc *tableCount) Decrease() int {
+	tc.Lock()
+	defer tc.Unlock()
+	tc.count--
+	if tc.count < 0 {
+		fmt.Errorf("Table count is less than 0: %d", tc.count)
+	}
+	return tc.count
+}
+
+type database struct {
+	sync.RWMutex
+	tables map[string]map[string]*tableCount // topic: index: []value
+}
+
+// getTable return the table of the topic/values.
+// The second bool indicates if the topic exists
+func (db *database) getTable(topic string, key string) (*Table, bool) {
+	db.RLock()
+	defer db.RUnlock()
+	r, ok := db.tables[topic]
+	if !ok {
+		return nil, false
+	}
+	tc, ok := r[key]
+	if ok {
+		return tc.t, true
+	} else {
+		return nil, false
+	}
+}
+
+// addTable add a table to the database
+// If the table already exists, return the existing table;
+// otherwise, create a new table and return it.
+// The second argument is to indicate if the table is newly created
+func (db *database) addTable(topic string, key string) (*Table, bool) {
+	db.Lock()
+	defer db.Unlock()
+	r, ok := db.tables[topic]
+	if !ok {
+		r = make(map[string]*tableCount)
+		db.tables[topic] = r
+	}
+	tc, ok := r[key]
+	if ok {
+		tc.Increase()
+	} else {
+		t := createTable()
+		tc = &tableCount{
+			count: 1,
+			t:     t,
+		}
+		r[key] = tc
+	}
+	return tc.t, !ok
+}
+
+// dropTable drop the table of the topic/values
+// stops to accumulate job
+// deletes the cache data
+func (db *database) dropTable(topic string, key string) error {
+	db.Lock()
+	defer db.Unlock()
+	if r, ok := db.tables[topic]; ok {
+		if tc, tok := r[key]; tok {
+			if tc.Decrease() == 0 {
+				if tc.t != nil && tc.t.cancel != nil {
+					tc.t.cancel()
+				}
+				delete(r, key)
+			}
+			return nil
+		}
+	}
+	return fmt.Errorf("Table %s/%s not found", topic, key)
+}
+
+// Table has one writer and multiple reader
+type Table struct {
+	sync.RWMutex
+	datamap map[string][]api.SourceTuple
+	cancel  context.CancelFunc
+}
+
+func createTable() *Table {
+	return &Table{
+		datamap: make(map[string][]api.SourceTuple),
+	}
+}
+
+func (t *Table) add(key string, value api.SourceTuple) {
+	t.Lock()
+	defer t.Unlock()
+	t.datamap[key] = append(t.datamap[key], value)
+}
+
+func (t *Table) Read(values []interface{}) ([]api.SourceTuple, error) {
+	t.RLock()
+	defer t.RUnlock()
+	mapkey := ""
+	for _, k := range values {
+		mapkey += fmt.Sprintf("%v,", k)
+	}
+	return t.datamap[mapkey], nil
+}
+
+var (
+	db = &database{
+		tables: make(map[string]map[string]*tableCount),
+	}
+)

+ 113 - 0
internal/topo/memory/store/db_test.go

@@ -0,0 +1,113 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package store
+
+import (
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"reflect"
+	"testing"
+)
+
+func TestTable(t *testing.T) {
+	tb := createTable()
+	tb.add("1,", api.NewDefaultSourceTuple(map[string]interface{}{"a": 1}, nil))
+	tb.add("2,", api.NewDefaultSourceTuple(map[string]interface{}{"a": 2}, nil))
+	tb.add("3,", api.NewDefaultSourceTuple(map[string]interface{}{"a": 3}, nil))
+	tb.add("1,", api.NewDefaultSourceTuple(map[string]interface{}{"a": 4}, nil))
+	v, _ := tb.Read([]interface{}{"1"})
+	exp := []api.SourceTuple{
+		api.NewDefaultSourceTuple(map[string]interface{}{"a": 1}, nil),
+		api.NewDefaultSourceTuple(map[string]interface{}{"a": 4}, nil),
+	}
+	if !reflect.DeepEqual(v, exp) {
+		t.Errorf("read 1 expect %v, but got %v", exp, v)
+		return
+	}
+	v, _ = tb.Read([]interface{}{"3"})
+	exp = []api.SourceTuple{
+		api.NewDefaultSourceTuple(map[string]interface{}{"a": 3}, nil),
+	}
+	if !reflect.DeepEqual(v, exp) {
+		t.Errorf("read 3 expect %v, but got %v", exp, v)
+		return
+	}
+	tb.add("1,3,", api.NewDefaultSourceTuple(map[string]interface{}{"a": 5}, nil))
+	tb.add("1,", api.NewDefaultSourceTuple(map[string]interface{}{"a": 6}, nil))
+	v, _ = tb.Read([]interface{}{"1"})
+	exp = []api.SourceTuple{
+		api.NewDefaultSourceTuple(map[string]interface{}{"a": 1}, nil),
+		api.NewDefaultSourceTuple(map[string]interface{}{"a": 4}, nil),
+		api.NewDefaultSourceTuple(map[string]interface{}{"a": 6}, nil),
+	}
+	if !reflect.DeepEqual(v, exp) {
+		t.Errorf("read 1 again expect %v, but got %v", exp, v)
+		return
+	}
+	v, _ = tb.Read([]interface{}{"1", "3"})
+	exp = []api.SourceTuple{
+		api.NewDefaultSourceTuple(map[string]interface{}{"a": 5}, nil),
+	}
+	if !reflect.DeepEqual(v, exp) {
+		t.Errorf("read 1,3 expect %v, but got %v", exp, v)
+		return
+	}
+}
+
+func TestDb(t *testing.T) {
+	db = &database{
+		tables: make(map[string]map[string]*tableCount),
+	}
+	db.addTable("t1", "a")
+	db.addTable("t1", "b")
+	db.addTable("t2", "a")
+	db.addTable("t1", "a")
+	_, ok := db.getTable("t1", "a")
+	if !ok {
+		t.Errorf("table t1 a should exist")
+		return
+	}
+	_, ok = db.getTable("t1", "b")
+	if !ok {
+		t.Errorf("table t1 b should exist")
+		return
+	}
+	_, ok = db.getTable("t1", "c")
+	if ok {
+		t.Errorf("table t1 c should not exist")
+		return
+	}
+	tc := db.tables["t1"]["a"]
+	if tc.count != 2 {
+		t.Errorf("table t1 a should have 2 instances")
+		return
+	}
+	tc = db.tables["t2"]["a"]
+	if tc.count != 1 {
+		t.Errorf("table t1 a should have 1 instances")
+		return
+	}
+	db.dropTable("t1", "a")
+	db.dropTable("t2", "a")
+	_, ok = db.getTable("t2", "a")
+	if ok {
+		t.Errorf("table ta a should not exist")
+		return
+	}
+	tc = db.tables["t1"]["a"]
+	if tc.count != 1 {
+		t.Errorf("table t1 a should have 2 instances")
+		return
+	}
+}

+ 65 - 0
internal/topo/memory/store/store.go

@@ -0,0 +1,65 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package store
+
+import (
+	"context"
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/topo/memory/pubsub"
+	"regexp"
+)
+
+// Reg registers a topic to save it to memory store
+// Create a new go routine to listen to the topic and save the data to memory
+func Reg(topic string, topicRegex *regexp.Regexp, key string, keys []string) (*Table, error) {
+	t, isNew := db.addTable(topic, key)
+	if isNew {
+		go runTable(topic, topicRegex, key, keys, t)
+	}
+	return t, nil
+}
+
+// runTable should only run in a single instance.
+// This go routine is used to accumulate data in memory
+// If the go routine close, the go routine exits but the data will be kept until table dropped
+func runTable(topic string, topicRegex *regexp.Regexp, key string, keys []string, t *Table) {
+	conf.Log.Infof("runTable %s_%s", topic, key)
+	ch := pubsub.CreateSub(topic, topicRegex, fmt.Sprintf("store_%s_%s", topic, key), 1024)
+	ctx, cancel := context.WithCancel(context.Background())
+	t.cancel = cancel
+	for {
+		select {
+		case v, opened := <-ch:
+			if !opened { // exit go routine is not sync with drop table
+				return
+			}
+			mapkey := ""
+			for _, k := range keys {
+				mapkey += fmt.Sprintf("%v,", v.Message()[k])
+			}
+			t.add(mapkey, v)
+			conf.Log.Debugf("receive data %v for %s_%s", v, topic, key)
+		case <-ctx.Done():
+			return
+		}
+	}
+}
+
+// Unreg unregisters a topic to remove it from memory store
+func Unreg(topic string, key string) error {
+	// Must be an atomic operation
+	return db.dropTable(topic, key)
+}

+ 65 - 0
internal/topo/memory/store/store_test.go

@@ -0,0 +1,65 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package store
+
+import (
+	"reflect"
+	"testing"
+)
+
+func TestReg(t *testing.T) {
+	db = &database{
+		tables: make(map[string]map[string]*tableCount),
+	}
+	reg1, err := Reg("test", nil, "a", []string{"a"})
+	if err != nil {
+		t.Errorf("register test error: %v", err)
+		return
+	}
+	reg2, err2 := Reg("test", nil, "a,b", []string{"a", "b"})
+	if err2 != nil {
+		t.Errorf("register test error: %v", err2)
+		return
+	}
+	exp := map[string]map[string]*tableCount{
+		"test": {
+			"a": &tableCount{
+				count: 1,
+				t:     reg1,
+			},
+			"a,b": &tableCount{
+				count: 1,
+				t:     reg2,
+			},
+		},
+	}
+	if !reflect.DeepEqual(exp, db.tables) {
+		t.Errorf("register expect %v, but got %v", exp, db.tables)
+		return
+	}
+	Unreg("test", "a,b")
+	exp = map[string]map[string]*tableCount{
+		"test": {
+			"a": &tableCount{
+				count: 1,
+				t:     reg1,
+			},
+		},
+	}
+	if !reflect.DeepEqual(exp, db.tables) {
+		t.Errorf("unregister expect %v, but got %v", exp, db.tables)
+		return
+	}
+}

+ 5 - 5
internal/topo/neuron/connection.go

@@ -19,7 +19,7 @@ import (
 	"fmt"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	kctx "github.com/lf-edge/ekuiper/internal/topo/context"
 	kctx "github.com/lf-edge/ekuiper/internal/topo/context"
-	"github.com/lf-edge/ekuiper/internal/topo/memory"
+	"github.com/lf-edge/ekuiper/internal/topo/memory/pubsub"
 	"github.com/lf-edge/ekuiper/internal/topo/state"
 	"github.com/lf-edge/ekuiper/internal/topo/state"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
@@ -68,7 +68,7 @@ func createOrGetConnection(sc api.StreamContext, url string) error {
 			return err
 			return err
 		}
 		}
 		sc.GetLogger().Infof("Neuron connected")
 		sc.GetLogger().Infof("Neuron connected")
-		memory.CreatePub(NeuronTopic)
+		pubsub.CreatePub(NeuronTopic)
 		go run(sctx)
 		go run(sctx)
 	}
 	}
 	connectionCount++
 	connectionCount++
@@ -79,7 +79,7 @@ func closeConnection(ctx api.StreamContext, url string) error {
 	m.Lock()
 	m.Lock()
 	defer m.Unlock()
 	defer m.Unlock()
 	ctx.GetLogger().Infof("closeConnection count: %d", connectionCount)
 	ctx.GetLogger().Infof("closeConnection count: %d", connectionCount)
-	memory.RemovePub(NeuronTopic)
+	pubsub.RemovePub(NeuronTopic)
 	if connectionCount == 1 {
 	if connectionCount == 1 {
 		err := disconnect(url)
 		err := disconnect(url)
 		if err != nil {
 		if err != nil {
@@ -114,7 +114,7 @@ func connect(ctx api.StreamContext, url string) error {
 		case mangos.PipeEventDetached:
 		case mangos.PipeEventDetached:
 			atomic.StoreInt32(&opened, 0)
 			atomic.StoreInt32(&opened, 0)
 			conf.Log.Warnf("neuron connection detached")
 			conf.Log.Warnf("neuron connection detached")
-			memory.ProduceError(ctx, NeuronTopic, fmt.Errorf("neuron connection detached"))
+			pubsub.ProduceError(ctx, NeuronTopic, fmt.Errorf("neuron connection detached"))
 		}
 		}
 	})
 	})
 	//sock.SetOption(mangos.OptionWriteQLen, 100)
 	//sock.SetOption(mangos.OptionWriteQLen, 100)
@@ -145,7 +145,7 @@ func run(ctx api.StreamContext) {
 				ctx.GetLogger().Errorf("neuron decode message error %v", err)
 				ctx.GetLogger().Errorf("neuron decode message error %v", err)
 				continue
 				continue
 			}
 			}
-			memory.Produce(ctx, NeuronTopic, result)
+			pubsub.Produce(ctx, NeuronTopic, result)
 		} else if err == mangos.ErrClosed {
 		} else if err == mangos.ErrClosed {
 			ctx.GetLogger().Infof("neuron connection closed, exit receiving loop")
 			ctx.GetLogger().Infof("neuron connection closed, exit receiving loop")
 			return
 			return

+ 3 - 3
internal/topo/neuron/source.go

@@ -16,7 +16,7 @@ package neuron
 
 
 import (
 import (
 	"fmt"
 	"fmt"
-	"github.com/lf-edge/ekuiper/internal/topo/memory"
+	"github.com/lf-edge/ekuiper/internal/topo/memory/pubsub"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/infra"
 	"github.com/lf-edge/ekuiper/pkg/infra"
@@ -45,8 +45,8 @@ func (s *source) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, er
 		return
 		return
 	}
 	}
 	defer closeConnection(ctx, s.url)
 	defer closeConnection(ctx, s.url)
-	ch := memory.CreateSub(NeuronTopic, nil, fmt.Sprintf("%s_%s_%d", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId()), s.bufferLength)
-	defer memory.CloseSourceConsumerChannel(NeuronTopic, fmt.Sprintf("%s_%s_%d", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId()))
+	ch := pubsub.CreateSub(NeuronTopic, nil, fmt.Sprintf("%s_%s_%d", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId()), s.bufferLength)
+	defer pubsub.CloseSourceConsumerChannel(NeuronTopic, fmt.Sprintf("%s_%s_%d", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId()))
 	for {
 	for {
 		select {
 		select {
 		case v, opened := <-ch:
 		case v, opened := <-ch:

+ 2 - 1
internal/topo/node/lookup_node.go

@@ -193,7 +193,8 @@ func (n *LookupNode) lookup(ctx api.StreamContext, d xsql.TupleRow, fv *xsql.Fun
 			merged.AddTuple(d)
 			merged.AddTuple(d)
 			t := &xsql.Tuple{
 			t := &xsql.Tuple{
 				Emitter:   n.name,
 				Emitter:   n.name,
-				Message:   v,
+				Message:   v.Message(),
+				Metadata:  v.Meta(),
 				Timestamp: conf.GetNowInMilli(),
 				Timestamp: conf.GetNowInMilli(),
 			}
 			}
 			merged.AddTuple(t)
 			merged.AddTuple(t)

+ 14 - 14
internal/topo/node/lookup_node_test.go

@@ -40,50 +40,50 @@ func (m *mockLookupSrc) Configure(_ string, _ map[string]interface{}, _ []string
 }
 }
 
 
 // Lookup accept int value as the first array value
 // Lookup accept int value as the first array value
-func (m *mockLookupSrc) Lookup(_ api.StreamContext, values []interface{}) ([]map[string]interface{}, error) {
+func (m *mockLookupSrc) Lookup(_ api.StreamContext, values []interface{}) ([]api.SourceTuple, error) {
 	a1, ok := values[0].(int)
 	a1, ok := values[0].(int)
 	if ok {
 	if ok {
-		var result []map[string]interface{}
+		var result []api.SourceTuple
 		c := a1 % 2
 		c := a1 % 2
 		if c != 0 {
 		if c != 0 {
-			result = append(result, map[string]interface{}{
+			result = append(result, api.NewDefaultSourceTuple(map[string]interface{}{
 				"newA": c,
 				"newA": c,
 				"newB": c * 2,
 				"newB": c * 2,
-			})
+			}, nil))
 		}
 		}
 		c = a1 % 3
 		c = a1 % 3
 		if c != 0 {
 		if c != 0 {
-			result = append(result, map[string]interface{}{
+			result = append(result, api.NewDefaultSourceTuple(map[string]interface{}{
 				"newA": c,
 				"newA": c,
 				"newB": c * 2,
 				"newB": c * 2,
-			})
+			}, nil))
 		}
 		}
 		c = a1 % 5
 		c = a1 % 5
 		if c != 0 {
 		if c != 0 {
-			result = append(result, map[string]interface{}{
+			result = append(result, api.NewDefaultSourceTuple(map[string]interface{}{
 				"newA": c,
 				"newA": c,
 				"newB": c * 2,
 				"newB": c * 2,
-			})
+			}, nil))
 		}
 		}
 		c = a1 % 7
 		c = a1 % 7
 		if c != 0 {
 		if c != 0 {
-			result = append(result, map[string]interface{}{
+			result = append(result, api.NewDefaultSourceTuple(map[string]interface{}{
 				"newA": c,
 				"newA": c,
 				"newB": c * 2,
 				"newB": c * 2,
-			})
+			}, nil))
 		}
 		}
 		return result, nil
 		return result, nil
 	} else {
 	} else {
-		return []map[string]interface{}{
-			{
+		return []api.SourceTuple{
+			api.NewDefaultSourceTuple(map[string]interface{}{
 				"newA": 0,
 				"newA": 0,
 				"newB": 0,
 				"newB": 0,
-			},
+			}, nil),
 		}, nil
 		}, nil
 	}
 	}
 }
 }
 
 
-func (m *mockLookupSrc) Close(ctx api.StreamContext) error {
+func (m *mockLookupSrc) Close(_ api.StreamContext) error {
 	// do nothing
 	// do nothing
 	return nil
 	return nil
 }
 }

+ 3 - 3
internal/topo/source/httppush_source.go

@@ -17,7 +17,7 @@ package source
 import (
 import (
 	"fmt"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
-	"github.com/lf-edge/ekuiper/internal/topo/memory"
+	"github.com/lf-edge/ekuiper/internal/topo/memory/pubsub"
 	"github.com/lf-edge/ekuiper/internal/topo/source/httpserver"
 	"github.com/lf-edge/ekuiper/internal/topo/source/httpserver"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
@@ -70,8 +70,8 @@ func (hps *HTTPPushSource) Open(ctx api.StreamContext, consumer chan<- api.Sourc
 		return
 		return
 	}
 	}
 	defer httpserver.UnregisterEndpoint(hps.conf.Endpoint)
 	defer httpserver.UnregisterEndpoint(hps.conf.Endpoint)
-	ch := memory.CreateSub(t, nil, fmt.Sprintf("%s_%s_%d", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId()), hps.conf.BufferLength)
-	defer memory.CloseSourceConsumerChannel(t, fmt.Sprintf("%s_%s_%d", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId()))
+	ch := pubsub.CreateSub(t, nil, fmt.Sprintf("%s_%s_%d", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId()), hps.conf.BufferLength)
+	defer pubsub.CloseSourceConsumerChannel(t, fmt.Sprintf("%s_%s_%d", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId()))
 	for {
 	for {
 		select {
 		select {
 		case <-done: // http data server error
 		case <-done: // http data server error

+ 5 - 5
internal/topo/source/httpserver/data_server.go

@@ -21,7 +21,7 @@ import (
 	"github.com/gorilla/mux"
 	"github.com/gorilla/mux"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	kctx "github.com/lf-edge/ekuiper/internal/topo/context"
 	kctx "github.com/lf-edge/ekuiper/internal/topo/context"
-	"github.com/lf-edge/ekuiper/internal/topo/memory"
+	"github.com/lf-edge/ekuiper/internal/topo/memory/pubsub"
 	"github.com/lf-edge/ekuiper/internal/topo/state"
 	"github.com/lf-edge/ekuiper/internal/topo/state"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"net/http"
 	"net/http"
@@ -75,7 +75,7 @@ func RegisterEndpoint(endpoint string, method string, _ string) (string, chan st
 		return "", nil, err
 		return "", nil, err
 	}
 	}
 	topic := TopicPrefix + endpoint
 	topic := TopicPrefix + endpoint
-	memory.CreatePub(topic)
+	pubsub.CreatePub(topic)
 	router.HandleFunc(endpoint, func(w http.ResponseWriter, r *http.Request) {
 	router.HandleFunc(endpoint, func(w http.ResponseWriter, r *http.Request) {
 		sctx.GetLogger().Debugf("receive http request: %s", r.URL.String())
 		sctx.GetLogger().Debugf("receive http request: %s", r.URL.String())
 		defer r.Body.Close()
 		defer r.Body.Close()
@@ -83,11 +83,11 @@ func RegisterEndpoint(endpoint string, method string, _ string) (string, chan st
 		err := json.NewDecoder(r.Body).Decode(&m)
 		err := json.NewDecoder(r.Body).Decode(&m)
 		if err != nil {
 		if err != nil {
 			handleError(w, err, "Fail to decode data")
 			handleError(w, err, "Fail to decode data")
-			memory.ProduceError(sctx, topic, fmt.Errorf("fail to decode data %s: %v", r.Body, err))
+			pubsub.ProduceError(sctx, topic, fmt.Errorf("fail to decode data %s: %v", r.Body, err))
 			return
 			return
 		}
 		}
 		sctx.GetLogger().Debugf("httppush received message %s", m)
 		sctx.GetLogger().Debugf("httppush received message %s", m)
-		memory.Produce(sctx, topic, m)
+		pubsub.Produce(sctx, topic, m)
 		w.WriteHeader(http.StatusOK)
 		w.WriteHeader(http.StatusOK)
 		_, _ = w.Write([]byte("ok"))
 		_, _ = w.Write([]byte("ok"))
 	}).Methods(method)
 	}).Methods(method)
@@ -97,7 +97,7 @@ func RegisterEndpoint(endpoint string, method string, _ string) (string, chan st
 func UnregisterEndpoint(endpoint string) {
 func UnregisterEndpoint(endpoint string) {
 	lock.Lock()
 	lock.Lock()
 	defer lock.Unlock()
 	defer lock.Unlock()
-	memory.RemovePub(TopicPrefix + endpoint)
+	pubsub.RemovePub(TopicPrefix + endpoint)
 	refCount--
 	refCount--
 	// TODO async close server
 	// TODO async close server
 	if refCount == 0 {
 	if refCount == 0 {

+ 1 - 1
pkg/api/stream.go

@@ -85,7 +85,7 @@ type LookupSource interface {
 	//read from the yaml
 	//read from the yaml
 	Configure(datasource string, props map[string]interface{}, lookupKeys []string) error
 	Configure(datasource string, props map[string]interface{}, lookupKeys []string) error
 	// Lookup receive lookup values to construct the query and return query results
 	// Lookup receive lookup values to construct the query and return query results
-	Lookup(ctx StreamContext, values []interface{}) ([]map[string]interface{}, error)
+	Lookup(ctx StreamContext, values []interface{}) ([]SourceTuple, error)
 	Closable
 	Closable
 }
 }