Parcourir la source

fix(source): should exit only when receiving fatal error

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang il y a 2 ans
Parent
commit
08977f106f
1 fichiers modifiés avec 3 ajouts et 2 suppressions
  1. 3 2
      internal/topo/source/mqtt_source.go

+ 3 - 2
internal/topo/source/mqtt_source.go

@@ -93,6 +93,7 @@ func (ms *MQTTSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTupl
 	}
 	}
 }
 }
 
 
+// should only return fatal error
 func subscribe(ms *MQTTSource, ctx api.StreamContext, consumer chan<- api.SourceTuple) error {
 func subscribe(ms *MQTTSource, ctx api.StreamContext, consumer chan<- api.SourceTuple) error {
 	log := ctx.GetLogger()
 	log := ctx.GetLogger()
 
 
@@ -123,13 +124,13 @@ func subscribe(ms *MQTTSource, ctx api.StreamContext, consumer chan<- api.Source
 				msg, ok := env.(pahoMqtt.Message)
 				msg, ok := env.(pahoMqtt.Message)
 				if !ok {
 				if !ok {
 					log.Errorf("can not convert interface data to mqtt message %s.", ms.tpc)
 					log.Errorf("can not convert interface data to mqtt message %s.", ms.tpc)
-					return nil
+					continue
 				}
 				}
 				result, e := ctx.Decode(msg.Payload())
 				result, e := ctx.Decode(msg.Payload())
 				//The unmarshal type can only be bool, float64, string, []interface{}, map[string]interface{}, nil
 				//The unmarshal type can only be bool, float64, string, []interface{}, map[string]interface{}, nil
 				if e != nil {
 				if e != nil {
 					log.Errorf("Invalid data format, cannot decode %s to %s format with error %s", string(msg.Payload()), ms.format, e)
 					log.Errorf("Invalid data format, cannot decode %s to %s format with error %s", string(msg.Payload()), ms.format, e)
-					return e
+					continue
 				}
 				}
 
 
 				meta := make(map[string]interface{})
 				meta := make(map[string]interface{})