schema.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package service
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "github.com/golang/protobuf/proto"
  19. dpb "github.com/golang/protobuf/protoc-gen-go/descriptor"
  20. "github.com/jhump/protoreflect/desc"
  21. "github.com/jhump/protoreflect/desc/protoparse"
  22. "github.com/jhump/protoreflect/dynamic"
  23. kconf "github.com/lf-edge/ekuiper/internal/conf"
  24. "github.com/lf-edge/ekuiper/internal/xsql"
  25. "github.com/lf-edge/ekuiper/pkg/cast"
  26. _ "google.golang.org/genproto/googleapis/api/annotations"
  27. "sync"
  28. )
  29. const (
  30. wrapperBool = "google.protobuf.BoolValue"
  31. wrapperBytes = "google.protobuf.BytesValue"
  32. wrapperDouble = "google.protobuf.DoubleValue"
  33. wrapperFloat = "google.protobuf.FloatValue"
  34. wrapperInt32 = "google.protobuf.Int32Value"
  35. wrapperInt64 = "google.protobuf.Int64Value"
  36. wrapperString = "google.protobuf.StringValue"
  37. wrapperUInt32 = "google.protobuf.UInt32Value"
  38. wrapperUInt64 = "google.protobuf.UInt64Value"
  39. wrapperVoid = "google.protobuf.EMPTY"
  40. )
  41. var WRAPPER_TYPES = map[string]struct{}{
  42. wrapperBool: {},
  43. wrapperBytes: {},
  44. wrapperDouble: {},
  45. wrapperFloat: {},
  46. wrapperInt32: {},
  47. wrapperInt64: {},
  48. wrapperString: {},
  49. wrapperUInt32: {},
  50. wrapperUInt64: {},
  51. }
  52. type descriptor interface {
  53. GetFunctions() []string
  54. }
  55. type protoDescriptor interface {
  56. ConvertParamsToMessage(method string, params []interface{}) (*dynamic.Message, error)
  57. ConvertReturnMessage(method string, returnVal *dynamic.Message) (interface{}, error)
  58. MethodDescriptor(method string) *desc.MethodDescriptor
  59. MessageFactory() *dynamic.MessageFactory
  60. }
  61. type jsonDescriptor interface {
  62. ConvertParamsToJson(method string, params []interface{}) ([]byte, error)
  63. ConvertReturnJson(method string, returnVal []byte) (interface{}, error)
  64. }
  65. type textDescriptor interface {
  66. ConvertParamsToText(method string, params []interface{}) ([]byte, error)
  67. ConvertReturnText(method string, returnVal []byte) (interface{}, error)
  68. }
  69. type interfaceDescriptor interface {
  70. ConvertParams(method string, params []interface{}) ([]interface{}, error)
  71. ConvertReturn(method string, returnVal interface{}) (interface{}, error)
  72. }
  73. type multiplexDescriptor interface {
  74. jsonDescriptor
  75. textDescriptor
  76. interfaceDescriptor
  77. httpMapping
  78. }
  79. var ( //Do not call these directly, use the get methods
  80. protoParser *protoparse.Parser
  81. // A buffer of descriptor for schemas
  82. reg = &sync.Map{}
  83. )
  84. func ProtoParser() *protoparse.Parser {
  85. once.Do(func() {
  86. dir := "etc/services/schemas/"
  87. if kconf.IsTesting {
  88. dir = "service/test/schemas/"
  89. }
  90. schemaDir, _ := kconf.GetLoc(dir)
  91. protoParser = &protoparse.Parser{ImportPaths: []string{schemaDir}}
  92. })
  93. return protoParser
  94. }
  95. func parse(schema schema, file string) (descriptor, error) {
  96. info := &schemaInfo{
  97. SchemaType: schema,
  98. SchemaFile: file,
  99. }
  100. switch schema {
  101. case PROTOBUFF:
  102. if v, ok := reg.Load(info); ok {
  103. return v.(descriptor), nil
  104. }
  105. if fds, err := ProtoParser().ParseFiles(file); err != nil {
  106. return nil, err
  107. } else {
  108. result := &wrappedProtoDescriptor{
  109. FileDescriptor: fds[0],
  110. mf: dynamic.NewMessageFactoryWithDefaults(),
  111. }
  112. err := result.parseHttpOptions()
  113. if err != nil {
  114. return nil, err
  115. }
  116. reg.Store(info, result)
  117. return result, nil
  118. }
  119. default:
  120. return nil, fmt.Errorf("unsupported schema %s", schema)
  121. }
  122. }
  123. type wrappedProtoDescriptor struct {
  124. *desc.FileDescriptor
  125. methodOptions map[string]*httpOptions
  126. mf *dynamic.MessageFactory
  127. }
  128. //TODO support for duplicate names
  129. func (d *wrappedProtoDescriptor) GetFunctions() (result []string) {
  130. for _, s := range d.GetServices() {
  131. for _, m := range s.GetMethods() {
  132. result = append(result, m.GetName())
  133. }
  134. }
  135. return
  136. }
  137. func (d *wrappedProtoDescriptor) MessageFactory() *dynamic.MessageFactory {
  138. return d.mf
  139. }
  140. // ConvertParams TODO support optional field, support enum type
  141. // Parameter mapping for protobuf
  142. // 1. If param length is 1, it can either a map contains all field or a field only.
  143. // 2. If param length is more then 1, they will map to message fields in the order
  144. func (d *wrappedProtoDescriptor) ConvertParams(method string, params []interface{}) ([]interface{}, error) {
  145. m := d.MethodDescriptor(method)
  146. if m == nil {
  147. return nil, fmt.Errorf("can't find method %s in proto", method)
  148. }
  149. im := m.GetInputType()
  150. return d.convertParams(im, params)
  151. }
  152. func (d *wrappedProtoDescriptor) ConvertParamsToMessage(method string, params []interface{}) (*dynamic.Message, error) {
  153. m := d.MethodDescriptor(method)
  154. if m == nil {
  155. return nil, fmt.Errorf("can't find method %s in proto", method)
  156. }
  157. im := m.GetInputType()
  158. message := d.mf.NewDynamicMessage(im)
  159. typedParams, err := d.convertParams(im, params)
  160. if err != nil {
  161. return nil, err
  162. }
  163. for i, typeParam := range typedParams {
  164. message.SetFieldByNumber(i+1, typeParam)
  165. }
  166. return message, nil
  167. }
  168. func (d *wrappedProtoDescriptor) ConvertParamsToJson(method string, params []interface{}) ([]byte, error) {
  169. // Deal with encoded json string. Just return the string
  170. if len(params) == 1 {
  171. m := d.MethodDescriptor(method)
  172. if m == nil {
  173. return nil, fmt.Errorf("can't find method %s in proto", method)
  174. }
  175. im := m.GetInputType()
  176. if im.GetFullyQualifiedName() == wrapperString {
  177. ss, err := cast.ToString(params[0], cast.STRICT)
  178. if err != nil {
  179. return nil, err
  180. }
  181. return []byte(ss), nil
  182. }
  183. }
  184. if message, err := d.ConvertParamsToMessage(method, params); err != nil {
  185. return nil, err
  186. } else {
  187. return message.MarshalJSON()
  188. }
  189. }
  190. func (d *wrappedProtoDescriptor) ConvertParamsToText(method string, params []interface{}) ([]byte, error) {
  191. if message, err := d.ConvertParamsToMessage(method, params); err != nil {
  192. return nil, err
  193. } else {
  194. return message.MarshalText()
  195. }
  196. }
  197. func (d *wrappedProtoDescriptor) convertParams(im *desc.MessageDescriptor, params []interface{}) ([]interface{}, error) {
  198. fields := im.GetFields()
  199. var result []interface{}
  200. switch len(params) {
  201. case 0:
  202. if len(fields) == 0 {
  203. return result, nil
  204. } else {
  205. return nil, fmt.Errorf("require %d parameters but none", len(fields))
  206. }
  207. case 1:
  208. // If it is map, try unfold it
  209. // TODO custom error for non map or map name not match
  210. if r, err := d.unfoldMap(im, params[0]); err != nil {
  211. kconf.Log.Debugf("try unfold param for message %s fail: %v", im.GetName(), err)
  212. } else {
  213. return r, nil
  214. }
  215. // For non map params, treat it as special case of multiple params
  216. if len(fields) == 1 {
  217. param0, err := d.encodeField(fields[0], params[0])
  218. if err != nil {
  219. return nil, err
  220. }
  221. return append(result, param0), nil
  222. } else {
  223. return nil, fmt.Errorf("require %d parameters but only got 1", len(fields))
  224. }
  225. default:
  226. if len(fields) == len(params) {
  227. for i, field := range fields {
  228. param, err := d.encodeField(field, params[i])
  229. if err != nil {
  230. return nil, err
  231. }
  232. result = append(result, param)
  233. }
  234. return result, nil
  235. } else {
  236. return nil, fmt.Errorf("require %d parameters but only got %d", len(fields), len(params))
  237. }
  238. }
  239. }
  240. func (d *wrappedProtoDescriptor) ConvertReturn(method string, returnVal interface{}) (interface{}, error) {
  241. m := d.MethodDescriptor(method)
  242. t := m.GetOutputType()
  243. if _, ok := WRAPPER_TYPES[t.GetFullyQualifiedName()]; ok {
  244. return decodeField(returnVal, t.FindFieldByNumber(1), cast.STRICT)
  245. } else { // MUST be a map
  246. if retMap, ok := returnVal.(map[string]interface{}); ok {
  247. return decodeMap(retMap, t, cast.CONVERT_SAMEKIND)
  248. } else {
  249. return nil, fmt.Errorf("fail to convert return val, must be a map but got %v", returnVal)
  250. }
  251. }
  252. }
  253. func (d *wrappedProtoDescriptor) ConvertReturnMessage(method string, returnVal *dynamic.Message) (interface{}, error) {
  254. m := d.MethodDescriptor(method)
  255. return decodeMessage(returnVal, m.GetOutputType()), nil
  256. }
  257. func (d *wrappedProtoDescriptor) ConvertReturnJson(method string, returnVal []byte) (interface{}, error) {
  258. r := make(map[string]interface{})
  259. err := json.Unmarshal(returnVal, &r)
  260. if err != nil {
  261. return nil, err
  262. }
  263. m := d.MethodDescriptor(method)
  264. return decodeMap(r, m.GetOutputType(), cast.CONVERT_SAMEKIND)
  265. }
  266. func (d *wrappedProtoDescriptor) ConvertReturnText(method string, returnVal []byte) (interface{}, error) {
  267. m := d.MethodDescriptor(method)
  268. t := m.GetOutputType()
  269. if _, ok := WRAPPER_TYPES[t.GetFullyQualifiedName()]; ok {
  270. return decodeField(string(returnVal), t.FindFieldByNumber(1), cast.CONVERT_ALL)
  271. } else {
  272. return nil, fmt.Errorf("fail to convert return val to text, return type must be primitive type but got %s", t.GetName())
  273. }
  274. }
  275. func (d *wrappedProtoDescriptor) MethodDescriptor(name string) *desc.MethodDescriptor {
  276. var m *desc.MethodDescriptor
  277. for _, s := range d.GetServices() {
  278. m = s.FindMethodByName(name)
  279. if m != nil {
  280. break
  281. }
  282. }
  283. return m
  284. }
  285. func (d *wrappedProtoDescriptor) unfoldMap(ft *desc.MessageDescriptor, i interface{}) ([]interface{}, error) {
  286. fields := ft.GetFields()
  287. result := make([]interface{}, len(fields))
  288. if m, ok := xsql.ToMessage(i); ok {
  289. for _, field := range fields {
  290. v, ok := m.Value(field.GetName(), "")
  291. if !ok {
  292. return nil, fmt.Errorf("field %s not found", field.GetName())
  293. }
  294. fv, err := d.encodeField(field, v)
  295. if err != nil {
  296. return nil, err
  297. }
  298. result[field.GetNumber()-1] = fv
  299. }
  300. } else {
  301. return nil, fmt.Errorf("not a map")
  302. }
  303. return result, nil
  304. }
  305. func (d *wrappedProtoDescriptor) encodeMap(im *desc.MessageDescriptor, i interface{}) (*dynamic.Message, error) {
  306. result := d.mf.NewDynamicMessage(im)
  307. fields := im.GetFields()
  308. if m, ok := i.(map[string]interface{}); ok {
  309. for _, field := range fields {
  310. v, ok := m[field.GetName()]
  311. if !ok {
  312. return nil, fmt.Errorf("field %s not found", field.GetName())
  313. }
  314. fv, err := d.encodeField(field, v)
  315. if err != nil {
  316. return nil, err
  317. }
  318. result.SetFieldByName(field.GetName(), fv)
  319. }
  320. }
  321. return result, nil
  322. }
  323. func (d *wrappedProtoDescriptor) encodeField(field *desc.FieldDescriptor, v interface{}) (interface{}, error) {
  324. fn := field.GetName()
  325. ft := field.GetType()
  326. if field.IsRepeated() {
  327. var (
  328. result interface{}
  329. err error
  330. )
  331. switch ft {
  332. case dpb.FieldDescriptorProto_TYPE_DOUBLE:
  333. result, err = cast.ToFloat64Slice(v, cast.STRICT)
  334. case dpb.FieldDescriptorProto_TYPE_FLOAT:
  335. result, err = cast.ToTypedSlice(v, func(input interface{}, sn cast.Strictness) (interface{}, error) {
  336. r, err := cast.ToFloat64(input, sn)
  337. if err != nil {
  338. return 0, nil
  339. } else {
  340. return float32(r), nil
  341. }
  342. }, "float", cast.STRICT)
  343. case dpb.FieldDescriptorProto_TYPE_INT32, dpb.FieldDescriptorProto_TYPE_SFIXED32, dpb.FieldDescriptorProto_TYPE_SINT32:
  344. result, err = cast.ToTypedSlice(v, func(input interface{}, sn cast.Strictness) (interface{}, error) {
  345. r, err := cast.ToInt(input, sn)
  346. if err != nil {
  347. return 0, nil
  348. } else {
  349. return int32(r), nil
  350. }
  351. }, "int", cast.STRICT)
  352. case dpb.FieldDescriptorProto_TYPE_INT64, dpb.FieldDescriptorProto_TYPE_SFIXED64, dpb.FieldDescriptorProto_TYPE_SINT64:
  353. result, err = cast.ToInt64Slice(v, cast.STRICT)
  354. case dpb.FieldDescriptorProto_TYPE_FIXED32, dpb.FieldDescriptorProto_TYPE_UINT32:
  355. result, err = cast.ToTypedSlice(v, func(input interface{}, sn cast.Strictness) (interface{}, error) {
  356. r, err := cast.ToUint64(input, sn)
  357. if err != nil {
  358. return 0, nil
  359. } else {
  360. return uint32(r), nil
  361. }
  362. }, "uint", cast.STRICT)
  363. case dpb.FieldDescriptorProto_TYPE_FIXED64, dpb.FieldDescriptorProto_TYPE_UINT64:
  364. result, err = cast.ToUint64Slice(v, cast.STRICT)
  365. case dpb.FieldDescriptorProto_TYPE_BOOL:
  366. result, err = cast.ToBoolSlice(v, cast.STRICT)
  367. case dpb.FieldDescriptorProto_TYPE_STRING:
  368. result, err = cast.ToStringSlice(v, cast.STRICT)
  369. case dpb.FieldDescriptorProto_TYPE_BYTES:
  370. result, err = cast.ToBytesSlice(v, cast.STRICT)
  371. case dpb.FieldDescriptorProto_TYPE_MESSAGE:
  372. result, err = cast.ToTypedSlice(v, func(input interface{}, sn cast.Strictness) (interface{}, error) {
  373. r, err := cast.ToStringMap(v)
  374. if err == nil {
  375. return d.encodeMap(field.GetMessageType(), r)
  376. } else {
  377. return nil, fmt.Errorf("invalid type for map type field '%s': %v", fn, err)
  378. }
  379. }, "map", cast.STRICT)
  380. default:
  381. return nil, fmt.Errorf("invalid type for field '%s'", fn)
  382. }
  383. if err != nil {
  384. err = fmt.Errorf("failed to encode field '%s':%v", fn, err)
  385. }
  386. return result, err
  387. } else {
  388. return d.encodeSingleField(field, v)
  389. }
  390. }
  391. func (d *wrappedProtoDescriptor) encodeSingleField(field *desc.FieldDescriptor, v interface{}) (interface{}, error) {
  392. fn := field.GetName()
  393. switch field.GetType() {
  394. case dpb.FieldDescriptorProto_TYPE_DOUBLE:
  395. r, err := cast.ToFloat64(v, cast.STRICT)
  396. if err == nil {
  397. return r, nil
  398. } else {
  399. return nil, fmt.Errorf("invalid type for float type field '%s': %v", fn, err)
  400. }
  401. case dpb.FieldDescriptorProto_TYPE_FLOAT:
  402. r, err := cast.ToFloat64(v, cast.STRICT)
  403. if err == nil {
  404. return float32(r), nil
  405. } else {
  406. return nil, fmt.Errorf("invalid type for float type field '%s': %v", fn, err)
  407. }
  408. case dpb.FieldDescriptorProto_TYPE_INT32, dpb.FieldDescriptorProto_TYPE_SFIXED32, dpb.FieldDescriptorProto_TYPE_SINT32:
  409. r, err := cast.ToInt(v, cast.STRICT)
  410. if err == nil {
  411. return int32(r), nil
  412. } else {
  413. return nil, fmt.Errorf("invalid type for int type field '%s': %v", fn, err)
  414. }
  415. case dpb.FieldDescriptorProto_TYPE_INT64, dpb.FieldDescriptorProto_TYPE_SFIXED64, dpb.FieldDescriptorProto_TYPE_SINT64:
  416. r, err := cast.ToInt64(v, cast.STRICT)
  417. if err == nil {
  418. return r, nil
  419. } else {
  420. return nil, fmt.Errorf("invalid type for int type field '%s': %v", fn, err)
  421. }
  422. case dpb.FieldDescriptorProto_TYPE_FIXED32, dpb.FieldDescriptorProto_TYPE_UINT32:
  423. r, err := cast.ToUint64(v, cast.STRICT)
  424. if err == nil {
  425. return uint32(r), nil
  426. } else {
  427. return nil, fmt.Errorf("invalid type for uint type field '%s': %v", fn, err)
  428. }
  429. case dpb.FieldDescriptorProto_TYPE_FIXED64, dpb.FieldDescriptorProto_TYPE_UINT64:
  430. r, err := cast.ToUint64(v, cast.STRICT)
  431. if err == nil {
  432. return r, nil
  433. } else {
  434. return nil, fmt.Errorf("invalid type for uint type field '%s': %v", fn, err)
  435. }
  436. case dpb.FieldDescriptorProto_TYPE_BOOL:
  437. r, err := cast.ToBool(v, cast.STRICT)
  438. if err == nil {
  439. return r, nil
  440. } else {
  441. return nil, fmt.Errorf("invalid type for bool type field '%s': %v", fn, err)
  442. }
  443. case dpb.FieldDescriptorProto_TYPE_STRING:
  444. r, err := cast.ToString(v, cast.STRICT)
  445. if err == nil {
  446. return r, nil
  447. } else {
  448. return nil, fmt.Errorf("invalid type for string type field '%s': %v", fn, err)
  449. }
  450. case dpb.FieldDescriptorProto_TYPE_BYTES:
  451. r, err := cast.ToBytes(v, cast.STRICT)
  452. if err == nil {
  453. return r, nil
  454. } else {
  455. return nil, fmt.Errorf("invalid type for bytes type field '%s': %v", fn, err)
  456. }
  457. case dpb.FieldDescriptorProto_TYPE_MESSAGE:
  458. r, err := cast.ToStringMap(v)
  459. if err == nil {
  460. return d.encodeMap(field.GetMessageType(), r)
  461. } else {
  462. return nil, fmt.Errorf("invalid type for map type field '%s': %v", fn, err)
  463. }
  464. default:
  465. return nil, fmt.Errorf("invalid type for field '%s'", fn)
  466. }
  467. }
  468. func decodeMessage(message *dynamic.Message, outputType *desc.MessageDescriptor) interface{} {
  469. if _, ok := WRAPPER_TYPES[outputType.GetFullyQualifiedName()]; ok {
  470. return message.GetFieldByNumber(1)
  471. } else if wrapperVoid == outputType.GetFullyQualifiedName() {
  472. return nil
  473. }
  474. result := make(map[string]interface{})
  475. for _, field := range outputType.GetFields() {
  476. decodeMessageField(message.GetField(field), field, result, cast.STRICT)
  477. }
  478. return result
  479. }
  480. func decodeMessageField(src interface{}, field *desc.FieldDescriptor, result map[string]interface{}, sn cast.Strictness) error {
  481. if f, err := decodeField(src, field, sn); err != nil {
  482. return err
  483. } else {
  484. result[field.GetName()] = f
  485. return nil
  486. }
  487. }
  488. func decodeField(src interface{}, field *desc.FieldDescriptor, sn cast.Strictness) (interface{}, error) {
  489. var (
  490. r interface{}
  491. e error
  492. )
  493. fn := field.GetName()
  494. switch field.GetType() {
  495. case dpb.FieldDescriptorProto_TYPE_DOUBLE, dpb.FieldDescriptorProto_TYPE_FLOAT:
  496. if field.IsRepeated() {
  497. r, e = cast.ToFloat64Slice(src, sn)
  498. } else {
  499. r, e = cast.ToFloat64(src, sn)
  500. }
  501. case dpb.FieldDescriptorProto_TYPE_INT32, dpb.FieldDescriptorProto_TYPE_SFIXED32, dpb.FieldDescriptorProto_TYPE_SINT32, dpb.FieldDescriptorProto_TYPE_INT64, dpb.FieldDescriptorProto_TYPE_SFIXED64, dpb.FieldDescriptorProto_TYPE_SINT64, dpb.FieldDescriptorProto_TYPE_FIXED32, dpb.FieldDescriptorProto_TYPE_UINT32, dpb.FieldDescriptorProto_TYPE_FIXED64, dpb.FieldDescriptorProto_TYPE_UINT64:
  502. if field.IsRepeated() {
  503. r, e = cast.ToInt64Slice(src, sn)
  504. } else {
  505. r, e = cast.ToInt64(src, sn)
  506. }
  507. case dpb.FieldDescriptorProto_TYPE_BOOL:
  508. if field.IsRepeated() {
  509. r, e = cast.ToBoolSlice(src, sn)
  510. } else {
  511. r, e = cast.ToBool(src, sn)
  512. }
  513. case dpb.FieldDescriptorProto_TYPE_STRING:
  514. if field.IsRepeated() {
  515. r, e = cast.ToStringSlice(src, sn)
  516. } else {
  517. r, e = cast.ToString(src, sn)
  518. }
  519. case dpb.FieldDescriptorProto_TYPE_BYTES:
  520. if field.IsRepeated() {
  521. r, e = cast.ToBytesSlice(src, sn)
  522. } else {
  523. r, e = cast.ToBytes(src, sn)
  524. }
  525. case dpb.FieldDescriptorProto_TYPE_MESSAGE:
  526. if field.IsRepeated() {
  527. r, e = cast.ToTypedSlice(src, func(input interface{}, ssn cast.Strictness) (interface{}, error) {
  528. return decodeSubMessage(input, field.GetMessageType(), ssn)
  529. }, "map", sn)
  530. } else {
  531. r, e = decodeSubMessage(src, field.GetMessageType(), sn)
  532. }
  533. default:
  534. return nil, fmt.Errorf("unsupported type for %s", fn)
  535. }
  536. if e != nil {
  537. e = fmt.Errorf("invalid type of return value for '%s': %v", fn, e)
  538. }
  539. return r, e
  540. }
  541. func decodeMap(src map[string]interface{}, ft *desc.MessageDescriptor, sn cast.Strictness) (map[string]interface{}, error) {
  542. result := make(map[string]interface{})
  543. for _, field := range ft.GetFields() {
  544. val, ok := src[field.GetName()]
  545. if !ok {
  546. continue
  547. }
  548. err := decodeMessageField(val, field, result, sn)
  549. if err != nil {
  550. return nil, err
  551. }
  552. }
  553. return result, nil
  554. }
  555. func decodeSubMessage(input interface{}, ft *desc.MessageDescriptor, sn cast.Strictness) (interface{}, error) {
  556. var m = map[string]interface{}{}
  557. switch v := input.(type) {
  558. case map[interface{}]interface{}:
  559. for k, val := range v {
  560. m[cast.ToStringAlways(k)] = val
  561. }
  562. return decodeMap(m, ft, sn)
  563. case map[string]interface{}:
  564. return decodeMap(v, ft, sn)
  565. case proto.Message:
  566. message, err := dynamic.AsDynamicMessage(v)
  567. if err != nil {
  568. return nil, err
  569. }
  570. return decodeMessage(message, ft), nil
  571. case *dynamic.Message:
  572. return decodeMessage(v, ft), nil
  573. default:
  574. return nil, fmt.Errorf("cannot decode %[1]T(%[1]v) to map", input)
  575. }
  576. }