Selaa lähdekoodia

fix(source): rest sink template support token data

Signed-off-by: Jiyong Huang <huangjy@emqx.io>

tbm

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 vuotta sitten
vanhempi
commit
e19499e5d7
3 muutettua tiedostoa jossa 32 lisäystä ja 13 poistoa
  1. 6 1
      internal/io/http/client.go
  2. 23 11
      internal/io/http/rest_sink.go
  3. 3 1
      internal/pkg/httpx/http.go

+ 6 - 1
internal/io/http/client.go

@@ -260,7 +260,12 @@ func (cc *ClientConf) refresh(ctx api.StreamContext) error {
 		if ee != nil {
 		if ee != nil {
 			return fmt.Errorf("fail to get refresh token: %v", ee)
 			return fmt.Errorf("fail to get refresh token: %v", ee)
 		}
 		}
-		cc.tokens, _, err = cc.parseResponse(ctx, rr, true, nil)
+		nt, _, err := cc.parseResponse(ctx, rr, true, nil)
+		for k, v := range nt {
+			if v != nil {
+				cc.tokens[k] = v
+			}
+		}
 		if err != nil {
 		if err != nil {
 			return fmt.Errorf("Cannot parse refresh token response to json: %v", err)
 			return fmt.Errorf("Cannot parse refresh token response to json: %v", err)
 		}
 		}

+ 23 - 11
internal/io/http/rest_sink.go

@@ -56,16 +56,7 @@ func (me MultiErrors) Error() string {
 func (ms *RestSink) Collect(ctx api.StreamContext, item interface{}) error {
 func (ms *RestSink) Collect(ctx api.StreamContext, item interface{}) error {
 	logger := ctx.GetLogger()
 	logger := ctx.GetLogger()
 	logger.Debugf("rest sink receive %s", item)
 	logger.Debugf("rest sink receive %s", item)
-	output, transed, err := ctx.TransformOutput(item)
-	if err != nil {
-		logger.Warnf("rest sink decode data error: %v", err)
-		return nil
-	}
-	var d = item
-	if transed {
-		d = output
-	}
-	resp, err := ms.Send(ctx, d, logger)
+	resp, err := ms.Send(ctx, item, logger)
 	if err != nil {
 	if err != nil {
 		return fmt.Errorf("rest sink fails to send out the data: %s", err)
 		return fmt.Errorf("rest sink fails to send out the data: %s", err)
 	} else {
 	} else {
@@ -82,6 +73,27 @@ func (ms *RestSink) Collect(ctx api.StreamContext, item interface{}) error {
 }
 }
 
 
 func (ms *RestSink) Send(ctx api.StreamContext, v interface{}, logger api.Logger) (*http.Response, error) {
 func (ms *RestSink) Send(ctx api.StreamContext, v interface{}, logger api.Logger) (*http.Response, error) {
+	// Allow to use tokens in headers
+	// TODO optimization: only do this if tokens are used in template
+	if ms.tokens != nil {
+		switch dt := v.(type) {
+		case map[string]interface{}:
+			for k, vv := range ms.tokens {
+				dt[k] = vv
+			}
+		case []map[string]interface{}:
+			for m := range dt {
+				for k, vv := range ms.tokens {
+					dt[m][k] = vv
+				}
+			}
+		}
+	}
+	output, _, err := ctx.TransformOutput(v)
+	if err != nil {
+		logger.Warnf("rest sink decode data error: %v", err)
+		return nil, fmt.Errorf("rest sink decode data error: %v", err)
+	}
 	bodyType, err := ctx.ParseTemplate(ms.config.BodyType, v)
 	bodyType, err := ctx.ParseTemplate(ms.config.BodyType, v)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
@@ -98,7 +110,7 @@ func (ms *RestSink) Send(ctx api.StreamContext, v interface{}, logger api.Logger
 	if err != nil {
 	if err != nil {
 		return nil, fmt.Errorf("rest sink headers template decode error: %v", err)
 		return nil, fmt.Errorf("rest sink headers template decode error: %v", err)
 	}
 	}
-	return httpx.Send(logger, ms.client, bodyType, method, u, headers, ms.config.SendSingle, v)
+	return httpx.Send(logger, ms.client, bodyType, method, u, headers, ms.config.SendSingle, output)
 }
 }
 
 
 func (ms *RestSink) Close(ctx api.StreamContext) error {
 func (ms *RestSink) Close(ctx api.StreamContext) error {

+ 3 - 1
internal/pkg/httpx/http.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -47,6 +47,8 @@ func Send(logger api.Logger, client *http.Client, bodyType string, method string
 		switch t := v.(type) {
 		switch t := v.(type) {
 		case []byte:
 		case []byte:
 			body = bytes.NewBuffer(t)
 			body = bytes.NewBuffer(t)
+		case string:
+			body = bytes.NewBufferString(t)
 		default:
 		default:
 			vj, err := json.Marshal(v)
 			vj, err := json.Marshal(v)
 			if err != nil {
 			if err != nil {