Explorar el Código

fix: send manager should be enlarged during runWithTicker (#1874)

* fix

Signed-off-by: yisaer <disxiaofei@163.com>

* add test

Signed-off-by: yisaer <disxiaofei@163.com>

---------

Signed-off-by: yisaer <disxiaofei@163.com>
Song Gao hace 1 año
padre
commit
0a175d3a16
Se han modificado 2 ficheros con 46 adiciones y 12 borrados
  1. 16 12
      internal/io/sink/send_manager.go
  2. 30 0
      internal/io/sink/send_manager_test.go

+ 16 - 12
internal/io/sink/send_manager.go

@@ -72,8 +72,7 @@ func (sm *SendManager) runWithTicker(ctx context.Context) {
 		case <-ctx.Done():
 			return
 		case d := <-sm.bufferCh:
-			sm.buffer[sm.currIndex] = d
-			sm.currIndex++
+			sm.appendDataInBuffer(d, false)
 		case <-ticker.C:
 			sm.send()
 		}
@@ -86,11 +85,7 @@ func (sm *SendManager) runWithBatchSize(ctx context.Context) {
 		case <-ctx.Done():
 			return
 		case d := <-sm.bufferCh:
-			sm.buffer[sm.currIndex] = d
-			sm.currIndex++
-			if sm.currIndex >= sm.batchSize {
-				sm.send()
-			}
+			sm.appendDataInBuffer(d, true)
 		}
 	}
 }
@@ -103,11 +98,7 @@ func (sm *SendManager) runWithTickerAndBatchSize(ctx context.Context) {
 		case <-ctx.Done():
 			return
 		case d := <-sm.bufferCh:
-			sm.buffer[sm.currIndex] = d
-			sm.currIndex++
-			if sm.currIndex >= sm.batchSize {
-				sm.send()
-			}
+			sm.appendDataInBuffer(d, true)
 		case <-ticker.C:
 			sm.send()
 		}
@@ -126,6 +117,19 @@ func (sm *SendManager) send() {
 	sm.outputCh <- list
 }
 
+func (sm *SendManager) appendDataInBuffer(d map[string]interface{}, sendData bool) {
+	if sm.currIndex >= len(sm.buffer) {
+		// The buffer should be enlarged if the data length is larger than capacity during runWithTicker
+		sm.buffer = append(sm.buffer, d)
+	} else {
+		sm.buffer[sm.currIndex] = d
+	}
+	sm.currIndex++
+	if sendData && sm.currIndex >= sm.batchSize {
+		sm.send()
+	}
+}
+
 func (sm *SendManager) GetOutputChan() <-chan []map[string]interface{} {
 	return sm.outputCh
 }

+ 30 - 0
internal/io/sink/send_manager_test.go

@@ -56,6 +56,12 @@ func TestSendManager(t *testing.T) {
 			lingerInterval: 100,
 			expectItems:    4,
 		},
+		{
+			sendCount:      6,
+			batchSize:      3,
+			lingerInterval: 3000,
+			expectItems:    3,
+		},
 	}
 	mc := conf.Clock.(*clock.Mock)
 	for i, tc := range testcases {
@@ -133,3 +139,27 @@ func TestCancelRun(t *testing.T) {
 		}
 	}
 }
+
+func TestEnlargeSendManagerCap(t *testing.T) {
+	sm, err := NewSendManager(0, 1000)
+	if err != nil {
+		t.Fatal(err)
+	}
+	count := 1025
+	for i := 0; i < count; i++ {
+		go sm.RecvData(map[string]interface{}{})
+		sm.appendDataInBuffer(<-sm.bufferCh, false)
+	}
+	if len(sm.buffer) != count {
+		t.Fatal(fmt.Sprintf("sm buffer should be %v", count))
+	}
+	if sm.currIndex != count {
+		t.Fatal(fmt.Sprintf("sm index should be %v", count))
+	}
+	originCap := cap(sm.buffer)
+	originLen := len(sm.buffer)
+	sm.send()
+	if sm.currIndex != 0 || originCap != cap(sm.buffer) || originLen != len(sm.buffer) {
+		t.Fatal("sm buffer capacity shouldn't be changed after send")
+	}
+}