Quellcode durchsuchen

fix(sink): reset sink to return io error if network problem happens

Only return io error can trigger the cache mechanism

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang vor 2 Jahren
Ursprung
Commit
01ca69a997
2 geänderte Dateien mit 14 neuen und 6 gelöschten Zeilen
  1. 11 3
      internal/io/http/rest_sink.go
  2. 3 3
      internal/io/http/rest_sink_test.go

+ 11 - 3
internal/io/http/rest_sink.go

@@ -21,6 +21,7 @@ import (
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
 	"net/http"
 	"net/http"
+	"net/url"
 	"strings"
 	"strings"
 )
 )
 
 
@@ -64,8 +65,15 @@ func (ms *RestSink) Collect(ctx api.StreamContext, item interface{}) error {
 
 
 	resp, err := ms.Send(ctx, decodedData, item, logger)
 	resp, err := ms.Send(ctx, decodedData, item, logger)
 	if err != nil {
 	if err != nil {
-		return fmt.Errorf(`rest sink fails to send out the data: %s | method=%s path="%s" request_body="%s"`,
-			err,
+		e := err.Error()
+		if urlErr, ok := err.(*url.Error); ok {
+			// consider timeout and temporary error as recoverable
+			if urlErr.Timeout() || urlErr.Temporary() {
+				e = errorx.IOErr
+			}
+		}
+		return fmt.Errorf(`%s: rest sink fails to send out the data: method=%s path="%s" request_body="%s"`,
+			e,
 			ms.config.Method,
 			ms.config.Method,
 			ms.config.Url,
 			ms.config.Url,
 			decodedData,
 			decodedData,
@@ -75,7 +83,7 @@ func (ms *RestSink) Collect(ctx api.StreamContext, item interface{}) error {
 		_, b, err := ms.parseResponse(ctx, resp, ms.config.DebugResp, nil)
 		_, b, err := ms.parseResponse(ctx, resp, ms.config.DebugResp, nil)
 		if err != nil {
 		if err != nil {
 			return fmt.Errorf(`%s: http error. | method=%s path="%s" status=%d request_body="%s" response_body="%s"`,
 			return fmt.Errorf(`%s: http error. | method=%s path="%s" status=%d request_body="%s" response_body="%s"`,
-				errorx.IOErr,
+				err,
 				ms.config.Method,
 				ms.config.Method,
 				ms.config.Url,
 				ms.config.Url,
 				resp.StatusCode,
 				resp.StatusCode,

+ 3 - 3
internal/io/http/rest_sink_test.go

@@ -394,14 +394,14 @@ func TestRestSinkErrorLog(t *testing.T) {
 		}
 		}
 		err := s.Collect(vCtx, reqBody)
 		err := s.Collect(vCtx, reqBody)
 
 
-		if !strings.Contains(err.Error(), "hello1") {
+		if strings.HasPrefix(err.Error(), errorx.IOErr) && !strings.Contains(err.Error(), "hello1") {
 			t.Errorf("should include request body, but got %s", err.Error())
 			t.Errorf("should include request body, but got %s", err.Error())
 		}
 		}
 		fmt.Println(err.Error())
 		fmt.Println(err.Error())
 		s.Close(context.Background())
 		s.Close(context.Background())
 	})
 	})
 
 
-	t.Run("Test rest sink with io error prefix", func(t *testing.T) {
+	t.Run("Test  error info", func(t *testing.T) {
 		s := &RestSink{}
 		s := &RestSink{}
 		config := map[string]interface{}{
 		config := map[string]interface{}{
 			"url":          ts.URL,
 			"url":          ts.URL,
@@ -419,7 +419,7 @@ func TestRestSinkErrorLog(t *testing.T) {
 			{"ab": "hello2"},
 			{"ab": "hello2"},
 		})
 		})
 		fmt.Println(err.Error())
 		fmt.Println(err.Error())
-		if !strings.HasPrefix(err.Error(), errorx.IOErr) {
+		if strings.HasPrefix(err.Error(), errorx.IOErr) && !strings.Contains(err.Error(), "404") {
 			t.Errorf("should start with io error, but got %s", err.Error())
 			t.Errorf("should start with io error, but got %s", err.Error())
 		}
 		}