dynamic_channel_buffer.go 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. package node
  2. import (
  3. "github.com/lf-edge/ekuiper/pkg/api"
  4. "sync/atomic"
  5. )
  6. type DynamicChannelBuffer struct {
  7. limit int64
  8. In chan api.SourceTuple
  9. Out chan api.SourceTuple
  10. buffer []api.SourceTuple
  11. done chan bool
  12. }
  13. func NewDynamicChannelBuffer() *DynamicChannelBuffer {
  14. buffer := &DynamicChannelBuffer{
  15. In: make(chan api.SourceTuple),
  16. Out: make(chan api.SourceTuple),
  17. buffer: make([]api.SourceTuple, 0),
  18. limit: 102400,
  19. done: make(chan bool, 1),
  20. }
  21. go buffer.run()
  22. return buffer
  23. }
  24. func (b *DynamicChannelBuffer) SetLimit(limit int) {
  25. if limit > 0 {
  26. atomic.StoreInt64(&b.limit, int64(limit))
  27. }
  28. }
  29. func (b *DynamicChannelBuffer) run() {
  30. for {
  31. l := len(b.buffer)
  32. if int64(l) >= atomic.LoadInt64(&b.limit) {
  33. select {
  34. case b.Out <- b.buffer[0]:
  35. b.buffer = b.buffer[1:]
  36. case <-b.done:
  37. return
  38. }
  39. } else if l > 0 {
  40. select {
  41. case b.Out <- b.buffer[0]:
  42. b.buffer = b.buffer[1:]
  43. case value := <-b.In:
  44. b.buffer = append(b.buffer, value)
  45. case <-b.done:
  46. return
  47. }
  48. } else {
  49. select {
  50. case value := <-b.In:
  51. b.buffer = append(b.buffer, value)
  52. case <-b.done:
  53. return
  54. }
  55. }
  56. }
  57. }
  58. func (b *DynamicChannelBuffer) GetLength() int {
  59. return len(b.buffer)
  60. }
  61. func (b *DynamicChannelBuffer) Close() {
  62. b.done <- true
  63. }