dynamic_channel_buffer.go 834 B

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. package utils
  2. type DynamicChannelBuffer struct {
  3. In chan interface{}
  4. Out chan interface{}
  5. buffer []interface{}
  6. limit int
  7. }
  8. func NewDynamicChannelBuffer() *DynamicChannelBuffer {
  9. buffer := &DynamicChannelBuffer{
  10. In: make(chan interface{}),
  11. Out: make(chan interface{}),
  12. buffer: make([]interface{}, 0),
  13. limit: 102400,
  14. }
  15. go buffer.run()
  16. return buffer
  17. }
  18. func (b *DynamicChannelBuffer) SetLimit(limit int){
  19. if limit > 0 {
  20. b.limit = limit
  21. }
  22. }
  23. func (b *DynamicChannelBuffer) run() {
  24. for {
  25. l := len(b.buffer)
  26. if l >= b.limit{
  27. b.Out <- b.buffer[0]
  28. b.buffer = b.buffer[1:]
  29. }else if l > 0 {
  30. select {
  31. case b.Out <- b.buffer[0]:
  32. b.buffer = b.buffer[1:]
  33. case value := <- b.In:
  34. b.buffer = append(b.buffer, value)
  35. }
  36. } else {
  37. value := <- b.In
  38. b.buffer = append(b.buffer, value)
  39. }
  40. }
  41. }