浏览代码

feat(sink): Limit the sink error which will trigger retry attempts

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 3 年之前
父节点
当前提交
a6e0dda8c3

+ 2 - 0
docs/en_US/extension/native/sink.md

@@ -30,6 +30,8 @@ Most of the time, the map content will be the selective fields. But if `sendErro
 
 
 The developer can fetch the transformed result from the context method `ctx.TransformOutput()`. The return values are the transformed value of `[]byte` type. Currently, it will be transformed to the json byte array be default or formatted with the set [`dataTemlate` property](../../rules/overview.md#data-template). If the value is transformed by dataTemplate, the second return value will be true. 
 The developer can fetch the transformed result from the context method `ctx.TransformOutput()`. The return values are the transformed value of `[]byte` type. Currently, it will be transformed to the json byte array be default or formatted with the set [`dataTemlate` property](../../rules/overview.md#data-template). If the value is transformed by dataTemplate, the second return value will be true. 
 
 
+The developer can return any errors. However, to leverage the retry feature of eKuiper, the developer must return an error whose message starts with "io error".
+
 ```go
 ```go
 //Called when each row of data has transferred to this sink
 //Called when each row of data has transferred to this sink
 Collect(ctx StreamContext, data interface{}) error
 Collect(ctx StreamContext, data interface{}) error

+ 2 - 0
docs/zh_CN/extension/native/sink.md

@@ -33,6 +33,8 @@ Sink (目标)的主要任务是实现 _collect_ 方法。 当 eKuiper 将任
 
 
 需要注意的是,当 [`dataTemlate` 属性](../../rules/overview.md#数据模板) 设置时,开发者可通过 context 方法`ctx.TransformOutput()` 获取转换后的数据。若数据模板未设置,则该方法返回空值。
 需要注意的是,当 [`dataTemlate` 属性](../../rules/overview.md#数据模板) 设置时,开发者可通过 context 方法`ctx.TransformOutput()` 获取转换后的数据。若数据模板未设置,则该方法返回空值。
 
 
+该方法可以返回任何错误类型。但是,如果想要让自动重试机制生效,返回的错误消息必须以 "io error" 开头。大多数情况下,也只有 io 问题才有重试的需要。
+
 ```go
 ```go
 //Called when each row of data has transferred to this sink
 //Called when each row of data has transferred to this sink
 Collect(ctx StreamContext, data interface{}) error
 Collect(ctx StreamContext, data interface{}) error

+ 8 - 6
extensions/sinks/redis/redis.go

@@ -16,6 +16,8 @@ package main
 
 
 import (
 import (
 	"errors"
 	"errors"
+	"fmt"
+	"github.com/lf-edge/ekuiper/pkg/errorx"
 	"time"
 	"time"
 
 
 	"github.com/go-redis/redis/v7"
 	"github.com/go-redis/redis/v7"
@@ -143,14 +145,14 @@ func (r *RedisSink) Collect(ctx api.StreamContext, data interface{}) error {
 					err := r.cli.LPush(key, v).Err()
 					err := r.cli.LPush(key, v).Err()
 					if err != nil {
 					if err != nil {
 						logger.Error(err)
 						logger.Error(err)
-						return err
+						return fmt.Errorf("%s:%s", errorx.IOErr, err.Error())
 					}
 					}
 					logger.Debugf("send redis list success, key:%s data: %s", key, string(v))
 					logger.Debugf("send redis list success, key:%s data: %s", key, string(v))
 				} else {
 				} else {
 					err := r.cli.Set(key, v, r.expiration*time.Second).Err()
 					err := r.cli.Set(key, v, r.expiration*time.Second).Err()
 					if err != nil {
 					if err != nil {
 						logger.Error(err)
 						logger.Error(err)
-						return err
+						return fmt.Errorf("%s:%s", errorx.IOErr, err.Error())
 					}
 					}
 					logger.Debugf("send redis string success, key:%s data: %s", key, string(v))
 					logger.Debugf("send redis string success, key:%s data: %s", key, string(v))
 				}
 				}
@@ -166,14 +168,14 @@ func (r *RedisSink) Collect(ctx api.StreamContext, data interface{}) error {
 				err := r.cli.LPush(key, v).Err()
 				err := r.cli.LPush(key, v).Err()
 				if err != nil {
 				if err != nil {
 					logger.Error(err)
 					logger.Error(err)
-					return err
+					return fmt.Errorf("%s:%s", errorx.IOErr, err.Error())
 				}
 				}
 				logger.Debugf("send redis list success, key:%s data: %s", key, string(v))
 				logger.Debugf("send redis list success, key:%s data: %s", key, string(v))
 			} else {
 			} else {
 				err := r.cli.Set(key, v, r.expiration*time.Second).Err()
 				err := r.cli.Set(key, v, r.expiration*time.Second).Err()
 				if err != nil {
 				if err != nil {
 					logger.Error(err)
 					logger.Error(err)
-					return err
+					return fmt.Errorf("%s:%s", errorx.IOErr, err.Error())
 				}
 				}
 				logger.Debugf("send redis string success, key:%s data: %s", key, string(v))
 				logger.Debugf("send redis string success, key:%s data: %s", key, string(v))
 			}
 			}
@@ -183,14 +185,14 @@ func (r *RedisSink) Collect(ctx api.StreamContext, data interface{}) error {
 			err := r.cli.LPush(r.key, v).Err()
 			err := r.cli.LPush(r.key, v).Err()
 			if err != nil {
 			if err != nil {
 				logger.Error(err)
 				logger.Error(err)
-				return err
+				return fmt.Errorf("%s:%s", errorx.IOErr, err.Error())
 			}
 			}
 			logger.Debugf("send redis list success, key:%s data: %s", r.key, string(v))
 			logger.Debugf("send redis list success, key:%s data: %s", r.key, string(v))
 		} else {
 		} else {
 			err := r.cli.Set(r.key, v, r.expiration*time.Second).Err()
 			err := r.cli.Set(r.key, v, r.expiration*time.Second).Err()
 			if err != nil {
 			if err != nil {
 				logger.Error(err)
 				logger.Error(err)
-				return err
+				return fmt.Errorf("%s:%s", errorx.IOErr, err.Error())
 			}
 			}
 			logger.Debugf("send redis string success, key:%s data: %s", r.key, string(v))
 			logger.Debugf("send redis string success, key:%s data: %s", r.key, string(v))
 		}
 		}

+ 2 - 1
extensions/sinks/tdengine/tdengine.go

@@ -22,6 +22,7 @@ import (
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
+	"github.com/lf-edge/ekuiper/pkg/errorx"
 	_ "github.com/taosdata/driver-go/v2/taosSql"
 	_ "github.com/taosdata/driver-go/v2/taosSql"
 	"reflect"
 	"reflect"
 	"strings"
 	"strings"
@@ -169,7 +170,7 @@ func (m *taosSink) Collect(ctx api.StreamContext, item interface{}) error {
 		logger.Debugf(sql)
 		logger.Debugf(sql)
 		rows, err := m.db.Query(sql)
 		rows, err := m.db.Query(sql)
 		if err != nil {
 		if err != nil {
-			return err
+			return fmt.Errorf("%s:%s", errorx.IOErr, err.Error())
 		}
 		}
 		rows.Close()
 		rows.Close()
 	}
 	}

+ 3 - 1
extensions/sinks/zmq/zmq.go

@@ -17,6 +17,7 @@ package main
 import (
 import (
 	"fmt"
 	"fmt"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/errorx"
 	zmq "github.com/pebbe/zmq4"
 	zmq "github.com/pebbe/zmq4"
 )
 )
 
 
@@ -81,7 +82,8 @@ func (m *zmqSink) Collect(ctx api.StreamContext, item interface{}) (err error) {
 		logger.Debug("zmq sink receive non byte data %v", item)
 		logger.Debug("zmq sink receive non byte data %v", item)
 	}
 	}
 	if err != nil {
 	if err != nil {
-		logger.Debugf("send to zmq error %v", err)
+		logger.Errorf("send to zmq error %v", err)
+		return fmt.Errorf("%s:%s", errorx.IOErr, err.Error())
 	}
 	}
 	return
 	return
 }
 }

+ 7 - 1
internal/plugin/portable/runtime/sink.go

@@ -15,7 +15,9 @@
 package runtime
 package runtime
 
 
 import (
 import (
+	"fmt"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/errorx"
 )
 )
 
 
 type PortableSink struct {
 type PortableSink struct {
@@ -82,7 +84,11 @@ func (ps *PortableSink) Collect(ctx api.StreamContext, item interface{}) error {
 	ctx.GetLogger().Debugf("Receive %+v", item)
 	ctx.GetLogger().Debugf("Receive %+v", item)
 	if val, _, err := ctx.TransformOutput(); err == nil {
 	if val, _, err := ctx.TransformOutput(); err == nil {
 		ctx.GetLogger().Debugf("Send %s", val)
 		ctx.GetLogger().Debugf("Send %s", val)
-		return ps.dataCh.Send(val)
+		e := ps.dataCh.Send(val)
+		if e != nil {
+			return fmt.Errorf("%s:%s", errorx.IOErr, e)
+		}
+		return nil
 	} else {
 	} else {
 		ctx.GetLogger().Errorf("Found error %s", err.Error())
 		ctx.GetLogger().Errorf("Found error %s", err.Error())
 		return err
 		return err

+ 2 - 1
internal/topo/node/sink_node.go

@@ -22,6 +22,7 @@ import (
 	"github.com/lf-edge/ekuiper/internal/topo/transform"
 	"github.com/lf-edge/ekuiper/internal/topo/transform"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
+	"github.com/lf-edge/ekuiper/pkg/errorx"
 	"strings"
 	"strings"
 	"sync"
 	"sync"
 	"time"
 	"time"
@@ -296,7 +297,7 @@ func doCollectData(ctx api.StreamContext, sink api.Sink, outData interface{}, st
 			if err := sink.Collect(vCtx, outData); err != nil {
 			if err := sink.Collect(vCtx, outData); err != nil {
 				stats.IncTotalExceptions()
 				stats.IncTotalExceptions()
 				ctx.GetLogger().Warnf("sink node %s instance %d publish %s error: %v", ctx.GetOpId(), ctx.GetInstanceId(), outData, err)
 				ctx.GetLogger().Warnf("sink node %s instance %d publish %s error: %v", ctx.GetOpId(), ctx.GetInstanceId(), outData, err)
-				if sconf.RetryInterval > 0 && retries > 0 && strings.HasPrefix(err.Error(), "io error") {
+				if sconf.RetryInterval > 0 && retries > 0 && strings.HasPrefix(err.Error(), errorx.IOErr) {
 					retries--
 					retries--
 					time.Sleep(time.Duration(sconf.RetryInterval) * time.Millisecond)
 					time.Sleep(time.Duration(sconf.RetryInterval) * time.Millisecond)
 					ctx.GetLogger().Debugf("try again")
 					ctx.GetLogger().Debugf("try again")

+ 4 - 3
internal/topo/sink/edgex_sink.go

@@ -29,6 +29,7 @@ import (
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
+	"github.com/lf-edge/ekuiper/pkg/errorx"
 	"reflect"
 	"reflect"
 )
 )
 
 
@@ -436,7 +437,7 @@ func (ems *EdgexMsgBusSink) getMeta(result []map[string]interface{}) *meta {
 	return newMetaFromMap(nil)
 	return newMetaFromMap(nil)
 }
 }
 
 
-func (ems *EdgexMsgBusSink) Collect(ctx api.StreamContext, item interface{}) error {
+func (ems *EdgexMsgBusSink) Collect(ctx api.StreamContext, _ interface{}) error {
 	logger := ctx.GetLogger()
 	logger := ctx.GetLogger()
 	if payload, _, err := ctx.TransformOutput(); err == nil {
 	if payload, _, err := ctx.TransformOutput(); err == nil {
 		logger.Debugf("EdgeX message bus sink: %s\n", payload)
 		logger.Debugf("EdgeX message bus sink: %s\n", payload)
@@ -470,8 +471,8 @@ func (ems *EdgexMsgBusSink) Collect(ctx api.StreamContext, item interface{}) err
 		}
 		}
 
 
 		if e := ems.client.Publish(env, topic); e != nil {
 		if e := ems.client.Publish(env, topic); e != nil {
-			logger.Errorf("Found error %s when publish to EdgeX message bus.\n", e)
-			return e
+			logger.Errorf("%s: found error %s when publish to EdgeX message bus.\n", e)
+			return fmt.Errorf("%s:%s", errorx.IOErr, e.Error())
 		}
 		}
 		logger.Debugf("Published %+v to EdgeX message bus topic %s", evt, topic)
 		logger.Debugf("Published %+v to EdgeX message bus topic %s", evt, topic)
 	} else {
 	} else {

+ 2 - 1
internal/topo/sink/mqtt_sink.go

@@ -22,6 +22,7 @@ import (
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
+	"github.com/lf-edge/ekuiper/pkg/errorx"
 	"strings"
 	"strings"
 )
 )
 
 
@@ -257,7 +258,7 @@ func (ms *MQTTSink) Collect(ctx api.StreamContext, item interface{}) error {
 		return fmt.Errorf("the value %v of dynamic prop %s for topic is not a string", ms.tpc, tpc)
 		return fmt.Errorf("the value %v of dynamic prop %s for topic is not a string", ms.tpc, tpc)
 	}
 	}
 	if token := c.Publish(tpc.(string), ms.qos, ms.retained, jsonBytes); token.Wait() && token.Error() != nil {
 	if token := c.Publish(tpc.(string), ms.qos, ms.retained, jsonBytes); token.Wait() && token.Error() != nil {
-		return fmt.Errorf("publish error: %s", token.Error())
+		return fmt.Errorf("%s: %s", errorx.IOErr, token.Error())
 	}
 	}
 	return nil
 	return nil
 }
 }

+ 3 - 2
internal/topo/sink/rest_sink.go

@@ -19,6 +19,7 @@ import (
 	"fmt"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/pkg/httpx"
 	"github.com/lf-edge/ekuiper/internal/pkg/httpx"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/errorx"
 	"io/ioutil"
 	"io/ioutil"
 	"net/http"
 	"net/http"
 	"net/url"
 	"net/url"
@@ -199,10 +200,10 @@ func (ms *RestSink) Collect(ctx api.StreamContext, item interface{}) error {
 		if resp.StatusCode < 200 || resp.StatusCode > 299 {
 		if resp.StatusCode < 200 || resp.StatusCode > 299 {
 			if buf, bodyErr := ioutil.ReadAll(resp.Body); bodyErr != nil {
 			if buf, bodyErr := ioutil.ReadAll(resp.Body); bodyErr != nil {
 				logger.Errorf("%s\n", bodyErr)
 				logger.Errorf("%s\n", bodyErr)
-				return fmt.Errorf("rest sink fails to err http return code: %d and error message %s.", resp.StatusCode, bodyErr)
+				return fmt.Errorf("%s: http return code: %d and error message %s", errorx.IOErr, resp.StatusCode, bodyErr)
 			} else {
 			} else {
 				logger.Errorf("%s\n", string(buf))
 				logger.Errorf("%s\n", string(buf))
-				return fmt.Errorf("rest sink fails to err http return code: %d and error message %s.", resp.StatusCode, string(buf))
+				return fmt.Errorf("%s: http return code: %d and error message %s", errorx.IOErr, resp.StatusCode, string(buf))
 			}
 			}
 		} else {
 		} else {
 			if ms.debugResp {
 			if ms.debugResp {

+ 2 - 0
pkg/errorx/errors.go

@@ -23,6 +23,8 @@ const (
 	NOT_FOUND
 	NOT_FOUND
 )
 )
 
 
+const IOErr = "io error"
+
 var NotFoundErr = NewWithCode(NOT_FOUND, "not found")
 var NotFoundErr = NewWithCode(NOT_FOUND, "not found")
 
 
 type Error struct {
 type Error struct {