executors.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package service
  15. import (
  16. "context"
  17. "crypto/tls"
  18. "fmt"
  19. "github.com/golang/protobuf/proto"
  20. "github.com/jhump/protoreflect/dynamic"
  21. "github.com/jhump/protoreflect/dynamic/grpcdynamic"
  22. "github.com/lf-edge/ekuiper/internal/pkg/httpx"
  23. "github.com/lf-edge/ekuiper/pkg/api"
  24. "github.com/lf-edge/ekuiper/pkg/cast"
  25. "github.com/lf-edge/ekuiper/pkg/infra"
  26. "github.com/ugorji/go/codec"
  27. "google.golang.org/grpc"
  28. "io/ioutil"
  29. "net"
  30. "net/http"
  31. "net/rpc"
  32. "net/url"
  33. "reflect"
  34. "strings"
  35. "sync"
  36. "time"
  37. )
  38. // NewExecutor
  39. // Each interface definition maps to one executor instance. It is suppose to have only one thread running.
  40. func NewExecutor(i *interfaceInfo) (executor, error) {
  41. // No validation here, suppose the validation has been done in json parsing
  42. descriptor, err := parse(i.Schema.SchemaType, i.Schema.SchemaFile)
  43. if err != nil {
  44. return nil, err
  45. }
  46. u, err := url.Parse(i.Addr)
  47. if err != nil {
  48. return nil, fmt.Errorf("invalid url %s", i.Addr)
  49. }
  50. opt := &interfaceOpt{
  51. addr: u,
  52. timeout: 5000,
  53. }
  54. switch i.Protocol {
  55. case GRPC:
  56. d, ok := descriptor.(protoDescriptor)
  57. if !ok {
  58. return nil, fmt.Errorf("invalid descriptor type for grpc")
  59. }
  60. exe := &grpcExecutor{
  61. descriptor: d,
  62. interfaceOpt: opt,
  63. }
  64. return exe, nil
  65. case REST:
  66. d, ok := descriptor.(multiplexDescriptor)
  67. if !ok {
  68. return nil, fmt.Errorf("invalid descriptor type for rest")
  69. }
  70. o := &restOption{}
  71. e := cast.MapToStruct(i.Options, o)
  72. if e != nil {
  73. return nil, fmt.Errorf("incorrect rest option: %v", e)
  74. }
  75. exe := &httpExecutor{
  76. descriptor: d,
  77. interfaceOpt: opt,
  78. restOpt: o,
  79. }
  80. return exe, nil
  81. case MSGPACK:
  82. d, ok := descriptor.(interfaceDescriptor)
  83. if !ok {
  84. return nil, fmt.Errorf("invalid descriptor type for msgpack-rpc")
  85. }
  86. exe := &msgpackExecutor{
  87. descriptor: d,
  88. interfaceOpt: opt,
  89. }
  90. return exe, nil
  91. default:
  92. return nil, fmt.Errorf("unsupported protocol %s", i.Protocol)
  93. }
  94. }
  95. type executor interface {
  96. InvokeFunction(ctx api.FunctionContext, name string, params []interface{}) (interface{}, error)
  97. }
  98. type interfaceOpt struct {
  99. addr *url.URL
  100. timeout int64
  101. }
  102. type grpcExecutor struct {
  103. descriptor protoDescriptor
  104. *interfaceOpt
  105. conn *grpc.ClientConn
  106. }
  107. func (d *grpcExecutor) InvokeFunction(_ api.FunctionContext, name string, params []interface{}) (interface{}, error) {
  108. if d.conn == nil {
  109. dialCtx, cancel := context.WithTimeout(context.Background(), time.Duration(d.timeout)*time.Millisecond)
  110. var (
  111. conn *grpc.ClientConn
  112. e error
  113. )
  114. go infra.SafeRun(func() error {
  115. defer cancel()
  116. conn, e = grpc.DialContext(dialCtx, d.addr.Host, grpc.WithInsecure(), grpc.WithBlock())
  117. return e
  118. })
  119. select {
  120. case <-dialCtx.Done():
  121. err := dialCtx.Err()
  122. switch err {
  123. case context.Canceled:
  124. // connect successfully, do nothing
  125. case context.DeadlineExceeded:
  126. return nil, fmt.Errorf("connect to %s timeout", d.addr.String())
  127. default:
  128. return nil, fmt.Errorf("connect to %s error: %v", d.addr.String(), err)
  129. }
  130. }
  131. if e != nil {
  132. return nil, e
  133. }
  134. d.conn = conn
  135. }
  136. // TODO reconnect if fail and error handling
  137. stub := grpcdynamic.NewStubWithMessageFactory(d.conn, d.descriptor.MessageFactory())
  138. message, err := d.descriptor.ConvertParamsToMessage(name, params)
  139. if err != nil {
  140. return nil, err
  141. }
  142. timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Duration(d.timeout)*time.Millisecond)
  143. var (
  144. o proto.Message
  145. e error
  146. )
  147. go infra.SafeRun(func() error {
  148. defer cancel()
  149. o, e = stub.InvokeRpc(timeoutCtx, d.descriptor.MethodDescriptor(name), message)
  150. return e
  151. })
  152. select {
  153. case <-timeoutCtx.Done():
  154. err := timeoutCtx.Err()
  155. switch err {
  156. case context.Canceled:
  157. // connect successfully, do nothing
  158. case context.DeadlineExceeded:
  159. return nil, fmt.Errorf("invoke %s timeout", name)
  160. default:
  161. return nil, fmt.Errorf("invoke %s error: %v", name, err)
  162. }
  163. }
  164. if e != nil {
  165. return nil, fmt.Errorf("error invoking method %s in proto: %v", name, err)
  166. }
  167. odm, err := dynamic.AsDynamicMessage(o)
  168. if err != nil {
  169. return nil, fmt.Errorf("error parsing method %s result: %v", name, err)
  170. }
  171. return d.descriptor.ConvertReturnMessage(name, odm)
  172. }
  173. type httpExecutor struct {
  174. descriptor multiplexDescriptor
  175. *interfaceOpt
  176. restOpt *restOption
  177. conn *http.Client
  178. }
  179. func (h *httpExecutor) InvokeFunction(ctx api.FunctionContext, name string, params []interface{}) (interface{}, error) {
  180. if h.conn == nil {
  181. tr := &http.Transport{
  182. TLSClientConfig: &tls.Config{InsecureSkipVerify: h.restOpt.InsecureSkipVerify},
  183. }
  184. h.conn = &http.Client{
  185. Transport: tr,
  186. Timeout: time.Duration(h.timeout) * time.Millisecond}
  187. }
  188. hm, err := h.descriptor.ConvertHttpMapping(name, params)
  189. if err != nil {
  190. return nil, err
  191. }
  192. u := h.addr.String() + hm.Uri
  193. _, err = url.Parse(u)
  194. if err != nil {
  195. return nil, err
  196. }
  197. resp, err := httpx.Send(ctx.GetLogger(), h.conn, "json", hm.Method, u, h.restOpt.Headers, false, hm.Body)
  198. if err != nil {
  199. return nil, err
  200. }
  201. defer resp.Body.Close()
  202. if resp.StatusCode < 200 || resp.StatusCode > 299 {
  203. buf, _ := ioutil.ReadAll(resp.Body)
  204. ctx.GetLogger().Debugf("%s\n", string(buf))
  205. return nil, fmt.Errorf("http executor fails to err http return code: %d and error message %s", resp.StatusCode, string(buf))
  206. } else {
  207. buf, bodyErr := ioutil.ReadAll(resp.Body)
  208. if bodyErr != nil {
  209. return nil, fmt.Errorf("http executor read response body error: %v", bodyErr)
  210. }
  211. contentType := resp.Header.Get("Content-Type")
  212. if strings.HasPrefix(contentType, "application/json") {
  213. return h.descriptor.ConvertReturnJson(name, buf)
  214. } else if strings.HasPrefix(contentType, "text/plain") {
  215. return h.descriptor.ConvertReturnText(name, buf)
  216. } else {
  217. return nil, fmt.Errorf("unsupported resposne content type %s", contentType)
  218. }
  219. }
  220. }
  221. type msgpackExecutor struct {
  222. descriptor interfaceDescriptor
  223. *interfaceOpt
  224. sync.Mutex
  225. connected bool
  226. conn *rpc.Client
  227. }
  228. // InvokeFunction flat the params and result
  229. func (m *msgpackExecutor) InvokeFunction(_ api.FunctionContext, name string, params []interface{}) (interface{}, error) {
  230. if !m.connected {
  231. m.Lock()
  232. if !m.connected {
  233. h := &codec.MsgpackHandle{}
  234. h.MapType = reflect.TypeOf(map[string]interface{}(nil))
  235. conn, err := net.Dial(m.addr.Scheme, m.addr.Host)
  236. if err != nil {
  237. return nil, err
  238. }
  239. rpcCodec := codec.MsgpackSpecRpc.ClientCodec(conn, h)
  240. m.conn = rpc.NewClientWithCodec(rpcCodec)
  241. }
  242. m.connected = true
  243. m.Unlock()
  244. }
  245. ps, err := m.descriptor.ConvertParams(name, params)
  246. if err != nil {
  247. return nil, err
  248. }
  249. var (
  250. reply interface{}
  251. args interface{}
  252. )
  253. // TODO argument flat
  254. switch len(ps) {
  255. case 0:
  256. // do nothing
  257. case 1:
  258. args = ps[0]
  259. default:
  260. args = codec.MsgpackSpecRpcMultiArgs(ps)
  261. }
  262. err = m.conn.Call(name, args, &reply)
  263. if err != nil {
  264. if err == rpc.ErrShutdown {
  265. m.connected = false
  266. }
  267. return nil, err
  268. }
  269. return m.descriptor.ConvertReturn(name, reply)
  270. }