浏览代码

feat(source): httppull supports time range (#2055)

* feat(source): httppull supports time range

Allow user to specify dynamic time range by url or body.

Signed-off-by: Jiyong Huang <huangjy@emqx.io>

* test(source): add test to cover errors

Signed-off-by: Jiyong Huang <huangjy@emqx.io>

* doc: add eng translation

Signed-off-by: Jiyong Huang <huangjy@emqx.io>

---------

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
ngjaying 1 年之前
父节点
当前提交
bacfa6c6ff

+ 22 - 1
docs/en_US/guide/sources/builtin/http_pull.md

@@ -137,9 +137,30 @@ There are two parts to configure: access for access code fetch and refresh for t
 - headers: the request header to refresh the token. Usually put the tokens here for authorization.
 - body: the request body to refresh the token. May not need when using header to pass the refresh token.
 
+## Dynamic Properties
+
+Dynamic properties are properties that are updated at runtime.
+You can use dynamic attributes to specify the HTTP request's URL, body and header.
+The syntax is based on [data template](../../sinks/data_template.md) format for dynamic attributes.
+
+The attributes that can be used include:
+
+- PullTime: The timestamp of the current pull time in int64 format.
+- LastPullTime: The timestamp of the last pull time in int64 format.
+
+If the target HTTP service supports filtering the start and end times,
+these two properties can be used to implement incremental pulls.
+
+- If the target HTTP service passes the start and end times via the url parameter, you can configure the
+  URL, e.g. `http://localhost:9090/pull?start={{.LastPullTime}}&end={{.PullTime}}`.
+- If the target HTTP service passes the start and end times via the body parameter, you can configure the
+  body, e.g. `{"start": {{.LastPullTime}}, "end": {{.PullTime}}`.
+
 ## Override the default settings
 
-If you have a specific connection that need to overwrite the default settings, you can create a customized section. In the previous sample, we create a specific setting named with `application_conf`.  Then you can specify the configuration with option `CONF_KEY` when creating the stream definition (see [stream specs](../../../sqls/streams.md) for more info).
+If you have a specific connection that need to overwrite the default settings, you can create a customized section. In
+the previous sample, we create a specific setting named with `application_conf`. Then you can specify the configuration
+with option `CONF_KEY` when creating the stream definition (see [stream specs](../../../sqls/streams.md) for more info).
 
 **Sample**
 

+ 18 - 2
docs/zh_CN/guide/sources/builtin/http_pull.md

@@ -6,7 +6,7 @@
 eKuiper 为提取 HTTP 源流提供了内置支持,该支持可从 HTTP 服务器代理提取消息并输入 eKuiper 处理管道。HTTP 提取源的配置文件位于 `etc/sources/httppull.yaml` 中。 以下是文件格式。
 
 ```yaml
-#全局httppull配置
+#全局 httppull 配置
 default:
   # 请求服务器地址的URL
   url: http://localhost
@@ -137,9 +137,25 @@ OAuth 2.0 是一个授权协议,让 API 客户端有限度地访问网络服
 - headers:用于刷新令牌的请求头。通常把令牌放在这里,用于授权。
 - body:刷新令牌的请求主体。当使用头文件来传递刷新令牌时,可能不需要配置此选项。
 
+## 动态属性
+
+动态属性是指在运行时会动态更新的属性。 您可以使用动态属性来指定 HTTP 请求的
+URL、正文和标头。其语法基于[数据模板](../../sinks/data_template.md)格式的动态属性。可使用的动态属性包括:
+
+- PullTime: 本次拉取的 int64 格式时间戳。
+- LastPullTime: 上次拉取的 int64 格式时间戳。
+
+若目标 HTTP 服务支持过滤开始和结束时间,可以使用这两个属性来实现增量拉取。
+
+- 目标 HTTP 服务通过 url 参数传递开始和结束时间,则可配置
+  URL,例如 `http://localhost:9090/pull?start={{.LastPullTime}}&end={{.PullTime}}` 。
+- 目标 HTTP 服务通过 body 参数传递开始和结束时间,则可配置
+  body,例如 `{"start": {{.LastPullTime}}, "end": {{.PullTime}}}`。
+
 ## 重载默认设置
 
-如果您有特定的连接需要重载默认设置,则可以创建一个自定义部分。 在上一个示例中,我们创建了一个名为 `application_conf` 的特定设置。 然后,您可以在创建流定义时使用选项 `CONF_KEY` 指定配置(有关更多信息,请参见 [流规格](../../../sqls/streams.md))。
+如果您有特定的连接需要重载默认设置,则可以创建一个自定义部分。 在上一个示例中,我们创建了一个名为 `application_conf` 的特定设置。
+然后,您可以在创建流定义时使用选项 `CONF_KEY` 指定配置(有关更多信息,请参见 [流规格](../../../sqls/streams.md))。
 
 **样例**
 

+ 1 - 1
internal/conf/time.go

@@ -40,7 +40,7 @@ func GetLocalZone() int {
 	}
 }
 
-// Time related. For Mock
+// GetTicker Time related. For Mock
 func GetTicker(duration int64) *clock.Ticker {
 	return Clock.Ticker(time.Duration(duration) * time.Millisecond)
 }

+ 12 - 10
internal/io/http/client.go

@@ -154,14 +154,6 @@ func (cc *ClientConf) InitConf(device string, props map[string]interface{}) erro
 			}
 		case string:
 			c.HeadersTemplate = h
-		// TODO remove later, adapt to the wrong format in manager
-		case []interface{}:
-			c.HeadersMap = make(map[string]string, len(h))
-			for _, v := range h {
-				if mv, ok := v.(map[string]interface{}); ok && len(mv) == 3 {
-					c.HeadersMap[mv["name"].(string)] = mv["default"].(string)
-				}
-			}
 		default:
 			return fmt.Errorf("headers must be a map or a string")
 		}
@@ -329,7 +321,12 @@ func (cc *ClientConf) parseResponse(ctx api.StreamContext, resp *http.Response,
 			return nil, []byte("fail to read body"),
 				fmt.Errorf("http return code error: %d", resp.StatusCode)
 		}
-		defer resp.Body.Close()
+		defer func(Body io.ReadCloser) {
+			err := Body.Close()
+			if err != nil {
+				conf.Log.Errorf("fail to close the response body: %v", err)
+			}
+		}(resp.Body)
 		return nil, c, fmt.Errorf("http return code error: %d", resp.StatusCode)
 	} else if !returnBody { // For rest sink who only need to know if the request is successful
 		return nil, nil, nil
@@ -338,7 +335,12 @@ func (cc *ClientConf) parseResponse(ctx api.StreamContext, resp *http.Response,
 	if err != nil {
 		return nil, []byte("fail to read body"), err
 	}
-	defer resp.Body.Close()
+	defer func(Body io.ReadCloser) {
+		err := Body.Close()
+		if err != nil {
+			conf.Log.Errorf("fail to close the response body: %v", err)
+		}
+	}(resp.Body)
 	if returnBody && cc.config.Incremental {
 		nmd5 := getMD5Hash(c)
 		if *omd5 == nmd5 {

+ 91 - 39
internal/io/http/httppull_source.go

@@ -15,15 +15,25 @@
 package http
 
 import (
+	"fmt"
 	"time"
 
 	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/io"
 	"github.com/lf-edge/ekuiper/internal/pkg/httpx"
+	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/pkg/api"
 )
 
+type pullTimeMeta struct {
+	LastPullTime int64 `json:"lastPullTime"`
+	PullTime     int64 `json:"pullTime"`
+}
+
 type PullSource struct {
 	ClientConf
+
+	t *pullTimeMeta
 }
 
 func (hps *PullSource) Configure(device string, props map[string]interface{}) error {
@@ -45,51 +55,93 @@ func (hps *PullSource) Close(ctx api.StreamContext) error {
 func (hps *PullSource) initTimerPull(ctx api.StreamContext, consumer chan<- api.SourceTuple, _ chan<- error) {
 	logger := ctx.GetLogger()
 	logger.Infof("Starting HTTP pull source with interval %d", hps.config.Interval)
-	ticker := time.NewTicker(time.Millisecond * time.Duration(hps.config.Interval))
+	ticker := conf.GetTicker(int64(hps.config.Interval))
 	defer ticker.Stop()
 	omd5 := ""
 	for {
 		select {
-		case <-ticker.C:
-			rcvTime := conf.GetNow()
-			headers, err := hps.parseHeaders(ctx, hps.tokens)
-			if err != nil {
-				continue
-			}
-			// check oAuth token expiration
-			if hps.accessConf != nil && hps.accessConf.ExpireInSecond > 0 &&
-				int(time.Now().Sub(hps.tokenLastUpdateAt).Abs().Seconds()) >= hps.accessConf.ExpireInSecond {
-				ctx.GetLogger().Debugf("Refreshing token")
-				if err := hps.refresh(ctx); err != nil {
-					ctx.GetLogger().Warnf("Refresh token error: %v", err)
-				}
-			}
-			ctx.GetLogger().Debugf("rest sink sending request url: %s, headers: %v, body %s", hps.config.Url, headers, hps.config.Body)
-			if resp, e := httpx.Send(logger, hps.client, hps.config.BodyType, hps.config.Method, hps.config.Url, headers, true, hps.config.Body); e != nil {
-				logger.Warnf("Found error %s when trying to reach %v ", e, hps)
-			} else {
-				logger.Debugf("rest sink got response %v", resp)
-				results, _, e := hps.parseResponse(ctx, resp, true, &omd5)
-				if e != nil {
-					logger.Errorf("Parse response error %v", e)
-					continue
-				}
-				if results == nil {
-					logger.Debugf("no data to send for incremental")
-					continue
-				}
-				meta := make(map[string]interface{})
-				for _, result := range results {
-					select {
-					case consumer <- api.NewDefaultSourceTupleWithTime(result, meta, rcvTime):
-						logger.Debugf("send data to device node")
-					case <-ctx.Done():
-						return
-					}
-				}
-			}
+		case rcvTime := <-ticker.C:
+			logger.Debugf("Pulling data at %d", rcvTime.UnixMilli())
+			tuples := hps.doPull(ctx, rcvTime, &omd5)
+			io.ReceiveTuples(ctx, consumer, tuples)
 		case <-ctx.Done():
 			return
 		}
 	}
 }
+
+func (hps *PullSource) doPull(ctx api.StreamContext, rcvTime time.Time, omd5 *string) []api.SourceTuple {
+	if hps.t == nil {
+		hps.t = &pullTimeMeta{
+			LastPullTime: rcvTime.UnixMilli() - int64(hps.config.Interval),
+			PullTime:     rcvTime.UnixMilli(),
+		}
+	} else {
+		// only update last pull time when there is no error
+		hps.t.PullTime = rcvTime.UnixMilli()
+	}
+	// Parse url which may contain dynamic time range
+	url, err := ctx.ParseTemplate(hps.config.Url, hps.t)
+	if err != nil {
+		return []api.SourceTuple{
+			&xsql.ErrorSourceTuple{
+				Error: fmt.Errorf("parse url %s error %v", hps.config.Url, err),
+			},
+		}
+	}
+
+	headers, err := hps.parseHeaders(ctx, hps.tokens)
+	if err != nil {
+		return []api.SourceTuple{
+			&xsql.ErrorSourceTuple{
+				Error: fmt.Errorf("parse headers error %v", err),
+			},
+		}
+	}
+	body, err := ctx.ParseTemplate(hps.config.Body, hps.t)
+	if err != nil {
+		return []api.SourceTuple{
+			&xsql.ErrorSourceTuple{
+				Error: fmt.Errorf("parse body %s error %v", hps.config.Body, err),
+			},
+		}
+	}
+	// check oAuth token expiration
+	if hps.accessConf != nil && hps.accessConf.ExpireInSecond > 0 &&
+		int(time.Now().Sub(hps.tokenLastUpdateAt).Abs().Seconds()) >= hps.accessConf.ExpireInSecond {
+		ctx.GetLogger().Debugf("Refreshing token")
+		if err := hps.refresh(ctx); err != nil {
+			ctx.GetLogger().Warnf("Refresh token error: %v", err)
+		}
+	}
+	ctx.GetLogger().Debugf("httppull source sending request url: %s, headers: %v, body %s", url, headers, hps.config.Body)
+	if resp, e := httpx.Send(ctx.GetLogger(), hps.client, hps.config.BodyType, hps.config.Method, url, headers, true, body); e != nil {
+		ctx.GetLogger().Warnf("Found error %s when trying to reach %v ", e, hps)
+		return []api.SourceTuple{
+			&xsql.ErrorSourceTuple{
+				Error: fmt.Errorf("send request error %v", e),
+			},
+		}
+	} else {
+		ctx.GetLogger().Debugf("httppull source got response %v", resp)
+		results, _, e := hps.parseResponse(ctx, resp, true, omd5)
+		if e != nil {
+			return []api.SourceTuple{
+				&xsql.ErrorSourceTuple{
+					Error: fmt.Errorf("parse response error %v", e),
+				},
+			}
+		}
+		hps.t.LastPullTime = hps.t.PullTime
+		if results == nil {
+			ctx.GetLogger().Debugf("no data to send for incremental")
+			return nil
+		}
+		tuples := make([]api.SourceTuple, len(results))
+		meta := make(map[string]interface{})
+		for i, result := range results {
+			tuples[i] = api.NewDefaultSourceTupleWithTime(result, meta, rcvTime)
+		}
+		return tuples
+	}
+}

+ 216 - 5
internal/io/http/httppull_source_test.go

@@ -18,6 +18,7 @@ import (
 	"encoding/json"
 	"errors"
 	"fmt"
+	"io"
 	"net"
 	"net/http"
 	"net/http/httptest"
@@ -25,12 +26,15 @@ import (
 	"reflect"
 	"strconv"
 	"testing"
+	"time"
 
-	"github.com/benbjohnson/clock"
 	"github.com/gorilla/mux"
+	"github.com/stretchr/testify/assert"
 
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/io/mock"
+	"github.com/lf-edge/ekuiper/internal/topo/topotest/mockclock"
+	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/pkg/api"
 )
 
@@ -178,9 +182,86 @@ func mockAuthServer() *httptest.Server {
 		}
 		jsonOut(w, out)
 	}).Methods(http.MethodGet)
+	// data4 receives time range in url
+	router.HandleFunc("/data4", func(w http.ResponseWriter, r *http.Request) {
+		device := r.URL.Query().Get("device")
+		s := r.URL.Query().Get("start")
+		e := r.URL.Query().Get("end")
+
+		start, _ := strconv.ParseInt(s, 10, 64)
+		end, _ := strconv.ParseInt(e, 10, 64)
+
+		out := &struct {
+			Code int `json:"code"`
+			Data struct {
+				DeviceId    string `json:"device_id"`
+				Temperature int64  `json:"temperature"`
+				Humidity    int64  `json:"humidity"`
+			} `json:"data"`
+		}{
+			Code: 200,
+			Data: struct {
+				DeviceId    string `json:"device_id"`
+				Temperature int64  `json:"temperature"`
+				Humidity    int64  `json:"humidity"`
+			}{
+				DeviceId:    device,
+				Temperature: start % 50,
+				Humidity:    end % 100,
+			},
+		}
+		jsonOut(w, out)
+	}).Methods(http.MethodGet)
+
+	// data5 receives time range in body
+	router.HandleFunc("/data5", func(w http.ResponseWriter, r *http.Request) {
+		body, err := io.ReadAll(r.Body)
+		if err != nil {
+			http.Error(w, "Failed to read request body", http.StatusBadRequest)
+			return
+		}
+
+		// Create a Person struct to hold the JSON data
+		var ddd struct {
+			Device string `json:"device"`
+			Start  int64  `json:"start"`
+			End    int64  `json:"end"`
+		}
+
+		// Unmarshal the JSON data into the Person struct
+		err = json.Unmarshal(body, &ddd)
+		if err != nil {
+			http.Error(w, "Failed to parse JSON", http.StatusBadRequest)
+			return
+		}
+
+		out := &struct {
+			Code int `json:"code"`
+			Data struct {
+				DeviceId    string `json:"device_id"`
+				Temperature int64  `json:"temperature"`
+				Humidity    int64  `json:"humidity"`
+			} `json:"data"`
+		}{
+			Code: 200,
+			Data: struct {
+				DeviceId    string `json:"device_id"`
+				Temperature int64  `json:"temperature"`
+				Humidity    int64  `json:"humidity"`
+			}{
+				DeviceId:    ddd.Device,
+				Temperature: ddd.Start % 50,
+				Humidity:    ddd.End % 100,
+			},
+		}
+		jsonOut(w, out)
+	}).Methods(http.MethodPost)
 
 	server := httptest.NewUnstartedServer(router)
-	server.Listener.Close()
+	err := server.Listener.Close()
+	if err != nil {
+		panic(err)
+	}
 	server.Listener = l
 	return server
 }
@@ -806,6 +887,8 @@ func TestConfigure(t *testing.T) {
 }
 
 func TestPullWithAuth(t *testing.T) {
+	conf.IsTesting = false
+	conf.InitClock()
 	r := &PullSource{}
 	server := mockAuthServer()
 	server.Start()
@@ -835,7 +918,7 @@ func TestPullWithAuth(t *testing.T) {
 		t.Errorf(err.Error())
 		return
 	}
-	mc := conf.Clock.(*clock.Mock)
+	mc := conf.Clock
 	exp := []api.SourceTuple{
 		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"device_id": "device1", "humidity": 60.0, "temperature": 25.5}, map[string]interface{}{}, mc.Now()),
 	}
@@ -843,6 +926,8 @@ func TestPullWithAuth(t *testing.T) {
 }
 
 func TestPullIncremental(t *testing.T) {
+	conf.IsTesting = false
+	conf.InitClock()
 	r := &PullSource{}
 	server := mockAuthServer()
 	server.Start()
@@ -857,7 +942,7 @@ func TestPullIncremental(t *testing.T) {
 		t.Errorf(err.Error())
 		return
 	}
-	mc := conf.Clock.(*clock.Mock)
+	mc := conf.Clock
 	exp := []api.SourceTuple{
 		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "device0", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}, mc.Now()),
 		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "device1", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}, mc.Now()),
@@ -867,6 +952,8 @@ func TestPullIncremental(t *testing.T) {
 }
 
 func TestPullJsonList(t *testing.T) {
+	conf.IsTesting = false
+	conf.InitClock()
 	r := &PullSource{}
 	server := mockAuthServer()
 	server.Start()
@@ -880,10 +967,134 @@ func TestPullJsonList(t *testing.T) {
 		t.Errorf(err.Error())
 		return
 	}
-	mc := conf.Clock.(*clock.Mock)
+	mc := conf.Clock
 	exp := []api.SourceTuple{
 		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "d1", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}, mc.Now()),
 		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "d2", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}, mc.Now()),
 	}
 	mock.TestSourceOpen(r, exp, t)
 }
+
+func TestPullUrlTimeRange(t *testing.T) {
+	r := &PullSource{}
+	server := mockAuthServer()
+	server.Start()
+	defer server.Close()
+	err := r.Configure("", map[string]interface{}{
+		"url":          "http://localhost:52345/data4?device=d1&start={{.LastPullTime}}&end={{.PullTime}}",
+		"interval":     110,
+		"responseType": "body",
+	})
+	if err != nil {
+		t.Errorf(err.Error())
+		return
+	}
+	// Mock time
+	mockclock.ResetClock(143)
+	exp := []api.SourceTuple{
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "d1", "humidity": float64(53), "temperature": float64(43)}}, map[string]interface{}{}, time.UnixMilli(253)),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "d1", "humidity": float64(63), "temperature": float64(3)}}, map[string]interface{}{}, time.UnixMilli(363)),
+	}
+	c := mockclock.GetMockClock()
+	go func() {
+		time.Sleep(10 * time.Millisecond)
+		c.Add(350 * time.Millisecond)
+	}()
+	mock.TestSourceOpen(r, exp, t)
+}
+
+func TestPullBodyTimeRange(t *testing.T) {
+	r := &PullSource{}
+	server := mockAuthServer()
+	server.Start()
+	defer server.Close()
+	err := r.Configure("data5", map[string]interface{}{
+		"url":          "http://localhost:52345/",
+		"interval":     110,
+		"responseType": "body",
+		"method":       "POST",
+		"body":         `{"device": "d1", "start": {{.LastPullTime}}, "end": {{.PullTime}}}`,
+	})
+	if err != nil {
+		t.Errorf(err.Error())
+		return
+	}
+	// Mock time
+	mockclock.ResetClock(143)
+	exp := []api.SourceTuple{
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "d1", "humidity": float64(53), "temperature": float64(43)}}, map[string]interface{}{}, time.UnixMilli(253)),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "d1", "humidity": float64(63), "temperature": float64(3)}}, map[string]interface{}{}, time.UnixMilli(363)),
+	}
+	c := mockclock.GetMockClock()
+	go func() {
+		time.Sleep(10 * time.Millisecond)
+		c.Add(350 * time.Millisecond)
+	}()
+	mock.TestSourceOpen(r, exp, t)
+}
+
+func TestPullErrorTest(t *testing.T) {
+	conf.IsTesting = false
+	conf.InitClock()
+
+	tests := []struct {
+		name string
+		conf map[string]interface{}
+		exp  []api.SourceTuple
+	}{
+		{
+			name: "wrong url template",
+			conf: map[string]interface{}{"url": "http://localhost:52345/data4?device=d1&start={{.lastPullTime}}&end={{.PullTime}", "interval": 10},
+			exp: []api.SourceTuple{
+				&xsql.ErrorSourceTuple{
+					Error: errors.New("parse url http://localhost:52345/data4?device=d1&start={{.lastPullTime}}&end={{.PullTime} error template: sink:1: bad character U+007D '}'"),
+				},
+			},
+		}, {
+			name: "wrong header template",
+			conf: map[string]interface{}{"url": "http://localhost:52345/data4", "interval": 10, "HeadersTemplate": "\"Authorization\": \"Bearer {{.aatoken}}"},
+			exp: []api.SourceTuple{
+				&xsql.ErrorSourceTuple{
+					Error: errors.New("parse headers error parsed header template is not json: \"Authorization\": \"Bearer <no value>"),
+				},
+			},
+		}, {
+			name: "wrong body template",
+			conf: map[string]interface{}{"url": "http://localhost:52345/data4", "interval": 10, "body": `{"device": "d1", "start": {{.LastPullTime}}, "end": {{.pullTime}}}`},
+			exp: []api.SourceTuple{
+				&xsql.ErrorSourceTuple{
+					Error: errors.New("parse body {\"device\": \"d1\", \"start\": {{.LastPullTime}}, \"end\": {{.pullTime}}} error template: sink:1:54: executing \"sink\" at <.pullTime>: can't evaluate field pullTime in type *http.pullTimeMeta"),
+				},
+			},
+		}, {
+			name: "wrong response",
+			conf: map[string]interface{}{"url": "http://localhost:52345/aa/data4", "interval": 10},
+			exp: []api.SourceTuple{
+				&xsql.ErrorSourceTuple{
+					Error: errors.New("parse response error http return code error: 404"),
+				},
+			},
+		}, {
+			name: "wrong request",
+			conf: map[string]interface{}{"url": "http://localhost:52345/aa/data4", "interval": 10, "bodyType": "form", "body": "ddd"},
+			exp: []api.SourceTuple{
+				&xsql.ErrorSourceTuple{
+					Error: errors.New("send request error invalid content: ddd"),
+				},
+			},
+		},
+	}
+
+	server := mockAuthServer()
+	server.Start()
+	defer server.Close()
+
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			r := &PullSource{}
+			err := r.Configure("", test.conf)
+			assert.NoError(t, err)
+			mock.TestSourceOpen(r, test.exp, t)
+		})
+	}
+}

+ 8 - 3
internal/io/mock/test_source.go

@@ -16,11 +16,12 @@ package mock
 
 import (
 	"fmt"
-	"reflect"
 	"sync/atomic"
 	"testing"
 	"time"
 
+	"github.com/stretchr/testify/assert"
+
 	"github.com/lf-edge/ekuiper/internal/converter"
 	mockContext "github.com/lf-edge/ekuiper/internal/io/mock/context"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
@@ -36,8 +37,12 @@ func TestSourceOpen(r api.Source, exp []api.SourceTuple, t *testing.T) {
 		t.Error(err)
 	}
 	for i, v := range result {
-		if !reflect.DeepEqual(exp[i].Message(), v.Message()) || !reflect.DeepEqual(exp[i].Meta(), v.Meta()) {
-			t.Errorf("result mismatch:\n  exp=%s\n  got=%s\n\n", exp[i], v)
+		switch v.(type) {
+		case *api.DefaultSourceTuple:
+			assert.Equal(t, exp[i].Message(), v.Message())
+			assert.Equal(t, exp[i].Meta(), v.Meta())
+		default:
+			assert.Equal(t, exp[i], v)
 		}
 	}
 }

+ 2 - 8
internal/io/mqtt/mqtt_source.go

@@ -23,6 +23,7 @@ import (
 
 	"github.com/lf-edge/ekuiper/internal/compressor"
 	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/io"
 	"github.com/lf-edge/ekuiper/internal/topo/connection/clients"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/pkg/api"
@@ -141,14 +142,7 @@ func subscribe(ms *MQTTSource, ctx api.StreamContext, consumer chan<- api.Source
 				}
 				tuples = getTuples(ctx, ms, env)
 			}
-			for _, t := range tuples {
-				select {
-				case consumer <- t:
-					log.Debugf("send data to source node")
-				case <-ctx.Done():
-					return nil
-				}
-			}
+			io.ReceiveTuples(ctx, consumer, tuples)
 		}
 	}
 }

+ 28 - 0
internal/io/receiver.go

@@ -0,0 +1,28 @@
+// Copyright 2023 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package io
+
+import "github.com/lf-edge/ekuiper/pkg/api"
+
+func ReceiveTuples(ctx api.StreamContext, consumer chan<- api.SourceTuple, tuples []api.SourceTuple) {
+	for _, t := range tuples {
+		select {
+		case consumer <- t:
+			ctx.GetLogger().Debugf("send data to source node")
+		case <-ctx.Done():
+			return
+		}
+	}
+}