瀏覽代碼

opt(buffer): Optimize dynamic buffer and add unit test (#1416)

Signed-off-by: Jiyong Huang <huangjy@emqx.io>

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
ngjaying 2 年之前
父節點
當前提交
299f4be2bb
共有 2 個文件被更改,包括 53 次插入1 次删除
  1. 4 1
      internal/topo/node/dynamic_channel_buffer.go
  2. 49 0
      internal/topo/node/dynamic_channel_buffer_test.go

+ 4 - 1
internal/topo/node/dynamic_channel_buffer.go

@@ -29,7 +29,7 @@ type DynamicChannelBuffer struct {
 
 
 func NewDynamicChannelBuffer() *DynamicChannelBuffer {
 func NewDynamicChannelBuffer() *DynamicChannelBuffer {
 	buffer := &DynamicChannelBuffer{
 	buffer := &DynamicChannelBuffer{
-		In:     make(chan api.SourceTuple, 10),
+		In:     make(chan api.SourceTuple, 1024),
 		Out:    make(chan api.SourceTuple),
 		Out:    make(chan api.SourceTuple),
 		buffer: make([]api.SourceTuple, 0),
 		buffer: make([]api.SourceTuple, 0),
 		limit:  102400,
 		limit:  102400,
@@ -58,8 +58,10 @@ func (b *DynamicChannelBuffer) run() {
 		} else if l > 0 {
 		} else if l > 0 {
 			select {
 			select {
 			case b.Out <- b.buffer[0]:
 			case b.Out <- b.buffer[0]:
+				//fmt.Println("out loud")
 				b.buffer = b.buffer[1:]
 				b.buffer = b.buffer[1:]
 			case value := <-b.In:
 			case value := <-b.In:
+				//fmt.Printf("in loud with length %d\n", len(b.In))
 				b.buffer = append(b.buffer, value)
 				b.buffer = append(b.buffer, value)
 			case <-b.done:
 			case <-b.done:
 				return
 				return
@@ -67,6 +69,7 @@ func (b *DynamicChannelBuffer) run() {
 		} else {
 		} else {
 			select {
 			select {
 			case value := <-b.In:
 			case value := <-b.In:
+				//fmt.Printf("in quiet with length %d \n", len(b.In))
 				b.buffer = append(b.buffer, value)
 				b.buffer = append(b.buffer, value)
 			case <-b.done:
 			case <-b.done:
 				return
 				return

+ 49 - 0
internal/topo/node/dynamic_channel_buffer_test.go

@@ -0,0 +1,49 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package node
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"testing"
+	"time"
+)
+
+func TestBuffer(t *testing.T) {
+	b := NewDynamicChannelBuffer()
+	b.SetLimit(100)
+	stopSign := make(chan struct{})
+	go func(done chan struct{}) {
+		for i := 0; i < 100; i++ {
+			select {
+			case b.In <- api.NewDefaultSourceTuple(map[string]interface{}{"a": 5}, nil):
+				fmt.Printf("feed in %d\n", i)
+			default:
+				t.Errorf("message %d dropped, should not drop message", i)
+			}
+		}
+		close(done)
+	}(stopSign)
+	for i := 0; i < 50; i++ {
+		_ = <-b.Out
+		fmt.Printf("eaten %d\n", i)
+		time.Sleep(10 * time.Millisecond)
+	}
+	<-stopSign
+	l := b.GetLength()
+	if l != 50 {
+		t.Errorf("Expect buffer length 50, but got %d", l)
+	}
+}