Browse Source

feat(neuron): support multiple url

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Signed-off-by: Jiyong <huangjy@emqx.io>
Jiyong Huang 2 years ago
parent
commit
2b6b9b2df9

+ 58 - 54
internal/topo/neuron/connection.go

@@ -1,4 +1,4 @@
-// Copyright 2022 EMQ Technologies Co., Ltd.
+// Copyright 2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -26,101 +26,115 @@ import (
 	"go.nanomsg.org/mangos/v3"
 	"go.nanomsg.org/mangos/v3/protocol/pair"
 	_ "go.nanomsg.org/mangos/v3/transport/ipc"
+	_ "go.nanomsg.org/mangos/v3/transport/tcp"
 	"sync"
 	"sync/atomic"
 	"time"
 )
 
 const (
-	NeuronTopic = "$$neuron"
-	NeuronUrl   = "ipc:///tmp/neuron-ekuiper.ipc"
+	TopicPrefix      = "$$neuron_"
+	DefaultNeuronUrl = "ipc:///tmp/neuron-ekuiper.ipc"
 )
 
+type conninfo struct {
+	count  int
+	sock   mangos.Socket
+	opened int32
+}
+
 var (
-	m               sync.RWMutex
-	connectionCount int
-	sock            mangos.Socket
-	opened          int32
-	sendTimeout     = 100
+	m             sync.RWMutex
+	connectionReg = make(map[string]*conninfo)
+	sendTimeout   = 100
 )
 
 // createOrGetNeuronConnection creates a new neuron connection or returns an existing one
 // This is the entry function for creating a neuron connection singleton
 // The context is from a rule, but the singleton will server for multiple rules
-func createOrGetConnection(sc api.StreamContext, url string) error {
+func createOrGetConnection(sc api.StreamContext, url string) (*conninfo, error) {
 	m.Lock()
 	defer m.Unlock()
-	sc.GetLogger().Infof("createOrGetConnection count: %d", connectionCount)
-	if connectionCount == 0 {
-		sc.GetLogger().Infof("Creating neuron connection")
-		contextLogger := conf.Log.WithField("neuron_connection", 0)
+	sc.GetLogger().Infof("createOrGetConnection for %s", url)
+	info, ok := connectionReg[url]
+	if !ok || info.count <= 0 {
+		sc.GetLogger().Infof("Creating neuron connection for %s", url)
+		contextLogger := conf.Log.WithField("neuron_connection_url", url)
 		ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
-		ruleId := "$$neuron_connection"
-		opId := "$$neuron_connection"
+		ruleId := "$$neuron_connection_" + url
+		opId := "$$neuron_connection_" + url
 		store, err := state.CreateStore(ruleId, 0)
 		if err != nil {
 			ctx.GetLogger().Errorf("neuron connection create store error %v", err)
-			return err
+			return nil, err
 		}
 		sctx := ctx.WithMeta(ruleId, opId, store)
-		err = connect(sctx, url)
+		info = &conninfo{count: 0}
+		connectionReg[url] = info
+		err = connect(sctx, url, info)
 		if err != nil {
-			return err
+			return nil, err
 		}
-		sc.GetLogger().Infof("Neuron connected")
-		pubsub.CreatePub(NeuronTopic)
-		go run(sctx)
+		sc.GetLogger().Infof("Neuron %s connected", url)
+		pubsub.CreatePub(TopicPrefix + url)
+		go run(sctx, info, url)
 	}
-	connectionCount++
-	return nil
+	info.count++
+	return info, nil
 }
 
 func closeConnection(ctx api.StreamContext, url string) error {
 	m.Lock()
 	defer m.Unlock()
-	ctx.GetLogger().Infof("closeConnection count: %d", connectionCount)
-	pubsub.RemovePub(NeuronTopic)
-	if connectionCount == 1 {
-		err := disconnect(url)
-		if err != nil {
-			return err
+	ctx.GetLogger().Infof("closeConnection %s", url)
+	info, ok := connectionReg[url]
+	if !ok {
+		return fmt.Errorf("no connection for %s", url)
+	}
+	pubsub.RemovePub(TopicPrefix + url)
+	if info.count == 1 {
+		if info.sock != nil {
+			err := info.sock.Close()
+			if err != nil {
+				return err
+			}
 		}
 	}
-	connectionCount--
+	info.count--
 	return nil
 }
 
 // nng connections
 
 // connect to nng
-func connect(ctx api.StreamContext, url string) error {
+func connect(ctx api.StreamContext, url string, info *conninfo) error {
 	var err error
-	sock, err = pair.NewSocket()
+	info.sock, err = pair.NewSocket()
 	if err != nil {
 		return err
 	}
 	// options consider to export
-	err = sock.SetOption(mangos.OptionSendDeadline, time.Duration(sendTimeout)*time.Millisecond)
+	err = info.sock.SetOption(mangos.OptionSendDeadline, time.Duration(sendTimeout)*time.Millisecond)
 	if err != nil {
 		return err
 	}
-	sock.SetPipeEventHook(func(ev mangos.PipeEvent, p mangos.Pipe) {
+	info.sock.SetPipeEventHook(func(ev mangos.PipeEvent, p mangos.Pipe) {
 		switch ev {
 		case mangos.PipeEventAttached:
-			atomic.StoreInt32(&opened, 1)
+			atomic.StoreInt32(&info.opened, 1)
 			conf.Log.Infof("neuron connection attached")
 		case mangos.PipeEventAttaching:
 			conf.Log.Infof("neuron connection is attaching")
 		case mangos.PipeEventDetached:
-			atomic.StoreInt32(&opened, 0)
+			atomic.StoreInt32(&info.opened, 0)
 			conf.Log.Warnf("neuron connection detached")
-			pubsub.ProduceError(ctx, NeuronTopic, fmt.Errorf("neuron connection detached"))
+			pubsub.ProduceError(ctx, TopicPrefix+url, fmt.Errorf("neuron connection detached"))
 		}
 	})
 	//sock.SetOption(mangos.OptionWriteQLen, 100)
 	//sock.SetOption(mangos.OptionReadQLen, 100)
 	//sock.SetOption(mangos.OptionBestEffort, false)
-	if err = sock.DialOptions(url, map[string]interface{}{
+	if err = info.sock.DialOptions(url, map[string]interface{}{
 		mangos.OptionDialAsynch:       true, // will not report error and keep connecting
 		mangos.OptionMaxReconnectTime: 5 * time.Second,
 		mangos.OptionReconnectTime:    100 * time.Millisecond,
@@ -133,11 +147,11 @@ func connect(ctx api.StreamContext, url string) error {
 
 // run the loop to receive message from the nng connection singleton
 // exit when connection is closed
-func run(ctx api.StreamContext) {
+func run(ctx api.StreamContext, info *conninfo, url string) {
 	ctx.GetLogger().Infof("neuron source receiving loop started")
 	for {
 		// no receiving deadline, will wait until the socket closed
-		if msg, err := sock.Recv(); err == nil {
+		if msg, err := info.sock.Recv(); err == nil {
 			ctx.GetLogger().Debugf("neuron received message %s", string(msg))
 			result := make(map[string]interface{})
 			err := json.Unmarshal(msg, &result)
@@ -145,7 +159,7 @@ func run(ctx api.StreamContext) {
 				ctx.GetLogger().Errorf("neuron decode message error %v", err)
 				continue
 			}
-			pubsub.Produce(ctx, NeuronTopic, result)
+			pubsub.Produce(ctx, TopicPrefix+url, result)
 		} else if err == mangos.ErrClosed {
 			ctx.GetLogger().Infof("neuron connection closed, exit receiving loop")
 			return
@@ -155,20 +169,10 @@ func run(ctx api.StreamContext) {
 	}
 }
 
-func publish(ctx api.StreamContext, data []byte) error {
+func publish(ctx api.StreamContext, data []byte, info *conninfo) error {
 	ctx.GetLogger().Debugf("publish to neuron: %s", string(data))
-	if sock != nil && atomic.LoadInt32(&opened) == 1 {
-		return sock.Send(data)
+	if info.sock != nil && atomic.LoadInt32(&info.opened) == 1 {
+		return info.sock.Send(data)
 	}
 	return fmt.Errorf("%s: neuron connection is not established", errorx.IOErr)
 }
-
-func disconnect(_ string) error {
-	if sock != nil {
-		err := sock.Close()
-		if err != nil {
-			return err
-		}
-	}
-	return nil
-}

+ 4 - 4
internal/topo/neuron/neuron_test.go

@@ -1,4 +1,4 @@
-// Copyright 2022 EMQ Technologies Co., Ltd.
+// Copyright 2022-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -81,9 +81,9 @@ func mockNeuron(send bool, recv bool) (mangos.Socket, chan []byte) {
 func TestMultiSourceSink(t *testing.T) {
 	// start and test 2 sources
 	exp := []api.SourceTuple{
-		api.NewDefaultSourceTuple(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 11.22, "tag_name2": "yellow"}, "errors": map[string]interface{}{"tag_name3": 122.0}}, map[string]interface{}{"topic": "$$neuron"}),
-		api.NewDefaultSourceTuple(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 11.22, "tag_name2": "green", "tag_name3": 60.0}, "errors": map[string]interface{}{}}, map[string]interface{}{"topic": "$$neuron"}),
-		api.NewDefaultSourceTuple(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 15.4, "tag_name2": "green", "tag_name3": 70.0}, "errors": map[string]interface{}{}}, map[string]interface{}{"topic": "$$neuron"}),
+		api.NewDefaultSourceTuple(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 11.22, "tag_name2": "yellow"}, "errors": map[string]interface{}{"tag_name3": 122.0}}, map[string]interface{}{"topic": "$$neuron_ipc:///tmp/neuron-ekuiper.ipc"}),
+		api.NewDefaultSourceTuple(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 11.22, "tag_name2": "green", "tag_name3": 60.0}, "errors": map[string]interface{}{}}, map[string]interface{}{"topic": "$$neuron_ipc:///tmp/neuron-ekuiper.ipc"}),
+		api.NewDefaultSourceTuple(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 15.4, "tag_name2": "green", "tag_name3": 70.0}, "errors": map[string]interface{}{}}, map[string]interface{}{"topic": "$$neuron_ipc:///tmp/neuron-ekuiper.ipc"}),
 	}
 	s1 := GetSource()
 	err := s1.Configure("new", nil)

+ 16 - 16
internal/topo/neuron/sink.go

@@ -1,4 +1,4 @@
-// Copyright 2022 EMQ Technologies Co., Ltd.
+// Copyright 2022-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -25,9 +25,8 @@ import (
 )
 
 type sink struct {
-	url       string
-	c         *c
-	connected bool
+	c   *c
+	cli *conninfo
 }
 
 type c struct {
@@ -35,7 +34,8 @@ type c struct {
 	GroupName string   `json:"groupName"`
 	Tags      []string `json:"tags"`
 	// If sent with the raw converted string or let us range over the result map
-	Raw bool `json:"raw"`
+	Raw bool   `json:"raw"`
+	Url string `json:"url"`
 }
 
 type neuronTemplate struct {
@@ -46,9 +46,9 @@ type neuronTemplate struct {
 }
 
 func (s *sink) Configure(props map[string]interface{}) error {
-	s.url = NeuronUrl
 	cc := &c{
 		Raw: false,
+		Url: DefaultNeuronUrl,
 	}
 	err := cast.MapToStruct(props, cc)
 	if err != nil {
@@ -68,11 +68,11 @@ func (s *sink) Configure(props map[string]interface{}) error {
 
 func (s *sink) Open(ctx api.StreamContext) error {
 	ctx.GetLogger().Debugf("Opening neuron sink")
-	err := createOrGetConnection(ctx, s.url)
+	cli, err := createOrGetConnection(ctx, s.c.Url)
 	if err != nil {
 		return err
 	}
-	s.connected = true
+	s.cli = cli
 	return nil
 }
 
@@ -83,7 +83,7 @@ func (s *sink) Collect(ctx api.StreamContext, data interface{}) error {
 		if err != nil {
 			return err
 		}
-		return publish(ctx, r)
+		return publish(ctx, r, s.cli)
 	} else {
 		switch d := data.(type) {
 		case []map[string]interface{}:
@@ -104,8 +104,8 @@ func (s *sink) Collect(ctx api.StreamContext, data interface{}) error {
 
 func (s *sink) Close(ctx api.StreamContext) error {
 	ctx.GetLogger().Debugf("closing neuron sink")
-	if s.connected {
-		return closeConnection(ctx, s.url)
+	if s.cli != nil {
+		return closeConnection(ctx, s.c.Url)
 	}
 	return nil
 }
@@ -136,7 +136,7 @@ func (s *sink) SendMapToNeuron(ctx api.StreamContext, el map[string]interface{})
 			for _, k := range keys {
 				t.TagName = k
 				t.Value = el[k]
-				err := doPublish(ctx, t)
+				err := doPublish(ctx, t, s.cli)
 				if err != nil {
 					return err
 				}
@@ -145,7 +145,7 @@ func (s *sink) SendMapToNeuron(ctx api.StreamContext, el map[string]interface{})
 			for k, v := range el {
 				t.TagName = k
 				t.Value = v
-				err := doPublish(ctx, t)
+				err := doPublish(ctx, t, s.cli)
 				if err != nil {
 					return err
 				}
@@ -164,7 +164,7 @@ func (s *sink) SendMapToNeuron(ctx api.StreamContext, el map[string]interface{})
 				ctx.GetLogger().Errorf("Error get the value of tag %s: %v", t.TagName, err)
 				continue
 			}
-			err := doPublish(ctx, t)
+			err := doPublish(ctx, t, s.cli)
 			if err != nil {
 				return err
 			}
@@ -173,12 +173,12 @@ func (s *sink) SendMapToNeuron(ctx api.StreamContext, el map[string]interface{})
 	return nil
 }
 
-func doPublish(ctx api.StreamContext, t *neuronTemplate) error {
+func doPublish(ctx api.StreamContext, t *neuronTemplate, cli *conninfo) error {
 	r, err := json.Marshal(t)
 	if err != nil {
 		return fmt.Errorf("Error marshall the tag payload %v: %v", t, err)
 	}
-	err = publish(ctx, r)
+	err = publish(ctx, r, cli)
 	if err != nil {
 		return fmt.Errorf("%s: Error publish the tag payload %s: %v", errorx.IOErr, t.TagName, err)
 	}

+ 19 - 13
internal/topo/neuron/source.go

@@ -1,4 +1,4 @@
-// Copyright 2022 EMQ Technologies Co., Ltd.
+// Copyright 2022-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -22,31 +22,37 @@ import (
 	"github.com/lf-edge/ekuiper/pkg/infra"
 )
 
+type sc struct {
+	Url          string `json:"url,omitempty"`
+	BufferLength int    `json:"bufferLength,omitempty"`
+}
+
 type source struct {
-	url          string
-	bufferLength int
+	c *sc
 }
 
 func (s *source) Configure(_ string, props map[string]interface{}) error {
-	s.url = NeuronUrl
-	s.bufferLength = 1024
-	if c, ok := props["bufferLength"]; ok {
-		if bl, err := cast.ToInt(c, cast.STRICT); err != nil || bl > 0 {
-			s.bufferLength = bl
-		}
+	cc := &sc{
+		BufferLength: 1024,
+		Url:          DefaultNeuronUrl,
+	}
+	err := cast.MapToStruct(props, cc)
+	if err != nil {
+		return err
 	}
+	s.c = cc
 	return nil
 }
 
 func (s *source) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
-	err := createOrGetConnection(ctx, s.url)
+	_, err := createOrGetConnection(ctx, s.c.Url)
 	if err != nil {
 		infra.DrainError(ctx, err, errCh)
 		return
 	}
-	defer closeConnection(ctx, s.url)
-	ch := pubsub.CreateSub(NeuronTopic, nil, fmt.Sprintf("%s_%s_%d", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId()), s.bufferLength)
-	defer pubsub.CloseSourceConsumerChannel(NeuronTopic, fmt.Sprintf("%s_%s_%d", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId()))
+	defer closeConnection(ctx, s.c.Url)
+	ch := pubsub.CreateSub(TopicPrefix+s.c.Url, nil, fmt.Sprintf("%s_%s_%d", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId()), s.c.BufferLength)
+	defer pubsub.CloseSourceConsumerChannel(TopicPrefix+s.c.Url, fmt.Sprintf("%s_%s_%d", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId()))
 	for {
 		select {
 		case v, opened := <-ch:

+ 4 - 4
internal/topo/neuron/source_test.go

@@ -1,4 +1,4 @@
-// Copyright 2022 EMQ Technologies Co., Ltd.
+// Copyright 2022-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -27,9 +27,9 @@ import (
 
 func TestRun(t *testing.T) {
 	exp := []api.SourceTuple{
-		api.NewDefaultSourceTuple(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 11.22, "tag_name2": "yellow"}, "errors": map[string]interface{}{"tag_name3": 122.0}}, map[string]interface{}{"topic": "$$neuron"}),
-		api.NewDefaultSourceTuple(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 11.22, "tag_name2": "green", "tag_name3": 60.0}, "errors": map[string]interface{}{}}, map[string]interface{}{"topic": "$$neuron"}),
-		api.NewDefaultSourceTuple(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 15.4, "tag_name2": "green", "tag_name3": 70.0}, "errors": map[string]interface{}{}}, map[string]interface{}{"topic": "$$neuron"}),
+		api.NewDefaultSourceTuple(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 11.22, "tag_name2": "yellow"}, "errors": map[string]interface{}{"tag_name3": 122.0}}, map[string]interface{}{"topic": "$$neuron_ipc:///tmp/neuron-ekuiper.ipc"}),
+		api.NewDefaultSourceTuple(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 11.22, "tag_name2": "green", "tag_name3": 60.0}, "errors": map[string]interface{}{}}, map[string]interface{}{"topic": "$$neuron_ipc:///tmp/neuron-ekuiper.ipc"}),
+		api.NewDefaultSourceTuple(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 15.4, "tag_name2": "green", "tag_name3": 70.0}, "errors": map[string]interface{}{}}, map[string]interface{}{"topic": "$$neuron_ipc:///tmp/neuron-ekuiper.ipc"}),
 	}
 	s := GetSource()
 	err := s.Configure("new", nil)