executors.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package service
  15. import (
  16. "context"
  17. "crypto/tls"
  18. "fmt"
  19. "github.com/golang/protobuf/proto"
  20. "github.com/jhump/protoreflect/dynamic"
  21. "github.com/jhump/protoreflect/dynamic/grpcdynamic"
  22. "github.com/lf-edge/ekuiper/internal/pkg/httpx"
  23. "github.com/lf-edge/ekuiper/pkg/api"
  24. "github.com/lf-edge/ekuiper/pkg/cast"
  25. "github.com/ugorji/go/codec"
  26. "google.golang.org/grpc"
  27. "io/ioutil"
  28. "net"
  29. "net/http"
  30. "net/rpc"
  31. "net/url"
  32. "reflect"
  33. "strings"
  34. "sync"
  35. "time"
  36. )
  37. // NewExecutor
  38. // Each interface definition maps to one executor instance. It is suppose to have only one thread running.
  39. func NewExecutor(i *interfaceInfo) (executor, error) {
  40. // No validation here, suppose the validation has been done in json parsing
  41. descriptor, err := parse(i.Schema.SchemaType, i.Schema.SchemaFile)
  42. if err != nil {
  43. return nil, err
  44. }
  45. u, err := url.Parse(i.Addr)
  46. if err != nil {
  47. return nil, fmt.Errorf("invalid url %s", i.Addr)
  48. }
  49. opt := &interfaceOpt{
  50. addr: u,
  51. timeout: 5000,
  52. }
  53. switch i.Protocol {
  54. case GRPC:
  55. d, ok := descriptor.(protoDescriptor)
  56. if !ok {
  57. return nil, fmt.Errorf("invalid descriptor type for grpc")
  58. }
  59. exe := &grpcExecutor{
  60. descriptor: d,
  61. interfaceOpt: opt,
  62. }
  63. return exe, nil
  64. case REST:
  65. d, ok := descriptor.(multiplexDescriptor)
  66. if !ok {
  67. return nil, fmt.Errorf("invalid descriptor type for rest")
  68. }
  69. o := &restOption{}
  70. e := cast.MapToStruct(i.Options, o)
  71. if e != nil {
  72. return nil, fmt.Errorf("incorrect rest option: %v", e)
  73. }
  74. exe := &httpExecutor{
  75. descriptor: d,
  76. interfaceOpt: opt,
  77. restOpt: o,
  78. }
  79. return exe, nil
  80. case MSGPACK:
  81. d, ok := descriptor.(interfaceDescriptor)
  82. if !ok {
  83. return nil, fmt.Errorf("invalid descriptor type for msgpack-rpc")
  84. }
  85. exe := &msgpackExecutor{
  86. descriptor: d,
  87. interfaceOpt: opt,
  88. }
  89. return exe, nil
  90. default:
  91. return nil, fmt.Errorf("unsupported protocol %s", i.Protocol)
  92. }
  93. }
  94. type executor interface {
  95. InvokeFunction(ctx api.FunctionContext, name string, params []interface{}) (interface{}, error)
  96. }
  97. type interfaceOpt struct {
  98. addr *url.URL
  99. timeout int64
  100. }
  101. type grpcExecutor struct {
  102. descriptor protoDescriptor
  103. *interfaceOpt
  104. conn *grpc.ClientConn
  105. }
  106. func (d *grpcExecutor) InvokeFunction(ctx api.FunctionContext, name string, params []interface{}) (interface{}, error) {
  107. if d.conn == nil {
  108. dialCtx, cancel := context.WithTimeout(context.Background(), time.Duration(d.timeout)*time.Millisecond)
  109. var (
  110. conn *grpc.ClientConn
  111. e error
  112. )
  113. go func() {
  114. defer cancel()
  115. conn, e = grpc.DialContext(dialCtx, d.addr.Host, grpc.WithInsecure(), grpc.WithBlock())
  116. }()
  117. select {
  118. case <-dialCtx.Done():
  119. err := dialCtx.Err()
  120. switch err {
  121. case context.Canceled:
  122. // connect successfully, do nothing
  123. case context.DeadlineExceeded:
  124. return nil, fmt.Errorf("connect to %s timeout", d.addr.String())
  125. default:
  126. return nil, fmt.Errorf("connect to %s error: %v", d.addr.String(), err)
  127. }
  128. }
  129. if e != nil {
  130. return nil, e
  131. }
  132. d.conn = conn
  133. }
  134. // TODO reconnect if fail and error handling
  135. stub := grpcdynamic.NewStubWithMessageFactory(d.conn, d.descriptor.MessageFactory())
  136. message, err := d.descriptor.ConvertParamsToMessage(name, params)
  137. if err != nil {
  138. return nil, err
  139. }
  140. timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Duration(d.timeout)*time.Millisecond)
  141. var (
  142. o proto.Message
  143. e error
  144. )
  145. go func() {
  146. defer cancel()
  147. o, e = stub.InvokeRpc(timeoutCtx, d.descriptor.MethodDescriptor(name), message)
  148. }()
  149. select {
  150. case <-timeoutCtx.Done():
  151. err := timeoutCtx.Err()
  152. switch err {
  153. case context.Canceled:
  154. // connect successfully, do nothing
  155. case context.DeadlineExceeded:
  156. return nil, fmt.Errorf("invoke %s timeout", name)
  157. default:
  158. return nil, fmt.Errorf("invoke %s error: %v", name, err)
  159. }
  160. }
  161. if e != nil {
  162. return nil, fmt.Errorf("error invoking method %s in proto: %v", name, err)
  163. }
  164. odm, err := dynamic.AsDynamicMessage(o)
  165. if err != nil {
  166. return nil, fmt.Errorf("error parsing method %s result: %v", name, err)
  167. }
  168. return d.descriptor.ConvertReturnMessage(name, odm)
  169. }
  170. type httpExecutor struct {
  171. descriptor multiplexDescriptor
  172. *interfaceOpt
  173. restOpt *restOption
  174. conn *http.Client
  175. }
  176. func (h *httpExecutor) InvokeFunction(ctx api.FunctionContext, name string, params []interface{}) (interface{}, error) {
  177. if h.conn == nil {
  178. tr := &http.Transport{
  179. TLSClientConfig: &tls.Config{InsecureSkipVerify: h.restOpt.InsecureSkipVerify},
  180. }
  181. h.conn = &http.Client{
  182. Transport: tr,
  183. Timeout: time.Duration(h.timeout) * time.Millisecond}
  184. }
  185. hm, err := h.descriptor.ConvertHttpMapping(name, params)
  186. if err != nil {
  187. return nil, err
  188. }
  189. u := h.addr.String() + hm.Uri
  190. _, err = url.Parse(u)
  191. if err != nil {
  192. return nil, err
  193. }
  194. resp, err := httpx.Send(ctx.GetLogger(), h.conn, "json", hm.Method, u, h.restOpt.Headers, false, hm.Body)
  195. if err != nil {
  196. return nil, err
  197. }
  198. defer resp.Body.Close()
  199. if resp.StatusCode < 200 || resp.StatusCode > 299 {
  200. buf, _ := ioutil.ReadAll(resp.Body)
  201. ctx.GetLogger().Debugf("%s\n", string(buf))
  202. return nil, fmt.Errorf("http executor fails to err http return code: %d and error message %s", resp.StatusCode, string(buf))
  203. } else {
  204. buf, bodyErr := ioutil.ReadAll(resp.Body)
  205. if bodyErr != nil {
  206. return nil, fmt.Errorf("http executor read response body error: %v", bodyErr)
  207. }
  208. contentType := resp.Header.Get("Content-Type")
  209. if strings.HasPrefix(contentType, "application/json") {
  210. return h.descriptor.ConvertReturnJson(name, buf)
  211. } else if strings.HasPrefix(contentType, "text/plain") {
  212. return h.descriptor.ConvertReturnText(name, buf)
  213. } else {
  214. return nil, fmt.Errorf("unsupported resposne content type %s", contentType)
  215. }
  216. }
  217. }
  218. type msgpackExecutor struct {
  219. descriptor interfaceDescriptor
  220. *interfaceOpt
  221. sync.Mutex
  222. connected bool
  223. conn *rpc.Client
  224. }
  225. // InvokeFunction flat the params and result
  226. func (m *msgpackExecutor) InvokeFunction(_ api.FunctionContext, name string, params []interface{}) (interface{}, error) {
  227. if !m.connected {
  228. m.Lock()
  229. if !m.connected {
  230. h := &codec.MsgpackHandle{}
  231. h.MapType = reflect.TypeOf(map[string]interface{}(nil))
  232. conn, err := net.Dial(m.addr.Scheme, m.addr.Host)
  233. if err != nil {
  234. return nil, err
  235. }
  236. rpcCodec := codec.MsgpackSpecRpc.ClientCodec(conn, h)
  237. m.conn = rpc.NewClientWithCodec(rpcCodec)
  238. }
  239. m.connected = true
  240. m.Unlock()
  241. }
  242. ps, err := m.descriptor.ConvertParams(name, params)
  243. if err != nil {
  244. return nil, err
  245. }
  246. var (
  247. reply interface{}
  248. args interface{}
  249. )
  250. // TODO argument flat
  251. switch len(ps) {
  252. case 0:
  253. // do nothing
  254. case 1:
  255. args = ps[0]
  256. default:
  257. args = codec.MsgpackSpecRpcMultiArgs(ps)
  258. }
  259. err = m.conn.Call(name, args, &reply)
  260. if err != nil {
  261. if err == rpc.ErrShutdown {
  262. m.connected = false
  263. }
  264. return nil, err
  265. }
  266. return m.descriptor.ConvertReturn(name, reply)
  267. }