瀏覽代碼

fix(source): do not exit the rule when connecting to message broker fail

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>
Jianxiang Ran 2 年之前
父節點
當前提交
4d251b6dee

+ 7 - 7
go.mod

@@ -7,8 +7,8 @@ require (
 	github.com/alicebob/miniredis/v2 v2.15.1
 	github.com/benbjohnson/clock v1.0.0
 	github.com/eclipse/paho.mqtt.golang v1.3.5
-	github.com/edgexfoundry/go-mod-core-contracts/v2 v2.1.0
-	github.com/edgexfoundry/go-mod-messaging/v2 v2.1.0
+	github.com/edgexfoundry/go-mod-core-contracts/v2 v2.2.0
+	github.com/edgexfoundry/go-mod-messaging/v2 v2.2.0
 	github.com/gdexlab/go-render v1.0.1
 	github.com/go-redis/redis/v7 v7.3.0
 	github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3
@@ -46,10 +46,10 @@ require (
 	github.com/cespare/xxhash/v2 v2.1.1 // indirect
 	github.com/cpuguy83/go-md2man v1.0.10 // indirect
 	github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect
-	github.com/fxamacker/cbor/v2 v2.3.0 // indirect
+	github.com/fxamacker/cbor/v2 v2.4.0 // indirect
 	github.com/go-playground/locales v0.14.0 // indirect
 	github.com/go-playground/universal-translator v0.18.0 // indirect
-	github.com/go-playground/validator/v10 v10.9.0 // indirect
+	github.com/go-playground/validator/v10 v10.10.1 // indirect
 	github.com/gorilla/websocket v1.4.2 // indirect
 	github.com/huandu/xstrings v1.3.2 // indirect
 	github.com/imdario/mergo v0.3.12 // indirect
@@ -71,10 +71,10 @@ require (
 	github.com/tebeka/strftime v0.1.5 // indirect
 	github.com/x448/float16 v0.8.4 // indirect
 	github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da // indirect
-	golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97 // indirect
-	golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 // indirect
+	golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3 // indirect
+	golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect
 	golang.org/x/sys v0.0.0-20220204135822-1c1b9b1eba6a // indirect
-	golang.org/x/text v0.3.6 // indirect
+	golang.org/x/text v0.3.7 // indirect
 )
 
 go 1.17

+ 15 - 3
go.sum

@@ -76,8 +76,10 @@ github.com/eclipse/paho.mqtt.golang v1.3.5 h1:sWtmgNxYM9P2sP+xEItMozsR3w0cqZFlqn
 github.com/eclipse/paho.mqtt.golang v1.3.5/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc=
 github.com/edgexfoundry/go-mod-core-contracts/v2 v2.1.0 h1:uphot3ZKOH0/aoo/Y5gr2NCRgGzy9RksWsXKtJRVEuQ=
 github.com/edgexfoundry/go-mod-core-contracts/v2 v2.1.0/go.mod h1:I6UhBPCREubcU0ouIGBdZlNG5Xx4NijUVN5rvEtD03k=
-github.com/edgexfoundry/go-mod-messaging/v2 v2.1.0 h1:vw2zYd7DF5eizT1B3X+lzpaKxajCft53jyl3B2QPGBs=
-github.com/edgexfoundry/go-mod-messaging/v2 v2.1.0/go.mod h1:bLKWB9yeOHLZoQtHLZlGwz8MjsMJIvHDFce7CcUb4fE=
+github.com/edgexfoundry/go-mod-core-contracts/v2 v2.2.0 h1:Sfi9jAIgRXZaJQw8Ji6+8//47D+iOyGiXQSNZXhy3HE=
+github.com/edgexfoundry/go-mod-core-contracts/v2 v2.2.0/go.mod h1:jyfVSx7mI3u/o/oo10COxBRBvJ8O/9I3z2xAwPmNt/Q=
+github.com/edgexfoundry/go-mod-messaging/v2 v2.2.0 h1:FdnA7hLq0U8PeMAIuJXt4KcXLAyGo7OjckzxTAwaoBc=
+github.com/edgexfoundry/go-mod-messaging/v2 v2.2.0/go.mod h1:+X6C0h8ZTJe+lLU2AGJfiAzCJK3zL+yM6cej9VC+79E=
 github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
 github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
 github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
@@ -93,6 +95,8 @@ github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2
 github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
 github.com/fxamacker/cbor/v2 v2.3.0 h1:aM45YGMctNakddNNAezPxDUpv38j44Abh+hifNuqXik=
 github.com/fxamacker/cbor/v2 v2.3.0/go.mod h1:TA1xS00nchWmaBnEIxPSE5oHLuJBAVvqrtAnWBwBCVo=
+github.com/fxamacker/cbor/v2 v2.4.0 h1:ri0ArlOR+5XunOP8CRUowT0pSJOwhW098ZCUyskZD88=
+github.com/fxamacker/cbor/v2 v2.4.0/go.mod h1:TA1xS00nchWmaBnEIxPSE5oHLuJBAVvqrtAnWBwBCVo=
 github.com/gdamore/optopia v0.2.0/go.mod h1:YKYEwo5C1Pa617H7NlPcmQXl+vG6YnSSNB44n8dNL0Q=
 github.com/gdexlab/go-render v1.0.1 h1:rxqB3vo5s4n1kF0ySmoNeSPRYkEsyHgln4jFIQY7v0U=
 github.com/gdexlab/go-render v1.0.1/go.mod h1:wRi5nW2qfjiGj4mPukH4UV0IknS1cHD4VgFTmJX5JzM=
@@ -112,6 +116,8 @@ github.com/go-playground/universal-translator v0.18.0 h1:82dyy6p4OuJq4/CByFNOn/j
 github.com/go-playground/universal-translator v0.18.0/go.mod h1:UvRDBj+xPUEGrFYl+lu/H90nyDXpg0fqeB/AQUGNTVA=
 github.com/go-playground/validator/v10 v10.9.0 h1:NgTtmN58D0m8+UuxtYmGztBJB7VnPgjj221I1QHci2A=
 github.com/go-playground/validator/v10 v10.9.0/go.mod h1:74x4gJWsvQexRdW8Pn3dXSGrTK4nAUsbPlLADvpJkos=
+github.com/go-playground/validator/v10 v10.10.1 h1:uA0+amWMiglNZKZ9FJRKUAe9U3RX91eVn1JYXMWt7ig=
+github.com/go-playground/validator/v10 v10.10.1/go.mod h1:i+3WkQ1FvaUjjxh1kSvIA4dMGDBiPU55YFDl0WbKdWU=
 github.com/go-redis/redis/v7 v7.3.0 h1:3oHqd0W7f/VLKBxeYTEpqdMUsmMectngjM9OtoRoIgg=
 github.com/go-redis/redis/v7 v7.3.0/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg=
 github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
@@ -413,6 +419,8 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
 golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
 golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97 h1:/UOmuWzQfxxo9UtlXMwuQU8CMgg1eZXqTRwkSQJWKOI=
 golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
+golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3 h1:0es+/5331RGQPcXlMfP+WrnIIS6dNnNRe0WB02W0F4M=
+golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
 golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
 golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
 golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
@@ -444,6 +452,8 @@ golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwY
 golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
 golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0=
 golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
+golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE=
+golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
 golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
 golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -486,10 +496,10 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
 golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069 h1:siQdpVirKtzPhKl3lZWozZraCFObP8S1v6PRp0bLrtU=
 golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20220204135822-1c1b9b1eba6a h1:ppl5mZgokTT8uPkmYOyEUmPTr3ypaKkg5eFOGrAmxxE=
 golang.org/x/sys v0.0.0-20220204135822-1c1b9b1eba6a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
@@ -500,6 +510,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
 golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
+golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
 golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
 golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

+ 14 - 2
internal/topo/connection/clients/edgex/edgex_wrapper.go

@@ -94,13 +94,25 @@ func (mc *edgexClientWrapper) Publish(c api.StreamContext, topic string, message
 	return nil
 }
 
-func (mc *edgexClientWrapper) messageHandler(topic string, sub *edgexSubscriptionInfo) func(stopChan chan struct{}, msgChan chan types.MessageEnvelope) {
+func (mc *edgexClientWrapper) messageHandler(topic string, sub *edgexSubscriptionInfo, messageErrors chan error) func(stopChan chan struct{}, msgChan chan types.MessageEnvelope) {
 	return func(stopChan chan struct{}, msgChan chan types.MessageEnvelope) {
 		for {
 			select {
 			case <-stopChan:
 				conf.Log.Infof("message handler for topic %s stopped", topic)
 				return
+			case msgErr := <-messageErrors:
+				//broadcast to all topic subscribers
+				if sub != nil {
+					for _, consumer := range sub.topicConsumers {
+						select {
+						case consumer.SubErrors <- msgErr:
+							break
+						default:
+							conf.Log.Warnf("consumer SubErrors channel full for request id %s", consumer.ConsumerId)
+						}
+					}
+				}
 			case msg, ok := <-msgChan:
 				if !ok {
 					for _, consumer := range sub.topicConsumers {
@@ -169,7 +181,7 @@ func (mc *edgexClientWrapper) Subscribe(c api.StreamContext, subChan []api.Topic
 			if err := mc.cli.Subscribe(message, tpc, errChan); err != nil {
 				return err
 			}
-			sub.handler = mc.messageHandler(tpc, sub)
+			sub.handler = mc.messageHandler(tpc, sub, errChan)
 			go sub.handler(sub.stop, message)
 
 			mc.topicSubscriptions[tpc] = sub

+ 0 - 1
internal/topo/connection/clients/mqtt/mqtt_wrapper.go

@@ -201,7 +201,6 @@ func (mc *mqttClientWrapper) Subscribe(c api.StreamContext, subChan []api.TopicC
 			log.Infof("new subscription for topic %s, reqId is %s", tpc, subId)
 			token := mc.cli.conn.Subscribe(tpc, Qos, sub.topicHandler)
 			if token.Error() != nil {
-				messageErrors <- token.Error()
 				return token.Error()
 			}
 			mc.topicSubscriptions[tpc] = sub

+ 2 - 2
internal/topo/source/edgex_source.go

@@ -88,6 +88,7 @@ func (es *EdgexSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTup
 	if e := es.cli.Subscribe(ctx, topics, subErrs, nil); e != nil {
 		log.Errorf("Failed to subscribe to edgex messagebus topic %s.\n", e)
 		errCh <- e
+		return
 	} else {
 		log.Infof("Successfully subscribed to edgex messagebus topic %s.", es.topic)
 		for {
@@ -96,8 +97,7 @@ func (es *EdgexSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTup
 				log.Infof("Exit subscription to edgex messagebus topic %s.", es.topic)
 				return
 			case e1 := <-subErrs:
-				errCh <- e1
-				return
+				log.Errorf("Subscription to edgex messagebus received error %v.\n", e1)
 			case msg, ok := <-messages:
 				if !ok { // the source is closed
 					log.Infof("Exit subscription to edgex messagebus topic %s.", es.topic)

+ 0 - 1
internal/topo/source/mqtt_source.go

@@ -115,7 +115,6 @@ func subscribe(ms *MQTTSource, ctx api.StreamContext, consumer chan<- api.Source
 				return nil
 			case e1 := <-err:
 				log.Errorf("the subscription to mqtt topic %s have error %s.\n", ms.tpc, e1.Error())
-				return e1
 			case env, ok := <-messages:
 				if !ok { // the source is closed
 					log.Infof("Exit subscription to mqtt messagebus topic %s.", ms.tpc)