connection.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package connection
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/sdk/go/api"
  18. "github.com/lf-edge/ekuiper/sdk/go/context"
  19. "go.nanomsg.org/mangos/v3"
  20. "go.nanomsg.org/mangos/v3/protocol/pull"
  21. "go.nanomsg.org/mangos/v3/protocol/push"
  22. "go.nanomsg.org/mangos/v3/protocol/req"
  23. _ "go.nanomsg.org/mangos/v3/transport/ipc"
  24. "time"
  25. )
  26. // Options Initialized in plugin.go Start according to the config
  27. var Options map[string]interface{}
  28. type Closable interface {
  29. Close() error
  30. }
  31. type ReplyFunc func([]byte) []byte
  32. type ControlChannel interface {
  33. // reply with string message
  34. Run(ReplyFunc) error
  35. Closable
  36. }
  37. type DataInChannel interface {
  38. Recv() ([]byte, error)
  39. Closable
  40. }
  41. type DataOutChannel interface {
  42. Send([]byte) error
  43. Closable
  44. }
  45. type DataInOutChannel interface {
  46. Run(ReplyFunc) error
  47. Closable
  48. }
  49. type NanomsgRepChannel struct {
  50. sock mangos.Socket
  51. }
  52. // Run until process end
  53. func (r *NanomsgRepChannel) Run(f ReplyFunc) error {
  54. err := r.sock.Send([]byte("handshake"))
  55. if err != nil {
  56. return fmt.Errorf("can't send handshake: %s", err.Error())
  57. }
  58. for {
  59. msg, err := r.sock.Recv()
  60. if err != nil {
  61. return fmt.Errorf("cannot receive on rep socket: %s", err.Error())
  62. }
  63. reply := f(msg)
  64. err = r.sock.Send(reply)
  65. if err != nil {
  66. return fmt.Errorf("can't send reply: %s", err.Error())
  67. }
  68. }
  69. return nil
  70. }
  71. func (r *NanomsgRepChannel) Close() error {
  72. return r.sock.Close()
  73. }
  74. func CreateControlChannel(pluginName string) (ControlChannel, error) {
  75. var (
  76. sock mangos.Socket
  77. err error
  78. )
  79. if sock, err = req.NewSocket(); err != nil {
  80. return nil, fmt.Errorf("can't get new req socket: %s", err)
  81. }
  82. setSockOptions(sock)
  83. sock.SetOption(mangos.OptionRetryTime, 0)
  84. url := fmt.Sprintf("ipc:///tmp/plugin_%s.ipc", pluginName)
  85. if err = sock.Dial(url); err != nil {
  86. return nil, fmt.Errorf("can't dial on req socket: %s", err.Error())
  87. }
  88. return &NanomsgRepChannel{sock: sock}, nil
  89. }
  90. func CreateSourceChannel(ctx api.StreamContext) (DataOutChannel, error) {
  91. var (
  92. sock mangos.Socket
  93. err error
  94. )
  95. if sock, err = push.NewSocket(); err != nil {
  96. return nil, fmt.Errorf("can't get new push socket: %s", err)
  97. }
  98. setSockOptions(sock)
  99. url := fmt.Sprintf("ipc:///tmp/%s_%s_%d.ipc", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId())
  100. if err = sock.Dial(url); err != nil {
  101. return nil, fmt.Errorf("can't dial on push socket: %s", err.Error())
  102. }
  103. return sock, nil
  104. }
  105. func CreateFuncChannel(symbolName string) (DataInOutChannel, error) {
  106. var (
  107. sock mangos.Socket
  108. err error
  109. )
  110. if sock, err = req.NewSocket(); err != nil {
  111. return nil, fmt.Errorf("can't get new req socket: %s", err)
  112. }
  113. setSockOptions(sock)
  114. sock.SetOption(mangos.OptionRetryTime, 0)
  115. url := fmt.Sprintf("ipc:///tmp/func_%s.ipc", symbolName)
  116. if err = sock.Dial(url); err != nil {
  117. return nil, fmt.Errorf("can't dial on req socket: %s", err.Error())
  118. }
  119. return &NanomsgRepChannel{sock: sock}, nil
  120. }
  121. func CreateSinkChannel(ctx api.StreamContext) (DataInChannel, error) {
  122. var (
  123. sock mangos.Socket
  124. err error
  125. )
  126. if sock, err = pull.NewSocket(); err != nil {
  127. return nil, fmt.Errorf("can't get new pull socket: %s", err)
  128. }
  129. setSockOptions(sock)
  130. url := fmt.Sprintf("ipc:///tmp/%s_%s_%d.ipc", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId())
  131. if err = listenWithRetry(sock, url); err != nil {
  132. return nil, fmt.Errorf("can't listen on pull socket for %s: %s", url, err.Error())
  133. }
  134. return sock, nil
  135. }
  136. func setSockOptions(sock mangos.Socket) {
  137. for k, v := range Options {
  138. sock.SetOption(k, v)
  139. }
  140. }
  141. func listenWithRetry(sock mangos.Socket, url string) error {
  142. var (
  143. retryCount = 300
  144. retryInterval = 10
  145. )
  146. for {
  147. err := sock.Listen(url)
  148. if err == nil {
  149. context.Log.Infof("plugin start to listen after %d tries", retryCount)
  150. return err
  151. }
  152. retryCount--
  153. if retryCount < 0 {
  154. return err
  155. }
  156. time.Sleep(time.Duration(retryInterval) * time.Millisecond)
  157. }
  158. }