Procházet zdrojové kódy

chore(io): http oauth validation to tolerate manager default setting

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang před 2 roky
rodič
revize
2b765d3a14

+ 5 - 6
etc/sinks/rest.json

@@ -235,8 +235,8 @@
           "default": [
             {
               "name": "url",
-              "default": "http://127.0.0.1:5536/token",
-              "optional": false,
+              "default": "",
+              "optional": true,
               "control": "text",
               "type": "string",
               "hint": {
@@ -265,7 +265,7 @@
             },
             {
               "name": "expire",
-              "default": "3600",
+              "default": "",
               "optional": true,
               "control": "text",
               "type": "string",
@@ -296,8 +296,8 @@
           "default": [
             {
               "name": "url",
-              "default": "http://127.0.0.1:5536/refresh",
-              "optional": false,
+              "default": "",
+              "optional": true,
               "control": "text",
               "type": "string",
               "hint": {
@@ -311,7 +311,6 @@
             },
             {
               "name": "headers",
-              "default": [],
               "optional": true,
               "control": "list",
               "type": "list_object",

+ 4 - 7
etc/sources/httppull.json

@@ -264,8 +264,8 @@
 					"default": [
 						{
 							"name": "url",
-							"default": "http://127.0.0.1:5536/token",
-							"optional": false,
+							"default": "",
+							"optional": true,
 							"control": "text",
 							"type": "string",
 							"hint": {
@@ -294,7 +294,7 @@
 						},
 						{
 							"name": "expire",
-							"default": "3600",
+							"default": "",
 							"optional": true,
 							"control": "text",
 							"type": "string",
@@ -325,8 +325,7 @@
 					"default": [
 						{
 							"name": "url",
-							"default": "http://127.0.0.1:5536/refresh",
-							"optional": false,
+							"optional": true,
 							"control": "text",
 							"type": "string",
 							"hint": {
@@ -340,7 +339,6 @@
 						},
 						{
 							"name": "headers",
-							"default": [],
 							"optional": true,
 							"control": "list",
 							"type": "list_object",
@@ -355,7 +353,6 @@
 						},
 						{
 							"name": "body",
-							"default": "",
 							"optional": true,
 							"control": "textarea",
 							"type": "string",

+ 29 - 14
internal/io/http/client.go

@@ -143,16 +143,27 @@ func (cc *ClientConf) InitConf(device string, props map[string]interface{}) erro
 	if err != nil {
 		return err
 	}
-	switch h := c.Headers.(type) {
-	case map[string]interface{}:
-		c.HeadersMap = make(map[string]string, len(h))
-		for k, v := range h {
-			c.HeadersMap[k] = v.(string)
+	if c.Headers != nil {
+		switch h := c.Headers.(type) {
+		case map[string]interface{}:
+			c.HeadersMap = make(map[string]string, len(h))
+			for k, v := range h {
+				c.HeadersMap[k] = v.(string)
+			}
+		case string:
+			c.HeadersTemplate = h
+		// TODO remove later, adapt to the wrong format in manager
+		case []interface{}:
+			c.HeadersMap = make(map[string]string, len(h))
+			for _, v := range h {
+				if mv, ok := v.(map[string]interface{}); ok && len(mv) == 3 {
+					c.HeadersMap[mv["name"].(string)] = mv["default"].(string)
+				}
+			}
+		default:
+			return fmt.Errorf("headers must be a map or a string")
 		}
-	case string:
-		c.HeadersTemplate = h
 	}
-
 	tlsOpts := cert.TlsConfigurationOptions{
 		SkipCertVerify: c.InsecureSkipVerify,
 		CertFile:       c.CertificationPath,
@@ -164,7 +175,7 @@ func (cc *ClientConf) InitConf(device string, props map[string]interface{}) erro
 	if err != nil {
 		return err
 	}
-	// validate oAuth
+	// validate oAuth. In order to adapt to manager, the validation is closed to allow empty value
 	if c.OAuth != nil {
 		// validate access token
 		if ap, ok := c.OAuth["access"]; ok {
@@ -173,10 +184,12 @@ func (cc *ClientConf) InitConf(device string, props map[string]interface{}) erro
 				return fmt.Errorf("fail to parse the access properties of oAuth: %v", err)
 			}
 			if accessConf.Url == "" {
-				return fmt.Errorf("access token url is required")
+				conf.Log.Warnf("access token url is not set, so ignored the oauth setting")
+				c.OAuth = nil
+			} else {
+				// expire time will update every time when access token is refreshed if expired is set
+				cc.accessConf = accessConf
 			}
-			// expire time will update every time when access token is refreshed if expired is set
-			cc.accessConf = accessConf
 		} else {
 			return fmt.Errorf("if setting oAuth, `access` property is required")
 		}
@@ -187,9 +200,11 @@ func (cc *ClientConf) InitConf(device string, props map[string]interface{}) erro
 				return fmt.Errorf("fail to parse the refresh token properties: %v", err)
 			}
 			if refreshConf.Url == "" {
-				return fmt.Errorf("refresh token url is required")
+				conf.Log.Warnf("refresh token url is not set, so ignored the refresh setting")
+				delete(c.OAuth, "refresh")
+			} else {
+				cc.refreshConf = refreshConf
 			}
-			cc.refreshConf = refreshConf
 		}
 	}
 

+ 42 - 4
internal/io/http/httppull_source_test.go

@@ -448,7 +448,21 @@ func TestConfigure(t *testing.T) {
 					},
 				},
 			},
-			err: errors.New("access token url is required"),
+			config: &RawConf{
+				Url:                "http://localhost:52345/",
+				Method:             http.MethodGet,
+				Interval:           DefaultInterval,
+				Timeout:            DefaultTimeout,
+				BodyType:           "none",
+				ResponseType:       "code",
+				InsecureSkipVerify: true,
+				Headers: map[string]interface{}{
+					"Authorization": "Bearer {{.token}}",
+				},
+				HeadersMap: map[string]string{
+					"Authorization": "Bearer {{.token}}",
+				},
+			},
 		},
 		{
 			name: "oAuth miss access",
@@ -504,8 +518,27 @@ func TestConfigure(t *testing.T) {
 						"body":   "{\"username\": \"admin\",\"password\": \"0000\"}",
 						"expire": "3600",
 					},
-					"refresh": map[string]interface{}{
-						"url": "",
+				},
+			},
+			config: &RawConf{
+				Url:                "http://localhost:52345/",
+				Method:             http.MethodGet,
+				Interval:           DefaultInterval,
+				Timeout:            DefaultTimeout,
+				BodyType:           "none",
+				ResponseType:       "code",
+				InsecureSkipVerify: true,
+				Headers: map[string]interface{}{
+					"Authorization": "Bearer {{.token}}",
+				},
+				HeadersMap: map[string]string{
+					"Authorization": "Bearer {{.token}}",
+				},
+				OAuth: map[string]map[string]interface{}{
+					"access": {
+						"url":    "http://localhost:52345/token",
+						"body":   "{\"username\": \"admin\",\"password\": \"0000\"}",
+						"expire": "3600",
 					},
 				},
 			},
@@ -515,7 +548,12 @@ func TestConfigure(t *testing.T) {
 				Expire:         "3600",
 				ExpireInSecond: 3600,
 			},
-			err: errors.New("refresh token url is required"),
+			tokens: map[string]interface{}{
+				"token":         DefaultToken,
+				"refresh_token": RefreshToken,
+				"client_id":     "test",
+				"expires":       float64(36000),
+			},
 		},
 		// oAuth authentication flow errors
 		{