schemaHttp.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package service
  15. import (
  16. "fmt"
  17. dpb "github.com/golang/protobuf/protoc-gen-go/descriptor"
  18. "github.com/jhump/protoreflect/desc"
  19. "github.com/jhump/protoreflect/dynamic"
  20. "github.com/lf-edge/ekuiper/pkg/cast"
  21. "google.golang.org/protobuf/reflect/protoreflect"
  22. "net/http"
  23. "regexp"
  24. "strings"
  25. )
  26. type httpConnMeta struct {
  27. Method string
  28. Uri string // The Uri is a relative path which must start with /
  29. Body []byte
  30. }
  31. type httpMapping interface {
  32. ConvertHttpMapping(method string, params []interface{}) (*httpConnMeta, error)
  33. }
  34. const (
  35. httpAPI = "google.api.http"
  36. wildcardBody = "*"
  37. emptyBody = ""
  38. )
  39. type httpOptions struct {
  40. Method string
  41. UriTemplate *uriTempalte // must not nil
  42. BodyField string
  43. }
  44. type uriTempalte struct {
  45. Template string
  46. Fields []*field
  47. }
  48. type field struct {
  49. name string
  50. prefix string
  51. }
  52. func (d *wrappedProtoDescriptor) parseHttpOptions() error {
  53. optionsMap := make(map[string]*httpOptions)
  54. var err error
  55. for _, s := range d.GetServices() {
  56. for _, m := range s.GetMethods() {
  57. options := m.GetMethodOptions()
  58. var ho *httpOptions
  59. // Find http option and exit loop at once. If not found, http option is nil
  60. options.ProtoReflect().Range(func(d protoreflect.FieldDescriptor, v protoreflect.Value) bool {
  61. if d.FullName() == httpAPI {
  62. if d.Kind() == protoreflect.MessageKind {
  63. var (
  64. uriOpt string
  65. bodyOpt string
  66. err error
  67. )
  68. ho = &httpOptions{}
  69. v.Message().Range(func(din protoreflect.FieldDescriptor, vin protoreflect.Value) bool {
  70. switch din.Name() {
  71. case "get":
  72. ho.Method = http.MethodGet
  73. uriOpt, err = getUriOpt(din, vin)
  74. case "put":
  75. ho.Method = http.MethodPut
  76. uriOpt, err = getUriOpt(din, vin)
  77. case "delete":
  78. ho.Method = http.MethodDelete
  79. uriOpt, err = getUriOpt(din, vin)
  80. case "post":
  81. ho.Method = http.MethodPost
  82. uriOpt, err = getUriOpt(din, vin)
  83. case "patch":
  84. ho.Method = http.MethodPatch
  85. uriOpt, err = getUriOpt(din, vin)
  86. case "body":
  87. bodyOpt, err = getUriOpt(din, vin)
  88. default:
  89. err = fmt.Errorf("unsupported option %s", din.Name())
  90. }
  91. if err != nil {
  92. return false
  93. }
  94. return true
  95. })
  96. if err != nil {
  97. return false
  98. }
  99. err = ho.convertUri(m, uriOpt, bodyOpt)
  100. if err != nil {
  101. return false
  102. }
  103. } else {
  104. err = fmt.Errorf("invalid http option for method %s in proto", m.GetName())
  105. }
  106. return false
  107. }
  108. if err != nil {
  109. return false
  110. }
  111. return true
  112. })
  113. if err != nil {
  114. return err
  115. }
  116. if ho != nil {
  117. optionsMap[m.GetName()] = ho
  118. }
  119. }
  120. }
  121. d.methodOptions = optionsMap
  122. return err
  123. }
  124. func (d *wrappedProtoDescriptor) ConvertHttpMapping(method string, params []interface{}) (*httpConnMeta, error) {
  125. hcm := &httpConnMeta{}
  126. var (
  127. json []byte
  128. err error
  129. )
  130. if ho, ok := d.methodOptions[method]; ok {
  131. message, err := d.ConvertParamsToMessage(method, params)
  132. if err != nil {
  133. return nil, err
  134. }
  135. if len(ho.UriTemplate.Fields) > 0 {
  136. args := make([]interface{}, len(ho.UriTemplate.Fields))
  137. for i, v := range ho.UriTemplate.Fields {
  138. fv, err := getMessageFieldWithDots(message, v.name)
  139. if err != nil {
  140. return nil, err
  141. }
  142. args[i], err = cast.ToString(fv, cast.CONVERT_ALL)
  143. if err != nil {
  144. return nil, fmt.Errorf("invalid field %s(%v) as http option, must be string", v.name, fv)
  145. }
  146. // Remove all params to be used in the params, the left params are for BODY
  147. level1Names := strings.Split(v.name, ".")
  148. message.ClearFieldByName(level1Names[0])
  149. if v.prefix != "" {
  150. if strings.HasPrefix(args[i].(string), v.prefix) {
  151. continue
  152. }
  153. return nil, fmt.Errorf("invalid field %s(%s) as http option, must have prefix %s", v.name, args[i], v.prefix)
  154. }
  155. }
  156. hcm.Uri = fmt.Sprintf(ho.UriTemplate.Template, args...)
  157. } else {
  158. hcm.Uri = ho.UriTemplate.Template
  159. }
  160. hcm.Method = ho.Method
  161. switch ho.BodyField {
  162. case wildcardBody:
  163. json, err = message.MarshalJSON()
  164. case emptyBody:
  165. json = nil
  166. default:
  167. bodyMessage := message.GetFieldByName(ho.BodyField)
  168. if bm, ok := bodyMessage.(*dynamic.Message); ok {
  169. json, err = bm.MarshalJSON()
  170. } else {
  171. return nil, fmt.Errorf("invalid body field %s, must be a message", ho.BodyField)
  172. }
  173. }
  174. } else { // If options are not set, use the default setting
  175. hcm.Method = "POST"
  176. hcm.Uri = "/" + method
  177. json, err = d.ConvertParamsToJson(method, params)
  178. }
  179. if err != nil {
  180. return nil, err
  181. }
  182. hcm.Body = json
  183. return hcm, nil
  184. }
  185. func getMessageFieldWithDots(message *dynamic.Message, name string) (interface{}, error) {
  186. secs := strings.Split(name, ".")
  187. currentMessage := message
  188. for i, sec := range secs {
  189. if i == len(secs)-1 {
  190. return currentMessage.GetFieldByName(sec), nil
  191. } else {
  192. c := currentMessage.GetFieldByName(sec)
  193. if cm, ok := c.(*dynamic.Message); ok {
  194. currentMessage = cm
  195. } else {
  196. return nil, fmt.Errorf("fail to find field %s", name)
  197. }
  198. }
  199. }
  200. return nil, fmt.Errorf("fail to find field %s", name)
  201. }
  202. func getUriOpt(d protoreflect.FieldDescriptor, v protoreflect.Value) (string, error) {
  203. if d.Kind() != protoreflect.StringKind {
  204. return "", fmt.Errorf("invalid type for %s option, string required", d.Name())
  205. }
  206. return v.String(), nil
  207. }
  208. func (ho *httpOptions) convertUri(md *desc.MethodDescriptor, uriOpt string, bodyOpt string) error {
  209. fmap := make(map[string]bool) // the value represents if the key is still available (not used) so that they can be removed from *
  210. for _, f := range md.GetInputType().GetFields() {
  211. fmap[f.GetName()] = true
  212. }
  213. result := &uriTempalte{}
  214. re := regexp.MustCompile(`\{(.*?)\}`)
  215. m := re.FindAllStringSubmatch(uriOpt, -1)
  216. if len(m) > 0 {
  217. result.Template = re.ReplaceAllString(uriOpt, "%s")
  218. var fields []*field
  219. for _, e := range m {
  220. f := &field{}
  221. rr := strings.Split(e[1], "=")
  222. if len(rr) == 2 {
  223. if strings.HasSuffix(rr[1], "*") {
  224. f.name = rr[0]
  225. f.prefix = rr[1][:len(rr[1])-1]
  226. } else {
  227. return fmt.Errorf("invalid uri %s in http option", uriOpt)
  228. }
  229. } else if len(rr) == 1 {
  230. f.name = e[1]
  231. } else {
  232. return fmt.Errorf("invalid uri %s in http option", uriOpt)
  233. }
  234. _, ok := fmap[f.name]
  235. if !ok {
  236. return fmt.Errorf("invalid uri %s in http option, %s field not found", uriOpt, f.name)
  237. }
  238. fmap[f.name] = false
  239. fields = append(fields, f)
  240. }
  241. result.Fields = fields
  242. } else {
  243. result.Template = uriOpt
  244. }
  245. switch bodyOpt {
  246. case wildcardBody:
  247. ho.BodyField = bodyOpt
  248. default:
  249. if bodyOpt != emptyBody {
  250. if _, ok := fmap[bodyOpt]; !ok {
  251. return fmt.Errorf("invalid body %s, field not found", bodyOpt)
  252. } else {
  253. fmap[bodyOpt] = false
  254. }
  255. }
  256. ho.BodyField = bodyOpt
  257. paramAdded := false
  258. result.updateUriParams(md.GetInputType(), "", fmap, paramAdded)
  259. }
  260. ho.UriTemplate = result
  261. return nil
  262. }
  263. func (u *uriTempalte) updateUriParams(md *desc.MessageDescriptor, prefix string, fmap map[string]bool, paramAdded bool) bool {
  264. var jointer string
  265. for _, mf := range md.GetFields() {
  266. if fmap[mf.GetName()] || prefix != "" { // The first level field which are not consumed or the second level field
  267. if mf.GetType() == dpb.FieldDescriptorProto_TYPE_MESSAGE {
  268. paramAdded = u.updateUriParams(mf.GetMessageType(), prefix+mf.GetName()+".", fmap, paramAdded)
  269. continue
  270. }
  271. if !paramAdded {
  272. paramAdded = true
  273. jointer = "?"
  274. } else {
  275. jointer = "&"
  276. }
  277. u.Template = fmt.Sprintf("%s%s%s%s=%s", u.Template, jointer, prefix, mf.GetName(), "%s")
  278. u.Fields = append(u.Fields, &field{name: prefix + mf.GetName()})
  279. }
  280. }
  281. return paramAdded
  282. }