connection.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package connection
  15. import (
  16. "fmt"
  17. "time"
  18. "go.nanomsg.org/mangos/v3"
  19. "go.nanomsg.org/mangos/v3/protocol/pull"
  20. "go.nanomsg.org/mangos/v3/protocol/push"
  21. "go.nanomsg.org/mangos/v3/protocol/req"
  22. // introduce ipc
  23. _ "go.nanomsg.org/mangos/v3/transport/ipc"
  24. "github.com/lf-edge/ekuiper/sdk/go/api"
  25. "github.com/lf-edge/ekuiper/sdk/go/context"
  26. )
  27. // Options Initialized in plugin.go Start according to the config
  28. var (
  29. dialOptions = map[string]interface{}{
  30. mangos.OptionDialAsynch: false,
  31. mangos.OptionMaxReconnectTime: 5 * time.Second,
  32. mangos.OptionReconnectTime: 100 * time.Millisecond,
  33. }
  34. )
  35. type Closable interface {
  36. Close() error
  37. }
  38. type ReplyFunc func([]byte) []byte
  39. type ControlChannel interface {
  40. // reply with string message
  41. Run(ReplyFunc) error
  42. Closable
  43. }
  44. type DataInChannel interface {
  45. Recv() ([]byte, error)
  46. Closable
  47. }
  48. type DataOutChannel interface {
  49. Send([]byte) error
  50. Closable
  51. }
  52. type DataInOutChannel interface {
  53. Run(ReplyFunc) error
  54. Closable
  55. }
  56. type NanomsgRepChannel struct {
  57. sock mangos.Socket
  58. }
  59. // Run until process end
  60. func (r *NanomsgRepChannel) Run(f ReplyFunc) error {
  61. err := r.sock.Send([]byte("handshake"))
  62. if err != nil {
  63. return fmt.Errorf("can't send handshake: %s", err.Error())
  64. }
  65. for {
  66. msg, err := r.sock.Recv()
  67. if err != nil {
  68. return fmt.Errorf("cannot receive on rep socket: %s", err.Error())
  69. }
  70. reply := f(msg)
  71. err = r.sock.Send(reply)
  72. if err != nil {
  73. return fmt.Errorf("can't send reply: %s", err.Error())
  74. }
  75. }
  76. }
  77. func (r *NanomsgRepChannel) Close() error {
  78. return r.sock.Close()
  79. }
  80. func CreateControlChannel(pluginName string) (ControlChannel, error) {
  81. var (
  82. sock mangos.Socket
  83. err error
  84. )
  85. if sock, err = req.NewSocket(); err != nil {
  86. return nil, fmt.Errorf("can't get new req socket: %s", err)
  87. }
  88. setSockOptions(sock, map[string]interface{}{
  89. mangos.OptionRetryTime: 0,
  90. })
  91. url := fmt.Sprintf("ipc:///tmp/plugin_%s.ipc", pluginName)
  92. if err = sock.DialOptions(url, dialOptions); err != nil {
  93. return nil, fmt.Errorf("can't dial on req socket: %s", err.Error())
  94. }
  95. return &NanomsgRepChannel{sock: sock}, nil
  96. }
  97. func CreateSourceChannel(ctx api.StreamContext) (DataOutChannel, error) {
  98. var (
  99. sock mangos.Socket
  100. err error
  101. )
  102. if sock, err = push.NewSocket(); err != nil {
  103. return nil, fmt.Errorf("can't get new push socket: %s", err)
  104. }
  105. setSockOptions(sock, map[string]interface{}{
  106. mangos.OptionSendDeadline: 1000 * time.Millisecond,
  107. })
  108. url := fmt.Sprintf("ipc:///tmp/%s_%s_%d.ipc", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId())
  109. if err = sock.DialOptions(url, dialOptions); err != nil {
  110. return nil, fmt.Errorf("can't dial on push socket: %s", err.Error())
  111. }
  112. return sock, nil
  113. }
  114. func CreateFuncChannel(symbolName string) (DataInOutChannel, error) {
  115. var (
  116. sock mangos.Socket
  117. err error
  118. )
  119. if sock, err = req.NewSocket(); err != nil {
  120. return nil, fmt.Errorf("can't get new req socket: %s", err)
  121. }
  122. // The recv should not have timeout because it is event driven
  123. setSockOptions(sock, map[string]interface{}{
  124. mangos.OptionSendDeadline: 1000 * time.Millisecond,
  125. mangos.OptionRetryTime: 0,
  126. })
  127. url := fmt.Sprintf("ipc:///tmp/func_%s.ipc", symbolName)
  128. if err = sock.DialOptions(url, dialOptions); err != nil {
  129. return nil, fmt.Errorf("can't dial on req socket: %s", err.Error())
  130. }
  131. return &NanomsgRepChannel{sock: sock}, nil
  132. }
  133. func CreateSinkChannel(ctx api.StreamContext) (DataInChannel, error) {
  134. var (
  135. sock mangos.Socket
  136. err error
  137. )
  138. if sock, err = pull.NewSocket(); err != nil {
  139. return nil, fmt.Errorf("can't get new pull socket: %s", err)
  140. }
  141. setSockOptions(sock, map[string]interface{}{
  142. mangos.OptionRecvDeadline: 500 * time.Millisecond,
  143. })
  144. url := fmt.Sprintf("ipc:///tmp/%s_%s_%d.ipc", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId())
  145. if err = listenWithRetry(sock, url); err != nil {
  146. return nil, fmt.Errorf("can't listen on pull socket for %s: %s", url, err.Error())
  147. }
  148. return sock, nil
  149. }
  150. func setSockOptions(sock mangos.Socket, sockOptions map[string]interface{}) {
  151. for k, v := range sockOptions {
  152. err := sock.SetOption(k, v)
  153. if err != nil && err != mangos.ErrBadOption {
  154. context.Log.Errorf("can't set socket option %s: %s", k, err.Error())
  155. }
  156. }
  157. }
  158. func listenWithRetry(sock mangos.Socket, url string) error {
  159. var (
  160. retryCount = 300
  161. retryInterval = 10
  162. )
  163. for {
  164. err := sock.Listen(url)
  165. if err == nil {
  166. context.Log.Infof("plugin start to listen after %d tries", retryCount)
  167. return err
  168. }
  169. retryCount--
  170. if retryCount < 0 {
  171. return err
  172. }
  173. time.Sleep(time.Duration(retryInterval) * time.Millisecond)
  174. }
  175. }