connection.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package runtime
  15. import (
  16. "fmt"
  17. "sync"
  18. "time"
  19. "go.nanomsg.org/mangos/v3"
  20. "go.nanomsg.org/mangos/v3/protocol/pull"
  21. "go.nanomsg.org/mangos/v3/protocol/push"
  22. "go.nanomsg.org/mangos/v3/protocol/rep"
  23. // introduce ipc
  24. _ "go.nanomsg.org/mangos/v3/transport/ipc"
  25. "github.com/lf-edge/ekuiper/internal/conf"
  26. "github.com/lf-edge/ekuiper/pkg/api"
  27. )
  28. // TODO to design timeout strategy
  29. // sockOptions Initialized in config
  30. var (
  31. dialOptions = map[string]interface{}{
  32. mangos.OptionDialAsynch: false,
  33. mangos.OptionMaxReconnectTime: 5 * time.Second,
  34. mangos.OptionReconnectTime: 100 * time.Millisecond,
  35. }
  36. )
  37. type Closable interface {
  38. Close() error
  39. }
  40. type ControlChannel interface {
  41. Handshake() error
  42. SendCmd(arg []byte) error
  43. Closable
  44. }
  45. // NanomsgReqChannel shared by symbols
  46. type NanomsgReqChannel struct {
  47. sync.Mutex
  48. sock mangos.Socket
  49. }
  50. func (r *NanomsgReqChannel) Close() error {
  51. return r.sock.Close()
  52. }
  53. func (r *NanomsgReqChannel) SendCmd(arg []byte) error {
  54. r.Lock()
  55. defer r.Unlock()
  56. if err := r.sock.Send(arg); err != nil {
  57. if err == mangos.ErrProtoState {
  58. _, err = r.sock.Recv()
  59. if err == nil {
  60. err = r.sock.Send(arg)
  61. if err == nil {
  62. return nil
  63. }
  64. }
  65. }
  66. return fmt.Errorf("can't send message on control rep socket: %s", err.Error())
  67. }
  68. if msg, err := r.sock.Recv(); err != nil {
  69. return fmt.Errorf("can't receive: %s", err.Error())
  70. } else {
  71. if string(msg) != "ok" {
  72. return fmt.Errorf("receive error: %s", string(msg))
  73. }
  74. }
  75. return nil
  76. }
  77. // Handshake should only be called once
  78. func (r *NanomsgReqChannel) Handshake() error {
  79. t, err := r.sock.GetOption(mangos.OptionRecvDeadline)
  80. if err != nil {
  81. return err
  82. }
  83. err = r.sock.SetOption(mangos.OptionRecvDeadline, time.Duration(conf.Config.Portable.InitTimeout)*time.Millisecond)
  84. if err != nil {
  85. return err
  86. }
  87. _, err = r.sock.Recv()
  88. if err != nil && err != mangos.ErrProtoState {
  89. return err
  90. }
  91. err = r.sock.SetOption(mangos.OptionRecvDeadline, t)
  92. if err != nil {
  93. return err
  94. }
  95. return nil
  96. }
  97. type DataInChannel interface {
  98. Recv() ([]byte, error)
  99. Closable
  100. }
  101. type DataOutChannel interface {
  102. Send([]byte) error
  103. Closable
  104. }
  105. type DataReqChannel interface {
  106. Req([]byte) ([]byte, error)
  107. Closable
  108. }
  109. type NanomsgReqRepChannel struct {
  110. sync.Mutex
  111. sock mangos.Socket
  112. }
  113. func (r *NanomsgReqRepChannel) Close() error {
  114. return r.sock.Close()
  115. }
  116. func (r *NanomsgReqRepChannel) Req(arg []byte) ([]byte, error) {
  117. r.Lock()
  118. defer r.Unlock()
  119. conf.Log.Debugf("send request: %s", string(arg))
  120. for {
  121. err := r.sock.Send(arg)
  122. // resend if protocol state wrong, because of plugin restart or other problems
  123. if err == mangos.ErrProtoState {
  124. conf.Log.Debugf("send request protestate error %s", err.Error())
  125. var prev []byte
  126. prev, err = r.sock.Recv()
  127. if err == nil {
  128. conf.Log.Warnf("discard previous response: %s", string(prev))
  129. conf.Log.Debugf("resend request: %s", string(arg))
  130. err = r.sock.Send(arg)
  131. }
  132. }
  133. if err != nil {
  134. return nil, fmt.Errorf("can't send message on function rep socket: %s", err.Error())
  135. }
  136. result, e := r.sock.Recv()
  137. if e != nil {
  138. conf.Log.Errorf("can't receive: %s", e.Error())
  139. } else {
  140. conf.Log.Debugf("receive response: %s", string(result))
  141. }
  142. if len(result) > 0 && result[0] == 'h' {
  143. conf.Log.Debugf("receive handshake response: %s", string(result))
  144. continue
  145. }
  146. return result, e
  147. }
  148. }
  149. func CreateSourceChannel(ctx api.StreamContext) (DataInChannel, error) {
  150. var (
  151. sock mangos.Socket
  152. err error
  153. )
  154. if sock, err = pull.NewSocket(); err != nil {
  155. return nil, fmt.Errorf("can't get new pull socket: %s", err)
  156. }
  157. setSockOptions(sock, map[string]interface{}{
  158. mangos.OptionRecvDeadline: 500 * time.Millisecond,
  159. })
  160. url := fmt.Sprintf("ipc:///tmp/%s_%s_%d.ipc", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId())
  161. if err = listenWithRetry(sock, url); err != nil {
  162. return nil, fmt.Errorf("can't listen on pull socket for %s: %s", url, err.Error())
  163. }
  164. conf.Log.Infof("source channel created: %s", url)
  165. return sock, nil
  166. }
  167. func CreateFunctionChannel(symbolName string) (DataReqChannel, error) {
  168. var (
  169. sock mangos.Socket
  170. err error
  171. )
  172. if sock, err = rep.NewSocket(); err != nil {
  173. return nil, fmt.Errorf("can't get new rep socket: %s", err)
  174. }
  175. // Function must send out data quickly and wait for the response with some buffer
  176. setSockOptions(sock, map[string]interface{}{
  177. mangos.OptionRecvDeadline: 5000 * time.Millisecond,
  178. mangos.OptionSendDeadline: 1000 * time.Millisecond,
  179. mangos.OptionRetryTime: 0,
  180. })
  181. url := fmt.Sprintf("ipc:///tmp/func_%s.ipc", symbolName)
  182. if err = listenWithRetry(sock, url); err != nil {
  183. return nil, fmt.Errorf("can't listen on rep socket for %s: %s", url, err.Error())
  184. }
  185. conf.Log.Infof("function channel created: %s", url)
  186. return &NanomsgReqRepChannel{sock: sock}, nil
  187. }
  188. func CreateSinkChannel(ctx api.StreamContext) (DataOutChannel, error) {
  189. var (
  190. sock mangos.Socket
  191. err error
  192. )
  193. if sock, err = push.NewSocket(); err != nil {
  194. return nil, fmt.Errorf("can't get new push socket: %s", err)
  195. }
  196. setSockOptions(sock, map[string]interface{}{
  197. mangos.OptionSendDeadline: 1000 * time.Millisecond,
  198. })
  199. url := fmt.Sprintf("ipc:///tmp/%s_%s_%d.ipc", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId())
  200. if err = sock.DialOptions(url, dialOptions); err != nil {
  201. return nil, fmt.Errorf("can't dial on push socket: %s", err.Error())
  202. }
  203. conf.Log.Infof("sink channel created: %s", url)
  204. return sock, nil
  205. }
  206. func CreateControlChannel(pluginName string) (ControlChannel, error) {
  207. var (
  208. sock mangos.Socket
  209. err error
  210. )
  211. if sock, err = rep.NewSocket(); err != nil {
  212. return nil, fmt.Errorf("can't get new rep socket: %s", err)
  213. }
  214. // NO time out now for control channel
  215. // because the plugin instance liveness can be detected
  216. // thus, if the plugin exit, the control channel will be closed
  217. setSockOptions(sock, map[string]interface{}{
  218. mangos.OptionRecvDeadline: 1 * time.Hour,
  219. })
  220. url := fmt.Sprintf("ipc:///tmp/plugin_%s.ipc", pluginName)
  221. if err = listenWithRetry(sock, url); err != nil {
  222. return nil, fmt.Errorf("can't listen on rep socket: %s", err.Error())
  223. }
  224. conf.Log.Infof("control channel created: %s", url)
  225. return &NanomsgReqChannel{sock: sock}, nil
  226. }
  227. func setSockOptions(sock mangos.Socket, sockOptions map[string]interface{}) {
  228. for k, v := range sockOptions {
  229. err := sock.SetOption(k, v)
  230. if err != nil && err != mangos.ErrBadOption {
  231. conf.Log.Errorf("can't set socket option %s: %s", k, err.Error())
  232. }
  233. }
  234. }
  235. func listenWithRetry(sock mangos.Socket, url string) error {
  236. var (
  237. retryCount = 5
  238. retryInterval = 100
  239. )
  240. for {
  241. err := sock.Listen(url)
  242. if err == nil {
  243. conf.Log.Infof("start to listen at %s after %d tries", url, 5-retryCount)
  244. return err
  245. }
  246. retryCount--
  247. if retryCount < 0 {
  248. return err
  249. }
  250. time.Sleep(time.Duration(retryInterval) * time.Millisecond)
  251. }
  252. }