executors.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. package service
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "fmt"
  6. "github.com/golang/protobuf/proto"
  7. "github.com/jhump/protoreflect/dynamic"
  8. "github.com/jhump/protoreflect/dynamic/grpcdynamic"
  9. "github.com/lf-edge/ekuiper/internal/pkg/httpx"
  10. "github.com/lf-edge/ekuiper/pkg/api"
  11. "github.com/lf-edge/ekuiper/pkg/cast"
  12. "github.com/ugorji/go/codec"
  13. "google.golang.org/grpc"
  14. "io/ioutil"
  15. "net"
  16. "net/http"
  17. "net/rpc"
  18. "net/url"
  19. "reflect"
  20. "strings"
  21. "sync"
  22. "time"
  23. )
  24. // NewExecutor
  25. // Each interface definition maps to one executor instance. It is suppose to have only one thread running.
  26. func NewExecutor(i *interfaceInfo) (executor, error) {
  27. // No validation here, suppose the validation has been done in json parsing
  28. descriptor, err := parse(i.Schema.SchemaType, i.Schema.SchemaFile)
  29. if err != nil {
  30. return nil, err
  31. }
  32. u, err := url.Parse(i.Addr)
  33. if err != nil {
  34. return nil, fmt.Errorf("invalid url %s", i.Addr)
  35. }
  36. opt := &interfaceOpt{
  37. addr: u,
  38. timeout: 5000,
  39. }
  40. switch i.Protocol {
  41. case GRPC:
  42. d, ok := descriptor.(protoDescriptor)
  43. if !ok {
  44. return nil, fmt.Errorf("invalid descriptor type for grpc")
  45. }
  46. exe := &grpcExecutor{
  47. descriptor: d,
  48. interfaceOpt: opt,
  49. }
  50. return exe, nil
  51. case REST:
  52. d, ok := descriptor.(multiplexDescriptor)
  53. if !ok {
  54. return nil, fmt.Errorf("invalid descriptor type for rest")
  55. }
  56. o := &restOption{}
  57. e := cast.MapToStruct(i.Options, o)
  58. if e != nil {
  59. return nil, fmt.Errorf("incorrect rest option: %v", e)
  60. }
  61. exe := &httpExecutor{
  62. descriptor: d,
  63. interfaceOpt: opt,
  64. restOpt: o,
  65. }
  66. return exe, nil
  67. case MSGPACK:
  68. d, ok := descriptor.(interfaceDescriptor)
  69. if !ok {
  70. return nil, fmt.Errorf("invalid descriptor type for msgpack-rpc")
  71. }
  72. exe := &msgpackExecutor{
  73. descriptor: d,
  74. interfaceOpt: opt,
  75. }
  76. return exe, nil
  77. default:
  78. return nil, fmt.Errorf("unsupported protocol %s", i.Protocol)
  79. }
  80. }
  81. type executor interface {
  82. InvokeFunction(ctx api.FunctionContext, name string, params []interface{}) (interface{}, error)
  83. }
  84. type interfaceOpt struct {
  85. addr *url.URL
  86. timeout int64
  87. }
  88. type grpcExecutor struct {
  89. descriptor protoDescriptor
  90. *interfaceOpt
  91. conn *grpc.ClientConn
  92. }
  93. func (d *grpcExecutor) InvokeFunction(ctx api.FunctionContext, name string, params []interface{}) (interface{}, error) {
  94. if d.conn == nil {
  95. dialCtx, cancel := context.WithTimeout(context.Background(), time.Duration(d.timeout)*time.Millisecond)
  96. var (
  97. conn *grpc.ClientConn
  98. e error
  99. )
  100. go func() {
  101. defer cancel()
  102. conn, e = grpc.DialContext(dialCtx, d.addr.Host, grpc.WithInsecure(), grpc.WithBlock())
  103. }()
  104. select {
  105. case <-dialCtx.Done():
  106. err := dialCtx.Err()
  107. switch err {
  108. case context.Canceled:
  109. // connect successfully, do nothing
  110. case context.DeadlineExceeded:
  111. return nil, fmt.Errorf("connect to %s timeout", d.addr.String())
  112. default:
  113. return nil, fmt.Errorf("connect to %s error: %v", d.addr.String(), err)
  114. }
  115. }
  116. if e != nil {
  117. return nil, e
  118. }
  119. d.conn = conn
  120. }
  121. // TODO reconnect if fail and error handling
  122. stub := grpcdynamic.NewStubWithMessageFactory(d.conn, d.descriptor.MessageFactory())
  123. message, err := d.descriptor.ConvertParamsToMessage(name, params)
  124. if err != nil {
  125. return nil, err
  126. }
  127. timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Duration(d.timeout)*time.Millisecond)
  128. var (
  129. o proto.Message
  130. e error
  131. )
  132. go func() {
  133. defer cancel()
  134. o, e = stub.InvokeRpc(timeoutCtx, d.descriptor.MethodDescriptor(name), message)
  135. }()
  136. select {
  137. case <-timeoutCtx.Done():
  138. err := timeoutCtx.Err()
  139. switch err {
  140. case context.Canceled:
  141. // connect successfully, do nothing
  142. case context.DeadlineExceeded:
  143. return nil, fmt.Errorf("invoke %s timeout", name)
  144. default:
  145. return nil, fmt.Errorf("invoke %s error: %v", name, err)
  146. }
  147. }
  148. if e != nil {
  149. return nil, fmt.Errorf("error invoking method %s in proto: %v", name, err)
  150. }
  151. odm, err := dynamic.AsDynamicMessage(o)
  152. if err != nil {
  153. return nil, fmt.Errorf("error parsing method %s result: %v", name, err)
  154. }
  155. return d.descriptor.ConvertReturnMessage(name, odm)
  156. }
  157. type httpExecutor struct {
  158. descriptor multiplexDescriptor
  159. *interfaceOpt
  160. restOpt *restOption
  161. conn *http.Client
  162. }
  163. func (h *httpExecutor) InvokeFunction(ctx api.FunctionContext, name string, params []interface{}) (interface{}, error) {
  164. if h.conn == nil {
  165. tr := &http.Transport{
  166. TLSClientConfig: &tls.Config{InsecureSkipVerify: h.restOpt.InsecureSkipVerify},
  167. }
  168. h.conn = &http.Client{
  169. Transport: tr,
  170. Timeout: time.Duration(h.timeout) * time.Millisecond}
  171. }
  172. hm, err := h.descriptor.ConvertHttpMapping(name, params)
  173. if err != nil {
  174. return nil, err
  175. }
  176. u := h.addr.String() + hm.Uri
  177. _, err = url.Parse(u)
  178. if err != nil {
  179. return nil, err
  180. }
  181. resp, err := httpx.Send(ctx.GetLogger(), h.conn, "json", hm.Method, u, h.restOpt.Headers, false, hm.Body)
  182. if err != nil {
  183. return nil, err
  184. }
  185. defer resp.Body.Close()
  186. if resp.StatusCode < 200 || resp.StatusCode > 299 {
  187. buf, _ := ioutil.ReadAll(resp.Body)
  188. ctx.GetLogger().Debugf("%s\n", string(buf))
  189. return nil, fmt.Errorf("http executor fails to err http return code: %d and error message %s", resp.StatusCode, string(buf))
  190. } else {
  191. buf, bodyErr := ioutil.ReadAll(resp.Body)
  192. if bodyErr != nil {
  193. return nil, fmt.Errorf("http executor read response body error: %v", bodyErr)
  194. }
  195. contentType := resp.Header.Get("Content-Type")
  196. if strings.HasPrefix(contentType, "application/json") {
  197. return h.descriptor.ConvertReturnJson(name, buf)
  198. } else if strings.HasPrefix(contentType, "text/plain") {
  199. return h.descriptor.ConvertReturnText(name, buf)
  200. } else {
  201. return nil, fmt.Errorf("unsupported resposne content type %s", contentType)
  202. }
  203. }
  204. }
  205. type msgpackExecutor struct {
  206. descriptor interfaceDescriptor
  207. *interfaceOpt
  208. sync.Mutex
  209. connected bool
  210. conn *rpc.Client
  211. }
  212. // InvokeFunction flat the params and result
  213. func (m *msgpackExecutor) InvokeFunction(_ api.FunctionContext, name string, params []interface{}) (interface{}, error) {
  214. if !m.connected {
  215. m.Lock()
  216. if !m.connected {
  217. h := &codec.MsgpackHandle{}
  218. h.MapType = reflect.TypeOf(map[string]interface{}(nil))
  219. conn, err := net.Dial(m.addr.Scheme, m.addr.Host)
  220. if err != nil {
  221. return nil, err
  222. }
  223. rpcCodec := codec.MsgpackSpecRpc.ClientCodec(conn, h)
  224. m.conn = rpc.NewClientWithCodec(rpcCodec)
  225. }
  226. m.connected = true
  227. m.Unlock()
  228. }
  229. ps, err := m.descriptor.ConvertParams(name, params)
  230. if err != nil {
  231. return nil, err
  232. }
  233. var (
  234. reply interface{}
  235. args interface{}
  236. )
  237. // TODO argument flat
  238. switch len(ps) {
  239. case 0:
  240. // do nothing
  241. case 1:
  242. args = ps[0]
  243. default:
  244. args = codec.MsgpackSpecRpcMultiArgs(ps)
  245. }
  246. err = m.conn.Call(name, args, &reply)
  247. if err != nil {
  248. if err == rpc.ErrShutdown {
  249. m.connected = false
  250. }
  251. return nil, err
  252. }
  253. return m.descriptor.ConvertReturn(name, reply)
  254. }