فهرست منبع

merge code from master

RockyJin 4 سال پیش
والد
کامیت
cabce69d36
3فایلهای تغییر یافته به همراه28 افزوده شده و 10 حذف شده
  1. 1 1
      .github/workflows/run_fvt_tests.yaml
  2. 1 0
      docs/en_US/rules/sinks/rest.md
  3. 26 9
      xstream/sinks/rest_sink.go

+ 1 - 1
.github/workflows/run_fvt_tests.yaml

@@ -200,4 +200,4 @@ jobs:
               echo -e "---------------------------------------------\n"
               echo "FVT tests error"
               exit 1
-          fi
+          fi

تفاوت فایلی نمایش داده نمی شود زیرا این فایل بسیار بزرگ است
+ 1 - 0
docs/en_US/rules/sinks/rest.md


+ 26 - 9
xstream/sinks/rest_sink.go

@@ -1,6 +1,7 @@
 package sinks
 
 import (
+	"crypto/tls"
 	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xstream/api"
@@ -12,13 +13,14 @@ import (
 )
 
 type RestSink struct {
-	method     string
-	url        string
-	headers    map[string]string
-	bodyType   string
-	timeout    int64
-	sendSingle bool
-	debugResp  bool
+	method             string
+	url                string
+	headers            map[string]string
+	bodyType           string
+	timeout            int64
+	sendSingle         bool
+	debugResp          bool
+	insecureSkipVerify bool
 
 	client *http.Client
 }
@@ -114,13 +116,28 @@ func (ms *RestSink) Configure(ps map[string]interface{}) error {
 			return fmt.Errorf("rest sink property debugResp %v is not a bool", temp)
 		}
 	}
+
+	temp, ok = ps["insecureSkipVerify"]
+	if !ok {
+		ms.insecureSkipVerify = true
+	} else {
+		ms.insecureSkipVerify, ok = temp.(bool)
+		if !ok {
+			return fmt.Errorf("rest sink property insecureSkipVerify %v is not a bool", temp)
+		}
+	}
 	return nil
 }
 
 func (ms *RestSink) Open(ctx api.StreamContext) error {
 	logger := ctx.GetLogger()
-	ms.client = &http.Client{Timeout: time.Duration(ms.timeout) * time.Millisecond}
-	logger.Infof("open rest sink with configuration: {method: %s, url: %s, bodyType: %s, timeout: %d,header: %v, sendSingle: %v", ms.method, ms.url, ms.bodyType, ms.timeout, ms.headers, ms.sendSingle)
+	tr := &http.Transport{
+		TLSClientConfig: &tls.Config{InsecureSkipVerify: ms.insecureSkipVerify},
+	}
+	ms.client = &http.Client{
+		Transport: tr,
+		Timeout:   time.Duration(ms.timeout) * time.Millisecond}
+	logger.Infof("open rest sink with configuration: {method: %s, url: %s, bodyType: %s, timeout: %d,header: %v, sendSingle: %v, insecureSkipVerify: %v", ms.method, ms.url, ms.bodyType, ms.timeout, ms.headers, ms.sendSingle, ms.insecureSkipVerify)
 
 	if _, err := url.Parse(ms.url); err != nil {
 		return err