Jelajahi Sumber

fix(fea): support connection test for source/sink

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>
Jianxiang Ran 2 tahun lalu
induk
melakukan
6a26e3179f
2 mengubah file dengan 29 tambahan dan 15 penghapusan
  1. 14 9
      internal/meta/yamlConfigMeta.go
  2. 15 6
      internal/topo/node/node.go

+ 14 - 9
internal/meta/yamlConfigMeta.go

@@ -36,8 +36,11 @@ var ConfigManager = configManager{
 }
 }
 
 
 const SourceCfgOperatorKeyTemplate = "sources.%s"
 const SourceCfgOperatorKeyTemplate = "sources.%s"
+const SourceCfgOperatorKeyPrefix = "sources."
 const SinkCfgOperatorKeyTemplate = "sinks.%s"
 const SinkCfgOperatorKeyTemplate = "sinks.%s"
+const SinkCfgOperatorKeyPrefix = "sinks."
 const ConnectionCfgOperatorKeyTemplate = "connections.%s"
 const ConnectionCfgOperatorKeyTemplate = "connections.%s"
+const ConnectionCfgOperatorKeyPrefix = "connections."
 
 
 // loadConfigOperatorForSource
 // loadConfigOperatorForSource
 // Try to load ConfigOperator for plugin xxx from /etc/sources/xxx.yaml  /data/sources/xxx.yaml
 // Try to load ConfigOperator for plugin xxx from /etc/sources/xxx.yaml  /data/sources/xxx.yaml
@@ -235,20 +238,22 @@ func GetResources(language string) (b []byte, err error) {
 	var sinkResources []map[string]string
 	var sinkResources []map[string]string
 
 
 	for key, ops := range ConfigManager.cfgOperators {
 	for key, ops := range ConfigManager.cfgOperators {
-		if strings.HasSuffix(key, ConnectionCfgOperatorKeyTemplate) {
+		if strings.HasPrefix(key, ConnectionCfgOperatorKeyPrefix) {
 			continue
 			continue
 		}
 		}
-		if strings.HasSuffix(key, SourceCfgOperatorKeyTemplate) {
-			plugin := strings.TrimPrefix(key, SourceCfgOperatorKeyTemplate)
+		if strings.HasPrefix(key, SourceCfgOperatorKeyPrefix) {
+			plugin := strings.TrimPrefix(key, SourceCfgOperatorKeyPrefix)
 			resourceIds := ops.GetUpdatableConfKeys()
 			resourceIds := ops.GetUpdatableConfKeys()
-			item := map[string]string{}
-			for _, v := range resourceIds {
-				item[plugin] = v
+			if len(resourceIds) > 0 {
+				item := map[string]string{}
+				for _, v := range resourceIds {
+					item[v] = plugin
+				}
+				srcResources = append(srcResources, item)
 			}
 			}
-			srcResources = append(srcResources, item)
 		}
 		}
-		if strings.HasSuffix(key, SinkCfgOperatorKeyTemplate) {
-			plugin := strings.TrimPrefix(key, SinkCfgOperatorKeyTemplate)
+		if strings.HasSuffix(key, SinkCfgOperatorKeyPrefix) {
+			plugin := strings.TrimPrefix(key, SinkCfgOperatorKeyPrefix)
 			resourceIds := ops.GetUpdatableConfKeys()
 			resourceIds := ops.GetUpdatableConfKeys()
 			item := map[string]string{}
 			item := map[string]string{}
 			for _, v := range resourceIds {
 			for _, v := range resourceIds {

+ 15 - 6
internal/topo/node/node.go

@@ -26,6 +26,7 @@ import (
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
+	"time"
 )
 )
 
 
 type OperatorNode interface {
 type OperatorNode interface {
@@ -183,6 +184,10 @@ func SinkOpen(sinkType string, config map[string]interface{}) error {
 	contextLogger := conf.Log.WithField("rule", "TestSinkOpen"+"_"+sinkType)
 	contextLogger := conf.Log.WithField("rule", "TestSinkOpen"+"_"+sinkType)
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 
 
+	defer func() {
+		_ = sink.Close(ctx)
+	}()
+
 	return sink.Open(ctx)
 	return sink.Open(ctx)
 }
 }
 
 
@@ -208,11 +213,15 @@ func SourceOpen(sourceType string, config map[string]interface{}) error {
 
 
 	contextLogger := conf.Log.WithField("rule", "TestSourceOpen"+"_"+sourceType)
 	contextLogger := conf.Log.WithField("rule", "TestSourceOpen"+"_"+sourceType)
 	ctx, cancel := context.WithValue(context.Background(), context.LoggerKey, contextLogger).WithCancel()
 	ctx, cancel := context.WithValue(context.Background(), context.LoggerKey, contextLogger).WithCancel()
+	defer func() {
+		cancel()
+		_ = ns.Close(ctx)
+	}()
 
 
-	defer cancel()
-
-	var sourceDataChannel chan api.SourceTuple
-	var errChannel chan error
+	TimeOut := 2000
+	ticker := time.NewTicker(time.Millisecond * time.Duration(TimeOut))
+	var sourceDataChannel = make(chan api.SourceTuple)
+	var errChannel = make(chan error)
 
 
 	go func() {
 	go func() {
 		ns.Open(ctx, sourceDataChannel, errChannel)
 		ns.Open(ctx, sourceDataChannel, errChannel)
@@ -225,7 +234,7 @@ func SourceOpen(sourceType string, config map[string]interface{}) error {
 		return nil
 		return nil
 	case err = <-errChannel:
 	case err = <-errChannel:
 		return err
 		return err
+	case <-ticker.C:
+		return nil
 	}
 	}
-
-	return nil
 }
 }