Browse Source

fix(sink): hang when cache priority set (#2185)

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
ngjaying 1 year ago
parent
commit
565e145cd4
1 changed files with 10 additions and 6 deletions
  1. 10 6
      internal/topo/node/sink_node.go

+ 10 - 6
internal/topo/node/sink_node.go

@@ -321,15 +321,17 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 										receiveQ(data)
 									case data := <-dataOutCh:
 										normalQ(data)
-									case <-ctx.Done():
-										doneQ()
-										return nil
 									default:
 										select {
+										case data := <-m.input:
+											receiveQ(data)
 										case data := <-dataOutCh:
 											normalQ(data)
 										case data := <-rq.Out:
 											resendQ(data)
+										case <-ctx.Done():
+											doneQ()
+											return nil
 										}
 									}
 								}
@@ -340,15 +342,17 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 										receiveQ(data)
 									case data := <-rq.Out:
 										resendQ(data)
-									case <-ctx.Done():
-										doneQ()
-										return nil
 									default:
 										select {
+										case data := <-m.input:
+											receiveQ(data)
 										case data := <-dataOutCh:
 											normalQ(data)
 										case data := <-rq.Out:
 											resendQ(data)
+										case <-ctx.Done():
+											doneQ()
+											return nil
 										}
 									}
 								}