connection.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package runtime
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/conf"
  18. "github.com/lf-edge/ekuiper/pkg/api"
  19. "go.nanomsg.org/mangos/v3"
  20. "go.nanomsg.org/mangos/v3/protocol/pull"
  21. "go.nanomsg.org/mangos/v3/protocol/push"
  22. "go.nanomsg.org/mangos/v3/protocol/rep"
  23. _ "go.nanomsg.org/mangos/v3/transport/ipc"
  24. "sync"
  25. "time"
  26. )
  27. // Options Initialized in config
  28. var Options = map[string]interface{}{
  29. mangos.OptionSendDeadline: 1000,
  30. }
  31. type Closable interface {
  32. Close() error
  33. }
  34. type ControlChannel interface {
  35. Handshake() error
  36. SendCmd(arg []byte) error
  37. Closable
  38. }
  39. // NanomsgReqChannel shared by symbols
  40. type NanomsgReqChannel struct {
  41. sync.Mutex
  42. sock mangos.Socket
  43. }
  44. func (r *NanomsgReqChannel) Close() error {
  45. return r.sock.Close()
  46. }
  47. func (r *NanomsgReqChannel) SendCmd(arg []byte) error {
  48. r.Lock()
  49. defer r.Unlock()
  50. if err := r.sock.Send(arg); err != nil {
  51. return fmt.Errorf("can't send message on control rep socket: %s", err.Error())
  52. }
  53. if msg, err := r.sock.Recv(); err != nil {
  54. return fmt.Errorf("can't receive: %s", err.Error())
  55. } else {
  56. if string(msg) != "ok" {
  57. return fmt.Errorf("receive error: %s", string(msg))
  58. }
  59. }
  60. return nil
  61. }
  62. // Handshake should only be called once
  63. func (r *NanomsgReqChannel) Handshake() error {
  64. _, err := r.sock.Recv()
  65. return err
  66. }
  67. type DataInChannel interface {
  68. Recv() ([]byte, error)
  69. Closable
  70. }
  71. type DataOutChannel interface {
  72. Send([]byte) error
  73. Closable
  74. }
  75. type DataReqChannel interface {
  76. Handshake() error
  77. Req([]byte) ([]byte, error)
  78. Closable
  79. }
  80. type NanomsgReqRepChannel struct {
  81. sync.Mutex
  82. sock mangos.Socket
  83. }
  84. func (r *NanomsgReqRepChannel) Close() error {
  85. return r.sock.Close()
  86. }
  87. func (r *NanomsgReqRepChannel) Req(arg []byte) ([]byte, error) {
  88. r.Lock()
  89. defer r.Unlock()
  90. if err := r.sock.Send(arg); err != nil {
  91. return nil, fmt.Errorf("can't send message on function rep socket: %s", err.Error())
  92. }
  93. return r.sock.Recv()
  94. }
  95. // Handshake should only be called once
  96. func (r *NanomsgReqRepChannel) Handshake() error {
  97. _, err := r.sock.Recv()
  98. return err
  99. }
  100. func CreateSourceChannel(ctx api.StreamContext) (DataInChannel, error) {
  101. var (
  102. sock mangos.Socket
  103. err error
  104. )
  105. if sock, err = pull.NewSocket(); err != nil {
  106. return nil, fmt.Errorf("can't get new pull socket: %s", err)
  107. }
  108. setSockOptions(sock)
  109. url := fmt.Sprintf("ipc:///tmp/%s_%s_%d.ipc", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId())
  110. if err = listenWithRetry(sock, url); err != nil {
  111. return nil, fmt.Errorf("can't listen on pull socket for %s: %s", url, err.Error())
  112. }
  113. return sock, nil
  114. }
  115. func CreateFunctionChannel(symbolName string) (DataReqChannel, error) {
  116. var (
  117. sock mangos.Socket
  118. err error
  119. )
  120. if sock, err = rep.NewSocket(); err != nil {
  121. return nil, fmt.Errorf("can't get new rep socket: %s", err)
  122. }
  123. setSockOptions(sock)
  124. sock.SetOption(mangos.OptionRecvDeadline, 1000)
  125. url := fmt.Sprintf("ipc:///tmp/func_%s.ipc", symbolName)
  126. if err = listenWithRetry(sock, url); err != nil {
  127. return nil, fmt.Errorf("can't listen on rep socket for %s: %s", url, err.Error())
  128. }
  129. return &NanomsgReqRepChannel{sock: sock}, nil
  130. }
  131. func CreateSinkChannel(ctx api.StreamContext) (DataOutChannel, error) {
  132. var (
  133. sock mangos.Socket
  134. err error
  135. )
  136. if sock, err = push.NewSocket(); err != nil {
  137. return nil, fmt.Errorf("can't get new push socket: %s", err)
  138. }
  139. setSockOptions(sock)
  140. url := fmt.Sprintf("ipc:///tmp/%s_%s_%d.ipc", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId())
  141. if err = sock.Dial(url); err != nil {
  142. return nil, fmt.Errorf("can't dial on push socket: %s", err.Error())
  143. }
  144. return sock, nil
  145. }
  146. func CreateControlChannel(pluginName string) (ControlChannel, error) {
  147. var (
  148. sock mangos.Socket
  149. err error
  150. )
  151. if sock, err = rep.NewSocket(); err != nil {
  152. return nil, fmt.Errorf("can't get new rep socket: %s", err)
  153. }
  154. setSockOptions(sock)
  155. sock.SetOption(mangos.OptionRecvDeadline, 100)
  156. url := fmt.Sprintf("ipc:///tmp/plugin_%s.ipc", pluginName)
  157. if err = listenWithRetry(sock, url); err != nil {
  158. return nil, fmt.Errorf("can't listen on rep socket: %s", err.Error())
  159. }
  160. return &NanomsgReqChannel{sock: sock}, nil
  161. }
  162. func setSockOptions(sock mangos.Socket) {
  163. for k, v := range Options {
  164. sock.SetOption(k, v)
  165. }
  166. }
  167. func listenWithRetry(sock mangos.Socket, url string) error {
  168. var (
  169. retryCount = 300
  170. retryInterval = 100
  171. )
  172. for {
  173. err := sock.Listen(url)
  174. if err == nil {
  175. conf.Log.Infof("start to listen after %d tries", 301-retryCount)
  176. return err
  177. }
  178. retryCount--
  179. if retryCount < 0 {
  180. return err
  181. }
  182. time.Sleep(time.Duration(retryInterval) * time.Millisecond)
  183. }
  184. }