|
@@ -77,13 +77,13 @@ func closeConnection(ctx api.StreamContext, url string) error {
|
|
defer m.Unlock()
|
|
defer m.Unlock()
|
|
ctx.GetLogger().Infof("closeConnection count: %d", connectionCount)
|
|
ctx.GetLogger().Infof("closeConnection count: %d", connectionCount)
|
|
memory.RemovePub(NeuronTopic)
|
|
memory.RemovePub(NeuronTopic)
|
|
- connectionCount--
|
|
|
|
- if connectionCount == 0 {
|
|
|
|
|
|
+ if connectionCount == 1 {
|
|
err := disconnect(url)
|
|
err := disconnect(url)
|
|
if err != nil {
|
|
if err != nil {
|
|
return err
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ connectionCount--
|
|
return nil
|
|
return nil
|
|
}
|
|
}
|
|
|
|
|
|
@@ -109,7 +109,7 @@ func connect(url string) error {
|
|
mangos.OptionMaxReconnectTime: 5 * time.Second,
|
|
mangos.OptionMaxReconnectTime: 5 * time.Second,
|
|
mangos.OptionReconnectTime: 100 * time.Millisecond,
|
|
mangos.OptionReconnectTime: 100 * time.Millisecond,
|
|
}); err != nil {
|
|
}); err != nil {
|
|
- return fmt.Errorf("can't dial to neuron: %s", err.Error())
|
|
|
|
|
|
+ return fmt.Errorf("please make sure neuron has started and configured, can't dial to neuron: %s", err.Error())
|
|
}
|
|
}
|
|
|
|
|
|
return nil
|
|
return nil
|
|
@@ -147,9 +147,6 @@ func publish(ctx api.StreamContext, data []byte) error {
|
|
}
|
|
}
|
|
|
|
|
|
func disconnect(_ string) error {
|
|
func disconnect(_ string) error {
|
|
- defer func() {
|
|
|
|
- sock = nil
|
|
|
|
- }()
|
|
|
|
if sock != nil {
|
|
if sock != nil {
|
|
err := sock.Close()
|
|
err := sock.Close()
|
|
if err != nil {
|
|
if err != nil {
|