Quellcode durchsuchen

feat(sink): add http request/response body to rest sink log when exception occurred

Signed-off-by: carlclone <906561974@qq.com>
(cherry picked from commit 0397195b9bff7d89474a02643265a29f395326e6)
carlclone vor 2 Jahren
Ursprung
Commit
f2147b7b9a
2 geänderte Dateien mit 88 neuen und 14 gelöschten Zeilen
  1. 23 10
      internal/io/http/rest_sink.go
  2. 65 4
      internal/io/http/rest_sink_test.go

+ 23 - 10
internal/io/http/rest_sink.go

@@ -56,14 +56,32 @@ func (me MultiErrors) Error() string {
 func (ms *RestSink) Collect(ctx api.StreamContext, item interface{}) error {
 func (ms *RestSink) Collect(ctx api.StreamContext, item interface{}) error {
 	logger := ctx.GetLogger()
 	logger := ctx.GetLogger()
 	logger.Debugf("rest sink receive %s", item)
 	logger.Debugf("rest sink receive %s", item)
-	resp, err := ms.Send(ctx, item, logger)
+	decodedData, _, err := ctx.TransformOutput(item)
 	if err != nil {
 	if err != nil {
-		return fmt.Errorf("rest sink fails to send out the data: %s", err)
+		logger.Warnf("rest sink decode data error: %v", err)
+		return fmt.Errorf("rest sink decode data error: %v", err)
+	}
+
+	resp, err := ms.Send(ctx, decodedData, item, logger)
+	if err != nil {
+		return fmt.Errorf(`rest sink fails to send out the data: %s | method=%s path="%s" request_body="%s"`,
+			err,
+			ms.config.Method,
+			ms.config.Url,
+			decodedData,
+		)
 	} else {
 	} else {
 		logger.Debugf("rest sink got response %v", resp)
 		logger.Debugf("rest sink got response %v", resp)
 		_, b, err := ms.parseResponse(ctx, resp, ms.config.DebugResp, nil)
 		_, b, err := ms.parseResponse(ctx, resp, ms.config.DebugResp, nil)
 		if err != nil {
 		if err != nil {
-			return fmt.Errorf("%s: http error %s", errorx.IOErr, string(b))
+			return fmt.Errorf(`%s: http error. | method=%s path="%s" status=%d request_body="%s" response_body="%s"`,
+				errorx.IOErr,
+				ms.config.Method,
+				ms.config.Url,
+				resp.StatusCode,
+				decodedData,
+				b,
+			)
 		}
 		}
 		if ms.config.DebugResp {
 		if ms.config.DebugResp {
 			logger.Infof("Response raw content: %s\n", string(b))
 			logger.Infof("Response raw content: %s\n", string(b))
@@ -72,7 +90,7 @@ func (ms *RestSink) Collect(ctx api.StreamContext, item interface{}) error {
 	return nil
 	return nil
 }
 }
 
 
-func (ms *RestSink) Send(ctx api.StreamContext, v interface{}, logger api.Logger) (*http.Response, error) {
+func (ms *RestSink) Send(ctx api.StreamContext, decodedData []byte, v interface{}, logger api.Logger) (*http.Response, error) {
 	// Allow to use tokens in headers
 	// Allow to use tokens in headers
 	// TODO optimization: only do this if tokens are used in template
 	// TODO optimization: only do this if tokens are used in template
 	if ms.tokens != nil {
 	if ms.tokens != nil {
@@ -89,11 +107,6 @@ func (ms *RestSink) Send(ctx api.StreamContext, v interface{}, logger api.Logger
 			}
 			}
 		}
 		}
 	}
 	}
-	output, _, err := ctx.TransformOutput(v)
-	if err != nil {
-		logger.Warnf("rest sink decode data error: %v", err)
-		return nil, fmt.Errorf("rest sink decode data error: %v", err)
-	}
 	bodyType, err := ctx.ParseTemplate(ms.config.BodyType, v)
 	bodyType, err := ctx.ParseTemplate(ms.config.BodyType, v)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
@@ -110,7 +123,7 @@ func (ms *RestSink) Send(ctx api.StreamContext, v interface{}, logger api.Logger
 	if err != nil {
 	if err != nil {
 		return nil, fmt.Errorf("rest sink headers template decode error: %v", err)
 		return nil, fmt.Errorf("rest sink headers template decode error: %v", err)
 	}
 	}
-	return httpx.Send(logger, ms.client, bodyType, method, u, headers, ms.config.SendSingle, output)
+	return httpx.Send(logger, ms.client, bodyType, method, u, headers, ms.config.SendSingle, decodedData)
 }
 }
 
 
 func (ms *RestSink) Close(ctx api.StreamContext) error {
 func (ms *RestSink) Close(ctx api.StreamContext) error {

+ 65 - 4
internal/io/http/rest_sink_test.go

@@ -16,15 +16,17 @@ package http
 
 
 import (
 import (
 	"fmt"
 	"fmt"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/topo/context"
+	"github.com/lf-edge/ekuiper/internal/topo/transform"
+	"github.com/lf-edge/ekuiper/pkg/errorx"
 	"io"
 	"io"
 	"net/http"
 	"net/http"
 	"net/http/httptest"
 	"net/http/httptest"
 	"reflect"
 	"reflect"
+	"strings"
 	"testing"
 	"testing"
-
-	"github.com/lf-edge/ekuiper/internal/conf"
-	"github.com/lf-edge/ekuiper/internal/topo/context"
-	"github.com/lf-edge/ekuiper/internal/topo/transform"
+	"time"
 )
 )
 
 
 type request struct {
 type request struct {
@@ -365,3 +367,62 @@ func TestRestSinkTemplate_Apply(t *testing.T) {
 		}
 		}
 	}
 	}
 }
 }
+
+func TestRestSinkErrorLog(t *testing.T) {
+	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+		var result = `{"data":[],"extra":"Success","returncode":1,"returnmessage":""}`
+		time.Sleep(30 * time.Millisecond)
+		w.WriteHeader(http.StatusNotFound)
+		w.Write([]byte(result))
+	}))
+	defer ts.Close()
+
+	t.Run("Test rest sink timeout and return correct error info", func(t *testing.T) {
+		s := &RestSink{}
+		config := map[string]interface{}{
+			"url":     ts.URL,
+			"timeout": float64(10),
+		}
+		s.Configure(config)
+		s.Open(context.Background())
+
+		tf, _ := transform.GenTransform("", "json", "", "")
+		vCtx := context.WithValue(context.Background(), context.TransKey, tf)
+		reqBody := []map[string]interface{}{
+			{"ab": "hello1"},
+			{"ab": "hello2"},
+		}
+		err := s.Collect(vCtx, reqBody)
+
+		if !strings.Contains(err.Error(), "hello1") {
+			t.Errorf("should include request body, but got %s", err.Error())
+		}
+		fmt.Println(err.Error())
+		s.Close(context.Background())
+	})
+
+	t.Run("Test rest sink with io error prefix", func(t *testing.T) {
+		s := &RestSink{}
+		config := map[string]interface{}{
+			"url":          ts.URL,
+			"method":       "put",
+			"bodyType":     "text",
+			"responseType": "body",
+			"timeout":      float64(1000),
+		}
+		s.Configure(config)
+		s.Open(context.Background())
+		tf, _ := transform.GenTransform("", "json", "", "")
+		vCtx := context.WithValue(context.Background(), context.TransKey, tf)
+		err := s.Collect(vCtx, []map[string]interface{}{
+			{"ab": "hello1"},
+			{"ab": "hello2"},
+		})
+		fmt.Println(err.Error())
+		if !strings.HasPrefix(err.Error(), errorx.IOErr) {
+			t.Errorf("should start with io error, but got %s", err.Error())
+		}
+
+		s.Close(context.Background())
+	})
+}