connection.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package connection
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/sdk/go/api"
  18. "github.com/lf-edge/ekuiper/sdk/go/context"
  19. "go.nanomsg.org/mangos/v3"
  20. "go.nanomsg.org/mangos/v3/protocol/pull"
  21. "go.nanomsg.org/mangos/v3/protocol/push"
  22. "go.nanomsg.org/mangos/v3/protocol/req"
  23. _ "go.nanomsg.org/mangos/v3/transport/ipc"
  24. "time"
  25. )
  26. // Options Initialized in plugin.go Start according to the config
  27. var (
  28. dialOptions = map[string]interface{}{
  29. mangos.OptionDialAsynch: false,
  30. mangos.OptionMaxReconnectTime: 5 * time.Second,
  31. mangos.OptionReconnectTime: 100 * time.Millisecond,
  32. }
  33. )
  34. type Closable interface {
  35. Close() error
  36. }
  37. type ReplyFunc func([]byte) []byte
  38. type ControlChannel interface {
  39. // reply with string message
  40. Run(ReplyFunc) error
  41. Closable
  42. }
  43. type DataInChannel interface {
  44. Recv() ([]byte, error)
  45. Closable
  46. }
  47. type DataOutChannel interface {
  48. Send([]byte) error
  49. Closable
  50. }
  51. type DataInOutChannel interface {
  52. Run(ReplyFunc) error
  53. Closable
  54. }
  55. type NanomsgRepChannel struct {
  56. sock mangos.Socket
  57. }
  58. // Run until process end
  59. func (r *NanomsgRepChannel) Run(f ReplyFunc) error {
  60. err := r.sock.Send([]byte("handshake"))
  61. if err != nil {
  62. return fmt.Errorf("can't send handshake: %s", err.Error())
  63. }
  64. for {
  65. msg, err := r.sock.Recv()
  66. if err != nil {
  67. return fmt.Errorf("cannot receive on rep socket: %s", err.Error())
  68. }
  69. reply := f(msg)
  70. err = r.sock.Send(reply)
  71. if err != nil {
  72. return fmt.Errorf("can't send reply: %s", err.Error())
  73. }
  74. }
  75. }
  76. func (r *NanomsgRepChannel) Close() error {
  77. return r.sock.Close()
  78. }
  79. func CreateControlChannel(pluginName string) (ControlChannel, error) {
  80. var (
  81. sock mangos.Socket
  82. err error
  83. )
  84. if sock, err = req.NewSocket(); err != nil {
  85. return nil, fmt.Errorf("can't get new req socket: %s", err)
  86. }
  87. setSockOptions(sock, map[string]interface{}{
  88. mangos.OptionRetryTime: 0,
  89. })
  90. url := fmt.Sprintf("ipc:///tmp/plugin_%s.ipc", pluginName)
  91. if err = sock.DialOptions(url, dialOptions); err != nil {
  92. return nil, fmt.Errorf("can't dial on req socket: %s", err.Error())
  93. }
  94. return &NanomsgRepChannel{sock: sock}, nil
  95. }
  96. func CreateSourceChannel(ctx api.StreamContext) (DataOutChannel, error) {
  97. var (
  98. sock mangos.Socket
  99. err error
  100. )
  101. if sock, err = push.NewSocket(); err != nil {
  102. return nil, fmt.Errorf("can't get new push socket: %s", err)
  103. }
  104. setSockOptions(sock, map[string]interface{}{
  105. mangos.OptionSendDeadline: 1000 * time.Millisecond,
  106. })
  107. url := fmt.Sprintf("ipc:///tmp/%s_%s_%d.ipc", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId())
  108. if err = sock.DialOptions(url, dialOptions); err != nil {
  109. return nil, fmt.Errorf("can't dial on push socket: %s", err.Error())
  110. }
  111. return sock, nil
  112. }
  113. func CreateFuncChannel(symbolName string) (DataInOutChannel, error) {
  114. var (
  115. sock mangos.Socket
  116. err error
  117. )
  118. if sock, err = req.NewSocket(); err != nil {
  119. return nil, fmt.Errorf("can't get new req socket: %s", err)
  120. }
  121. // The recv should not have timeout because it is event driven
  122. setSockOptions(sock, map[string]interface{}{
  123. mangos.OptionSendDeadline: 1000 * time.Millisecond,
  124. mangos.OptionRetryTime: 0,
  125. })
  126. url := fmt.Sprintf("ipc:///tmp/func_%s.ipc", symbolName)
  127. if err = sock.DialOptions(url, dialOptions); err != nil {
  128. return nil, fmt.Errorf("can't dial on req socket: %s", err.Error())
  129. }
  130. return &NanomsgRepChannel{sock: sock}, nil
  131. }
  132. func CreateSinkChannel(ctx api.StreamContext) (DataInChannel, error) {
  133. var (
  134. sock mangos.Socket
  135. err error
  136. )
  137. if sock, err = pull.NewSocket(); err != nil {
  138. return nil, fmt.Errorf("can't get new pull socket: %s", err)
  139. }
  140. setSockOptions(sock, map[string]interface{}{
  141. mangos.OptionRecvDeadline: 500 * time.Millisecond,
  142. })
  143. url := fmt.Sprintf("ipc:///tmp/%s_%s_%d.ipc", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId())
  144. if err = listenWithRetry(sock, url); err != nil {
  145. return nil, fmt.Errorf("can't listen on pull socket for %s: %s", url, err.Error())
  146. }
  147. return sock, nil
  148. }
  149. func setSockOptions(sock mangos.Socket, sockOptions map[string]interface{}) {
  150. for k, v := range sockOptions {
  151. err := sock.SetOption(k, v)
  152. if err != nil && err != mangos.ErrBadOption {
  153. context.Log.Errorf("can't set socket option %s: %s", k, err.Error())
  154. }
  155. }
  156. }
  157. func listenWithRetry(sock mangos.Socket, url string) error {
  158. var (
  159. retryCount = 300
  160. retryInterval = 10
  161. )
  162. for {
  163. err := sock.Listen(url)
  164. if err == nil {
  165. context.Log.Infof("plugin start to listen after %d tries", retryCount)
  166. return err
  167. }
  168. retryCount--
  169. if retryCount < 0 {
  170. return err
  171. }
  172. time.Sleep(time.Duration(retryInterval) * time.Millisecond)
  173. }
  174. }