schemaHttp.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package service
  15. import (
  16. "fmt"
  17. dpb "github.com/golang/protobuf/protoc-gen-go/descriptor"
  18. "github.com/jhump/protoreflect/desc"
  19. "github.com/jhump/protoreflect/dynamic"
  20. "github.com/lf-edge/ekuiper/pkg/cast"
  21. "google.golang.org/protobuf/reflect/protoreflect"
  22. "net/http"
  23. "regexp"
  24. "strings"
  25. )
  26. type httpConnMeta struct {
  27. Method string
  28. Uri string // The Uri is a relative path which must start with /
  29. Body []byte
  30. }
  31. type httpMapping interface {
  32. ConvertHttpMapping(method string, params []interface{}) (*httpConnMeta, error)
  33. }
  34. const (
  35. httpAPI = "google.api.http"
  36. wildcardBody = "*"
  37. emptyBody = ""
  38. )
  39. type httpOptions struct {
  40. Method string
  41. UriTemplate *uriTempalte // must not nil
  42. BodyField string
  43. }
  44. type uriTempalte struct {
  45. Template string
  46. Fields []*field
  47. }
  48. type field struct {
  49. name string
  50. prefix string
  51. }
  52. func (d *wrappedProtoDescriptor) parseHttpOptions() error {
  53. optionsMap := make(map[string]*httpOptions)
  54. var err error
  55. for _, s := range d.GetServices() {
  56. for _, m := range s.GetMethods() {
  57. options := m.GetMethodOptions()
  58. var ho *httpOptions
  59. // Find http option and exit loop at once. If not found, http option is nil
  60. options.ProtoReflect().Range(func(d protoreflect.FieldDescriptor, v protoreflect.Value) bool {
  61. if d.FullName() == httpAPI {
  62. if d.Kind() == protoreflect.MessageKind {
  63. var (
  64. uriOpt string
  65. bodyOpt string
  66. err error
  67. )
  68. ho = &httpOptions{}
  69. v.Message().Range(func(din protoreflect.FieldDescriptor, vin protoreflect.Value) bool {
  70. switch din.Name() {
  71. case "get":
  72. ho.Method = http.MethodGet
  73. uriOpt, err = getUriOpt(din, vin)
  74. case "put":
  75. ho.Method = http.MethodPut
  76. uriOpt, err = getUriOpt(din, vin)
  77. case "delete":
  78. ho.Method = http.MethodDelete
  79. uriOpt, err = getUriOpt(din, vin)
  80. case "post":
  81. ho.Method = http.MethodPost
  82. uriOpt, err = getUriOpt(din, vin)
  83. case "patch":
  84. ho.Method = http.MethodPatch
  85. uriOpt, err = getUriOpt(din, vin)
  86. case "body":
  87. bodyOpt, err = getUriOpt(din, vin)
  88. default:
  89. err = fmt.Errorf("unsupported option %s", din.Name())
  90. }
  91. if err != nil {
  92. return false
  93. }
  94. return true
  95. })
  96. if err != nil {
  97. return false
  98. }
  99. err = ho.convertUri(m, uriOpt, bodyOpt)
  100. if err != nil {
  101. return false
  102. }
  103. } else {
  104. err = fmt.Errorf("invalid http option for method %s in proto", m.GetName())
  105. }
  106. return false
  107. }
  108. if err != nil {
  109. return false
  110. }
  111. return true
  112. })
  113. if err != nil {
  114. return err
  115. }
  116. if ho != nil {
  117. optionsMap[m.GetName()] = ho
  118. }
  119. }
  120. }
  121. d.methodOptions = optionsMap
  122. return err
  123. }
  124. func (d *wrappedProtoDescriptor) ConvertHttpMapping(method string, params []interface{}) (*httpConnMeta, error) {
  125. hcm := &httpConnMeta{}
  126. var (
  127. json []byte
  128. err error
  129. )
  130. if ho, ok := d.methodOptions[method]; ok {
  131. message, err := d.ConvertParamsToMessage(method, params)
  132. if err != nil {
  133. return nil, err
  134. }
  135. if len(ho.UriTemplate.Fields) > 0 {
  136. args := make([]interface{}, len(ho.UriTemplate.Fields))
  137. for i, v := range ho.UriTemplate.Fields {
  138. fv, err := getMessageFieldWithDots(message, v.name)
  139. if err != nil {
  140. return nil, err
  141. }
  142. args[i], err = cast.ToString(fv, cast.CONVERT_ALL)
  143. if err != nil {
  144. return nil, fmt.Errorf("invalid field %s(%v) as http option, must be string", v.name, fv)
  145. }
  146. // Remove all params to be used in the params, the left params are for BODY
  147. level1Names := strings.Split(v.name, ".")
  148. message.ClearFieldByName(level1Names[0])
  149. if v.prefix != "" {
  150. if strings.HasPrefix(args[i].(string), v.prefix) {
  151. continue
  152. } else {
  153. return nil, fmt.Errorf("invalid field %s(%s) as http option, must have prefix %s", v.name, args[i], v.prefix)
  154. }
  155. }
  156. }
  157. hcm.Uri = fmt.Sprintf(ho.UriTemplate.Template, args...)
  158. } else {
  159. hcm.Uri = ho.UriTemplate.Template
  160. }
  161. hcm.Method = ho.Method
  162. switch ho.BodyField {
  163. case wildcardBody:
  164. json, err = message.MarshalJSON()
  165. case emptyBody:
  166. json = nil
  167. default:
  168. bodyMessage := message.GetFieldByName(ho.BodyField)
  169. if bm, ok := bodyMessage.(*dynamic.Message); ok {
  170. json, err = bm.MarshalJSON()
  171. } else {
  172. return nil, fmt.Errorf("invalid body field %s, must be a message", ho.BodyField)
  173. }
  174. }
  175. } else { // If options are not set, use the default setting
  176. hcm.Method = "POST"
  177. hcm.Uri = "/" + method
  178. json, err = d.ConvertParamsToJson(method, params)
  179. }
  180. if err != nil {
  181. return nil, err
  182. }
  183. hcm.Body = json
  184. return hcm, nil
  185. }
  186. func getMessageFieldWithDots(message *dynamic.Message, name string) (interface{}, error) {
  187. secs := strings.Split(name, ".")
  188. currentMessage := message
  189. for i, sec := range secs {
  190. if i == len(secs)-1 {
  191. return currentMessage.GetFieldByName(sec), nil
  192. } else {
  193. c := currentMessage.GetFieldByName(sec)
  194. if cm, ok := c.(*dynamic.Message); ok {
  195. currentMessage = cm
  196. } else {
  197. return nil, fmt.Errorf("fail to find field %s", name)
  198. }
  199. }
  200. }
  201. return nil, fmt.Errorf("fail to find field %s", name)
  202. }
  203. func getUriOpt(d protoreflect.FieldDescriptor, v protoreflect.Value) (string, error) {
  204. if d.Kind() != protoreflect.StringKind {
  205. return "", fmt.Errorf("invalid type for %s option, string required", d.Name())
  206. }
  207. return v.String(), nil
  208. }
  209. func (ho *httpOptions) convertUri(md *desc.MethodDescriptor, uriOpt string, bodyOpt string) error {
  210. fmap := make(map[string]bool) // the value represents if the key is still available (not used) so that they can be removed from *
  211. for _, f := range md.GetInputType().GetFields() {
  212. fmap[f.GetName()] = true
  213. }
  214. result := &uriTempalte{}
  215. re := regexp.MustCompile(`\{(.*?)\}`)
  216. m := re.FindAllStringSubmatch(uriOpt, -1)
  217. if len(m) > 0 {
  218. result.Template = re.ReplaceAllString(uriOpt, "%s")
  219. var fields []*field
  220. for _, e := range m {
  221. f := &field{}
  222. rr := strings.Split(e[1], "=")
  223. if len(rr) == 2 {
  224. if strings.HasSuffix(rr[1], "*") {
  225. f.name = rr[0]
  226. f.prefix = rr[1][:len(rr[1])-1]
  227. } else {
  228. return fmt.Errorf("invalid uri %s in http option", uriOpt)
  229. }
  230. } else if len(rr) == 1 {
  231. f.name = e[1]
  232. } else {
  233. return fmt.Errorf("invalid uri %s in http option", uriOpt)
  234. }
  235. _, ok := fmap[f.name]
  236. if !ok {
  237. return fmt.Errorf("invalid uri %s in http option, %s field not found", uriOpt, f.name)
  238. }
  239. fmap[f.name] = false
  240. fields = append(fields, f)
  241. }
  242. result.Fields = fields
  243. } else {
  244. result.Template = uriOpt
  245. }
  246. switch bodyOpt {
  247. case wildcardBody:
  248. ho.BodyField = bodyOpt
  249. default:
  250. if bodyOpt != emptyBody {
  251. if _, ok := fmap[bodyOpt]; !ok {
  252. return fmt.Errorf("invalid body %s, field not found", bodyOpt)
  253. } else {
  254. fmap[bodyOpt] = false
  255. }
  256. }
  257. ho.BodyField = bodyOpt
  258. paramAdded := false
  259. result.updateUriParams(md.GetInputType(), "", fmap, paramAdded)
  260. }
  261. ho.UriTemplate = result
  262. return nil
  263. }
  264. func (u *uriTempalte) updateUriParams(md *desc.MessageDescriptor, prefix string, fmap map[string]bool, paramAdded bool) bool {
  265. var jointer string
  266. for _, mf := range md.GetFields() {
  267. if fmap[mf.GetName()] || prefix != "" { // The first level field which are not consumed or the second level field
  268. if mf.GetType() == dpb.FieldDescriptorProto_TYPE_MESSAGE {
  269. paramAdded = u.updateUriParams(mf.GetMessageType(), prefix+mf.GetName()+".", fmap, paramAdded)
  270. continue
  271. }
  272. if !paramAdded {
  273. paramAdded = true
  274. jointer = "?"
  275. } else {
  276. jointer = "&"
  277. }
  278. u.Template = fmt.Sprintf("%s%s%s%s=%s", u.Template, jointer, prefix, mf.GetName(), "%s")
  279. u.Fields = append(u.Fields, &field{name: prefix + mf.GetName()})
  280. }
  281. }
  282. return paramAdded
  283. }