瀏覽代碼

fix(source): handle mqtt subscription errors gracefully

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>
Jianxiang Ran 2 年之前
父節點
當前提交
8701f5e39a
共有 1 個文件被更改,包括 6 次插入1 次删除
  1. 6 1
      internal/topo/connection/clients/mqtt/mqtt_wrapper.go

+ 6 - 1
internal/topo/connection/clients/mqtt/mqtt_wrapper.go

@@ -88,7 +88,12 @@ func (mc *mqttClientWrapper) onConnectHandler(_ pahoMqtt.Client) {
 		token := mc.cli.conn.Subscribe(topic, subscription.qos, subscription.topicHandler)
 		if token.Error() != nil {
 			for _, con := range subscription.topicConsumers {
-				con.SubErrors <- token.Error()
+				select {
+				case con.SubErrors <- token.Error():
+					break
+				default:
+					conf.Log.Warnf("consumer SubErrors channel full for request id %s", con.ConsumerId)
+				}
 			}
 		}
 	}