Переглянути джерело

fix metric

Signed-off-by: Rui-Gan <1171530954@qq.com>
Rui-Gan 1 рік тому
батько
коміт
c237adaec6
1 змінених файлів з 10 додано та 0 видалено
  1. 10 0
      internal/io/edgex/edgex_source.go

+ 10 - 0
internal/io/edgex/edgex_source.go

@@ -31,6 +31,7 @@ import (
 
 
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/topo/connection/clients"
 	"github.com/lf-edge/ekuiper/internal/topo/connection/clients"
+	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 )
 )
@@ -101,7 +102,16 @@ func (es *EdgexSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTup
 				log.Infof("Exit subscription to edgex messagebus topic %s.", es.topic)
 				log.Infof("Exit subscription to edgex messagebus topic %s.", es.topic)
 				return
 				return
 			case e1 := <-subErrs:
 			case e1 := <-subErrs:
+				errTuple := &xsql.ErrorSourceTuple{
+					Error: fmt.Errorf("the subscription to edgex topic %s have error %s.\n", es.topic, e1.Error()),
+				}
 				log.Errorf("Subscription to edgex messagebus received error %v.\n", e1)
 				log.Errorf("Subscription to edgex messagebus received error %v.\n", e1)
+				select {
+				case consumer <- errTuple:
+					log.Debugf("send data to device node")
+				case <-ctx.Done():
+					return
+				}
 			case msg, ok := <-messages:
 			case msg, ok := <-messages:
 				rcvTime := conf.GetNow()
 				rcvTime := conf.GetNow()
 				if !ok { // the source is closed
 				if !ok { // the source is closed