executors.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package service
  15. import (
  16. "context"
  17. "crypto/tls"
  18. "fmt"
  19. "io"
  20. "net/http"
  21. "net/url"
  22. "strings"
  23. "time"
  24. // TODO: replace with `google.golang.org/protobuf/proto` pkg.
  25. "github.com/golang/protobuf/proto" //nolint:staticcheck
  26. "github.com/jhump/protoreflect/dynamic"
  27. "github.com/jhump/protoreflect/dynamic/grpcdynamic"
  28. "google.golang.org/grpc"
  29. "google.golang.org/grpc/credentials/insecure"
  30. "github.com/lf-edge/ekuiper/internal/pkg/httpx"
  31. "github.com/lf-edge/ekuiper/pkg/api"
  32. "github.com/lf-edge/ekuiper/pkg/cast"
  33. "github.com/lf-edge/ekuiper/pkg/infra"
  34. )
  35. type exeIns func(desc descriptor, opt *interfaceOpt, i *interfaceInfo) (executor, error)
  36. var executors = map[protocol]exeIns{
  37. GRPC: newGrpcExecutor,
  38. REST: newHttpExecutor,
  39. }
  40. func newHttpExecutor(desc descriptor, opt *interfaceOpt, i *interfaceInfo) (executor, error) {
  41. d, ok := desc.(multiplexDescriptor)
  42. if !ok {
  43. return nil, fmt.Errorf("invalid descriptor type for rest")
  44. }
  45. o := &restOption{}
  46. e := cast.MapToStruct(i.Options, o)
  47. if e != nil {
  48. return nil, fmt.Errorf("incorrect rest option: %v", e)
  49. }
  50. exe := &httpExecutor{
  51. descriptor: d,
  52. interfaceOpt: opt,
  53. restOpt: o,
  54. }
  55. return exe, nil
  56. }
  57. func newGrpcExecutor(desc descriptor, opt *interfaceOpt, _ *interfaceInfo) (executor, error) {
  58. d, ok := desc.(protoDescriptor)
  59. if !ok {
  60. return nil, fmt.Errorf("invalid descriptor type for grpc")
  61. }
  62. exe := &grpcExecutor{
  63. descriptor: d,
  64. interfaceOpt: opt,
  65. }
  66. return exe, nil
  67. }
  68. // NewExecutor
  69. // Each interface definition maps to one executor instance. It is supposed to have only one thread running.
  70. func NewExecutor(i *interfaceInfo) (executor, error) {
  71. // No validation here, suppose the validation has been done in json parsing
  72. descriptor, err := parse(i.Schema.SchemaType, i.Schema.SchemaFile)
  73. if err != nil {
  74. return nil, err
  75. }
  76. u, err := url.Parse(i.Addr)
  77. if err != nil {
  78. return nil, fmt.Errorf("invalid url %s", i.Addr)
  79. }
  80. opt := &interfaceOpt{
  81. addr: u,
  82. timeout: 5000,
  83. }
  84. if ins, ok := executors[i.Protocol]; ok {
  85. return ins(descriptor, opt, i)
  86. } else {
  87. return nil, fmt.Errorf("unsupported protocol %s", i.Protocol)
  88. }
  89. }
  90. type executor interface {
  91. InvokeFunction(ctx api.FunctionContext, name string, params []interface{}) (interface{}, error)
  92. }
  93. type interfaceOpt struct {
  94. addr *url.URL
  95. timeout int64
  96. }
  97. type grpcExecutor struct {
  98. descriptor protoDescriptor
  99. *interfaceOpt
  100. conn *grpc.ClientConn
  101. }
  102. func (d *grpcExecutor) InvokeFunction(_ api.FunctionContext, name string, params []interface{}) (interface{}, error) {
  103. if d.conn == nil {
  104. dialCtx, cancel := context.WithTimeout(context.Background(), time.Duration(d.timeout)*time.Millisecond)
  105. var (
  106. conn *grpc.ClientConn
  107. e error
  108. )
  109. go infra.SafeRun(func() error {
  110. defer cancel()
  111. conn, e = grpc.DialContext(dialCtx, d.addr.Host, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
  112. return e
  113. })
  114. select {
  115. case <-dialCtx.Done():
  116. err := dialCtx.Err()
  117. switch err {
  118. case context.Canceled:
  119. // connect successfully, do nothing
  120. case context.DeadlineExceeded:
  121. return nil, fmt.Errorf("connect to %s timeout", d.addr.String())
  122. default:
  123. return nil, fmt.Errorf("connect to %s error: %v", d.addr.String(), err)
  124. }
  125. }
  126. if e != nil {
  127. return nil, e
  128. }
  129. d.conn = conn
  130. }
  131. // TODO reconnect if fail and error handling
  132. stub := grpcdynamic.NewStubWithMessageFactory(d.conn, d.descriptor.MessageFactory())
  133. message, err := d.descriptor.ConvertParamsToMessage(name, params)
  134. if err != nil {
  135. return nil, err
  136. }
  137. timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Duration(d.timeout)*time.Millisecond)
  138. var (
  139. o proto.Message
  140. e error
  141. )
  142. go infra.SafeRun(func() error {
  143. defer cancel()
  144. o, e = stub.InvokeRpc(timeoutCtx, d.descriptor.MethodDescriptor(name), message)
  145. return e
  146. })
  147. select {
  148. case <-timeoutCtx.Done():
  149. err := timeoutCtx.Err()
  150. switch err {
  151. case context.Canceled:
  152. // connect successfully, do nothing
  153. case context.DeadlineExceeded:
  154. return nil, fmt.Errorf("invoke %s timeout", name)
  155. default:
  156. return nil, fmt.Errorf("invoke %s error: %v", name, err)
  157. }
  158. }
  159. if e != nil {
  160. return nil, fmt.Errorf("error invoking method %s in proto: %v", name, err)
  161. }
  162. odm, err := dynamic.AsDynamicMessage(o)
  163. if err != nil {
  164. return nil, fmt.Errorf("error parsing method %s result: %v", name, err)
  165. }
  166. return d.descriptor.ConvertReturnMessage(name, odm)
  167. }
  168. type httpExecutor struct {
  169. descriptor multiplexDescriptor
  170. *interfaceOpt
  171. restOpt *restOption
  172. conn *http.Client
  173. }
  174. func (h *httpExecutor) InvokeFunction(ctx api.FunctionContext, name string, params []interface{}) (interface{}, error) {
  175. if h.conn == nil {
  176. tr := &http.Transport{
  177. TLSClientConfig: &tls.Config{InsecureSkipVerify: h.restOpt.InsecureSkipVerify},
  178. }
  179. h.conn = &http.Client{
  180. Transport: tr,
  181. Timeout: time.Duration(h.timeout) * time.Millisecond,
  182. }
  183. }
  184. hm, err := h.descriptor.ConvertHttpMapping(name, params)
  185. if err != nil {
  186. return nil, err
  187. }
  188. u := h.addr.String() + hm.Uri
  189. _, err = url.Parse(u)
  190. if err != nil {
  191. return nil, err
  192. }
  193. resp, err := httpx.Send(ctx.GetLogger(), h.conn, "json", hm.Method, u, h.restOpt.Headers, false, hm.Body)
  194. if err != nil {
  195. return nil, err
  196. }
  197. defer resp.Body.Close()
  198. if resp.StatusCode < 200 || resp.StatusCode > 299 {
  199. buf, _ := io.ReadAll(resp.Body)
  200. ctx.GetLogger().Debugf("%s\n", string(buf))
  201. return nil, fmt.Errorf("http executor fails to err http return code: %d and error message %s", resp.StatusCode, string(buf))
  202. } else {
  203. buf, bodyErr := io.ReadAll(resp.Body)
  204. if bodyErr != nil {
  205. return nil, fmt.Errorf("http executor read response body error: %v", bodyErr)
  206. }
  207. contentType := resp.Header.Get("Content-Type")
  208. if strings.HasPrefix(contentType, "application/json") {
  209. return h.descriptor.ConvertReturnJson(name, buf)
  210. } else if strings.HasPrefix(contentType, "text/plain") {
  211. return h.descriptor.ConvertReturnText(name, buf)
  212. } else {
  213. return nil, fmt.Errorf("unsupported resposne content type %s", contentType)
  214. }
  215. }
  216. }