Quellcode durchsuchen

feat(memory): memory source data to include the topic in metadata

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang vor 3 Jahren
Ursprung
Commit
68d8cd9bd1

+ 11 - 13
internal/topo/memory/manager.go

@@ -25,12 +25,12 @@ const IdProperty = "topic"
 
 type pubConsumers struct {
 	count     int
-	consumers map[string]chan map[string]interface{} // The consumer channel list [sourceId]chan
+	consumers map[string]chan api.SourceTuple // The consumer channel list [sourceId]chan
 }
 
 type subChan struct {
 	regex *regexp.Regexp
-	ch    chan map[string]interface{}
+	ch    chan api.SourceTuple
 }
 
 var (
@@ -57,7 +57,7 @@ func createPub(topic string) {
 	}
 	c := &pubConsumers{
 		count:     1,
-		consumers: make(map[string]chan map[string]interface{}),
+		consumers: make(map[string]chan api.SourceTuple),
 	}
 	pubTopics[topic] = c
 	for sourceId, sc := range subExps {
@@ -67,10 +67,10 @@ func createPub(topic string) {
 	}
 }
 
-func createSub(wildcard string, regex *regexp.Regexp, sourceId string) chan map[string]interface{} {
+func createSub(wildcard string, regex *regexp.Regexp, sourceId string) chan api.SourceTuple {
 	mu.Lock()
 	defer mu.Unlock()
-	ch := make(chan map[string]interface{})
+	ch := make(chan api.SourceTuple)
 	if regex != nil {
 		subExps[sourceId] = &subChan{
 			regex: regex,
@@ -87,7 +87,7 @@ func createSub(wildcard string, regex *regexp.Regexp, sourceId string) chan map[
 	return ch
 }
 
-func closeSourceConsumerChannel(topic string, sourceId string) error {
+func closeSourceConsumerChannel(topic string, sourceId string) {
 	mu.Lock()
 	defer mu.Unlock()
 
@@ -102,10 +102,9 @@ func closeSourceConsumerChannel(topic string, sourceId string) error {
 			removePubConsumer(topic, sourceId, sinkConsumerChannels)
 		}
 	}
-	return nil
 }
 
-func closeSink(topic string) error {
+func closeSink(topic string) {
 	mu.Lock()
 	defer mu.Unlock()
 
@@ -115,7 +114,6 @@ func closeSink(topic string) error {
 			delete(pubTopics, topic)
 		}
 	}
-	return nil
 }
 
 func produce(ctx api.StreamContext, topic string, data map[string]interface{}) {
@@ -129,9 +127,9 @@ func produce(ctx api.StreamContext, topic string, data map[string]interface{}) {
 	// blocking to retain the sequence, expect the source to consume the data immediately
 	wg.Add(len(c.consumers))
 	for n, out := range c.consumers {
-		go func(name string, output chan<- map[string]interface{}) {
+		go func(name string, output chan<- api.SourceTuple) {
 			select {
-			case output <- data:
+			case output <- api.NewDefaultSourceTuple(data, map[string]interface{}{"topic": topic}):
 				logger.Debugf("broadcast from topic %s to %s done", topic, name)
 			case <-ctx.Done():
 				// rule stop so stop waiting
@@ -143,13 +141,13 @@ func produce(ctx api.StreamContext, topic string, data map[string]interface{}) {
 	wg.Wait()
 }
 
-func addPubConsumer(topic string, sourceId string, ch chan map[string]interface{}) {
+func addPubConsumer(topic string, sourceId string, ch chan api.SourceTuple) {
 	var sinkConsumerChannels *pubConsumers
 	if c, exists := pubTopics[topic]; exists {
 		sinkConsumerChannels = c
 	} else {
 		sinkConsumerChannels = &pubConsumers{
-			consumers: make(map[string]chan map[string]interface{}),
+			consumers: make(map[string]chan api.SourceTuple),
 		}
 		pubTopics[topic] = sinkConsumerChannels
 	}

+ 73 - 36
internal/topo/memory/manager_test.go

@@ -15,7 +15,6 @@
 package memory
 
 import (
-	"encoding/json"
 	"fmt"
 	"github.com/gdexlab/go-render/render"
 	"github.com/lf-edge/ekuiper/internal/conf"
@@ -83,7 +82,7 @@ func TestSharedInmemoryNode(t *testing.T) {
 	for {
 		select {
 		case res := <-consumer:
-			expected := api.NewDefaultSourceTuple(data, make(map[string]interface{}))
+			expected := api.NewDefaultSourceTuple(data, map[string]interface{}{"topic": "test_id"})
 			if !reflect.DeepEqual(expected, res) {
 				t.Errorf("result %s should be equal to %s", res, expected)
 			}
@@ -98,7 +97,7 @@ func TestCreateAndClose(t *testing.T) {
 	var (
 		sourceTopics = []string{"h/d1/c1/s2", "h/+/+/s1", "h/d3/#", "h/d1/c1/s2", "h/+/c1/s1"}
 		sinkTopics   = []string{"h/d1/c1/s1", "h/d1/c1/s2", "h/d2/c2/s1", "h/d3/c3/s1", "h/d1/c1/s1"}
-		chans        []chan map[string]interface{}
+		chans        []chan api.SourceTuple
 	)
 	for i, topic := range sinkTopics {
 		createPub(topic)
@@ -120,27 +119,27 @@ func TestCreateAndClose(t *testing.T) {
 	expPub := map[string]*pubConsumers{
 		"h/d1/c1/s1": {
 			count: 2,
-			consumers: map[string]chan map[string]interface{}{
+			consumers: map[string]chan api.SourceTuple{
 				"1": chans[1],
 				"4": chans[4],
 			},
 		},
 		"h/d1/c1/s2": {
 			count: 1,
-			consumers: map[string]chan map[string]interface{}{
+			consumers: map[string]chan api.SourceTuple{
 				"0": chans[0],
 				"3": chans[3],
 			},
 		},
 		"h/d2/c2/s1": {
 			count: 1,
-			consumers: map[string]chan map[string]interface{}{
+			consumers: map[string]chan api.SourceTuple{
 				"1": chans[1],
 			},
 		},
 		"h/d3/c3/s1": {
 			count: 1,
-			consumers: map[string]chan map[string]interface{}{
+			consumers: map[string]chan api.SourceTuple{
 				"1": chans[1],
 				"2": chans[2],
 			},
@@ -159,19 +158,19 @@ func TestCreateAndClose(t *testing.T) {
 	expPub = map[string]*pubConsumers{
 		"h/d1/c1/s1": {
 			count: 1,
-			consumers: map[string]chan map[string]interface{}{
+			consumers: map[string]chan api.SourceTuple{
 				"4": chans[4],
 			},
 		},
 		"h/d1/c1/s2": {
 			count: 0,
-			consumers: map[string]chan map[string]interface{}{
+			consumers: map[string]chan api.SourceTuple{
 				"3": chans[3],
 			},
 		},
 		"h/d3/c3/s1": {
 			count:     1,
-			consumers: map[string]chan map[string]interface{}{},
+			consumers: map[string]chan api.SourceTuple{},
 		},
 	}
 	if !reflect.DeepEqual(expPub, pubTopics) {
@@ -245,147 +244,189 @@ func TestMultipleTopics(t *testing.T) {
 					"id":   1,
 					"temp": 23,
 				},
-				M: make(map[string]interface{}),
+				M: map[string]interface{}{
+					"topic": "h/d1/c1/s1",
+				},
 			},
 			&api.DefaultSourceTuple{
 				Mess: map[string]interface{}{
 					"id":   1,
 					"temp": 23,
 				},
-				M: make(map[string]interface{}),
+				M: map[string]interface{}{
+					"topic": "h/d1/c1/s1",
+				},
 			},
 			&api.DefaultSourceTuple{
 				Mess: map[string]interface{}{
 					"id":    4,
 					"color": "red",
 				},
-				M: make(map[string]interface{}),
+				M: map[string]interface{}{
+					"topic": "h/d1/c1/s2",
+				},
 			},
 			&api.DefaultSourceTuple{
 				Mess: map[string]interface{}{
 					"id":    4,
 					"color": "red",
 				},
-				M: make(map[string]interface{}),
+				M: map[string]interface{}{
+					"topic": "h/d1/c1/s2",
+				},
 			},
 			&api.DefaultSourceTuple{
 				Mess: map[string]interface{}{
 					"id":  7,
 					"hum": 67.5,
 				},
-				M: make(map[string]interface{}),
+				M: map[string]interface{}{
+					"topic": "h/d2/c2/s1",
+				},
 			},
 			&api.DefaultSourceTuple{
 				Mess: map[string]interface{}{
 					"id":     10,
 					"status": "on",
 				},
-				M: make(map[string]interface{}),
+				M: map[string]interface{}{
+					"topic": "h/d3/c3/s1",
+				},
 			},
 			&api.DefaultSourceTuple{
 				Mess: map[string]interface{}{
 					"id":     10,
 					"status": "on",
 				},
-				M: make(map[string]interface{}),
+				M: map[string]interface{}{
+					"topic": "h/d3/c3/s1",
+				},
 			},
 			&api.DefaultSourceTuple{
 				Mess: map[string]interface{}{
 					"id":   2,
 					"temp": 34,
 				},
-				M: make(map[string]interface{}),
+				M: map[string]interface{}{
+					"topic": "h/d1/c1/s1",
+				},
 			},
 			&api.DefaultSourceTuple{
 				Mess: map[string]interface{}{
 					"id":   2,
 					"temp": 34,
 				},
-				M: make(map[string]interface{}),
+				M: map[string]interface{}{
+					"topic": "h/d1/c1/s1",
+				},
 			},
 			&api.DefaultSourceTuple{
 				Mess: map[string]interface{}{
 					"id":    5,
 					"color": "red",
 				},
-				M: make(map[string]interface{}),
+				M: map[string]interface{}{
+					"topic": "h/d1/c1/s2",
+				},
 			},
 			&api.DefaultSourceTuple{
 				Mess: map[string]interface{}{
 					"id":    5,
 					"color": "red",
 				},
-				M: make(map[string]interface{}),
+				M: map[string]interface{}{
+					"topic": "h/d1/c1/s2",
+				},
 			},
 			&api.DefaultSourceTuple{
 				Mess: map[string]interface{}{
 					"id":  8,
 					"hum": 77.1,
 				},
-				M: make(map[string]interface{}),
+				M: map[string]interface{}{
+					"topic": "h/d2/c2/s1",
+				},
 			},
 			&api.DefaultSourceTuple{
 				Mess: map[string]interface{}{
 					"id":     11,
 					"status": "off",
 				},
-				M: make(map[string]interface{}),
+				M: map[string]interface{}{
+					"topic": "h/d3/c3/s1",
+				},
 			},
 			&api.DefaultSourceTuple{
 				Mess: map[string]interface{}{
 					"id":     11,
 					"status": "off",
 				},
-				M: make(map[string]interface{}),
+				M: map[string]interface{}{
+					"topic": "h/d3/c3/s1",
+				},
 			},
 			&api.DefaultSourceTuple{
 				Mess: map[string]interface{}{
 					"id":   3,
 					"temp": 28,
 				},
-				M: make(map[string]interface{}),
+				M: map[string]interface{}{
+					"topic": "h/d1/c1/s1",
+				},
 			},
 			&api.DefaultSourceTuple{
 				Mess: map[string]interface{}{
 					"id":   3,
 					"temp": 28,
 				},
-				M: make(map[string]interface{}),
+				M: map[string]interface{}{
+					"topic": "h/d1/c1/s1",
+				},
 			},
 			&api.DefaultSourceTuple{
 				Mess: map[string]interface{}{
 					"id":    6,
 					"color": "green",
 				},
-				M: make(map[string]interface{}),
+				M: map[string]interface{}{
+					"topic": "h/d1/c1/s2",
+				},
 			},
 			&api.DefaultSourceTuple{
 				Mess: map[string]interface{}{
 					"id":    6,
 					"color": "green",
 				},
-				M: make(map[string]interface{}),
+				M: map[string]interface{}{
+					"topic": "h/d1/c1/s2",
+				},
 			},
 			&api.DefaultSourceTuple{
 				Mess: map[string]interface{}{
 					"id":  9,
 					"hum": 90.3,
 				},
-				M: make(map[string]interface{}),
+				M: map[string]interface{}{
+					"topic": "h/d2/c2/s1",
+				},
 			},
 			&api.DefaultSourceTuple{
 				Mess: map[string]interface{}{
 					"id":     12,
 					"status": "on",
 				},
-				M: make(map[string]interface{}),
+				M: map[string]interface{}{
+					"topic": "h/d3/c3/s1",
+				},
 			},
 			&api.DefaultSourceTuple{
 				Mess: map[string]interface{}{
 					"id":     12,
 					"status": "on",
 				},
-				M: make(map[string]interface{}),
+				M: map[string]interface{}{
+					"topic": "h/d3/c3/s1",
+				},
 			},
 		}
 	)
@@ -452,10 +493,6 @@ func TestMultipleTopics(t *testing.T) {
 		results = append(results, res)
 	}
 	if !reflect.DeepEqual(expected, results) {
-		t.Errorf("Expect\t %v\n but got\t\t\t %v", render.AsCode(expected), render.AsCode(results))
+		t.Errorf("Expect\t %v\n but got\t\t\t\t %v", render.AsCode(expected), render.AsCode(results))
 	}
 }
-
-func asJsonBytes(m []map[string]interface{}) ([]byte, error) {
-	return json.Marshal(m)
-}

+ 2 - 1
internal/topo/memory/sink.go

@@ -70,5 +70,6 @@ func (s *sink) Collect(ctx api.StreamContext, data interface{}) error {
 
 func (s *sink) Close(ctx api.StreamContext) error {
 	ctx.GetLogger().Debugf("closing memory sink")
-	return closeSink(s.topic)
+	closeSink(s.topic)
+	return nil
 }

+ 4 - 3
internal/topo/memory/source.go

@@ -24,7 +24,7 @@ import (
 type source struct {
 	topic      string
 	topicRegex *regexp.Regexp
-	input      <-chan map[string]interface{}
+	input      <-chan api.SourceTuple
 }
 
 func (s *source) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, _ chan<- error) {
@@ -36,7 +36,7 @@ func (s *source) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, _
 			if !opened {
 				return
 			}
-			consumer <- api.NewDefaultSourceTuple(v, make(map[string]interface{}))
+			consumer <- v
 		case <-ctx.Done():
 			return
 		}
@@ -72,5 +72,6 @@ func getRegexp(topic string) (*regexp.Regexp, error) {
 
 func (s *source) Close(ctx api.StreamContext) error {
 	ctx.GetLogger().Debugf("closing memory source")
-	return closeSourceConsumerChannel(s.topic, fmt.Sprintf("%s_%s_%d", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId()))
+	closeSourceConsumerChannel(s.topic, fmt.Sprintf("%s_%s_%d", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId()))
+	return nil
 }