executors.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package service
  15. import (
  16. "context"
  17. "crypto/tls"
  18. "fmt"
  19. "io"
  20. "net"
  21. "net/http"
  22. "net/rpc"
  23. "net/url"
  24. "reflect"
  25. "strings"
  26. "sync"
  27. "time"
  28. "github.com/golang/protobuf/proto"
  29. "github.com/jhump/protoreflect/dynamic"
  30. "github.com/jhump/protoreflect/dynamic/grpcdynamic"
  31. "github.com/ugorji/go/codec"
  32. "google.golang.org/grpc"
  33. "google.golang.org/grpc/credentials/insecure"
  34. "github.com/lf-edge/ekuiper/internal/pkg/httpx"
  35. "github.com/lf-edge/ekuiper/pkg/api"
  36. "github.com/lf-edge/ekuiper/pkg/cast"
  37. "github.com/lf-edge/ekuiper/pkg/infra"
  38. )
  39. // NewExecutor
  40. // Each interface definition maps to one executor instance. It is suppose to have only one thread running.
  41. func NewExecutor(i *interfaceInfo) (executor, error) {
  42. // No validation here, suppose the validation has been done in json parsing
  43. descriptor, err := parse(i.Schema.SchemaType, i.Schema.SchemaFile)
  44. if err != nil {
  45. return nil, err
  46. }
  47. u, err := url.Parse(i.Addr)
  48. if err != nil {
  49. return nil, fmt.Errorf("invalid url %s", i.Addr)
  50. }
  51. opt := &interfaceOpt{
  52. addr: u,
  53. timeout: 5000,
  54. }
  55. switch i.Protocol {
  56. case GRPC:
  57. d, ok := descriptor.(protoDescriptor)
  58. if !ok {
  59. return nil, fmt.Errorf("invalid descriptor type for grpc")
  60. }
  61. exe := &grpcExecutor{
  62. descriptor: d,
  63. interfaceOpt: opt,
  64. }
  65. return exe, nil
  66. case REST:
  67. d, ok := descriptor.(multiplexDescriptor)
  68. if !ok {
  69. return nil, fmt.Errorf("invalid descriptor type for rest")
  70. }
  71. o := &restOption{}
  72. e := cast.MapToStruct(i.Options, o)
  73. if e != nil {
  74. return nil, fmt.Errorf("incorrect rest option: %v", e)
  75. }
  76. exe := &httpExecutor{
  77. descriptor: d,
  78. interfaceOpt: opt,
  79. restOpt: o,
  80. }
  81. return exe, nil
  82. case MSGPACK:
  83. d, ok := descriptor.(interfaceDescriptor)
  84. if !ok {
  85. return nil, fmt.Errorf("invalid descriptor type for msgpack-rpc")
  86. }
  87. exe := &msgpackExecutor{
  88. descriptor: d,
  89. interfaceOpt: opt,
  90. }
  91. return exe, nil
  92. default:
  93. return nil, fmt.Errorf("unsupported protocol %s", i.Protocol)
  94. }
  95. }
  96. type executor interface {
  97. InvokeFunction(ctx api.FunctionContext, name string, params []interface{}) (interface{}, error)
  98. }
  99. type interfaceOpt struct {
  100. addr *url.URL
  101. timeout int64
  102. }
  103. type grpcExecutor struct {
  104. descriptor protoDescriptor
  105. *interfaceOpt
  106. conn *grpc.ClientConn
  107. }
  108. func (d *grpcExecutor) InvokeFunction(_ api.FunctionContext, name string, params []interface{}) (interface{}, error) {
  109. if d.conn == nil {
  110. dialCtx, cancel := context.WithTimeout(context.Background(), time.Duration(d.timeout)*time.Millisecond)
  111. var (
  112. conn *grpc.ClientConn
  113. e error
  114. )
  115. go infra.SafeRun(func() error {
  116. defer cancel()
  117. conn, e = grpc.DialContext(dialCtx, d.addr.Host, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
  118. return e
  119. })
  120. select {
  121. case <-dialCtx.Done():
  122. err := dialCtx.Err()
  123. switch err {
  124. case context.Canceled:
  125. // connect successfully, do nothing
  126. case context.DeadlineExceeded:
  127. return nil, fmt.Errorf("connect to %s timeout", d.addr.String())
  128. default:
  129. return nil, fmt.Errorf("connect to %s error: %v", d.addr.String(), err)
  130. }
  131. }
  132. if e != nil {
  133. return nil, e
  134. }
  135. d.conn = conn
  136. }
  137. // TODO reconnect if fail and error handling
  138. stub := grpcdynamic.NewStubWithMessageFactory(d.conn, d.descriptor.MessageFactory())
  139. message, err := d.descriptor.ConvertParamsToMessage(name, params)
  140. if err != nil {
  141. return nil, err
  142. }
  143. timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Duration(d.timeout)*time.Millisecond)
  144. var (
  145. o proto.Message
  146. e error
  147. )
  148. go infra.SafeRun(func() error {
  149. defer cancel()
  150. o, e = stub.InvokeRpc(timeoutCtx, d.descriptor.MethodDescriptor(name), message)
  151. return e
  152. })
  153. select {
  154. case <-timeoutCtx.Done():
  155. err := timeoutCtx.Err()
  156. switch err {
  157. case context.Canceled:
  158. // connect successfully, do nothing
  159. case context.DeadlineExceeded:
  160. return nil, fmt.Errorf("invoke %s timeout", name)
  161. default:
  162. return nil, fmt.Errorf("invoke %s error: %v", name, err)
  163. }
  164. }
  165. if e != nil {
  166. return nil, fmt.Errorf("error invoking method %s in proto: %v", name, err)
  167. }
  168. odm, err := dynamic.AsDynamicMessage(o)
  169. if err != nil {
  170. return nil, fmt.Errorf("error parsing method %s result: %v", name, err)
  171. }
  172. return d.descriptor.ConvertReturnMessage(name, odm)
  173. }
  174. type httpExecutor struct {
  175. descriptor multiplexDescriptor
  176. *interfaceOpt
  177. restOpt *restOption
  178. conn *http.Client
  179. }
  180. func (h *httpExecutor) InvokeFunction(ctx api.FunctionContext, name string, params []interface{}) (interface{}, error) {
  181. if h.conn == nil {
  182. tr := &http.Transport{
  183. TLSClientConfig: &tls.Config{InsecureSkipVerify: h.restOpt.InsecureSkipVerify},
  184. }
  185. h.conn = &http.Client{
  186. Transport: tr,
  187. Timeout: time.Duration(h.timeout) * time.Millisecond}
  188. }
  189. hm, err := h.descriptor.ConvertHttpMapping(name, params)
  190. if err != nil {
  191. return nil, err
  192. }
  193. u := h.addr.String() + hm.Uri
  194. _, err = url.Parse(u)
  195. if err != nil {
  196. return nil, err
  197. }
  198. resp, err := httpx.Send(ctx.GetLogger(), h.conn, "json", hm.Method, u, h.restOpt.Headers, false, hm.Body)
  199. if err != nil {
  200. return nil, err
  201. }
  202. defer resp.Body.Close()
  203. if resp.StatusCode < 200 || resp.StatusCode > 299 {
  204. buf, _ := io.ReadAll(resp.Body)
  205. ctx.GetLogger().Debugf("%s\n", string(buf))
  206. return nil, fmt.Errorf("http executor fails to err http return code: %d and error message %s", resp.StatusCode, string(buf))
  207. } else {
  208. buf, bodyErr := io.ReadAll(resp.Body)
  209. if bodyErr != nil {
  210. return nil, fmt.Errorf("http executor read response body error: %v", bodyErr)
  211. }
  212. contentType := resp.Header.Get("Content-Type")
  213. if strings.HasPrefix(contentType, "application/json") {
  214. return h.descriptor.ConvertReturnJson(name, buf)
  215. } else if strings.HasPrefix(contentType, "text/plain") {
  216. return h.descriptor.ConvertReturnText(name, buf)
  217. } else {
  218. return nil, fmt.Errorf("unsupported resposne content type %s", contentType)
  219. }
  220. }
  221. }
  222. type msgpackExecutor struct {
  223. descriptor interfaceDescriptor
  224. *interfaceOpt
  225. sync.Mutex
  226. connected bool
  227. conn *rpc.Client
  228. }
  229. // InvokeFunction flat the params and result
  230. func (m *msgpackExecutor) InvokeFunction(_ api.FunctionContext, name string, params []interface{}) (interface{}, error) {
  231. if !m.connected {
  232. m.Lock()
  233. if !m.connected {
  234. h := &codec.MsgpackHandle{}
  235. h.MapType = reflect.TypeOf(map[string]interface{}(nil))
  236. conn, err := net.Dial(m.addr.Scheme, m.addr.Host)
  237. if err != nil {
  238. return nil, err
  239. }
  240. rpcCodec := codec.MsgpackSpecRpc.ClientCodec(conn, h)
  241. m.conn = rpc.NewClientWithCodec(rpcCodec)
  242. }
  243. m.connected = true
  244. m.Unlock()
  245. }
  246. ps, err := m.descriptor.ConvertParams(name, params)
  247. if err != nil {
  248. return nil, err
  249. }
  250. var (
  251. reply interface{}
  252. args interface{}
  253. )
  254. // TODO argument flat
  255. switch len(ps) {
  256. case 0:
  257. // do nothing
  258. case 1:
  259. args = ps[0]
  260. default:
  261. args = codec.MsgpackSpecRpcMultiArgs(ps)
  262. }
  263. err = m.conn.Call(name, args, &reply)
  264. if err != nil {
  265. if err == rpc.ErrShutdown {
  266. m.connected = false
  267. }
  268. return nil, err
  269. }
  270. return m.descriptor.ConvertReturn(name, reply)
  271. }