فهرست منبع

fix(source): remedy redis source test connection support

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 سال پیش
والد
کامیت
806359fdad
3فایلهای تغییر یافته به همراه44 افزوده شده و 21 حذف شده
  1. 2 2
      internal/server/rest_test.go
  2. 26 8
      internal/topo/node/node.go
  3. 16 11
      internal/topo/redis/lookup.go

+ 2 - 2
internal/server/rest_test.go

@@ -85,7 +85,7 @@ func Test_sourcesManageHandler(t *testing.T) {
 	}
 
 	//create table
-	buf := bytes.NewBuffer([]byte(` {"sql":"CREATE TABLE alertTable() WITH (DATASOURCE=\"0\", TYPE=\"redis\", KIND=\"lookup\")"}`))
+	buf := bytes.NewBuffer([]byte(` {"sql":"CREATE TABLE alertTable() WITH (DATASOURCE=\"0\", TYPE=\"memory\", KEY=\"id\", KIND=\"lookup\")"}`))
 	req, _ = http.NewRequest(http.MethodPost, "http://localhost:8080/streams?kind=lookup", buf)
 	w = httptest.NewRecorder()
 
@@ -132,7 +132,7 @@ func Test_sourcesManageHandler(t *testing.T) {
 	w = httptest.NewRecorder()
 	r.ServeHTTP(w, req)
 
-	expect = []byte(`{"Name":"alertTable","Options":{"datasource":"0","type":"redis","kind":"lookup"},"Statement":null,"StreamFields":null,"StreamType":1}`)
+	expect = []byte(`{"Name":"alertTable","Options":{"datasource":"0","type":"memory", "key":"id","kind":"lookup"},"Statement":null,"StreamFields":null,"StreamType":1}`)
 	exp = map[string]interface{}{}
 	_ = json.NewDecoder(bytes.NewBuffer(expect)).Decode(&exp)
 

+ 26 - 8
internal/topo/node/node.go

@@ -201,19 +201,37 @@ func SourceOpen(sourceType string, config map[string]interface{}) error {
 	if v, ok := config["DATASOURCE"]; ok {
 		dataSource = v.(string)
 	}
-
 	ns, err := io.Source(sourceType)
 	if err != nil {
 		return err
 	}
-	err = ns.Configure(dataSource, config)
-	if err != nil {
-		return err
-	}
+	if ns == nil {
+		lns, err := io.LookupSource(sourceType)
+		if err != nil {
+			return err
+		}
+		if lns == nil {
+			// should not happen
+			return fmt.Errorf("source %s not found", sourceType)
+		}
+		err = lns.Configure(dataSource, config)
+		if err != nil {
+			return err
+		}
 
-	contextLogger := conf.Log.WithField("rule", "TestSourceOpen"+"_"+sourceType)
-	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
-	defer ns.Close(ctx)
+		contextLogger := conf.Log.WithField("rule", "TestSourceOpen"+"_"+sourceType)
+		ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
+		lns.Close(ctx)
+	} else {
+		err = ns.Configure(dataSource, config)
+		if err != nil {
+			return err
+		}
+
+		contextLogger := conf.Log.WithField("rule", "TestSourceOpen"+"_"+sourceType)
+		ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
+		ns.Close(ctx)
+	}
 
 	return nil
 

+ 16 - 11
internal/topo/redis/lookup.go

@@ -1,4 +1,4 @@
-// Copyright 2022 EMQ Technologies Co., Ltd.
+// Copyright 2022-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -41,13 +41,17 @@ type lookupSource struct {
 }
 
 func (s *lookupSource) Configure(datasource string, props map[string]interface{}) error {
-	db, err := cast.ToInt(datasource, cast.CONVERT_ALL)
-	if err != nil {
-		return fmt.Errorf("invalid datasource, it must be an integer but got %s", datasource)
+	if datasource != "/$$TEST_CONNECTION$$" {
+		db, err := cast.ToInt(datasource, cast.CONVERT_ALL)
+		if err != nil {
+			return fmt.Errorf("invalid datasource, it must be an integer but got %s", datasource)
+		}
+		s.db = db
+	} else {
+		s.db = 0
 	}
-	s.db = db
 	cfg := &conf{}
-	err = cast.MapToStruct(props, cfg)
+	err := cast.MapToStruct(props, cfg)
 	if err != nil {
 		return err
 	}
@@ -58,17 +62,18 @@ func (s *lookupSource) Configure(datasource string, props map[string]interface{}
 		return errors.New("redis dataType must be string or list")
 	}
 	s.c = cfg
-	return nil
-}
-
-func (s *lookupSource) Open(ctx api.StreamContext) error {
-	ctx.GetLogger().Infof("Opening redis lookup source with conf %v", s.c)
 	s.cli = redis.NewClient(&redis.Options{
 		Addr:     s.c.Addr,
 		Username: s.c.Username,
 		Password: s.c.Password,
 		DB:       s.db,
 	})
+	_, err = s.cli.Ping().Result()
+	return err
+}
+
+func (s *lookupSource) Open(ctx api.StreamContext) error {
+	ctx.GetLogger().Infof("Opening redis lookup source with conf %v", s.c)
 	return nil
 }