executors.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. package services
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "fmt"
  6. "github.com/emqx/kuiper/common"
  7. "github.com/emqx/kuiper/xstream/api"
  8. "github.com/golang/protobuf/proto"
  9. "github.com/jhump/protoreflect/dynamic"
  10. "github.com/jhump/protoreflect/dynamic/grpcdynamic"
  11. "github.com/ugorji/go/codec"
  12. "google.golang.org/grpc"
  13. "io/ioutil"
  14. "net"
  15. "net/http"
  16. "net/rpc"
  17. "net/url"
  18. "reflect"
  19. "strings"
  20. "sync"
  21. "time"
  22. )
  23. // NewExecutor
  24. // Each interface definition maps to one executor instance. It is suppose to have only one thread running.
  25. func NewExecutor(i *interfaceInfo) (executor, error) {
  26. // No validation here, suppose the validation has been done in json parsing
  27. descriptor, err := parse(i.Schema.SchemaType, i.Schema.SchemaFile)
  28. if err != nil {
  29. return nil, err
  30. }
  31. u, err := url.Parse(i.Addr)
  32. if err != nil {
  33. return nil, fmt.Errorf("invalid url %s", i.Addr)
  34. }
  35. opt := &interfaceOpt{
  36. addr: u,
  37. timeout: 5000,
  38. }
  39. switch i.Protocol {
  40. case GRPC:
  41. d, ok := descriptor.(protoDescriptor)
  42. if !ok {
  43. return nil, fmt.Errorf("invalid descriptor type for grpc")
  44. }
  45. exe := &grpcExecutor{
  46. descriptor: d,
  47. interfaceOpt: opt,
  48. }
  49. return exe, nil
  50. case REST:
  51. d, ok := descriptor.(multiplexDescriptor)
  52. if !ok {
  53. return nil, fmt.Errorf("invalid descriptor type for rest")
  54. }
  55. o := &restOption{}
  56. e := common.MapToStruct(i.Options, o)
  57. if e != nil {
  58. return nil, fmt.Errorf("incorrect rest option: %v", e)
  59. }
  60. exe := &httpExecutor{
  61. descriptor: d,
  62. interfaceOpt: opt,
  63. restOpt: o,
  64. }
  65. return exe, nil
  66. case MSGPACK:
  67. d, ok := descriptor.(interfaceDescriptor)
  68. if !ok {
  69. return nil, fmt.Errorf("invalid descriptor type for msgpack-rpc")
  70. }
  71. exe := &msgpackExecutor{
  72. descriptor: d,
  73. interfaceOpt: opt,
  74. }
  75. return exe, nil
  76. default:
  77. return nil, fmt.Errorf("unsupported protocol %s", i.Protocol)
  78. }
  79. }
  80. type executor interface {
  81. InvokeFunction(ctx api.FunctionContext, name string, params []interface{}) (interface{}, error)
  82. }
  83. type interfaceOpt struct {
  84. addr *url.URL
  85. timeout int64
  86. }
  87. type grpcExecutor struct {
  88. descriptor protoDescriptor
  89. *interfaceOpt
  90. conn *grpc.ClientConn
  91. }
  92. func (d *grpcExecutor) InvokeFunction(ctx api.FunctionContext, name string, params []interface{}) (interface{}, error) {
  93. if d.conn == nil {
  94. dialCtx, cancel := context.WithTimeout(context.Background(), time.Duration(d.timeout)*time.Millisecond)
  95. var (
  96. conn *grpc.ClientConn
  97. e error
  98. )
  99. go func() {
  100. defer cancel()
  101. conn, e = grpc.DialContext(dialCtx, d.addr.Host, grpc.WithInsecure(), grpc.WithBlock())
  102. }()
  103. select {
  104. case <-dialCtx.Done():
  105. err := dialCtx.Err()
  106. switch err {
  107. case context.Canceled:
  108. // connect successfully, do nothing
  109. case context.DeadlineExceeded:
  110. return nil, fmt.Errorf("connect to %s timeout", d.addr.String())
  111. default:
  112. return nil, fmt.Errorf("connect to %s error: %v", d.addr.String(), err)
  113. }
  114. }
  115. if e != nil {
  116. return nil, e
  117. }
  118. d.conn = conn
  119. }
  120. // TODO reconnect if fail and error handling
  121. stub := grpcdynamic.NewStubWithMessageFactory(d.conn, d.descriptor.MessageFactory())
  122. message, err := d.descriptor.ConvertParamsToMessage(name, params)
  123. if err != nil {
  124. return nil, err
  125. }
  126. timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Duration(d.timeout)*time.Millisecond)
  127. var (
  128. o proto.Message
  129. e error
  130. )
  131. go func() {
  132. defer cancel()
  133. o, e = stub.InvokeRpc(timeoutCtx, d.descriptor.MethodDescriptor(name), message)
  134. }()
  135. select {
  136. case <-timeoutCtx.Done():
  137. err := timeoutCtx.Err()
  138. switch err {
  139. case context.Canceled:
  140. // connect successfully, do nothing
  141. case context.DeadlineExceeded:
  142. return nil, fmt.Errorf("invoke %s timeout", name)
  143. default:
  144. return nil, fmt.Errorf("invoke %s error: %v", name, err)
  145. }
  146. }
  147. if e != nil {
  148. return nil, fmt.Errorf("error invoking method %s in proto: %v", name, err)
  149. }
  150. odm, err := dynamic.AsDynamicMessage(o)
  151. if err != nil {
  152. return nil, fmt.Errorf("error parsing method %s result: %v", name, err)
  153. }
  154. return d.descriptor.ConvertReturnMessage(name, odm)
  155. }
  156. type httpExecutor struct {
  157. descriptor multiplexDescriptor
  158. *interfaceOpt
  159. restOpt *restOption
  160. conn *http.Client
  161. }
  162. func (h *httpExecutor) InvokeFunction(ctx api.FunctionContext, name string, params []interface{}) (interface{}, error) {
  163. if h.conn == nil {
  164. tr := &http.Transport{
  165. TLSClientConfig: &tls.Config{InsecureSkipVerify: h.restOpt.InsecureSkipVerify},
  166. }
  167. h.conn = &http.Client{
  168. Transport: tr,
  169. Timeout: time.Duration(h.timeout) * time.Millisecond}
  170. }
  171. hm, err := h.descriptor.ConvertHttpMapping(name, params)
  172. if err != nil {
  173. return nil, err
  174. }
  175. u := h.addr.String() + hm.Uri
  176. _, err = url.Parse(u)
  177. if err != nil {
  178. return nil, err
  179. }
  180. resp, err := common.Send(ctx.GetLogger(), h.conn, "json", hm.Method, u, h.restOpt.Headers, false, hm.Body)
  181. if err != nil {
  182. return nil, err
  183. }
  184. defer resp.Body.Close()
  185. if resp.StatusCode < 200 || resp.StatusCode > 299 {
  186. buf, _ := ioutil.ReadAll(resp.Body)
  187. ctx.GetLogger().Debugf("%s\n", string(buf))
  188. return nil, fmt.Errorf("http executor fails to err http return code: %d and error message %s", resp.StatusCode, string(buf))
  189. } else {
  190. buf, bodyErr := ioutil.ReadAll(resp.Body)
  191. if bodyErr != nil {
  192. return nil, fmt.Errorf("http executor read response body error: %v", bodyErr)
  193. }
  194. contentType := resp.Header.Get("Content-Type")
  195. if strings.HasPrefix(contentType, "application/json") {
  196. return h.descriptor.ConvertReturnJson(name, buf)
  197. } else if strings.HasPrefix(contentType, "text/plain") {
  198. return h.descriptor.ConvertReturnText(name, buf)
  199. } else {
  200. return nil, fmt.Errorf("unsupported resposne content type %s", contentType)
  201. }
  202. }
  203. }
  204. type msgpackExecutor struct {
  205. descriptor interfaceDescriptor
  206. *interfaceOpt
  207. sync.Mutex
  208. connected bool
  209. conn *rpc.Client
  210. }
  211. // InvokeFunction flat the params and result
  212. func (m *msgpackExecutor) InvokeFunction(_ api.FunctionContext, name string, params []interface{}) (interface{}, error) {
  213. if !m.connected {
  214. m.Lock()
  215. if !m.connected {
  216. h := &codec.MsgpackHandle{}
  217. h.MapType = reflect.TypeOf(map[string]interface{}(nil))
  218. conn, err := net.Dial(m.addr.Scheme, m.addr.Host)
  219. if err != nil {
  220. return nil, err
  221. }
  222. rpcCodec := codec.MsgpackSpecRpc.ClientCodec(conn, h)
  223. m.conn = rpc.NewClientWithCodec(rpcCodec)
  224. }
  225. m.connected = true
  226. m.Unlock()
  227. }
  228. ps, err := m.descriptor.ConvertParams(name, params)
  229. if err != nil {
  230. return nil, err
  231. }
  232. var (
  233. reply interface{}
  234. args interface{}
  235. )
  236. // TODO argument flat
  237. switch len(ps) {
  238. case 0:
  239. // do nothing
  240. case 1:
  241. args = ps[0]
  242. default:
  243. args = codec.MsgpackSpecRpcMultiArgs(ps)
  244. }
  245. err = m.conn.Call(name, args, &reply)
  246. if err != nil {
  247. if err == rpc.ErrShutdown {
  248. m.connected = false
  249. }
  250. return nil, err
  251. }
  252. return m.descriptor.ConvertReturn(name, reply)
  253. }