schema.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598
  1. package service
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. kconf "github.com/emqx/kuiper/internal/conf"
  6. "github.com/emqx/kuiper/internal/xsql"
  7. "github.com/emqx/kuiper/pkg/cast"
  8. "github.com/golang/protobuf/proto"
  9. dpb "github.com/golang/protobuf/protoc-gen-go/descriptor"
  10. "github.com/jhump/protoreflect/desc"
  11. "github.com/jhump/protoreflect/desc/protoparse"
  12. "github.com/jhump/protoreflect/dynamic"
  13. _ "google.golang.org/genproto/googleapis/api/annotations"
  14. "sync"
  15. )
  16. const (
  17. wrapperBool = "google.protobuf.BoolValue"
  18. wrapperBytes = "google.protobuf.BytesValue"
  19. wrapperDouble = "google.protobuf.DoubleValue"
  20. wrapperFloat = "google.protobuf.FloatValue"
  21. wrapperInt32 = "google.protobuf.Int32Value"
  22. wrapperInt64 = "google.protobuf.Int64Value"
  23. wrapperString = "google.protobuf.StringValue"
  24. wrapperUInt32 = "google.protobuf.UInt32Value"
  25. wrapperUInt64 = "google.protobuf.UInt64Value"
  26. wrapperVoid = "google.protobuf.EMPTY"
  27. )
  28. var WRAPPER_TYPES = map[string]struct{}{
  29. wrapperBool: {},
  30. wrapperBytes: {},
  31. wrapperDouble: {},
  32. wrapperFloat: {},
  33. wrapperInt32: {},
  34. wrapperInt64: {},
  35. wrapperString: {},
  36. wrapperUInt32: {},
  37. wrapperUInt64: {},
  38. }
  39. type descriptor interface {
  40. GetFunctions() []string
  41. }
  42. type protoDescriptor interface {
  43. ConvertParamsToMessage(method string, params []interface{}) (*dynamic.Message, error)
  44. ConvertReturnMessage(method string, returnVal *dynamic.Message) (interface{}, error)
  45. MethodDescriptor(method string) *desc.MethodDescriptor
  46. MessageFactory() *dynamic.MessageFactory
  47. }
  48. type jsonDescriptor interface {
  49. ConvertParamsToJson(method string, params []interface{}) ([]byte, error)
  50. ConvertReturnJson(method string, returnVal []byte) (interface{}, error)
  51. }
  52. type textDescriptor interface {
  53. ConvertParamsToText(method string, params []interface{}) ([]byte, error)
  54. ConvertReturnText(method string, returnVal []byte) (interface{}, error)
  55. }
  56. type interfaceDescriptor interface {
  57. ConvertParams(method string, params []interface{}) ([]interface{}, error)
  58. ConvertReturn(method string, returnVal interface{}) (interface{}, error)
  59. }
  60. type multiplexDescriptor interface {
  61. jsonDescriptor
  62. textDescriptor
  63. interfaceDescriptor
  64. httpMapping
  65. }
  66. var ( //Do not call these directly, use the get methods
  67. protoParser *protoparse.Parser
  68. // A buffer of descriptor for schemas
  69. reg = &sync.Map{}
  70. )
  71. func ProtoParser() *protoparse.Parser {
  72. once.Do(func() {
  73. dir := "etc/services/schemas/"
  74. if kconf.IsTesting {
  75. dir = "service/test/schemas/"
  76. }
  77. schemaDir, _ := kconf.GetLoc(dir)
  78. protoParser = &protoparse.Parser{ImportPaths: []string{schemaDir}}
  79. })
  80. return protoParser
  81. }
  82. func parse(schema schema, file string) (descriptor, error) {
  83. info := &schemaInfo{
  84. SchemaType: schema,
  85. SchemaFile: file,
  86. }
  87. switch schema {
  88. case PROTOBUFF:
  89. if v, ok := reg.Load(info); ok {
  90. return v.(descriptor), nil
  91. }
  92. if fds, err := ProtoParser().ParseFiles(file); err != nil {
  93. return nil, err
  94. } else {
  95. result := &wrappedProtoDescriptor{
  96. FileDescriptor: fds[0],
  97. mf: dynamic.NewMessageFactoryWithDefaults(),
  98. }
  99. err := result.parseHttpOptions()
  100. if err != nil {
  101. return nil, err
  102. }
  103. reg.Store(info, result)
  104. return result, nil
  105. }
  106. default:
  107. return nil, fmt.Errorf("unsupported schema %s", schema)
  108. }
  109. }
  110. type wrappedProtoDescriptor struct {
  111. *desc.FileDescriptor
  112. methodOptions map[string]*httpOptions
  113. mf *dynamic.MessageFactory
  114. }
  115. //TODO support for duplicate names
  116. func (d *wrappedProtoDescriptor) GetFunctions() (result []string) {
  117. for _, s := range d.GetServices() {
  118. for _, m := range s.GetMethods() {
  119. result = append(result, m.GetName())
  120. }
  121. }
  122. return
  123. }
  124. func (d *wrappedProtoDescriptor) MessageFactory() *dynamic.MessageFactory {
  125. return d.mf
  126. }
  127. // ConvertParams TODO support optional field, support enum type
  128. // Parameter mapping for protobuf
  129. // 1. If param length is 1, it can either a map contains all field or a field only.
  130. // 2. If param length is more then 1, they will map to message fields in the order
  131. func (d *wrappedProtoDescriptor) ConvertParams(method string, params []interface{}) ([]interface{}, error) {
  132. m := d.MethodDescriptor(method)
  133. if m == nil {
  134. return nil, fmt.Errorf("can't find method %s in proto", method)
  135. }
  136. im := m.GetInputType()
  137. return d.convertParams(im, params)
  138. }
  139. func (d *wrappedProtoDescriptor) ConvertParamsToMessage(method string, params []interface{}) (*dynamic.Message, error) {
  140. m := d.MethodDescriptor(method)
  141. if m == nil {
  142. return nil, fmt.Errorf("can't find method %s in proto", method)
  143. }
  144. im := m.GetInputType()
  145. message := d.mf.NewDynamicMessage(im)
  146. typedParams, err := d.convertParams(im, params)
  147. if err != nil {
  148. return nil, err
  149. }
  150. for i, typeParam := range typedParams {
  151. message.SetFieldByNumber(i+1, typeParam)
  152. }
  153. return message, nil
  154. }
  155. func (d *wrappedProtoDescriptor) ConvertParamsToJson(method string, params []interface{}) ([]byte, error) {
  156. // Deal with encoded json string. Just return the string
  157. if len(params) == 1 {
  158. m := d.MethodDescriptor(method)
  159. if m == nil {
  160. return nil, fmt.Errorf("can't find method %s in proto", method)
  161. }
  162. im := m.GetInputType()
  163. if im.GetFullyQualifiedName() == wrapperString {
  164. ss, err := cast.ToString(params[0], cast.STRICT)
  165. if err != nil {
  166. return nil, err
  167. }
  168. return []byte(ss), nil
  169. }
  170. }
  171. if message, err := d.ConvertParamsToMessage(method, params); err != nil {
  172. return nil, err
  173. } else {
  174. return message.MarshalJSON()
  175. }
  176. }
  177. func (d *wrappedProtoDescriptor) ConvertParamsToText(method string, params []interface{}) ([]byte, error) {
  178. if message, err := d.ConvertParamsToMessage(method, params); err != nil {
  179. return nil, err
  180. } else {
  181. return message.MarshalText()
  182. }
  183. }
  184. func (d *wrappedProtoDescriptor) convertParams(im *desc.MessageDescriptor, params []interface{}) ([]interface{}, error) {
  185. fields := im.GetFields()
  186. var result []interface{}
  187. switch len(params) {
  188. case 0:
  189. if len(fields) == 0 {
  190. return result, nil
  191. } else {
  192. return nil, fmt.Errorf("require %d parameters but none", len(fields))
  193. }
  194. case 1:
  195. // If it is map, try unfold it
  196. // TODO custom error for non map or map name not match
  197. if r, err := d.unfoldMap(im, params[0]); err != nil {
  198. kconf.Log.Debugf("try unfold param for message %s fail: %v", im.GetName(), err)
  199. } else {
  200. return r, nil
  201. }
  202. // For non map params, treat it as special case of multiple params
  203. if len(fields) == 1 {
  204. param0, err := d.encodeField(fields[0], params[0])
  205. if err != nil {
  206. return nil, err
  207. }
  208. return append(result, param0), nil
  209. } else {
  210. return nil, fmt.Errorf("require %d parameters but only got 1", len(fields))
  211. }
  212. default:
  213. if len(fields) == len(params) {
  214. for i, field := range fields {
  215. param, err := d.encodeField(field, params[i])
  216. if err != nil {
  217. return nil, err
  218. }
  219. result = append(result, param)
  220. }
  221. return result, nil
  222. } else {
  223. return nil, fmt.Errorf("require %d parameters but only got %d", len(fields), len(params))
  224. }
  225. }
  226. }
  227. func (d *wrappedProtoDescriptor) ConvertReturn(method string, returnVal interface{}) (interface{}, error) {
  228. m := d.MethodDescriptor(method)
  229. t := m.GetOutputType()
  230. if _, ok := WRAPPER_TYPES[t.GetFullyQualifiedName()]; ok {
  231. return decodeField(returnVal, t.FindFieldByNumber(1), cast.STRICT)
  232. } else { // MUST be a map
  233. if retMap, ok := returnVal.(map[string]interface{}); ok {
  234. return decodeMap(retMap, t, cast.CONVERT_SAMEKIND)
  235. } else {
  236. return nil, fmt.Errorf("fail to convert return val, must be a map but got %v", returnVal)
  237. }
  238. }
  239. }
  240. func (d *wrappedProtoDescriptor) ConvertReturnMessage(method string, returnVal *dynamic.Message) (interface{}, error) {
  241. m := d.MethodDescriptor(method)
  242. return decodeMessage(returnVal, m.GetOutputType()), nil
  243. }
  244. func (d *wrappedProtoDescriptor) ConvertReturnJson(method string, returnVal []byte) (interface{}, error) {
  245. r := make(map[string]interface{})
  246. err := json.Unmarshal(returnVal, &r)
  247. if err != nil {
  248. return nil, err
  249. }
  250. m := d.MethodDescriptor(method)
  251. return decodeMap(r, m.GetOutputType(), cast.CONVERT_SAMEKIND)
  252. }
  253. func (d *wrappedProtoDescriptor) ConvertReturnText(method string, returnVal []byte) (interface{}, error) {
  254. m := d.MethodDescriptor(method)
  255. t := m.GetOutputType()
  256. if _, ok := WRAPPER_TYPES[t.GetFullyQualifiedName()]; ok {
  257. return decodeField(string(returnVal), t.FindFieldByNumber(1), cast.CONVERT_ALL)
  258. } else {
  259. return nil, fmt.Errorf("fail to convert return val to text, return type must be primitive type but got %s", t.GetName())
  260. }
  261. }
  262. func (d *wrappedProtoDescriptor) MethodDescriptor(name string) *desc.MethodDescriptor {
  263. var m *desc.MethodDescriptor
  264. for _, s := range d.GetServices() {
  265. m = s.FindMethodByName(name)
  266. if m != nil {
  267. break
  268. }
  269. }
  270. return m
  271. }
  272. func (d *wrappedProtoDescriptor) unfoldMap(ft *desc.MessageDescriptor, i interface{}) ([]interface{}, error) {
  273. fields := ft.GetFields()
  274. result := make([]interface{}, len(fields))
  275. if m, ok := xsql.ToMessage(i); ok {
  276. for _, field := range fields {
  277. v, ok := m.Value(field.GetName())
  278. if !ok {
  279. return nil, fmt.Errorf("field %s not found", field.GetName())
  280. }
  281. fv, err := d.encodeField(field, v)
  282. if err != nil {
  283. return nil, err
  284. }
  285. result[field.GetNumber()-1] = fv
  286. }
  287. } else {
  288. return nil, fmt.Errorf("not a map")
  289. }
  290. return result, nil
  291. }
  292. func (d *wrappedProtoDescriptor) encodeMap(im *desc.MessageDescriptor, i interface{}) (*dynamic.Message, error) {
  293. result := d.mf.NewDynamicMessage(im)
  294. fields := im.GetFields()
  295. if m, ok := i.(map[string]interface{}); ok {
  296. for _, field := range fields {
  297. v, ok := m[field.GetName()]
  298. if !ok {
  299. return nil, fmt.Errorf("field %s not found", field.GetName())
  300. }
  301. fv, err := d.encodeField(field, v)
  302. if err != nil {
  303. return nil, err
  304. }
  305. result.SetFieldByName(field.GetName(), fv)
  306. }
  307. }
  308. return result, nil
  309. }
  310. func (d *wrappedProtoDescriptor) encodeField(field *desc.FieldDescriptor, v interface{}) (interface{}, error) {
  311. fn := field.GetName()
  312. ft := field.GetType()
  313. if field.IsRepeated() {
  314. var (
  315. result interface{}
  316. err error
  317. )
  318. switch ft {
  319. case dpb.FieldDescriptorProto_TYPE_DOUBLE:
  320. result, err = cast.ToFloat64Slice(v, cast.STRICT)
  321. case dpb.FieldDescriptorProto_TYPE_FLOAT:
  322. result, err = cast.ToTypedSlice(v, func(input interface{}, sn cast.Strictness) (interface{}, error) {
  323. r, err := cast.ToFloat64(input, sn)
  324. if err != nil {
  325. return 0, nil
  326. } else {
  327. return float32(r), nil
  328. }
  329. }, "float", cast.STRICT)
  330. case dpb.FieldDescriptorProto_TYPE_INT32, dpb.FieldDescriptorProto_TYPE_SFIXED32, dpb.FieldDescriptorProto_TYPE_SINT32:
  331. result, err = cast.ToTypedSlice(v, func(input interface{}, sn cast.Strictness) (interface{}, error) {
  332. r, err := cast.ToInt(input, sn)
  333. if err != nil {
  334. return 0, nil
  335. } else {
  336. return int32(r), nil
  337. }
  338. }, "int", cast.STRICT)
  339. case dpb.FieldDescriptorProto_TYPE_INT64, dpb.FieldDescriptorProto_TYPE_SFIXED64, dpb.FieldDescriptorProto_TYPE_SINT64:
  340. result, err = cast.ToInt64Slice(v, cast.STRICT)
  341. case dpb.FieldDescriptorProto_TYPE_FIXED32, dpb.FieldDescriptorProto_TYPE_UINT32:
  342. result, err = cast.ToTypedSlice(v, func(input interface{}, sn cast.Strictness) (interface{}, error) {
  343. r, err := cast.ToUint64(input, sn)
  344. if err != nil {
  345. return 0, nil
  346. } else {
  347. return uint32(r), nil
  348. }
  349. }, "uint", cast.STRICT)
  350. case dpb.FieldDescriptorProto_TYPE_FIXED64, dpb.FieldDescriptorProto_TYPE_UINT64:
  351. result, err = cast.ToUint64Slice(v, cast.STRICT)
  352. case dpb.FieldDescriptorProto_TYPE_BOOL:
  353. result, err = cast.ToBoolSlice(v, cast.STRICT)
  354. case dpb.FieldDescriptorProto_TYPE_STRING:
  355. result, err = cast.ToStringSlice(v, cast.STRICT)
  356. case dpb.FieldDescriptorProto_TYPE_BYTES:
  357. result, err = cast.ToBytesSlice(v, cast.STRICT)
  358. case dpb.FieldDescriptorProto_TYPE_MESSAGE:
  359. result, err = cast.ToTypedSlice(v, func(input interface{}, sn cast.Strictness) (interface{}, error) {
  360. r, err := cast.ToStringMap(v)
  361. if err == nil {
  362. return d.encodeMap(field.GetMessageType(), r)
  363. } else {
  364. return nil, fmt.Errorf("invalid type for map type field '%s': %v", fn, err)
  365. }
  366. }, "map", cast.STRICT)
  367. default:
  368. return nil, fmt.Errorf("invalid type for field '%s'", fn)
  369. }
  370. if err != nil {
  371. err = fmt.Errorf("failed to encode field '%s':%v", fn, err)
  372. }
  373. return result, err
  374. } else {
  375. return d.encodeSingleField(field, v)
  376. }
  377. }
  378. func (d *wrappedProtoDescriptor) encodeSingleField(field *desc.FieldDescriptor, v interface{}) (interface{}, error) {
  379. fn := field.GetName()
  380. switch field.GetType() {
  381. case dpb.FieldDescriptorProto_TYPE_DOUBLE:
  382. r, err := cast.ToFloat64(v, cast.STRICT)
  383. if err == nil {
  384. return r, nil
  385. } else {
  386. return nil, fmt.Errorf("invalid type for float type field '%s': %v", fn, err)
  387. }
  388. case dpb.FieldDescriptorProto_TYPE_FLOAT:
  389. r, err := cast.ToFloat64(v, cast.STRICT)
  390. if err == nil {
  391. return float32(r), nil
  392. } else {
  393. return nil, fmt.Errorf("invalid type for float type field '%s': %v", fn, err)
  394. }
  395. case dpb.FieldDescriptorProto_TYPE_INT32, dpb.FieldDescriptorProto_TYPE_SFIXED32, dpb.FieldDescriptorProto_TYPE_SINT32:
  396. r, err := cast.ToInt(v, cast.STRICT)
  397. if err == nil {
  398. return int32(r), nil
  399. } else {
  400. return nil, fmt.Errorf("invalid type for int type field '%s': %v", fn, err)
  401. }
  402. case dpb.FieldDescriptorProto_TYPE_INT64, dpb.FieldDescriptorProto_TYPE_SFIXED64, dpb.FieldDescriptorProto_TYPE_SINT64:
  403. r, err := cast.ToInt64(v, cast.STRICT)
  404. if err == nil {
  405. return r, nil
  406. } else {
  407. return nil, fmt.Errorf("invalid type for int type field '%s': %v", fn, err)
  408. }
  409. case dpb.FieldDescriptorProto_TYPE_FIXED32, dpb.FieldDescriptorProto_TYPE_UINT32:
  410. r, err := cast.ToUint64(v, cast.STRICT)
  411. if err == nil {
  412. return uint32(r), nil
  413. } else {
  414. return nil, fmt.Errorf("invalid type for uint type field '%s': %v", fn, err)
  415. }
  416. case dpb.FieldDescriptorProto_TYPE_FIXED64, dpb.FieldDescriptorProto_TYPE_UINT64:
  417. r, err := cast.ToUint64(v, cast.STRICT)
  418. if err == nil {
  419. return r, nil
  420. } else {
  421. return nil, fmt.Errorf("invalid type for uint type field '%s': %v", fn, err)
  422. }
  423. case dpb.FieldDescriptorProto_TYPE_BOOL:
  424. r, err := cast.ToBool(v, cast.STRICT)
  425. if err == nil {
  426. return r, nil
  427. } else {
  428. return nil, fmt.Errorf("invalid type for bool type field '%s': %v", fn, err)
  429. }
  430. case dpb.FieldDescriptorProto_TYPE_STRING:
  431. r, err := cast.ToString(v, cast.STRICT)
  432. if err == nil {
  433. return r, nil
  434. } else {
  435. return nil, fmt.Errorf("invalid type for string type field '%s': %v", fn, err)
  436. }
  437. case dpb.FieldDescriptorProto_TYPE_BYTES:
  438. r, err := cast.ToBytes(v, cast.STRICT)
  439. if err == nil {
  440. return r, nil
  441. } else {
  442. return nil, fmt.Errorf("invalid type for bytes type field '%s': %v", fn, err)
  443. }
  444. case dpb.FieldDescriptorProto_TYPE_MESSAGE:
  445. r, err := cast.ToStringMap(v)
  446. if err == nil {
  447. return d.encodeMap(field.GetMessageType(), r)
  448. } else {
  449. return nil, fmt.Errorf("invalid type for map type field '%s': %v", fn, err)
  450. }
  451. default:
  452. return nil, fmt.Errorf("invalid type for field '%s'", fn)
  453. }
  454. }
  455. func decodeMessage(message *dynamic.Message, outputType *desc.MessageDescriptor) interface{} {
  456. if _, ok := WRAPPER_TYPES[outputType.GetFullyQualifiedName()]; ok {
  457. return message.GetFieldByNumber(1)
  458. } else if wrapperVoid == outputType.GetFullyQualifiedName() {
  459. return nil
  460. }
  461. result := make(map[string]interface{})
  462. for _, field := range outputType.GetFields() {
  463. decodeMessageField(message.GetField(field), field, result, cast.STRICT)
  464. }
  465. return result
  466. }
  467. func decodeMessageField(src interface{}, field *desc.FieldDescriptor, result map[string]interface{}, sn cast.Strictness) error {
  468. if f, err := decodeField(src, field, sn); err != nil {
  469. return err
  470. } else {
  471. result[field.GetName()] = f
  472. return nil
  473. }
  474. }
  475. func decodeField(src interface{}, field *desc.FieldDescriptor, sn cast.Strictness) (interface{}, error) {
  476. var (
  477. r interface{}
  478. e error
  479. )
  480. fn := field.GetName()
  481. switch field.GetType() {
  482. case dpb.FieldDescriptorProto_TYPE_DOUBLE, dpb.FieldDescriptorProto_TYPE_FLOAT:
  483. if field.IsRepeated() {
  484. r, e = cast.ToFloat64Slice(src, sn)
  485. } else {
  486. r, e = cast.ToFloat64(src, sn)
  487. }
  488. case dpb.FieldDescriptorProto_TYPE_INT32, dpb.FieldDescriptorProto_TYPE_SFIXED32, dpb.FieldDescriptorProto_TYPE_SINT32, dpb.FieldDescriptorProto_TYPE_INT64, dpb.FieldDescriptorProto_TYPE_SFIXED64, dpb.FieldDescriptorProto_TYPE_SINT64, dpb.FieldDescriptorProto_TYPE_FIXED32, dpb.FieldDescriptorProto_TYPE_UINT32, dpb.FieldDescriptorProto_TYPE_FIXED64, dpb.FieldDescriptorProto_TYPE_UINT64:
  489. if field.IsRepeated() {
  490. r, e = cast.ToInt64Slice(src, sn)
  491. } else {
  492. r, e = cast.ToInt64(src, sn)
  493. }
  494. case dpb.FieldDescriptorProto_TYPE_BOOL:
  495. if field.IsRepeated() {
  496. r, e = cast.ToBoolSlice(src, sn)
  497. } else {
  498. r, e = cast.ToBool(src, sn)
  499. }
  500. case dpb.FieldDescriptorProto_TYPE_STRING:
  501. if field.IsRepeated() {
  502. r, e = cast.ToStringSlice(src, sn)
  503. } else {
  504. r, e = cast.ToString(src, sn)
  505. }
  506. case dpb.FieldDescriptorProto_TYPE_BYTES:
  507. if field.IsRepeated() {
  508. r, e = cast.ToBytesSlice(src, sn)
  509. } else {
  510. r, e = cast.ToBytes(src, sn)
  511. }
  512. case dpb.FieldDescriptorProto_TYPE_MESSAGE:
  513. if field.IsRepeated() {
  514. r, e = cast.ToTypedSlice(src, func(input interface{}, ssn cast.Strictness) (interface{}, error) {
  515. return decodeSubMessage(input, field.GetMessageType(), ssn)
  516. }, "map", sn)
  517. } else {
  518. r, e = decodeSubMessage(src, field.GetMessageType(), sn)
  519. }
  520. default:
  521. return nil, fmt.Errorf("unsupported type for %s", fn)
  522. }
  523. if e != nil {
  524. e = fmt.Errorf("invalid type of return value for '%s': %v", fn, e)
  525. }
  526. return r, e
  527. }
  528. func decodeMap(src map[string]interface{}, ft *desc.MessageDescriptor, sn cast.Strictness) (map[string]interface{}, error) {
  529. result := make(map[string]interface{})
  530. for _, field := range ft.GetFields() {
  531. val, ok := src[field.GetName()]
  532. if !ok {
  533. continue
  534. }
  535. err := decodeMessageField(val, field, result, sn)
  536. if err != nil {
  537. return nil, err
  538. }
  539. }
  540. return result, nil
  541. }
  542. func decodeSubMessage(input interface{}, ft *desc.MessageDescriptor, sn cast.Strictness) (interface{}, error) {
  543. var m = map[string]interface{}{}
  544. switch v := input.(type) {
  545. case map[interface{}]interface{}:
  546. for k, val := range v {
  547. m[cast.ToStringAlways(k)] = val
  548. }
  549. return decodeMap(m, ft, sn)
  550. case map[string]interface{}:
  551. return decodeMap(v, ft, sn)
  552. case proto.Message:
  553. message, err := dynamic.AsDynamicMessage(v)
  554. if err != nil {
  555. return nil, err
  556. }
  557. return decodeMessage(message, ft), nil
  558. case *dynamic.Message:
  559. return decodeMessage(v, ft), nil
  560. default:
  561. return nil, fmt.Errorf("cannot decode %[1]T(%[1]v) to map", input)
  562. }
  563. }