dynamic_channel_buffer.go 994 B

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. package nodes
  2. import "github.com/emqx/kuiper/xstream/api"
  3. type DynamicChannelBuffer struct {
  4. In chan api.SourceTuple
  5. Out chan api.SourceTuple
  6. buffer []api.SourceTuple
  7. limit int
  8. }
  9. func NewDynamicChannelBuffer() *DynamicChannelBuffer {
  10. buffer := &DynamicChannelBuffer{
  11. In: make(chan api.SourceTuple),
  12. Out: make(chan api.SourceTuple),
  13. buffer: make([]api.SourceTuple, 0),
  14. limit: 102400,
  15. }
  16. go buffer.run()
  17. return buffer
  18. }
  19. func (b *DynamicChannelBuffer) SetLimit(limit int) {
  20. if limit > 0 {
  21. b.limit = limit
  22. }
  23. }
  24. func (b *DynamicChannelBuffer) run() {
  25. for {
  26. l := len(b.buffer)
  27. if l >= b.limit {
  28. b.Out <- b.buffer[0]
  29. b.buffer = b.buffer[1:]
  30. } else if l > 0 {
  31. select {
  32. case b.Out <- b.buffer[0]:
  33. b.buffer = b.buffer[1:]
  34. case value := <-b.In:
  35. b.buffer = append(b.buffer, value)
  36. }
  37. } else {
  38. value := <-b.In
  39. b.buffer = append(b.buffer, value)
  40. }
  41. }
  42. }
  43. func (b *DynamicChannelBuffer) GetLength() int {
  44. return len(b.buffer)
  45. }