Browse Source

feat(sink): support resend destination (#2119)

- Add http, rest sink support
- Add docs

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
ngjaying 1 year ago
parent
commit
e9c9dfd796

+ 26 - 5
docs/en_US/extension/native/develop/sink.md

@@ -69,7 +69,7 @@ As the sink itself is a plugin, it must be in the main package. Given the sink s
 
 ```go
 func MySink() api.Sink {
-  return &mySink{}
+return &mySink{}
 }
 ```
 
@@ -77,15 +77,36 @@ The [Memory Sink](https://github.com/lf-edge/ekuiper/blob/master/extensions/sink
 
 #### Updatable Sink
 
-If your sink is updatable, you'll need to deal with the `rowkindField` property. Some sink may also need a `keyField` property to specify which field is the primary key to update.
+If your sink is updatable, you'll need to deal with the `rowkindField` property. Some sink may also need a `keyField`
+property to specify which field is the primary key to update.
 
-So in the _Configure_ method, parse the `rowkindField` to know which field in the data is the update action. Then in the _Collect_ method, retrieve the rowkind by the `rowkindField` and perform the proper action. The rowkind value could be `insert`, `update`, `upsert` and `delete`. For example, in SQL sink, each rowkind value will generate different SQL statement to execute.
+So in the _Configure_ method, parse the `rowkindField` to know which field in the data is the update action. Then in the
+_Collect_ method, retrieve the rowkind by the `rowkindField` and perform the proper action. The rowkind value could
+be `insert`, `update`, `upsert` and `delete`. For example, in SQL sink, each rowkind value will generate different SQL
+statement to execute.
+
+#### Customize Resend Strategy
+
+Sink can set the [cache and resend strategy](../../../guide/sinks/overview.md#caching) to ensure data delivery.
+By default, resending data will invoke the `Collect` method again.
+If you want to customize the resend strategy, you can implement the `CollectResend` method in the sink.
+In that method, you can do some format conversion or other operations on the data.
+You can also parse the common sink property `resendDestination` and make it the destination of the resend data.
+For example, you can resend the data to another topic defined in that property.
+
+```go
+// CollectResend Called when the sink cache resend is triggered
+CollectResend(ctx StreamContext, data interface{}) error
+```  
 
 #### Parse dynamic properties
 
-For customized sink plugins, users may still want to support [dynamic properties](../../../guide/sinks/overview.md#dynamic-properties) like the built-in ones.
+For customized sink plugins, users may still want to
+support [dynamic properties](../../../guide/sinks/overview.md#dynamic-properties) like the built-in ones.
 
-In the context object, a function `ParseTemplate` is provided to support the parsing of the dynamic property with the go template syntax. In the customized sink, developers can specify some properties to be dynamic according to the business logic. And in the plugin code, use this function to parse the user input in the collect function or elsewhere.
+In the context object, a function `ParseTemplate` is provided to support the parsing of the dynamic property with the go
+template syntax. In the customized sink, developers can specify some properties to be dynamic according to the business
+logic. And in the plugin code, use this function to parse the user input in the collect function or elsewhere.
 
 ```go
 // Parse the prop of go template syntax against the current data.

File diff suppressed because it is too large
+ 47 - 26
docs/en_US/guide/sinks/overview.md


+ 16 - 2
docs/zh_CN/extension/native/develop/sink.md

@@ -79,11 +79,25 @@ func MySink() api.Sink {
 
 如果你的 Sink 是可更新的,你将需要处理 `rowkindField` 属性。有些 sink 可能还需要一个 `keyField' 属性来指定哪个字段是要更新的主键。
 
-因此,在_Configure_方法中,需要解析 `rowkindField` 以知道数据中的哪个字段表示更新的动作。然后在_Collect_方法中,通过该字段获取动作类型,并执行适当的操作。rowkind 的值可以是 `insert`、`update`、`upsert` 和 `delete`。例如,在 SQL sink 中,每种 rowkind 值将产生不同的SQL语句来执行。
+因此,在_Configure_方法中,需要解析 `rowkindField` 以知道数据中的哪个字段表示更新的动作。然后在_Collect_方法中,通过该字段获取动作类型,并执行适当的操作。rowkind
+的值可以是 `insert`、`update`、`upsert` 和 `delete`。例如,在 SQL sink 中,每种 rowkind 值将产生不同的SQL语句来执行。
+
+#### 自定义重传策略
+
+Sink 可以配置[缓存和重发策略](../../../guide/sinks/overview.md#缓存)
+以保证数据不丢失。默认情况下,重发数据将再次调用 `Collect` 方法。
+如果您想自定义重发策略,可以在 sink 中实现 `CollectResend` 方法。 在该方法中,您可以对数据进行一些格式转换或其他操作。
+您还可以解析公共 sink 属性 `resendDestination` 并将其作为重发数据的目的地。例如,您可以将数据重新发送到该属性中定义的另一个主题。
+
+```go
+// CollectResend Called when the sink cache resend is triggered
+CollectResend(ctx StreamContext, data interface{}) error
+```  
 
 #### 解析动态属性
 
-在自定义的 sink 插件中,用户可能仍然想要像内置的 sink 一样支持[动态属性](../../../guide/sinks/overview.md#动态属性)。 我们在 context 对象中提供了 `ParseTemplate` 方法使得开发者可以方便地解析动态属性并应用于插件中。开发组应当根据业务逻辑,设计那些属性支持动态值。然后在代码编写时,使用此方法解析用户传入的属性值。
+在自定义的 sink 插件中,用户可能仍然想要像内置的 sink 一样支持[动态属性](../../../guide/sinks/overview.md#动态属性)。 我们在
+context 对象中提供了 `ParseTemplate` 方法使得开发者可以方便地解析动态属性并应用于插件中。开发组应当根据业务逻辑,设计那些属性支持动态值。然后在代码编写时,使用此方法解析用户传入的属性值。
 
 ```go
 // Parse the prop of go template syntax against the current data.

File diff suppressed because it is too large
+ 38 - 24
docs/zh_CN/guide/sinks/overview.md


+ 3 - 2
internal/io/http/client.go

@@ -60,8 +60,9 @@ type RawConf struct {
 	ResponseType string                            `json:"responseType"`
 	OAuth        map[string]map[string]interface{} `json:"oauth"`
 	// source specific properties
-	Interval    int  `json:"interval"`
-	Incremental bool `json:"incremental"`
+	Interval    int    `json:"interval"`
+	Incremental bool   `json:"incremental"`
+	ResendUrl   string `json:"resendDestination"`
 	// sink specific properties
 	SendSingle bool `json:"sendSingle"`
 	// inferred properties

+ 15 - 6
internal/io/http/rest_sink.go

@@ -40,16 +40,15 @@ func (ms *RestSink) Open(ctx api.StreamContext) error {
 	return nil
 }
 
-func (ms *RestSink) Collect(ctx api.StreamContext, item interface{}) error {
+func (ms *RestSink) collectWithUrl(ctx api.StreamContext, item interface{}, desUrl string) error {
 	logger := ctx.GetLogger()
-	logger.Debugf("rest sink receive %s", item)
 	decodedData, _, err := ctx.TransformOutput(item)
 	if err != nil {
 		logger.Warnf("rest sink decode data error: %v", err)
 		return fmt.Errorf("rest sink decode data error: %v", err)
 	}
 
-	resp, err := ms.Send(ctx, decodedData, item, logger)
+	resp, err := ms.sendWithUrl(ctx, decodedData, item, desUrl)
 	if err != nil {
 		e := err.Error()
 		if urlErr, ok := err.(*url.Error); ok {
@@ -84,7 +83,17 @@ func (ms *RestSink) Collect(ctx api.StreamContext, item interface{}) error {
 	return nil
 }
 
-func (ms *RestSink) Send(ctx api.StreamContext, decodedData []byte, v interface{}, logger api.Logger) (*http.Response, error) {
+func (ms *RestSink) Collect(ctx api.StreamContext, item interface{}) error {
+	ctx.GetLogger().Debugf("rest sink receive %s", item)
+	return ms.collectWithUrl(ctx, item, ms.config.Url)
+}
+
+func (ms *RestSink) CollectResend(ctx api.StreamContext, item interface{}) error {
+	ctx.GetLogger().Debugf("rest sink resend %s", item)
+	return ms.collectWithUrl(ctx, item, ms.config.ResendUrl)
+}
+
+func (ms *RestSink) sendWithUrl(ctx api.StreamContext, decodedData []byte, v interface{}, desUrl string) (*http.Response, error) {
 	// Allow to use tokens in headers and check oAuth token expiration
 	if ms.accessConf != nil && ms.accessConf.ExpireInSecond > 0 &&
 		int(time.Now().Sub(ms.tokenLastUpdateAt).Abs().Seconds()) >= ms.accessConf.ExpireInSecond {
@@ -115,7 +124,7 @@ func (ms *RestSink) Send(ctx api.StreamContext, decodedData []byte, v interface{
 	if err != nil {
 		return nil, err
 	}
-	u, err := ctx.ParseTemplate(ms.config.Url, v)
+	u, err := ctx.ParseTemplate(desUrl, v)
 	if err != nil {
 		return nil, err
 	}
@@ -123,7 +132,7 @@ func (ms *RestSink) Send(ctx api.StreamContext, decodedData []byte, v interface{
 	if err != nil {
 		return nil, fmt.Errorf("rest sink headers template decode error: %v", err)
 	}
-	return httpx.Send(logger, ms.client, bodyType, method, u, headers, ms.config.SendSingle, decodedData)
+	return httpx.Send(ctx.GetLogger(), ms.client, bodyType, method, u, headers, ms.config.SendSingle, decodedData)
 }
 
 func (ms *RestSink) Close(ctx api.StreamContext) error {

+ 15 - 3
internal/io/memory/sink.go

@@ -33,6 +33,7 @@ type config struct {
 	KeyField     string   `json:"keyField"`
 	Fields       []string `json:"fields"`
 	DataField    string   `json:"dataField"`
+	ResendTopic  string   `json:"resendDestination"`
 }
 
 type sink struct {
@@ -42,6 +43,7 @@ type sink struct {
 	rowkindField string
 	fields       []string
 	dataField    string
+	resendTopic  string
 }
 
 func (s *sink) Open(ctx api.StreamContext) error {
@@ -70,12 +72,12 @@ func (s *sink) Configure(props map[string]interface{}) error {
 	if s.rowkindField != "" && s.keyField == "" {
 		return fmt.Errorf("keyField is required when rowkindField is set")
 	}
+	s.resendTopic = cfg.ResendTopic
 	return nil
 }
 
-func (s *sink) Collect(ctx api.StreamContext, data interface{}) error {
-	ctx.GetLogger().Debugf("receive %+v", data)
-	topic, err := ctx.ParseTemplate(s.topic, data)
+func (s *sink) collectWithTopic(ctx api.StreamContext, data interface{}, t string) error {
+	topic, err := ctx.ParseTemplate(t, data)
 	if err != nil {
 		return err
 	}
@@ -116,6 +118,16 @@ func (s *sink) Collect(ctx api.StreamContext, data interface{}) error {
 	return nil
 }
 
+func (s *sink) Collect(ctx api.StreamContext, data interface{}) error {
+	ctx.GetLogger().Debugf("receive %+v", data)
+	return s.collectWithTopic(ctx, data, s.topic)
+}
+
+func (s *sink) CollectResend(ctx api.StreamContext, data interface{}) error {
+	ctx.GetLogger().Debugf("resend %+v", data)
+	return s.collectWithTopic(ctx, data, s.resendTopic)
+}
+
 func (s *sink) Close(ctx api.StreamContext) error {
 	ctx.GetLogger().Debugf("closing memory sink")
 	pubsub.RemovePub(s.topic)