Просмотр исходного кода

fix(source): mqtt source should stop executing after errors (#1008)

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
ngjaying 3 лет назад
Родитель
Сommit
031cd83a7f
1 измененных файлов с 9 добавлено и 4 удалено
  1. 9 4
      internal/topo/source/mqtt_source.go

+ 9 - 4
internal/topo/source/mqtt_source.go

@@ -58,7 +58,7 @@ type MQTTConfig struct {
 	KubeedgeVersion   string   `json:"kubeedgeVersion"`
 	KubeedgeVersion   string   `json:"kubeedgeVersion"`
 }
 }
 
 
-func (ms *MQTTSource) WithSchema(schema string) *MQTTSource {
+func (ms *MQTTSource) WithSchema(_ string) *MQTTSource {
 	return ms
 	return ms
 }
 }
 
 
@@ -104,11 +104,12 @@ func (ms *MQTTSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTupl
 
 
 	opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetProtocolVersion(ms.pVersion)
 	opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetProtocolVersion(ms.pVersion)
 	if ms.clientid == "" {
 	if ms.clientid == "" {
-		if uuid, err := uuid.NewUUID(); err != nil {
+		if newUUID, err := uuid.NewUUID(); err != nil {
 			errCh <- fmt.Errorf("failed to get uuid, the error is %s", err)
 			errCh <- fmt.Errorf("failed to get uuid, the error is %s", err)
+			return
 		} else {
 		} else {
-			ms.clientid = uuid.String()
-			opts.SetClientID(uuid.String())
+			ms.clientid = newUUID.String()
+			opts.SetClientID(newUUID.String())
 		}
 		}
 	} else {
 	} else {
 		opts.SetClientID(ms.clientid)
 		opts.SetClientID(ms.clientid)
@@ -122,14 +123,17 @@ func (ms *MQTTSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTupl
 				log.Infof("The private key file is %s.", kp)
 				log.Infof("The private key file is %s.", kp)
 				if cer, err2 := tls.LoadX509KeyPair(cp, kp); err2 != nil {
 				if cer, err2 := tls.LoadX509KeyPair(cp, kp); err2 != nil {
 					errCh <- err2
 					errCh <- err2
+					return
 				} else {
 				} else {
 					opts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}})
 					opts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}})
 				}
 				}
 			} else {
 			} else {
 				errCh <- err1
 				errCh <- err1
+				return
 			}
 			}
 		} else {
 		} else {
 			errCh <- err
 			errCh <- err
+			return
 		}
 		}
 	} else {
 	} else {
 		log.Infof("Connect MQTT broker with username and password.")
 		log.Infof("Connect MQTT broker with username and password.")
@@ -163,6 +167,7 @@ func (ms *MQTTSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTupl
 	c := MQTT.NewClient(opts)
 	c := MQTT.NewClient(opts)
 	if token := c.Connect(); token.Wait() && token.Error() != nil {
 	if token := c.Connect(); token.Wait() && token.Error() != nil {
 		errCh <- fmt.Errorf("found error when connecting to %s: %s", ms.srv, token.Error())
 		errCh <- fmt.Errorf("found error when connecting to %s: %s", ms.srv, token.Error())
+		return
 	}
 	}
 	log.Infof("The connection to server %s was established successfully", ms.srv)
 	log.Infof("The connection to server %s was established successfully", ms.srv)
 	ms.conn = c
 	ms.conn = c