connection.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package connection
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/sdk/api"
  18. "github.com/lf-edge/ekuiper/sdk/context"
  19. "go.nanomsg.org/mangos/v3"
  20. "go.nanomsg.org/mangos/v3/protocol/pull"
  21. "go.nanomsg.org/mangos/v3/protocol/push"
  22. "go.nanomsg.org/mangos/v3/protocol/req"
  23. _ "go.nanomsg.org/mangos/v3/transport/ipc"
  24. "time"
  25. )
  26. // Options Initialized in plugin.go Start according to the config
  27. var Options map[string]interface{}
  28. type Closable interface {
  29. Close() error
  30. }
  31. type ReplyFunc func([]byte) []byte
  32. type ControlChannel interface {
  33. // reply with string message
  34. Run(ReplyFunc) error
  35. Closable
  36. }
  37. type DataInChannel interface {
  38. Recv() ([]byte, error)
  39. Closable
  40. }
  41. type DataOutChannel interface {
  42. Send([]byte) error
  43. Closable
  44. }
  45. type DataInOutChannel interface {
  46. Run(ReplyFunc) error
  47. Closable
  48. }
  49. type NanomsgRepChannel struct {
  50. sock mangos.Socket
  51. }
  52. // Run until process end
  53. func (r *NanomsgRepChannel) Run(f ReplyFunc) error {
  54. err := r.sock.Send([]byte("handshake"))
  55. if err != nil {
  56. return fmt.Errorf("can't send handshake: %s", err.Error())
  57. }
  58. for {
  59. msg, err := r.sock.Recv()
  60. if err != nil {
  61. return fmt.Errorf("cannot receive on rep socket: %s", err.Error())
  62. }
  63. reply := f(msg)
  64. err = r.sock.Send(reply)
  65. if err != nil {
  66. return fmt.Errorf("can't send reply: %s", err.Error())
  67. }
  68. }
  69. return nil
  70. }
  71. func (r *NanomsgRepChannel) Close() error {
  72. return r.sock.Close()
  73. }
  74. func CreateControlChannel(pluginName string) (ControlChannel, error) {
  75. var (
  76. sock mangos.Socket
  77. err error
  78. )
  79. if sock, err = req.NewSocket(); err != nil {
  80. return nil, fmt.Errorf("can't get new req socket: %s", err)
  81. }
  82. setSockOptions(sock)
  83. url := fmt.Sprintf("ipc:///tmp/plugin_%s.ipc", pluginName)
  84. if err = sock.Dial(url); err != nil {
  85. return nil, fmt.Errorf("can't dial on req socket: %s", err.Error())
  86. }
  87. return &NanomsgRepChannel{sock: sock}, nil
  88. }
  89. func CreateSourceChannel(ctx api.StreamContext) (DataOutChannel, error) {
  90. var (
  91. sock mangos.Socket
  92. err error
  93. )
  94. if sock, err = push.NewSocket(); err != nil {
  95. return nil, fmt.Errorf("can't get new push socket: %s", err)
  96. }
  97. setSockOptions(sock)
  98. url := fmt.Sprintf("ipc:///tmp/%s_%s_%d.ipc", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId())
  99. if err = sock.Dial(url); err != nil {
  100. return nil, fmt.Errorf("can't dial on push socket: %s", err.Error())
  101. }
  102. return sock, nil
  103. }
  104. func CreateFuncChannel(symbolName string) (DataInOutChannel, error) {
  105. var (
  106. sock mangos.Socket
  107. err error
  108. )
  109. if sock, err = req.NewSocket(); err != nil {
  110. return nil, fmt.Errorf("can't get new req socket: %s", err)
  111. }
  112. setSockOptions(sock)
  113. url := fmt.Sprintf("ipc:///tmp/func_%s.ipc", symbolName)
  114. if err = sock.Dial(url); err != nil {
  115. return nil, fmt.Errorf("can't dial on req socket: %s", err.Error())
  116. }
  117. return &NanomsgRepChannel{sock: sock}, nil
  118. }
  119. func CreateSinkChannel(ctx api.StreamContext) (DataInChannel, error) {
  120. var (
  121. sock mangos.Socket
  122. err error
  123. )
  124. if sock, err = pull.NewSocket(); err != nil {
  125. return nil, fmt.Errorf("can't get new pull socket: %s", err)
  126. }
  127. setSockOptions(sock)
  128. url := fmt.Sprintf("ipc:///tmp/%s_%s_%d.ipc", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId())
  129. if err = listenWithRetry(sock, url); err != nil {
  130. return nil, fmt.Errorf("can't listen on pull socket for %s: %s", url, err.Error())
  131. }
  132. return sock, nil
  133. }
  134. func setSockOptions(sock mangos.Socket) {
  135. for k, v := range Options {
  136. sock.SetOption(k, v)
  137. }
  138. }
  139. func listenWithRetry(sock mangos.Socket, url string) error {
  140. var (
  141. retryCount = 300
  142. retryInterval = 10
  143. )
  144. for {
  145. err := sock.Listen(url)
  146. if err == nil {
  147. context.Log.Infof("plugin start to listen after %d tries", retryCount)
  148. return err
  149. }
  150. retryCount--
  151. if retryCount < 0 {
  152. return err
  153. }
  154. time.Sleep(time.Duration(retryInterval) * time.Millisecond)
  155. }
  156. }