Quellcode durchsuchen

fix(memory): fix problem when sub before pub

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang vor 3 Jahren
Ursprung
Commit
1e373f67b1
2 geänderte Dateien mit 13 neuen und 4 gelöschten Zeilen
  1. 1 0
      internal/topo/memory/manager.go
  2. 12 4
      internal/topo/memory/manager_test.go

+ 1 - 0
internal/topo/memory/manager.go

@@ -151,6 +151,7 @@ func addPubConsumer(topic string, sourceId string, ch chan map[string]interface{
 		sinkConsumerChannels = &pubConsumers{
 		sinkConsumerChannels = &pubConsumers{
 			consumers: make(map[string]chan map[string]interface{}),
 			consumers: make(map[string]chan map[string]interface{}),
 		}
 		}
+		pubTopics[topic] = sinkConsumerChannels
 	}
 	}
 	if _, exists := sinkConsumerChannels.consumers[sourceId]; exists {
 	if _, exists := sinkConsumerChannels.consumers[sourceId]; exists {
 		conf.Log.Warnf("create memory source consumer for %s which is already exists", sourceId)
 		conf.Log.Warnf("create memory source consumer for %s which is already exists", sourceId)

+ 12 - 4
internal/topo/memory/manager_test.go

@@ -23,6 +23,8 @@ import (
 	"github.com/lf-edge/ekuiper/internal/topo/state"
 	"github.com/lf-edge/ekuiper/internal/topo/state"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"reflect"
 	"reflect"
+	"regexp"
+	"strings"
 	"testing"
 	"testing"
 	"time"
 	"time"
 )
 )
@@ -105,10 +107,16 @@ func TestCreateAndClose(t *testing.T) {
 	)
 	)
 	for i, topic := range sinkTopics {
 	for i, topic := range sinkTopics {
 		createPub(topic)
 		createPub(topic)
-		r, err := getRegexp(sourceTopics[i])
-		if err != nil {
-			t.Error(err)
-			return
+		var (
+			r   *regexp.Regexp
+			err error
+		)
+		if strings.ContainsAny(sourceTopics[i], "+#") {
+			r, err = getRegexp(sourceTopics[i])
+			if err != nil {
+				t.Error(err)
+				return
+			}
 		}
 		}
 		c := createSub(sourceTopics[i], r, fmt.Sprintf("%d", i))
 		c := createSub(sourceTopics[i], r, fmt.Sprintf("%d", i))
 		chans = append(chans, c)
 		chans = append(chans, c)