Bladeren bron

feat(sink): fix kafka sink plugin write delay , refactor to IOErr retry mechanism (#1783)

Signed-off-by: carlclone <906561974@qq.com>
carlclone 2 jaren geleden
bovenliggende
commit
fa31061fd7
1 gewijzigde bestanden met toevoegingen van 22 en 69 verwijderingen
  1. 22 69
      extensions/sinks/kafka/kafka.go

+ 22 - 69
extensions/sinks/kafka/kafka.go

@@ -15,16 +15,15 @@
 package main
 package main
 
 
 import (
 import (
-	"context"
 	"fmt"
 	"fmt"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
+	"github.com/lf-edge/ekuiper/pkg/errorx"
 	kafkago "github.com/segmentio/kafka-go"
 	kafkago "github.com/segmentio/kafka-go"
 	"github.com/segmentio/kafka-go/sasl"
 	"github.com/segmentio/kafka-go/sasl"
 	"github.com/segmentio/kafka-go/sasl/plain"
 	"github.com/segmentio/kafka-go/sasl/plain"
 	"github.com/segmentio/kafka-go/sasl/scram"
 	"github.com/segmentio/kafka-go/sasl/scram"
 	"strings"
 	"strings"
-	"time"
 )
 )
 
 
 type kafkaSink struct {
 type kafkaSink struct {
@@ -100,8 +99,9 @@ func (m *kafkaSink) Open(ctx api.StreamContext) error {
 		Balancer:               &kafkago.LeastBytes{},
 		Balancer:               &kafkago.LeastBytes{},
 		Async:                  false,
 		Async:                  false,
 		AllowAutoTopicCreation: true,
 		AllowAutoTopicCreation: true,
-		MaxAttempts:            10,
+		MaxAttempts:            1,
 		RequiredAcks:           -1,
 		RequiredAcks:           -1,
+		BatchSize:              1,
 		Transport: &kafkago.Transport{
 		Transport: &kafkago.Transport{
 			SASL: mechanism,
 			SASL: mechanism,
 		},
 		},
@@ -133,78 +133,31 @@ func (m *kafkaSink) Collect(ctx api.StreamContext, item interface{}) error {
 		return fmt.Errorf("unrecognized format of %s", item)
 		return fmt.Errorf("unrecognized format of %s", item)
 	}
 	}
 
 
-	vctx, cancel := context.WithTimeout(ctx, 10*time.Second)
-	defer cancel()
-	err := m.kafkaWriteWithBackoff(vctx, ctx.GetLogger().Errorf, 100*time.Millisecond, time.Second, messages...)
-	if err != nil {
-		return err
-	}
-	logger.Debug("insert data into kafka success")
-	return nil
-}
-
-func (m *kafkaSink) kafkaWriteWithBackoff(ctx context.Context, log func(string, ...interface{}), interval, maxInterval time.Duration, messages ...kafkago.Message) error {
-	var berr error
-	tries := 0
-
-retry:
-	for {
-		tries++
-
-		err := m.writer.WriteMessages(ctx, messages...)
-		switch err := err.(type) {
-		case nil:
-			return nil
-
-		case kafkago.Error:
-			berr = err
-			if !err.Temporary() {
-				break retry
-			}
-
-		case kafkago.WriteErrors:
-			var remaining []kafkago.Message
-			for i, m := range messages {
-				switch err := err[i].(type) {
-				case nil:
+	err := m.writer.WriteMessages(ctx, messages...)
+	switch err := err.(type) {
+	case kafkago.Error:
+		if err.Temporary() {
+			return fmt.Errorf(`%s: kafka sink fails to send out the data . %v`, errorx.IOErr, err)
+		}
+	case kafkago.WriteErrors:
+		count := 0
+		for i := range messages {
+			switch err := err[i].(type) {
+			case nil:
+				continue
+
+			case kafkago.Error:
+				if err.Temporary() {
+					count++
 					continue
 					continue
-
-				case kafkago.Error:
-					if err.Temporary() {
-						remaining = append(remaining, m)
-						continue
-					}
 				}
 				}
-
-				return fmt.Errorf("failed to deliver messages: %v", err)
-			}
-
-			messages = remaining
-			berr = err
-
-		default:
-			if berr == nil || err != context.DeadlineExceeded {
-				berr = err
 			}
 			}
-			break retry
 		}
 		}
-
-		log("temporary write error: %v", err)
-
-		interval *= 2
-		if interval > maxInterval {
-			interval = maxInterval
-		}
-		timer := time.NewTimer(interval)
-		select {
-		case <-timer.C:
-		case <-ctx.Done():
-			timer.Stop()
-			break retry
+		if count == len(messages) {
+			return fmt.Errorf(`%s: kafka sink fails to send out the data . %v`, errorx.IOErr, err)
 		}
 		}
 	}
 	}
-
-	return fmt.Errorf("failed to deliver messages after %d tries: %v", tries, berr)
+	return err
 }
 }
 
 
 func (m *kafkaSink) Close(ctx api.StreamContext) error {
 func (m *kafkaSink) Close(ctx api.StreamContext) error {