executors.go 6.7 KB


  1. package services
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "fmt"
  6. "github.com/emqx/kuiper/common"
  7. "github.com/emqx/kuiper/xstream/api"
  8. "github.com/golang/protobuf/proto"
  9. "github.com/jhump/protoreflect/dynamic"
  10. "github.com/jhump/protoreflect/dynamic/grpcdynamic"
  11. "github.com/ugorji/go/codec"
  12. "google.golang.org/grpc"
  13. "io/ioutil"
  14. "net"
  15. "net/http"
  16. "net/rpc"
  17. "net/url"
  18. "path"
  19. "reflect"
  20. "strings"
  21. "sync"
  22. "time"
  23. )
  24. // NewExecutor
  25. // Each interface definition maps to one executor instance. It is suppose to have only one thread running.
  26. func NewExecutor(i *interfaceInfo) (executor, error) {
  27. // No validation here, suppose the validation has been done in json parsing
  28. descriptor, err := parse(i.Schema.SchemaType, i.Schema.SchemaFile)
  29. if err != nil {
  30. return nil, err
  31. }
  32. u, err := url.Parse(i.Addr)
  33. if err != nil {
  34. return nil, fmt.Errorf("invalid url %s", i.Addr)
  35. }
  36. opt := &interfaceOpt{
  37. addr: u,
  38. timeout: 5000,
  39. }
  40. switch i.Protocol {
  41. case GRPC:
  42. d, ok := descriptor.(protoDescriptor)
  43. if !ok {
  44. return nil, fmt.Errorf("invalid descriptor type for grpc")
  45. }
  46. exe := &grpcExecutor{
  47. descriptor: d,
  48. interfaceOpt: opt,
  49. }
  50. return exe, nil
  51. case REST:
  52. d, ok := descriptor.(multiplexDescriptor)
  53. if !ok {
  54. return nil, fmt.Errorf("invalid descriptor type for rest")
  55. }
  56. o := &restOption{}
  57. e := common.MapToStruct(i.Options, o)
  58. if e != nil {
  59. return nil, fmt.Errorf("incorrect rest option: %v", e)
  60. }
  61. exe := &httpExecutor{
  62. descriptor: d,
  63. interfaceOpt: opt,
  64. restOpt: o,
  65. }
  66. return exe, nil
  67. case MSGPACK:
  68. d, ok := descriptor.(interfaceDescriptor)
  69. if !ok {
  70. return nil, fmt.Errorf("invalid descriptor type for msgpack-rpc")
  71. }
  72. exe := &msgpackExecutor{
  73. descriptor: d,
  74. interfaceOpt: opt,
  75. }
  76. return exe, nil
  77. default:
  78. return nil, fmt.Errorf("unsupported protocol %s", i.Protocol)
  79. }
  80. }
  81. type executor interface {
  82. InvokeFunction(ctx api.FunctionContext, name string, params []interface{}) (interface{}, error)
  83. }
  84. type interfaceOpt struct {
  85. addr *url.URL
  86. timeout int64
  87. }
  88. type grpcExecutor struct {
  89. descriptor protoDescriptor
  90. *interfaceOpt
  91. conn *grpc.ClientConn
  92. }
  93. func (d *grpcExecutor) InvokeFunction(ctx api.FunctionContext, name string, params []interface{}) (interface{}, error) {
  94. if d.conn == nil {
  95. dialCtx, cancel := context.WithTimeout(context.Background(), time.Duration(d.timeout)*time.Millisecond)
  96. var (
  97. conn *grpc.ClientConn
  98. e error
  99. )
  100. go func() {
  101. defer cancel()
  102. conn, e = grpc.DialContext(dialCtx, d.addr.Host, grpc.WithInsecure(), grpc.WithBlock())
  103. }()
  104. select {
  105. case <-dialCtx.Done():
  106. err := dialCtx.Err()
  107. switch err {
  108. case context.Canceled:
  109. // connect successfully, do nothing
  110. case context.DeadlineExceeded:
  111. return nil, fmt.Errorf("connect to %s timeout", d.addr.String())
  112. default:
  113. return nil, fmt.Errorf("connect to %s error: %v", d.addr.String(), err)
  114. }
  115. }
  116. if e != nil {
  117. return nil, e
  118. }
  119. d.conn = conn
  120. }
  121. // TODO reconnect if fail and error handling
  122. stub := grpcdynamic.NewStubWithMessageFactory(d.conn, d.descriptor.MessageFactory())
  123. message, err := d.descriptor.ConvertParamsToMessage(name, params)
  124. if err != nil {
  125. return nil, err
  126. }
  127. timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Duration(d.timeout)*time.Millisecond)
  128. var (
  129. o proto.Message
  130. e error
  131. )
  132. go func() {
  133. defer cancel()
  134. o, e = stub.InvokeRpc(timeoutCtx, d.descriptor.MethodDescriptor(name), message)
  135. }()
  136. select {
  137. case <-timeoutCtx.Done():
  138. err := timeoutCtx.Err()
  139. switch err {
  140. case context.Canceled:
  141. // connect successfully, do nothing
  142. case context.DeadlineExceeded:
  143. return nil, fmt.Errorf("invoke %s timeout", name)
  144. default:
  145. return nil, fmt.Errorf("invoke %s error: %v", name, err)
  146. }
  147. }
  148. if e != nil {
  149. return nil, fmt.Errorf("error invoking method %s in proto: %v", name, err)
  150. }
  151. odm, err := dynamic.AsDynamicMessage(o)
  152. if err != nil {
  153. return nil, fmt.Errorf("error parsing method %s result: %v", name, err)
  154. }
  155. return d.descriptor.ConvertReturnMessage(name, odm)
  156. }
  157. type httpExecutor struct {
  158. descriptor multiplexDescriptor
  159. *interfaceOpt
  160. restOpt *restOption
  161. conn *http.Client
  162. }
  163. func (h *httpExecutor) InvokeFunction(ctx api.FunctionContext, name string, params []interface{}) (interface{}, error) {
  164. if h.conn == nil {
  165. tr := &http.Transport{
  166. TLSClientConfig: &tls.Config{InsecureSkipVerify: h.restOpt.InsecureSkipVerify},
  167. }
  168. h.conn = &http.Client{
  169. Transport: tr,
  170. Timeout: time.Duration(h.timeout) * time.Millisecond}
  171. }
  172. json, err := h.descriptor.ConvertParamsToJson(name, params)
  173. if err != nil {
  174. return nil, err
  175. }
  176. u := *h.addr
  177. u.Path = path.Join(u.Path, name)
  178. resp, err := common.Send(ctx.GetLogger(), h.conn, "json", http.MethodPost, u.String(), h.restOpt.Headers, false, json)
  179. if err != nil {
  180. return nil, err
  181. }
  182. defer resp.Body.Close()
  183. if resp.StatusCode < 200 || resp.StatusCode > 299 {
  184. buf, _ := ioutil.ReadAll(resp.Body)
  185. ctx.GetLogger().Debugf("%s\n", string(buf))
  186. return nil, fmt.Errorf("http executor fails to err http return code: %d and error message %s", resp.StatusCode, string(buf))
  187. } else {
  188. buf, bodyErr := ioutil.ReadAll(resp.Body)
  189. if bodyErr != nil {
  190. return nil, fmt.Errorf("http executor read response body error: %v", bodyErr)
  191. }
  192. contentType := resp.Header.Get("Content-Type")
  193. if strings.HasPrefix(contentType, "application/json") {
  194. return h.descriptor.ConvertReturnJson(name, buf)
  195. } else if strings.HasPrefix(contentType, "text/plain") {
  196. return h.descriptor.ConvertReturnText(name, buf)
  197. } else {
  198. return nil, fmt.Errorf("unsupported resposne content type %s", contentType)
  199. }
  200. }
  201. }
  202. type msgpackExecutor struct {
  203. descriptor interfaceDescriptor
  204. *interfaceOpt
  205. sync.Mutex
  206. connected bool
  207. conn *rpc.Client
  208. }
  209. // InvokeFunction flat the params and result
  210. func (m *msgpackExecutor) InvokeFunction(_ api.FunctionContext, name string, params []interface{}) (interface{}, error) {
  211. if !m.connected {
  212. m.Lock()
  213. if !m.connected {
  214. h := &codec.MsgpackHandle{}
  215. h.MapType = reflect.TypeOf(map[string]interface{}(nil))
  216. conn, err := net.Dial(m.addr.Scheme, m.addr.Host)
  217. if err != nil {
  218. return nil, err
  219. }
  220. rpcCodec := codec.MsgpackSpecRpc.ClientCodec(conn, h)
  221. m.conn = rpc.NewClientWithCodec(rpcCodec)
  222. }
  223. m.connected = true
  224. m.Unlock()
  225. }
  226. ps, err := m.descriptor.ConvertParams(name, params)
  227. if err != nil {
  228. return nil, err
  229. }
  230. var (
  231. reply interface{}
  232. args interface{}
  233. )
  234. // TODO argument flat
  235. switch len(ps) {
  236. case 0:
  237. // do nothing
  238. case 1:
  239. args = ps[0]
  240. default:
  241. args = codec.MsgpackSpecRpcMultiArgs(ps)
  242. }
  243. err = m.conn.Call(name, args, &reply)
  244. if err != nil {
  245. if err == rpc.ErrShutdown {
  246. m.connected = false
  247. }
  248. return nil, err
  249. }
  250. return m.descriptor.ConvertReturn(name, reply)
  251. }