connection.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package runtime
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/conf"
  18. "github.com/lf-edge/ekuiper/pkg/api"
  19. "go.nanomsg.org/mangos/v3"
  20. "go.nanomsg.org/mangos/v3/protocol/pull"
  21. "go.nanomsg.org/mangos/v3/protocol/push"
  22. "go.nanomsg.org/mangos/v3/protocol/rep"
  23. _ "go.nanomsg.org/mangos/v3/transport/ipc"
  24. "sync"
  25. "time"
  26. )
  27. // TODO to design timeout strategy
  28. // sockOptions Initialized in config
  29. var (
  30. dialOptions = map[string]interface{}{
  31. mangos.OptionDialAsynch: false,
  32. mangos.OptionMaxReconnectTime: 5 * time.Second,
  33. mangos.OptionReconnectTime: 100 * time.Millisecond,
  34. }
  35. )
  36. type Closable interface {
  37. Close() error
  38. }
  39. type ControlChannel interface {
  40. Handshake() error
  41. SendCmd(arg []byte) error
  42. Closable
  43. }
  44. // NanomsgReqChannel shared by symbols
  45. type NanomsgReqChannel struct {
  46. sync.Mutex
  47. sock mangos.Socket
  48. }
  49. func (r *NanomsgReqChannel) Close() error {
  50. return r.sock.Close()
  51. }
  52. func (r *NanomsgReqChannel) SendCmd(arg []byte) error {
  53. r.Lock()
  54. defer r.Unlock()
  55. if err := r.sock.Send(arg); err != nil {
  56. if err == mangos.ErrProtoState {
  57. _, err = r.sock.Recv()
  58. if err == nil {
  59. err = r.sock.Send(arg)
  60. if err == nil {
  61. return nil
  62. }
  63. }
  64. }
  65. return fmt.Errorf("can't send message on control rep socket: %s", err.Error())
  66. }
  67. if msg, err := r.sock.Recv(); err != nil {
  68. return fmt.Errorf("can't receive: %s", err.Error())
  69. } else {
  70. if string(msg) != "ok" {
  71. return fmt.Errorf("receive error: %s", string(msg))
  72. }
  73. }
  74. return nil
  75. }
  76. // Handshake should only be called once
  77. func (r *NanomsgReqChannel) Handshake() error {
  78. t, err := r.sock.GetOption(mangos.OptionRecvDeadline)
  79. if err != nil {
  80. return err
  81. }
  82. err = r.sock.SetOption(mangos.OptionRecvDeadline, time.Duration(conf.Config.Portable.InitTimeout)*time.Millisecond)
  83. if err != nil {
  84. return err
  85. }
  86. _, err = r.sock.Recv()
  87. if err != nil && err != mangos.ErrProtoState {
  88. return err
  89. }
  90. err = r.sock.SetOption(mangos.OptionRecvDeadline, t)
  91. if err != nil {
  92. return err
  93. }
  94. return nil
  95. }
  96. type DataInChannel interface {
  97. Recv() ([]byte, error)
  98. Closable
  99. }
  100. type DataOutChannel interface {
  101. Send([]byte) error
  102. Closable
  103. }
  104. type DataReqChannel interface {
  105. Req([]byte) ([]byte, error)
  106. Closable
  107. }
  108. type NanomsgReqRepChannel struct {
  109. sync.Mutex
  110. sock mangos.Socket
  111. }
  112. func (r *NanomsgReqRepChannel) Close() error {
  113. return r.sock.Close()
  114. }
  115. func (r *NanomsgReqRepChannel) Req(arg []byte) ([]byte, error) {
  116. r.Lock()
  117. defer r.Unlock()
  118. conf.Log.Debugf("send request: %s", string(arg))
  119. for {
  120. err := r.sock.Send(arg)
  121. // resend if protocol state wrong, because of plugin restart or other problems
  122. if err == mangos.ErrProtoState {
  123. conf.Log.Debugf("send request protestate error %s", err.Error())
  124. var prev []byte
  125. prev, err = r.sock.Recv()
  126. if err == nil {
  127. conf.Log.Warnf("discard previous response: %s", string(prev))
  128. conf.Log.Debugf("resend request: %s", string(arg))
  129. err = r.sock.Send(arg)
  130. }
  131. }
  132. if err != nil {
  133. return nil, fmt.Errorf("can't send message on function rep socket: %s", err.Error())
  134. }
  135. result, e := r.sock.Recv()
  136. if e != nil {
  137. conf.Log.Errorf("can't receive: %s", e.Error())
  138. } else {
  139. conf.Log.Debugf("receive response: %s", string(result))
  140. }
  141. if result[0] == 'h' {
  142. conf.Log.Debugf("receive handshake response: %s", string(result))
  143. continue
  144. }
  145. return result, e
  146. }
  147. }
  148. func CreateSourceChannel(ctx api.StreamContext) (DataInChannel, error) {
  149. var (
  150. sock mangos.Socket
  151. err error
  152. )
  153. if sock, err = pull.NewSocket(); err != nil {
  154. return nil, fmt.Errorf("can't get new pull socket: %s", err)
  155. }
  156. setSockOptions(sock, map[string]interface{}{
  157. mangos.OptionRecvDeadline: 500 * time.Millisecond,
  158. })
  159. url := fmt.Sprintf("ipc:///tmp/%s_%s_%d.ipc", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId())
  160. if err = listenWithRetry(sock, url); err != nil {
  161. return nil, fmt.Errorf("can't listen on pull socket for %s: %s", url, err.Error())
  162. }
  163. conf.Log.Infof("source channel created: %s", url)
  164. return sock, nil
  165. }
  166. func CreateFunctionChannel(symbolName string) (DataReqChannel, error) {
  167. var (
  168. sock mangos.Socket
  169. err error
  170. )
  171. if sock, err = rep.NewSocket(); err != nil {
  172. return nil, fmt.Errorf("can't get new rep socket: %s", err)
  173. }
  174. // Function must send out data quickly and wait for the response with some buffer
  175. setSockOptions(sock, map[string]interface{}{
  176. mangos.OptionRecvDeadline: 5000 * time.Millisecond,
  177. mangos.OptionSendDeadline: 1000 * time.Millisecond,
  178. mangos.OptionRetryTime: 0,
  179. })
  180. url := fmt.Sprintf("ipc:///tmp/func_%s.ipc", symbolName)
  181. if err = listenWithRetry(sock, url); err != nil {
  182. return nil, fmt.Errorf("can't listen on rep socket for %s: %s", url, err.Error())
  183. }
  184. conf.Log.Infof("function channel created: %s", url)
  185. return &NanomsgReqRepChannel{sock: sock}, nil
  186. }
  187. func CreateSinkChannel(ctx api.StreamContext) (DataOutChannel, error) {
  188. var (
  189. sock mangos.Socket
  190. err error
  191. )
  192. if sock, err = push.NewSocket(); err != nil {
  193. return nil, fmt.Errorf("can't get new push socket: %s", err)
  194. }
  195. setSockOptions(sock, map[string]interface{}{
  196. mangos.OptionSendDeadline: 1000 * time.Millisecond,
  197. })
  198. url := fmt.Sprintf("ipc:///tmp/%s_%s_%d.ipc", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId())
  199. if err = sock.DialOptions(url, dialOptions); err != nil {
  200. return nil, fmt.Errorf("can't dial on push socket: %s", err.Error())
  201. }
  202. conf.Log.Infof("sink channel created: %s", url)
  203. return sock, nil
  204. }
  205. func CreateControlChannel(pluginName string) (ControlChannel, error) {
  206. var (
  207. sock mangos.Socket
  208. err error
  209. )
  210. if sock, err = rep.NewSocket(); err != nil {
  211. return nil, fmt.Errorf("can't get new rep socket: %s", err)
  212. }
  213. // NO time out now for control channel
  214. // because the plugin instance liveness can be detected
  215. // thus, if the plugin exit, the control channel will be closed
  216. setSockOptions(sock, map[string]interface{}{
  217. mangos.OptionRecvDeadline: 1 * time.Hour,
  218. })
  219. url := fmt.Sprintf("ipc:///tmp/plugin_%s.ipc", pluginName)
  220. if err = listenWithRetry(sock, url); err != nil {
  221. return nil, fmt.Errorf("can't listen on rep socket: %s", err.Error())
  222. }
  223. conf.Log.Infof("control channel created: %s", url)
  224. return &NanomsgReqChannel{sock: sock}, nil
  225. }
  226. func setSockOptions(sock mangos.Socket, sockOptions map[string]interface{}) {
  227. for k, v := range sockOptions {
  228. err := sock.SetOption(k, v)
  229. if err != nil && err != mangos.ErrBadOption {
  230. conf.Log.Errorf("can't set socket option %s: %s", k, err.Error())
  231. }
  232. }
  233. }
  234. func listenWithRetry(sock mangos.Socket, url string) error {
  235. var (
  236. retryCount = 300
  237. retryInterval = 100
  238. )
  239. for {
  240. err := sock.Listen(url)
  241. if err == nil {
  242. conf.Log.Infof("start to listen at %s after %d tries", url, 301-retryCount)
  243. return err
  244. }
  245. retryCount--
  246. if retryCount < 0 {
  247. return err
  248. }
  249. time.Sleep(time.Duration(retryInterval) * time.Millisecond)
  250. }
  251. }