浏览代码

Merge pull request #83 from emqx/zmq

api change for source
jinfahua 5 年之前
父节点
当前提交
7c2aad2c4d
共有 6 个文件被更改,包括 88 次插入85 次删除
  1. 21 22
      plugins/sources/random.go
  2. 28 21
      plugins/sources/zmq.go
  3. 2 1
      xstream/api/stream.go
  4. 10 12
      xstream/extensions/mqtt_source.go
  5. 15 16
      xstream/nodes/source_node.go
  6. 12 13
      xstream/test/mock_source.go

+ 21 - 22
plugins/sources/random.go

@@ -10,63 +10,62 @@ import (
 //Emit data randomly with only a string field
 type randomSource struct {
 	interval int
-	seed int
-	pattern map[string]interface{}
-	cancel context.CancelFunc
+	seed     int
+	pattern  map[string]interface{}
+	cancel   context.CancelFunc
 }
 
-func (s *randomSource) Configure(topic string, props map[string]interface{}) error{
-	if i, ok := props["interval"].(float64); ok{
+func (s *randomSource) Configure(topic string, props map[string]interface{}) error {
+	if i, ok := props["interval"].(float64); ok {
 		s.interval = int(i)
-	}else{
+	} else {
 		s.interval = 1000
 	}
-	if p, ok := props["pattern"].(map[string]interface{}); ok{
+	if p, ok := props["pattern"].(map[string]interface{}); ok {
 		s.pattern = p
-	}else{
+	} else {
 		s.pattern = make(map[string]interface{})
 		s.pattern["count"] = 50
 	}
-	if i, ok := props["seed"].(float64); ok{
+	if i, ok := props["seed"].(float64); ok {
 		s.seed = int(i)
-	}else{
+	} else {
 		s.seed = 1
 	}
 	return nil
 }
 
-func (s *randomSource) Open(ctx api.StreamContext, consume api.ConsumeFunc) (err error) {
+func (s *randomSource) Open(ctx api.StreamContext, consume api.ConsumeFunc, onError api.ErrorFunc) {
 	t := time.NewTicker(time.Duration(s.interval) * time.Millisecond)
 	exeCtx, cancel := ctx.WithCancel()
 	s.cancel = cancel
-	go func(exeCtx api.StreamContext){
+	go func(exeCtx api.StreamContext) {
 		defer t.Stop()
-		for{
-			select{
-			case <- t.C:
+		for {
+			select {
+			case <-t.C:
 				consume(randomize(s.pattern, s.seed), nil)
-			case <- exeCtx.Done():
+			case <-exeCtx.Done():
 				return
 			}
 		}
 	}(exeCtx)
-	return nil
 }
 
-func randomize(p map[string]interface{}, seed int) map[string]interface{}{
+func randomize(p map[string]interface{}, seed int) map[string]interface{} {
 	r := make(map[string]interface{})
-	for k, v := range p{
+	for k, v := range p {
 		vi := v.(int)
 		r[k] = vi + rand.Intn(seed)
 	}
 	return r
 }
 
-func (s *randomSource) Close(ctx api.StreamContext) error{
-	if s.cancel != nil{
+func (s *randomSource) Close(ctx api.StreamContext) error {
+	if s.cancel != nil {
 		s.cancel()
 	}
 	return nil
 }
 
-var Random randomSource
+var Random randomSource

+ 28 - 21
plugins/sources/zmq.go

@@ -1,6 +1,7 @@
 package main
 
 import (
+	"context"
 	"encoding/json"
 	"fmt"
 	"github.com/emqx/kuiper/xstream/api"
@@ -9,11 +10,12 @@ import (
 
 type zmqSource struct {
 	subscriber *zmq.Socket
-	srv string
-	topic string
+	srv        string
+	topic      string
+	cancel     context.CancelFunc
 }
 
-func (s *zmqSource) Configure(topic string, props map[string]interface{}) error{
+func (s *zmqSource) Configure(topic string, props map[string]interface{}) error {
 	s.topic = topic
 	srv, ok := props["server"]
 	if !ok {
@@ -23,60 +25,65 @@ func (s *zmqSource) Configure(topic string, props map[string]interface{}) error{
 	return nil
 }
 
-func (s *zmqSource) Open(ctx api.StreamContext, consume api.ConsumeFunc) (err error) {
+func (s *zmqSource) Open(ctx api.StreamContext, consume api.ConsumeFunc, onError api.ErrorFunc) {
 	logger := ctx.GetLogger()
+	var err error
 	s.subscriber, err = zmq.NewSocket(zmq.SUB)
-	if err != nil{
-		return fmt.Errorf("zmq source fails to create socket: %v", err)
+	if err != nil {
+		onError(fmt.Errorf("zmq source fails to create socket: %v", err))
 	}
 	err = s.subscriber.Connect(s.srv)
-	if err != nil{
-		return fmt.Errorf("zmq source fails to connect to %s: %v", s.srv, err)
+	if err != nil {
+		onError(fmt.Errorf("zmq source fails to connect to %s: %v", s.srv, err))
 	}
 	s.subscriber.SetSubscribe(s.topic)
 	logger.Debugf("zmq source subscribe to topic %s", s.topic)
-	go func(){
+	exeCtx, cancel := ctx.WithCancel()
+	s.cancel = cancel
+	go func(exeCtx api.StreamContext) {
 		logger.Debugf("start to listen")
 		for {
 			msgs, err := s.subscriber.RecvMessage(0)
 			if err != nil {
 				id, err := s.subscriber.GetIdentity()
-				logger.Warnf("zmq source getting message %s error: %v", id, err)
+				onError(fmt.Errorf("zmq source getting message %s error: %v", id, err))
 			} else {
 				logger.Debugf("zmq source receive %v", msgs)
 				var m string
-				for i, msg := range msgs{
-					if i == 0 && s.topic != ""{
+				for i, msg := range msgs {
+					if i == 0 && s.topic != "" {
 						continue
 					}
 					m += msg
 				}
 				meta := make(map[string]interface{})
-				if s.topic != ""{
+				if s.topic != "" {
 					meta["topic"] = msgs[0]
 				}
 				result := make(map[string]interface{})
 				if e := json.Unmarshal([]byte(m), &result); e != nil {
 					logger.Warnf("zmq source message %s is not json", m)
-				}else{
+				} else {
 					consume(result, meta)
 				}
 			}
-			select{
-			case <- ctx.Done():
+			select {
+			case <-exeCtx.Done():
 				logger.Infof("zmq source done")
+				if s.subscriber != nil {
+					s.subscriber.Close()
+				}
 				return
 			default:
 				//do nothing
 			}
 		}
-	}()
-	return nil
+	}(exeCtx)
 }
 
-func (s *zmqSource) Close(ctx api.StreamContext) error{
-	if s.subscriber != nil{
-		return s.subscriber.Close()
+func (s *zmqSource) Close(ctx api.StreamContext) error {
+	if s.cancel != nil {
+		s.cancel()
 	}
 	return nil
 }

+ 2 - 1
xstream/api/stream.go

@@ -6,6 +6,7 @@ import (
 
 //The function to call when data is emitted by the source.
 type ConsumeFunc func(message map[string]interface{}, metadata map[string]interface{})
+type ErrorFunc func(err error)
 type Logger interface {
 	Debug(args ...interface{})
 	Info(args ...interface{})
@@ -27,7 +28,7 @@ type Closable interface {
 
 type Source interface {
 	//Should be sync function for normal case. The container will run it in go func
-	Open(ctx StreamContext, consume ConsumeFunc) error
+	Open(ctx StreamContext, consume ConsumeFunc, onError ErrorFunc)
 	//Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties
 	//read from the yaml
 	Configure(datasource string, props map[string]interface{}) error

+ 10 - 12
xstream/extensions/mqtt_source.go

@@ -70,13 +70,13 @@ func (ms *MQTTSource) Configure(topic string, props map[string]interface{}) erro
 	return nil
 }
 
-func (ms *MQTTSource) Open(ctx api.StreamContext, consume api.ConsumeFunc) error {
+func (ms *MQTTSource) Open(ctx api.StreamContext, consume api.ConsumeFunc, onError api.ErrorFunc) {
 	log := ctx.GetLogger()
 
 	opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetProtocolVersion(ms.pVersion)
 	if ms.clientid == "" {
 		if uuid, err := uuid.NewUUID(); err != nil {
-			return fmt.Errorf("failed to get uuid, the error is %s", err)
+			onError(fmt.Errorf("failed to get uuid, the error is %s", err))
 		} else {
 			ms.clientid = uuid.String()
 			opts.SetClientID(uuid.String())
@@ -92,15 +92,15 @@ func (ms *MQTTSource) Open(ctx api.StreamContext, consume api.ConsumeFunc) error
 			if kp, err1 := common.ProcessPath(ms.pkeyPath); err1 == nil {
 				log.Infof("The private key file is %s.", kp)
 				if cer, err2 := tls.LoadX509KeyPair(cp, kp); err2 != nil {
-					return err2
+					onError(err2)
 				} else {
 					opts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}})
 				}
 			} else {
-				return err1
+				onError(err1)
 			}
 		} else {
-			return err
+			onError(err)
 		}
 	} else {
 		log.Infof("Connect MQTT broker with username and password.")
@@ -119,27 +119,25 @@ func (ms *MQTTSource) Open(ctx api.StreamContext, consume api.ConsumeFunc) error
 	opts.SetAutoReconnect(true)
 	var reconn = false
 	opts.SetConnectionLostHandler(func(client MQTT.Client, e error) {
-		log.Errorf("The connection %s is disconnected due to error %s, will try to re-connect later.", ms.srv + ": " + ms.clientid, e)
+		log.Errorf("The connection %s is disconnected due to error %s, will try to re-connect later.", ms.srv+": "+ms.clientid, e)
 		reconn = true
 		subscribe(ms.tpc, client, ctx, consume)
 	})
-	
+
 	opts.SetOnConnectHandler(func(client MQTT.Client) {
 		if reconn {
-			log.Infof("The connection is %s re-established successfully.", ms.srv + ": " + ms.clientid)
+			log.Infof("The connection is %s re-established successfully.", ms.srv+": "+ms.clientid)
 		}
 	})
 
 	c := MQTT.NewClient(opts)
 	if token := c.Connect(); token.Wait() && token.Error() != nil {
-		return fmt.Errorf("found error when connecting to %s: %s", ms.srv, token.Error())
+		onError(fmt.Errorf("found error when connecting to %s: %s", ms.srv, token.Error()))
 	}
 	log.Infof("The connection to server %s was established successfully", ms.srv)
 	ms.conn = c
 	subscribe(ms.tpc, c, ctx, consume)
-	log.Infof("Successfully subscribe to topic %s", ms.srv + ": " + ms.clientid)
-
-	return nil
+	log.Infof("Successfully subscribe to topic %s", ms.srv+": "+ms.clientid)
 }
 
 func subscribe(topic string, client MQTT.Client, ctx api.StreamContext, consume api.ConsumeFunc) {

+ 15 - 16
xstream/nodes/source_node.go

@@ -20,10 +20,10 @@ type SourceNode struct {
 	options     map[string]string
 	concurrency int
 
-	mutex   	sync.RWMutex
-	sources 	[]api.Source
+	mutex        sync.RWMutex
+	sources      []api.Source
 	statManagers []*StatManager
-	buffer      *utils.DynamicChannelBuffer
+	buffer       *utils.DynamicChannelBuffer
 }
 
 func NewSourceNode(name string, options map[string]string) *SourceNode {
@@ -45,11 +45,11 @@ func NewSourceNode(name string, options map[string]string) *SourceNode {
 //Only for mock source, do not use it in production
 func NewSourceNodeWithSource(name string, source api.Source, options map[string]string) *SourceNode {
 	return &SourceNode{
-		sources: 	 []api.Source{  source},
-		outs:    make(map[string]chan<- interface{}),
-		name:    name,
-		options: options,
-		ctx:     nil,
+		sources:     []api.Source{source},
+		outs:        make(map[string]chan<- interface{}),
+		name:        name,
+		options:     options,
+		ctx:         nil,
 		concurrency: 1,
 		buffer:      utils.NewDynamicChannelBuffer(),
 	}
@@ -82,7 +82,7 @@ func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
 				//Do open source instances
 				var source api.Source
 				var err error
-				if createSource{
+				if createSource {
 					source, err = getSource(m.sourceType)
 					if err != nil {
 						m.drainError(errCh, err, ctx, logger)
@@ -96,7 +96,7 @@ func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
 					m.mutex.Lock()
 					m.sources = append(m.sources, source)
 					m.mutex.Unlock()
-				}else{
+				} else {
 					source = m.sources[instance]
 				}
 				stats, err := NewStatManager("source", ctx)
@@ -108,7 +108,7 @@ func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
 				m.statManagers = append(m.statManagers, stats)
 				m.mutex.Unlock()
 
-				if err := source.Open(ctx.WithInstance(instance), func(message map[string]interface{}, meta map[string]interface{}) {
+				source.Open(ctx.WithInstance(instance), func(message map[string]interface{}, meta map[string]interface{}) {
 					stats.IncTotalRecordsIn()
 					stats.ProcessTimeStart()
 					tuple := &xsql.Tuple{Emitter: m.name, Message: message, Timestamp: common.GetNowInMilli(), Metadata: meta}
@@ -117,10 +117,9 @@ func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
 					stats.IncTotalRecordsOut()
 					stats.SetBufferLength(int64(m.getBufferLength()))
 					logger.Debugf("%s consume data %v complete", m.name, tuple)
-				}); err != nil {
+				}, func(err error) {
 					m.drainError(errCh, err, ctx, logger)
-					return
-				}
+				})
 				logger.Infof("Start source %s instance %d successfully", m.name, instance)
 			}(i)
 		}
@@ -131,7 +130,7 @@ func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
 				logger.Infof("source %s done", m.name)
 				m.close(ctx, logger)
 				return
-			case data := <- m.buffer.Out:
+			case data := <-m.buffer.Out:
 				//blocking
 				Broadcast(m.outs, data, ctx)
 			}
@@ -232,7 +231,7 @@ func (m *SourceNode) AddOutput(output chan<- interface{}, name string) (err erro
 }
 
 func (m *SourceNode) GetMetrics() (result [][]interface{}) {
-	for _, stats := range m.statManagers{
+	for _, stats := range m.statManagers {
 		result = append(result, stats.GetMetrics())
 	}
 	return result

+ 12 - 13
xstream/test/mock_source.go

@@ -8,29 +8,29 @@ import (
 )
 
 type MockSource struct {
-	data []*xsql.Tuple
-	done chan<- struct{}
+	data        []*xsql.Tuple
+	done        chan<- struct{}
 	isEventTime bool
 }
 
 // New creates a new CsvSource
 func NewMockSource(data []*xsql.Tuple, done chan<- struct{}, isEventTime bool) *MockSource {
 	mock := &MockSource{
-		data: data,
-		done: done,
+		data:        data,
+		done:        done,
 		isEventTime: isEventTime,
 	}
 	return mock
 }
 
-func (m *MockSource) Open(ctx api.StreamContext, consume api.ConsumeFunc) (err error) {
+func (m *MockSource) Open(ctx api.StreamContext, consume api.ConsumeFunc, onError api.ErrorFunc) {
 	log := ctx.GetLogger()
 
 	log.Debugln("mock source starts")
-	go func(){
-		for _, d := range m.data{
+	go func() {
+		for _, d := range m.data {
 			log.Debugf("mock source is sending data %s", d)
-			if !m.isEventTime{
+			if !m.isEventTime {
 				common.SetMockNow(d.Timestamp)
 				ticker := common.GetMockTicker()
 				timer := common.GetMockTimer()
@@ -42,9 +42,9 @@ func (m *MockSource) Open(ctx api.StreamContext, consume api.ConsumeFunc) (err e
 				}
 			}
 			consume(d.Message, nil)
-			if m.isEventTime{
+			if m.isEventTime {
 				time.Sleep(1000 * time.Millisecond) //Let window run to make sure timers are set
-			}else{
+			} else {
 				time.Sleep(50 * time.Millisecond) //Let window run to make sure timers are set
 			}
 
@@ -55,13 +55,12 @@ func (m *MockSource) Open(ctx api.StreamContext, consume api.ConsumeFunc) (err e
 		}
 		m.done <- struct{}{}
 	}()
-	return nil
 }
 
-func (m *MockSource) Close(ctx api.StreamContext) error{
+func (m *MockSource) Close(ctx api.StreamContext) error {
 	return nil
 }
 
 func (m *MockSource) Configure(topic string, props map[string]interface{}) error {
 	return nil
-}
+}