connection.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package neuron
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/conf"
  18. kctx "github.com/lf-edge/ekuiper/internal/topo/context"
  19. "github.com/lf-edge/ekuiper/internal/topo/memory"
  20. "github.com/lf-edge/ekuiper/internal/topo/state"
  21. "github.com/lf-edge/ekuiper/pkg/api"
  22. "github.com/lf-edge/ekuiper/pkg/message"
  23. "go.nanomsg.org/mangos/v3"
  24. "go.nanomsg.org/mangos/v3/protocol/pair"
  25. _ "go.nanomsg.org/mangos/v3/transport/ipc"
  26. "sync"
  27. "time"
  28. )
  29. const (
  30. NeuronTopic = "$$neuron"
  31. NeuronUrl = "ipc:///tmp/neuron-ekuiper.ipc"
  32. )
  33. var (
  34. m sync.RWMutex
  35. connectionCount int
  36. sock mangos.Socket
  37. sendTimeout int
  38. )
  39. // createOrGetNeuronConnection creates a new neuron connection or returns an existing one
  40. // This is the entry function for creating a neuron connection singleton
  41. // The context is from a rule, but the singleton will server for multiple rules
  42. func createOrGetConnection(sc api.StreamContext, url string) error {
  43. m.Lock()
  44. defer m.Unlock()
  45. sc.GetLogger().Infof("createOrGetConnection count: %d", connectionCount)
  46. if connectionCount == 0 {
  47. sc.GetLogger().Infof("Creating neuron connection")
  48. err := connect(url)
  49. if err != nil {
  50. return err
  51. }
  52. sc.GetLogger().Infof("Neuron connected")
  53. contextLogger := conf.Log.WithField("neuron_connection", 0)
  54. ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
  55. ruleId := "$$neuron_connection"
  56. opId := "$$neuron_connection"
  57. store, err := state.CreateStore(ruleId, 0)
  58. if err != nil {
  59. ctx.GetLogger().Errorf("neuron connection create store error %v", err)
  60. return err
  61. }
  62. sctx := ctx.WithMeta(ruleId, opId, store)
  63. memory.CreatePub(NeuronTopic)
  64. go run(sctx)
  65. connectionCount++
  66. }
  67. return nil
  68. }
  69. func closeConnection(ctx api.StreamContext, url string) error {
  70. m.Lock()
  71. defer m.Unlock()
  72. ctx.GetLogger().Infof("closeConnection count: %d", connectionCount)
  73. memory.RemovePub(NeuronTopic)
  74. connectionCount--
  75. if connectionCount == 0 {
  76. err := disconnect(url)
  77. if err != nil {
  78. return err
  79. }
  80. }
  81. return nil
  82. }
  83. // nng connections
  84. // connect to nng
  85. func connect(url string) error {
  86. var err error
  87. sock, err = pair.NewSocket()
  88. if err != nil {
  89. return err
  90. }
  91. // options consider to export
  92. err = sock.SetOption(mangos.OptionSendDeadline, time.Duration(sendTimeout)*time.Millisecond)
  93. if err != nil {
  94. return err
  95. }
  96. //sock.SetOption(mangos.OptionWriteQLen, 100)
  97. //sock.SetOption(mangos.OptionReadQLen, 100)
  98. //sock.SetOption(mangos.OptionBestEffort, false)
  99. if err = sock.DialOptions(url, map[string]interface{}{
  100. mangos.OptionDialAsynch: false, // will reports error after max reconnect time
  101. mangos.OptionMaxReconnectTime: 5 * time.Second,
  102. mangos.OptionReconnectTime: 100 * time.Millisecond,
  103. }); err != nil {
  104. return fmt.Errorf("can't dial to neuron: %s", err.Error())
  105. }
  106. return nil
  107. }
  108. // run the loop to receive message from the nng connection singleton
  109. // exit when connection is closed
  110. func run(ctx api.StreamContext) {
  111. ctx.GetLogger().Infof("neuron source receiving loop started")
  112. for {
  113. // no receiving deadline, will wait until the socket closed
  114. if msg, err := sock.Recv(); err == nil {
  115. ctx.GetLogger().Debugf("neuron received message %s", string(msg))
  116. result, err := message.Decode(msg, message.FormatJson)
  117. if err != nil {
  118. ctx.GetLogger().Errorf("neuron decode message error %v", err)
  119. continue
  120. }
  121. memory.Produce(ctx, NeuronTopic, result)
  122. } else if err == mangos.ErrClosed {
  123. ctx.GetLogger().Infof("neuron connection closed, exit receiving loop")
  124. return
  125. } else {
  126. ctx.GetLogger().Errorf("neuron receiving error %v", err)
  127. }
  128. }
  129. }
  130. func publish(ctx api.StreamContext, data []byte) error {
  131. ctx.GetLogger().Debugf("publish to neuron: %s", string(data))
  132. if sock != nil {
  133. return sock.Send(data)
  134. }
  135. return fmt.Errorf("neuron connection is not established")
  136. }
  137. func disconnect(_ string) error {
  138. defer func() {
  139. sock = nil
  140. }()
  141. if sock != nil {
  142. err := sock.Close()
  143. if err != nil {
  144. return err
  145. }
  146. }
  147. return nil
  148. }