Преглед на файлове

fix(source): do not exit the rule when connecting to message broker fail

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>
Jianxiang Ran преди 2 години
родител
ревизия
b746823dbc
променени са 2 файла, в които са добавени 12 реда и са изтрити 5 реда
  1. 11 4
      internal/topo/connection/clients/edgex/edgex_wrapper.go
  2. 1 1
      internal/topo/sink/edgex_sink.go

+ 11 - 4
internal/topo/connection/clients/edgex/edgex_wrapper.go

@@ -34,6 +34,7 @@ type edgexSubscriptionInfo struct {
 	handler        messageHandler
 	stop           chan struct{}
 	topicConsumers []*clients.ConsumerInfo
+	hasError       bool
 }
 
 type edgexClientWrapper struct {
@@ -94,7 +95,7 @@ func (mc *edgexClientWrapper) Publish(c api.StreamContext, topic string, message
 	return nil
 }
 
-func (mc *edgexClientWrapper) messageHandler(topic string, sub *edgexSubscriptionInfo, messageErrors chan error) func(stopChan chan struct{}, msgChan chan types.MessageEnvelope) {
+func (mc *edgexClientWrapper) newMessageHandler(topic string, sub *edgexSubscriptionInfo, messageErrors chan error) func(stopChan chan struct{}, msgChan chan types.MessageEnvelope) {
 	return func(stopChan chan struct{}, msgChan chan types.MessageEnvelope) {
 		for {
 			select {
@@ -102,8 +103,8 @@ func (mc *edgexClientWrapper) messageHandler(topic string, sub *edgexSubscriptio
 				conf.Log.Infof("message handler for topic %s stopped", topic)
 				return
 			case msgErr := <-messageErrors:
-				//broadcast to all topic subscribers
-				if sub != nil {
+				//broadcast to all topic subscribers only one time
+				if sub != nil && !sub.hasError {
 					for _, consumer := range sub.topicConsumers {
 						select {
 						case consumer.SubErrors <- msgErr:
@@ -112,6 +113,7 @@ func (mc *edgexClientWrapper) messageHandler(topic string, sub *edgexSubscriptio
 							conf.Log.Warnf("consumer SubErrors channel full for request id %s", consumer.ConsumerId)
 						}
 					}
+					sub.hasError = true
 				}
 			case msg, ok := <-msgChan:
 				if !ok {
@@ -123,6 +125,10 @@ func (mc *edgexClientWrapper) messageHandler(topic string, sub *edgexSubscriptio
 				}
 				//broadcast to all topic subscribers
 				if sub != nil {
+					if sub.hasError == true {
+						sub.hasError = false
+						conf.Log.Infof("Subscription to edgex messagebus topic %s recoverd", topic)
+					}
 					for _, consumer := range sub.topicConsumers {
 						select {
 						case consumer.ConsumerChan <- &msg:
@@ -173,6 +179,7 @@ func (mc *edgexClientWrapper) Subscribe(c api.StreamContext, subChan []api.Topic
 						SubErrors:    messageErrors,
 					},
 				},
+				hasError: false,
 			}
 			log.Infof("new subscription for topic %s, reqId is %s", tpc, subId)
 			message := make(chan types.MessageEnvelope)
@@ -181,7 +188,7 @@ func (mc *edgexClientWrapper) Subscribe(c api.StreamContext, subChan []api.Topic
 			if err := mc.cli.Subscribe(message, tpc, errChan); err != nil {
 				return err
 			}
-			sub.handler = mc.messageHandler(tpc, sub, errChan)
+			sub.handler = mc.newMessageHandler(tpc, sub, errChan)
 			go sub.handler(sub.stop, message)
 
 			mc.topicSubscriptions[tpc] = sub

+ 1 - 1
internal/topo/sink/edgex_sink.go

@@ -510,7 +510,7 @@ func (ems *EdgexMsgBusSink) Collect(ctx api.StreamContext, item interface{}) err
 		"contentType": ems.c.ContentType,
 	}
 	if e := ems.cli.Publish(ctx, topic, data, para); e != nil {
-		logger.Errorf("%s: found error %s when publish to EdgeX message bus.\n", e)
+		logger.Errorf("%s: found error %s when publish to EdgeX message bus.\n", e.Error(), e.Error())
 		return fmt.Errorf("%s:%s", errorx.IOErr, e.Error())
 	}
 	logger.Debugf("Published %+v to EdgeX message bus topic %s", evt, topic)