Browse Source

feat: introduce sendManager in sink (#1875)

* introduce manager in sink

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: yisaer <disxiaofei@163.com>

---------

Signed-off-by: yisaer <disxiaofei@163.com>
Song Gao 1 year atrás
parent
commit
433e2ceede
2 changed files with 123 additions and 25 deletions
  1. 73 25
      internal/topo/node/sink_node.go
  2. 50 0
      internal/topo/node/sink_node_test.go

+ 73 - 25
internal/topo/node/sink_node.go

@@ -21,6 +21,7 @@ import (
 
 
 	"github.com/lf-edge/ekuiper/internal/binder/io"
 	"github.com/lf-edge/ekuiper/internal/binder/io"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
+	sinkUtil "github.com/lf-edge/ekuiper/internal/io/sink"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/internal/topo/node/cache"
 	"github.com/lf-edge/ekuiper/internal/topo/node/cache"
 	nodeConf "github.com/lf-edge/ekuiper/internal/topo/node/conf"
 	nodeConf "github.com/lf-edge/ekuiper/internal/topo/node/conf"
@@ -35,19 +36,28 @@ import (
 )
 )
 
 
 type SinkConf struct {
 type SinkConf struct {
-	Concurrency  int      `json:"concurrency"`
-	RunAsync     bool     `json:"runAsync"` // deprecated, will remove in the next release
-	Omitempty    bool     `json:"omitIfEmpty"`
-	SendSingle   bool     `json:"sendSingle"`
-	DataTemplate string   `json:"dataTemplate"`
-	Format       string   `json:"format"`
-	SchemaId     string   `json:"schemaId"`
-	Delimiter    string   `json:"delimiter"`
-	BufferLength int      `json:"bufferLength"`
-	Fields       []string `json:"fields"`
+	Concurrency    int      `json:"concurrency"`
+	RunAsync       bool     `json:"runAsync"` // deprecated, will remove in the next release
+	Omitempty      bool     `json:"omitIfEmpty"`
+	SendSingle     bool     `json:"sendSingle"`
+	DataTemplate   string   `json:"dataTemplate"`
+	Format         string   `json:"format"`
+	SchemaId       string   `json:"schemaId"`
+	Delimiter      string   `json:"delimiter"`
+	BufferLength   int      `json:"bufferLength"`
+	Fields         []string `json:"fields"`
+	BatchSize      int      `json:"batchSize"`
+	LingerInterval int      `json:"lingerInterval"`
 	conf.SinkConf
 	conf.SinkConf
 }
 }
 
 
+func (sc *SinkConf) isBatchSinkEnabled() bool {
+	if sc.BatchSize > 0 || sc.LingerInterval > 0 {
+		return true
+	}
+	return false
+}
+
 type SinkNode struct {
 type SinkNode struct {
 	*defaultSinkNode
 	*defaultSinkNode
 	// static
 	// static
@@ -155,6 +165,16 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 						m.statManagers = append(m.statManagers, stats)
 						m.statManagers = append(m.statManagers, stats)
 						m.mutex.Unlock()
 						m.mutex.Unlock()
 
 
+						var sendManager *sinkUtil.SendManager
+						if sconf.isBatchSinkEnabled() {
+							sendManager, err = sinkUtil.NewSendManager(sconf.BatchSize, sconf.LingerInterval)
+							if err != nil {
+								return err
+							}
+							go sendManager.Run(ctx)
+							go doCollectDataInBatch(ctx, sink, sendManager, stats)
+						}
+
 						if !sconf.EnableCache {
 						if !sconf.EnableCache {
 							for {
 							for {
 								select {
 								select {
@@ -169,7 +189,7 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 									if sconf.RunAsync {
 									if sconf.RunAsync {
 										conf.Log.Warnf("RunAsync is deprecated and ignored.")
 										conf.Log.Warnf("RunAsync is deprecated and ignored.")
 									}
 									}
-									err := doCollect(ctx, sink, data, stats, sconf)
+									err := doCollect(ctx, sink, data, sendManager, stats, sconf)
 									if err != nil {
 									if err != nil {
 										logger.Warnf("sink collect error: %v", err)
 										logger.Warnf("sink collect error: %v", err)
 									}
 									}
@@ -210,7 +230,7 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 									case data := <-c.Out:
 									case data := <-c.Out:
 										stats.ProcessTimeStart()
 										stats.ProcessTimeStart()
 										ack := true
 										ack := true
-										err := doCollectMaps(ctx, sink, sconf, data, stats)
+										err := doCollectMaps(ctx, sink, sconf, data, sendManager, stats)
 										// Only recoverable error should be cached
 										// Only recoverable error should be cached
 										if err != nil {
 										if err != nil {
 											if strings.HasPrefix(err.Error(), errorx.IOErr) { // do not log to prevent a lot of logs!
 											if strings.HasPrefix(err.Error(), errorx.IOErr) { // do not log to prevent a lot of logs!
@@ -298,7 +318,7 @@ func (m *SinkNode) reset() {
 	m.statManagers = nil
 	m.statManagers = nil
 }
 }
 
 
-func doCollect(ctx api.StreamContext, sink api.Sink, item interface{}, stats metric.StatManager, sconf *SinkConf) error {
+func doCollect(ctx api.StreamContext, sink api.Sink, item interface{}, sendManager *sinkUtil.SendManager, stats metric.StatManager, sconf *SinkConf) error {
 	stats.ProcessTimeStart()
 	stats.ProcessTimeStart()
 	defer stats.ProcessTimeEnd()
 	defer stats.ProcessTimeEnd()
 	outs := itemToMap(item)
 	outs := itemToMap(item)
@@ -306,12 +326,12 @@ func doCollect(ctx api.StreamContext, sink api.Sink, item interface{}, stats met
 		ctx.GetLogger().Debugf("receive empty in sink")
 		ctx.GetLogger().Debugf("receive empty in sink")
 		return nil
 		return nil
 	}
 	}
-	return doCollectMaps(ctx, sink, sconf, outs, stats)
+	return doCollectMaps(ctx, sink, sconf, outs, sendManager, stats)
 }
 }
 
 
-func doCollectMaps(ctx api.StreamContext, sink api.Sink, sconf *SinkConf, outs []map[string]interface{}, stats metric.StatManager) error {
+func doCollectMaps(ctx api.StreamContext, sink api.Sink, sconf *SinkConf, outs []map[string]interface{}, sendManager *sinkUtil.SendManager, stats metric.StatManager) error {
 	if !sconf.SendSingle {
 	if !sconf.SendSingle {
-		return doCollectData(ctx, sink, outs, stats)
+		return doCollectData(ctx, sink, outs, sendManager, stats)
 	} else {
 	} else {
 		var err error
 		var err error
 		for _, d := range outs {
 		for _, d := range outs {
@@ -319,7 +339,7 @@ func doCollectMaps(ctx api.StreamContext, sink api.Sink, sconf *SinkConf, outs [
 				ctx.GetLogger().Debugf("receive empty in sink")
 				ctx.GetLogger().Debugf("receive empty in sink")
 				continue
 				continue
 			}
 			}
-			newErr := doCollectData(ctx, sink, d, stats)
+			newErr := doCollectData(ctx, sink, d, sendManager, stats)
 			if newErr != nil {
 			if newErr != nil {
 				err = newErr
 				err = newErr
 			}
 			}
@@ -356,23 +376,51 @@ func itemToMap(item interface{}) []map[string]interface{} {
 }
 }
 
 
 // doCollectData outData must be map or []map
 // doCollectData outData must be map or []map
-func doCollectData(ctx api.StreamContext, sink api.Sink, outData interface{}, stats metric.StatManager) error {
+func doCollectData(ctx api.StreamContext, sink api.Sink, outData interface{}, sendManager *sinkUtil.SendManager, stats metric.StatManager) error {
+	if sendManager != nil {
+		switch v := outData.(type) {
+		case map[string]interface{}:
+			sendManager.RecvData(v)
+		case []map[string]interface{}:
+			for _, d := range v {
+				sendManager.RecvData(d)
+			}
+		}
+		return nil
+	}
 	select {
 	select {
 	case <-ctx.Done():
 	case <-ctx.Done():
 		ctx.GetLogger().Infof("sink node %s instance %d stops data resending", ctx.GetOpId(), ctx.GetInstanceId())
 		ctx.GetLogger().Infof("sink node %s instance %d stops data resending", ctx.GetOpId(), ctx.GetInstanceId())
 		return nil
 		return nil
 	default:
 	default:
-		if err := sink.Collect(ctx, outData); err != nil {
-			stats.IncTotalExceptions(err.Error())
-			return err
-		} else {
-			ctx.GetLogger().Debugf("success")
-			stats.IncTotalRecordsOut()
-			return nil
+		return sendDataToSink(ctx, sink, outData, stats)
+	}
+}
+
+func doCollectDataInBatch(ctx api.StreamContext, sink api.Sink, sendManager *sinkUtil.SendManager, stats metric.StatManager) {
+	for {
+		select {
+		case <-ctx.Done():
+			ctx.GetLogger().Infof("sink node %s instance %d stops data batch collecting", ctx.GetOpId(), ctx.GetInstanceId())
+		case outData := <-sendManager.GetOutputChan():
+			if err := sendDataToSink(ctx, sink, outData, stats); err != nil {
+				ctx.GetLogger().Warnf("sink node %s instance %d publish %s error: %v", ctx.GetOpId(), ctx.GetInstanceId(), outData, err)
+			}
 		}
 		}
 	}
 	}
 }
 }
 
 
+func sendDataToSink(ctx api.StreamContext, sink api.Sink, outData interface{}, stats metric.StatManager) error {
+	if err := sink.Collect(ctx, outData); err != nil {
+		stats.IncTotalExceptions(err.Error())
+		return err
+	} else {
+		ctx.GetLogger().Debugf("success")
+		stats.IncTotalRecordsOut()
+		return nil
+	}
+}
+
 func getSink(name string, action map[string]interface{}) (api.Sink, error) {
 func getSink(name string, action map[string]interface{}) (api.Sink, error) {
 	var (
 	var (
 		s   api.Sink
 		s   api.Sink

+ 50 - 0
internal/topo/node/sink_node_test.go

@@ -26,6 +26,7 @@ import (
 	"testing"
 	"testing"
 	"time"
 	"time"
 
 
+	"github.com/benbjohnson/clock"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/schema"
 	"github.com/lf-edge/ekuiper/internal/schema"
 	"github.com/lf-edge/ekuiper/internal/testx"
 	"github.com/lf-edge/ekuiper/internal/testx"
@@ -39,6 +40,55 @@ func init() {
 	testx.InitEnv()
 	testx.InitEnv()
 }
 }
 
 
+func TestBatchSink(t *testing.T) {
+	mc := conf.Clock.(*clock.Mock)
+	conf.InitConf()
+	transform.RegisterAdditionalFuncs()
+	var tests = []struct {
+		config map[string]interface{}
+		data   []map[string]interface{}
+		result [][]byte
+	}{
+		{
+			config: map[string]interface{}{
+				"batchSize": 2,
+			},
+			data:   []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}, {"ab": "hello3"}},
+			result: [][]byte{[]byte(`[{"ab":"hello1"},{"ab":"hello2"}]`)},
+		},
+		{
+			config: map[string]interface{}{
+				"lingerInterval": 1000,
+			},
+			data:   []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}, {"ab": "hello3"}},
+			result: [][]byte{[]byte(`[{"ab":"hello1"},{"ab":"hello2"},{"ab":"hello3"}]`)},
+		},
+	}
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := conf.Log.WithField("rule", "TestBatchSink")
+	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
+
+	for i, tt := range tests {
+		mc.Set(mc.Now())
+		mockSink := mocknode.NewMockSink()
+		s := NewSinkNodeWithSink("mockSink", mockSink, tt.config)
+		s.Open(ctx, make(chan error))
+		s.input <- tt.data
+		for i := 0; i < 10; i++ {
+			mc.Add(1 * time.Second)
+			time.Sleep(1 * time.Second)
+			// wait until mockSink get results
+			if len(mockSink.GetResults()) > 0 {
+				break
+			}
+		}
+		results := mockSink.GetResults()
+		if !reflect.DeepEqual(tt.result, results) {
+			t.Errorf("%d \tresult mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, results)
+		}
+	}
+}
+
 func TestSinkTemplate_Apply(t *testing.T) {
 func TestSinkTemplate_Apply(t *testing.T) {
 	conf.InitConf()
 	conf.InitConf()
 	transform.RegisterAdditionalFuncs()
 	transform.RegisterAdditionalFuncs()