Переглянути джерело

fix(mqtt): add publish/sub timeout (#2243)

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
ngjaying 1 рік тому
батько
коміт
e0a31968a0
1 змінених файлів з 21 додано та 11 видалено
  1. 21 11
      internal/topo/connection/clients/mqtt/mqtt.go

+ 21 - 11
internal/topo/connection/clients/mqtt/mqtt.go

@@ -1,4 +1,4 @@
-// Copyright 2022 EMQ Technologies Co., Ltd.
+// Copyright 2022-2023 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -124,13 +124,10 @@ func (ms *MQTTClient) Connect(connHandler MQTT.OnConnectHandler, lostHandler MQT
 
 
 	c := MQTT.NewClient(opts)
 	c := MQTT.NewClient(opts)
 	token := c.Connect()
 	token := c.Connect()
-	// timeout
-	if !token.WaitTimeout(5 * time.Second) {
-		conf.Log.Errorf("The connection to mqtt broker %s failed: connection timeout", ms.srv)
-		return fmt.Errorf("found error when connecting for %s: timeout", ms.srv)
-	} else if token.Error() != nil {
-		conf.Log.Errorf("The connection to mqtt broker %s failed : %s ", ms.srv, token.Error())
-		return fmt.Errorf("found error when connecting for %s: %s", ms.srv, token.Error())
+	err := handleToken(token)
+	if err != nil {
+		conf.Log.Errorf("The connection to mqtt broker %s failed: %s", ms.srv, err)
+		return fmt.Errorf("found error when connecting for %s: %s", ms.srv, err)
 	}
 	}
 	conf.Log.Infof("The connection to mqtt broker is established successfully for %s.", ms.srv)
 	conf.Log.Infof("The connection to mqtt broker is established successfully for %s.", ms.srv)
 	ms.conn = c
 	ms.conn = c
@@ -138,14 +135,27 @@ func (ms *MQTTClient) Connect(connHandler MQTT.OnConnectHandler, lostHandler MQT
 }
 }
 
 
 func (ms *MQTTClient) Subscribe(topic string, qos byte, handler MQTT.MessageHandler) error {
 func (ms *MQTTClient) Subscribe(topic string, qos byte, handler MQTT.MessageHandler) error {
-	if token := ms.conn.Subscribe(topic, qos, handler); token.WaitTimeout(5*time.Second) && token.Error() != nil {
-		return fmt.Errorf("%s: %s", errorx.IOErr, token.Error())
+	token := ms.conn.Subscribe(topic, qos, handler)
+	err := handleToken(token)
+	if err != nil {
+		return fmt.Errorf("found error when subscribing to %s of topic %s: %s", ms.srv, topic, err)
 	}
 	}
 	return nil
 	return nil
 }
 }
 
 
 func (ms *MQTTClient) Publish(topic string, qos byte, retained bool, message []byte) error {
 func (ms *MQTTClient) Publish(topic string, qos byte, retained bool, message []byte) error {
-	if token := ms.conn.Publish(topic, qos, retained, message); token.WaitTimeout(5*time.Second) && token.Error() != nil {
+	token := ms.conn.Publish(topic, qos, retained, message)
+	err := handleToken(token)
+	if err != nil {
+		return fmt.Errorf("found error when publishing to %s of topic %s: %s", ms.srv, topic, err)
+	}
+	return nil
+}
+
+func handleToken(token MQTT.Token) error {
+	if !token.WaitTimeout(5 * time.Second) {
+		return fmt.Errorf("%s: timeout", errorx.IOErr)
+	} else if token.Error() != nil {
 		return fmt.Errorf("%s: %s", errorx.IOErr, token.Error())
 		return fmt.Errorf("%s: %s", errorx.IOErr, token.Error())
 	}
 	}
 	return nil
 	return nil