dynamic_channel_buffer.go 1.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. package nodes
  2. import (
  3. "sync/atomic"
  4. "github.com/emqx/kuiper/xstream/api"
  5. )
  6. type DynamicChannelBuffer struct {
  7. limit int64
  8. In chan api.SourceTuple
  9. Out chan api.SourceTuple
  10. buffer []api.SourceTuple
  11. }
  12. func NewDynamicChannelBuffer() *DynamicChannelBuffer {
  13. buffer := &DynamicChannelBuffer{
  14. In: make(chan api.SourceTuple),
  15. Out: make(chan api.SourceTuple),
  16. buffer: make([]api.SourceTuple, 0),
  17. limit: 102400,
  18. }
  19. go buffer.run()
  20. return buffer
  21. }
  22. func (b *DynamicChannelBuffer) SetLimit(limit int) {
  23. if limit > 0 {
  24. atomic.StoreInt64(&b.limit, int64(limit))
  25. }
  26. }
  27. func (b *DynamicChannelBuffer) run() {
  28. for {
  29. l := len(b.buffer)
  30. if int64(l) >= atomic.LoadInt64(&b.limit) {
  31. b.Out <- b.buffer[0]
  32. b.buffer = b.buffer[1:]
  33. } else if l > 0 {
  34. select {
  35. case b.Out <- b.buffer[0]:
  36. b.buffer = b.buffer[1:]
  37. case value := <-b.In:
  38. b.buffer = append(b.buffer, value)
  39. }
  40. } else {
  41. value := <-b.In
  42. b.buffer = append(b.buffer, value)
  43. }
  44. }
  45. }
  46. func (b *DynamicChannelBuffer) GetLength() int {
  47. return len(b.buffer)
  48. }