schema.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596
  1. package services
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/golang/protobuf/proto"
  8. dpb "github.com/golang/protobuf/protoc-gen-go/descriptor"
  9. "github.com/jhump/protoreflect/desc"
  10. "github.com/jhump/protoreflect/desc/protoparse"
  11. "github.com/jhump/protoreflect/dynamic"
  12. "sync"
  13. )
  14. const (
  15. wrapperBool = "google.protobuf.BoolValue"
  16. wrapperBytes = "google.protobuf.BytesValue"
  17. wrapperDouble = "google.protobuf.DoubleValue"
  18. wrapperFloat = "google.protobuf.FloatValue"
  19. wrapperInt32 = "google.protobuf.Int32Value"
  20. wrapperInt64 = "google.protobuf.Int64Value"
  21. wrapperString = "google.protobuf.StringValue"
  22. wrapperUInt32 = "google.protobuf.UInt32Value"
  23. wrapperUInt64 = "google.protobuf.UInt64Value"
  24. wrapperVoid = "google.protobuf.EMPTY"
  25. )
  26. var WRAPPER_TYPES = map[string]struct{}{
  27. wrapperBool: {},
  28. wrapperBytes: {},
  29. wrapperDouble: {},
  30. wrapperFloat: {},
  31. wrapperInt32: {},
  32. wrapperInt64: {},
  33. wrapperString: {},
  34. wrapperUInt32: {},
  35. wrapperUInt64: {},
  36. }
  37. type descriptor interface {
  38. GetFunctions() []string
  39. }
  40. type protoDescriptor interface {
  41. ConvertParamsToMessage(method string, params []interface{}) (*dynamic.Message, error)
  42. ConvertReturnMessage(method string, returnVal *dynamic.Message) (interface{}, error)
  43. MethodDescriptor(method string) *desc.MethodDescriptor
  44. MessageFactory() *dynamic.MessageFactory
  45. }
  46. type jsonDescriptor interface {
  47. ConvertParamsToJson(method string, params []interface{}) ([]byte, error)
  48. ConvertReturnJson(method string, returnVal []byte) (interface{}, error)
  49. }
  50. type textDescriptor interface {
  51. ConvertParamsToText(method string, params []interface{}) ([]byte, error)
  52. ConvertReturnText(method string, returnVal []byte) (interface{}, error)
  53. }
  54. type interfaceDescriptor interface {
  55. ConvertParams(method string, params []interface{}) ([]interface{}, error)
  56. ConvertReturn(method string, returnVal interface{}) (interface{}, error)
  57. }
  58. type multiplexDescriptor interface {
  59. jsonDescriptor
  60. textDescriptor
  61. interfaceDescriptor
  62. httpMapping
  63. }
  64. var ( //Do not call these directly, use the get methods
  65. protoParser *protoparse.Parser
  66. // A buffer of descriptor for schemas
  67. reg = &sync.Map{}
  68. )
  69. func ProtoParser() *protoparse.Parser {
  70. once.Do(func() {
  71. dir := "/etc/services/schemas/"
  72. if common.IsTesting {
  73. dir = "/services/test/schemas/"
  74. }
  75. schemaDir, _ := common.GetLoc(dir)
  76. protoParser = &protoparse.Parser{ImportPaths: []string{schemaDir}}
  77. })
  78. return protoParser
  79. }
  80. func parse(schema schema, file string) (descriptor, error) {
  81. info := &schemaInfo{
  82. SchemaType: schema,
  83. SchemaFile: file,
  84. }
  85. switch schema {
  86. case PROTOBUFF:
  87. if v, ok := reg.Load(info); ok {
  88. return v.(descriptor), nil
  89. }
  90. if fds, err := ProtoParser().ParseFiles(file); err != nil {
  91. return nil, err
  92. } else {
  93. result := &wrappedProtoDescriptor{
  94. FileDescriptor: fds[0],
  95. mf: dynamic.NewMessageFactoryWithDefaults(),
  96. }
  97. err := result.parseHttpOptions()
  98. if err != nil {
  99. return nil, err
  100. }
  101. reg.Store(info, result)
  102. return result, nil
  103. }
  104. default:
  105. return nil, fmt.Errorf("unsupported schema %s", schema)
  106. }
  107. }
  108. type wrappedProtoDescriptor struct {
  109. *desc.FileDescriptor
  110. methodOptions map[string]*httpOptions
  111. mf *dynamic.MessageFactory
  112. }
  113. //TODO support for duplicate names
  114. func (d *wrappedProtoDescriptor) GetFunctions() (result []string) {
  115. for _, s := range d.GetServices() {
  116. for _, m := range s.GetMethods() {
  117. result = append(result, m.GetName())
  118. }
  119. }
  120. return
  121. }
  122. func (d *wrappedProtoDescriptor) MessageFactory() *dynamic.MessageFactory {
  123. return d.mf
  124. }
  125. // ConvertParams TODO support optional field, support enum type
  126. // Parameter mapping for protobuf
  127. // 1. If param length is 1, it can either a map contains all field or a field only.
  128. // 2. If param length is more then 1, they will map to message fields in the order
  129. func (d *wrappedProtoDescriptor) ConvertParams(method string, params []interface{}) ([]interface{}, error) {
  130. m := d.MethodDescriptor(method)
  131. if m == nil {
  132. return nil, fmt.Errorf("can't find method %s in proto", method)
  133. }
  134. im := m.GetInputType()
  135. return d.convertParams(im, params)
  136. }
  137. func (d *wrappedProtoDescriptor) ConvertParamsToMessage(method string, params []interface{}) (*dynamic.Message, error) {
  138. m := d.MethodDescriptor(method)
  139. if m == nil {
  140. return nil, fmt.Errorf("can't find method %s in proto", method)
  141. }
  142. im := m.GetInputType()
  143. message := d.mf.NewDynamicMessage(im)
  144. typedParams, err := d.convertParams(im, params)
  145. if err != nil {
  146. return nil, err
  147. }
  148. for i, typeParam := range typedParams {
  149. message.SetFieldByNumber(i+1, typeParam)
  150. }
  151. return message, nil
  152. }
  153. func (d *wrappedProtoDescriptor) ConvertParamsToJson(method string, params []interface{}) ([]byte, error) {
  154. // Deal with encoded json string. Just return the string
  155. if len(params) == 1 {
  156. m := d.MethodDescriptor(method)
  157. if m == nil {
  158. return nil, fmt.Errorf("can't find method %s in proto", method)
  159. }
  160. im := m.GetInputType()
  161. if im.GetFullyQualifiedName() == wrapperString {
  162. ss, err := common.ToString(params[0], common.STRICT)
  163. if err != nil {
  164. return nil, err
  165. }
  166. return []byte(ss), nil
  167. }
  168. }
  169. if message, err := d.ConvertParamsToMessage(method, params); err != nil {
  170. return nil, err
  171. } else {
  172. return message.MarshalJSON()
  173. }
  174. }
  175. func (d *wrappedProtoDescriptor) ConvertParamsToText(method string, params []interface{}) ([]byte, error) {
  176. if message, err := d.ConvertParamsToMessage(method, params); err != nil {
  177. return nil, err
  178. } else {
  179. return message.MarshalText()
  180. }
  181. }
  182. func (d *wrappedProtoDescriptor) convertParams(im *desc.MessageDescriptor, params []interface{}) ([]interface{}, error) {
  183. fields := im.GetFields()
  184. var result []interface{}
  185. switch len(params) {
  186. case 0:
  187. if len(fields) == 0 {
  188. return result, nil
  189. } else {
  190. return nil, fmt.Errorf("require %d parameters but none", len(fields))
  191. }
  192. case 1:
  193. // If it is map, try unfold it
  194. // TODO custom error for non map or map name not match
  195. if r, err := d.unfoldMap(im, params[0]); err != nil {
  196. common.Log.Debugf("try unfold param for message %s fail: %v", im.GetName(), err)
  197. } else {
  198. return r, nil
  199. }
  200. // For non map params, treat it as special case of multiple params
  201. if len(fields) == 1 {
  202. param0, err := d.encodeField(fields[0], params[0])
  203. if err != nil {
  204. return nil, err
  205. }
  206. return append(result, param0), nil
  207. } else {
  208. return nil, fmt.Errorf("require %d parameters but only got 1", len(fields))
  209. }
  210. default:
  211. if len(fields) == len(params) {
  212. for i, field := range fields {
  213. param, err := d.encodeField(field, params[i])
  214. if err != nil {
  215. return nil, err
  216. }
  217. result = append(result, param)
  218. }
  219. return result, nil
  220. } else {
  221. return nil, fmt.Errorf("require %d parameters but only got %d", len(fields), len(params))
  222. }
  223. }
  224. }
  225. func (d *wrappedProtoDescriptor) ConvertReturn(method string, returnVal interface{}) (interface{}, error) {
  226. m := d.MethodDescriptor(method)
  227. t := m.GetOutputType()
  228. if _, ok := WRAPPER_TYPES[t.GetFullyQualifiedName()]; ok {
  229. return decodeField(returnVal, t.FindFieldByNumber(1), common.STRICT)
  230. } else { // MUST be a map
  231. if retMap, ok := returnVal.(map[string]interface{}); ok {
  232. return decodeMap(retMap, t, common.CONVERT_SAMEKIND)
  233. } else {
  234. return nil, fmt.Errorf("fail to convert return val, must be a map but got %v", returnVal)
  235. }
  236. }
  237. }
  238. func (d *wrappedProtoDescriptor) ConvertReturnMessage(method string, returnVal *dynamic.Message) (interface{}, error) {
  239. m := d.MethodDescriptor(method)
  240. return decodeMessage(returnVal, m.GetOutputType()), nil
  241. }
  242. func (d *wrappedProtoDescriptor) ConvertReturnJson(method string, returnVal []byte) (interface{}, error) {
  243. r := make(map[string]interface{})
  244. err := json.Unmarshal(returnVal, &r)
  245. if err != nil {
  246. return nil, err
  247. }
  248. m := d.MethodDescriptor(method)
  249. return decodeMap(r, m.GetOutputType(), common.CONVERT_SAMEKIND)
  250. }
  251. func (d *wrappedProtoDescriptor) ConvertReturnText(method string, returnVal []byte) (interface{}, error) {
  252. m := d.MethodDescriptor(method)
  253. t := m.GetOutputType()
  254. if _, ok := WRAPPER_TYPES[t.GetFullyQualifiedName()]; ok {
  255. return decodeField(string(returnVal), t.FindFieldByNumber(1), common.CONVERT_ALL)
  256. } else {
  257. return nil, fmt.Errorf("fail to convert return val to text, return type must be primitive type but got %s", t.GetName())
  258. }
  259. }
  260. func (d *wrappedProtoDescriptor) MethodDescriptor(name string) *desc.MethodDescriptor {
  261. var m *desc.MethodDescriptor
  262. for _, s := range d.GetServices() {
  263. m = s.FindMethodByName(name)
  264. if m != nil {
  265. break
  266. }
  267. }
  268. return m
  269. }
  270. func (d *wrappedProtoDescriptor) unfoldMap(ft *desc.MessageDescriptor, i interface{}) ([]interface{}, error) {
  271. fields := ft.GetFields()
  272. result := make([]interface{}, len(fields))
  273. if m, ok := xsql.ToMessage(i); ok {
  274. for _, field := range fields {
  275. v, ok := m.Value(field.GetName())
  276. if !ok {
  277. return nil, fmt.Errorf("field %s not found", field.GetName())
  278. }
  279. fv, err := d.encodeField(field, v)
  280. if err != nil {
  281. return nil, err
  282. }
  283. result[field.GetNumber()-1] = fv
  284. }
  285. } else {
  286. return nil, fmt.Errorf("not a map")
  287. }
  288. return result, nil
  289. }
  290. func (d *wrappedProtoDescriptor) encodeMap(im *desc.MessageDescriptor, i interface{}) (*dynamic.Message, error) {
  291. result := d.mf.NewDynamicMessage(im)
  292. fields := im.GetFields()
  293. if m, ok := i.(map[string]interface{}); ok {
  294. for _, field := range fields {
  295. v, ok := m[field.GetName()]
  296. if !ok {
  297. return nil, fmt.Errorf("field %s not found", field.GetName())
  298. }
  299. fv, err := d.encodeField(field, v)
  300. if err != nil {
  301. return nil, err
  302. }
  303. result.SetFieldByName(field.GetName(), fv)
  304. }
  305. }
  306. return result, nil
  307. }
  308. func (d *wrappedProtoDescriptor) encodeField(field *desc.FieldDescriptor, v interface{}) (interface{}, error) {
  309. fn := field.GetName()
  310. ft := field.GetType()
  311. if field.IsRepeated() {
  312. var (
  313. result interface{}
  314. err error
  315. )
  316. switch ft {
  317. case dpb.FieldDescriptorProto_TYPE_DOUBLE:
  318. result, err = common.ToFloat64Slice(v, common.STRICT)
  319. case dpb.FieldDescriptorProto_TYPE_FLOAT:
  320. result, err = common.ToTypedSlice(v, func(input interface{}, sn common.Strictness) (interface{}, error) {
  321. r, err := common.ToFloat64(input, sn)
  322. if err != nil {
  323. return 0, nil
  324. } else {
  325. return float32(r), nil
  326. }
  327. }, "float", common.STRICT)
  328. case dpb.FieldDescriptorProto_TYPE_INT32, dpb.FieldDescriptorProto_TYPE_SFIXED32, dpb.FieldDescriptorProto_TYPE_SINT32:
  329. result, err = common.ToTypedSlice(v, func(input interface{}, sn common.Strictness) (interface{}, error) {
  330. r, err := common.ToInt(input, sn)
  331. if err != nil {
  332. return 0, nil
  333. } else {
  334. return int32(r), nil
  335. }
  336. }, "int", common.STRICT)
  337. case dpb.FieldDescriptorProto_TYPE_INT64, dpb.FieldDescriptorProto_TYPE_SFIXED64, dpb.FieldDescriptorProto_TYPE_SINT64:
  338. result, err = common.ToInt64Slice(v, common.STRICT)
  339. case dpb.FieldDescriptorProto_TYPE_FIXED32, dpb.FieldDescriptorProto_TYPE_UINT32:
  340. result, err = common.ToTypedSlice(v, func(input interface{}, sn common.Strictness) (interface{}, error) {
  341. r, err := common.ToUint64(input, sn)
  342. if err != nil {
  343. return 0, nil
  344. } else {
  345. return uint32(r), nil
  346. }
  347. }, "uint", common.STRICT)
  348. case dpb.FieldDescriptorProto_TYPE_FIXED64, dpb.FieldDescriptorProto_TYPE_UINT64:
  349. result, err = common.ToUint64Slice(v, common.STRICT)
  350. case dpb.FieldDescriptorProto_TYPE_BOOL:
  351. result, err = common.ToBoolSlice(v, common.STRICT)
  352. case dpb.FieldDescriptorProto_TYPE_STRING:
  353. result, err = common.ToStringSlice(v, common.STRICT)
  354. case dpb.FieldDescriptorProto_TYPE_BYTES:
  355. result, err = common.ToBytesSlice(v, common.STRICT)
  356. case dpb.FieldDescriptorProto_TYPE_MESSAGE:
  357. result, err = common.ToTypedSlice(v, func(input interface{}, sn common.Strictness) (interface{}, error) {
  358. r, err := common.ToStringMap(v)
  359. if err == nil {
  360. return d.encodeMap(field.GetMessageType(), r)
  361. } else {
  362. return nil, fmt.Errorf("invalid type for map type field '%s': %v", fn, err)
  363. }
  364. }, "map", common.STRICT)
  365. default:
  366. return nil, fmt.Errorf("invalid type for field '%s'", fn)
  367. }
  368. if err != nil {
  369. err = fmt.Errorf("failed to encode field '%s':%v", fn, err)
  370. }
  371. return result, err
  372. } else {
  373. return d.encodeSingleField(field, v)
  374. }
  375. }
  376. func (d *wrappedProtoDescriptor) encodeSingleField(field *desc.FieldDescriptor, v interface{}) (interface{}, error) {
  377. fn := field.GetName()
  378. switch field.GetType() {
  379. case dpb.FieldDescriptorProto_TYPE_DOUBLE:
  380. r, err := common.ToFloat64(v, common.STRICT)
  381. if err == nil {
  382. return r, nil
  383. } else {
  384. return nil, fmt.Errorf("invalid type for float type field '%s': %v", fn, err)
  385. }
  386. case dpb.FieldDescriptorProto_TYPE_FLOAT:
  387. r, err := common.ToFloat64(v, common.STRICT)
  388. if err == nil {
  389. return float32(r), nil
  390. } else {
  391. return nil, fmt.Errorf("invalid type for float type field '%s': %v", fn, err)
  392. }
  393. case dpb.FieldDescriptorProto_TYPE_INT32, dpb.FieldDescriptorProto_TYPE_SFIXED32, dpb.FieldDescriptorProto_TYPE_SINT32:
  394. r, err := common.ToInt(v, common.STRICT)
  395. if err == nil {
  396. return int32(r), nil
  397. } else {
  398. return nil, fmt.Errorf("invalid type for int type field '%s': %v", fn, err)
  399. }
  400. case dpb.FieldDescriptorProto_TYPE_INT64, dpb.FieldDescriptorProto_TYPE_SFIXED64, dpb.FieldDescriptorProto_TYPE_SINT64:
  401. r, err := common.ToInt64(v, common.STRICT)
  402. if err == nil {
  403. return r, nil
  404. } else {
  405. return nil, fmt.Errorf("invalid type for int type field '%s': %v", fn, err)
  406. }
  407. case dpb.FieldDescriptorProto_TYPE_FIXED32, dpb.FieldDescriptorProto_TYPE_UINT32:
  408. r, err := common.ToUint64(v, common.STRICT)
  409. if err == nil {
  410. return uint32(r), nil
  411. } else {
  412. return nil, fmt.Errorf("invalid type for uint type field '%s': %v", fn, err)
  413. }
  414. case dpb.FieldDescriptorProto_TYPE_FIXED64, dpb.FieldDescriptorProto_TYPE_UINT64:
  415. r, err := common.ToUint64(v, common.STRICT)
  416. if err == nil {
  417. return r, nil
  418. } else {
  419. return nil, fmt.Errorf("invalid type for uint type field '%s': %v", fn, err)
  420. }
  421. case dpb.FieldDescriptorProto_TYPE_BOOL:
  422. r, err := common.ToBool(v, common.STRICT)
  423. if err == nil {
  424. return r, nil
  425. } else {
  426. return nil, fmt.Errorf("invalid type for bool type field '%s': %v", fn, err)
  427. }
  428. case dpb.FieldDescriptorProto_TYPE_STRING:
  429. r, err := common.ToString(v, common.STRICT)
  430. if err == nil {
  431. return r, nil
  432. } else {
  433. return nil, fmt.Errorf("invalid type for string type field '%s': %v", fn, err)
  434. }
  435. case dpb.FieldDescriptorProto_TYPE_BYTES:
  436. r, err := common.ToBytes(v, common.STRICT)
  437. if err == nil {
  438. return r, nil
  439. } else {
  440. return nil, fmt.Errorf("invalid type for bytes type field '%s': %v", fn, err)
  441. }
  442. case dpb.FieldDescriptorProto_TYPE_MESSAGE:
  443. r, err := common.ToStringMap(v)
  444. if err == nil {
  445. return d.encodeMap(field.GetMessageType(), r)
  446. } else {
  447. return nil, fmt.Errorf("invalid type for map type field '%s': %v", fn, err)
  448. }
  449. default:
  450. return nil, fmt.Errorf("invalid type for field '%s'", fn)
  451. }
  452. }
  453. func decodeMessage(message *dynamic.Message, outputType *desc.MessageDescriptor) interface{} {
  454. if _, ok := WRAPPER_TYPES[outputType.GetFullyQualifiedName()]; ok {
  455. return message.GetFieldByNumber(1)
  456. } else if wrapperVoid == outputType.GetFullyQualifiedName() {
  457. return nil
  458. }
  459. result := make(map[string]interface{})
  460. for _, field := range outputType.GetFields() {
  461. decodeMessageField(message.GetField(field), field, result, common.STRICT)
  462. }
  463. return result
  464. }
  465. func decodeMessageField(src interface{}, field *desc.FieldDescriptor, result map[string]interface{}, sn common.Strictness) error {
  466. if f, err := decodeField(src, field, sn); err != nil {
  467. return err
  468. } else {
  469. result[field.GetName()] = f
  470. return nil
  471. }
  472. }
  473. func decodeField(src interface{}, field *desc.FieldDescriptor, sn common.Strictness) (interface{}, error) {
  474. var (
  475. r interface{}
  476. e error
  477. )
  478. fn := field.GetName()
  479. switch field.GetType() {
  480. case dpb.FieldDescriptorProto_TYPE_DOUBLE, dpb.FieldDescriptorProto_TYPE_FLOAT:
  481. if field.IsRepeated() {
  482. r, e = common.ToFloat64Slice(src, sn)
  483. } else {
  484. r, e = common.ToFloat64(src, sn)
  485. }
  486. case dpb.FieldDescriptorProto_TYPE_INT32, dpb.FieldDescriptorProto_TYPE_SFIXED32, dpb.FieldDescriptorProto_TYPE_SINT32, dpb.FieldDescriptorProto_TYPE_INT64, dpb.FieldDescriptorProto_TYPE_SFIXED64, dpb.FieldDescriptorProto_TYPE_SINT64, dpb.FieldDescriptorProto_TYPE_FIXED32, dpb.FieldDescriptorProto_TYPE_UINT32, dpb.FieldDescriptorProto_TYPE_FIXED64, dpb.FieldDescriptorProto_TYPE_UINT64:
  487. if field.IsRepeated() {
  488. r, e = common.ToInt64Slice(src, sn)
  489. } else {
  490. r, e = common.ToInt64(src, sn)
  491. }
  492. case dpb.FieldDescriptorProto_TYPE_BOOL:
  493. if field.IsRepeated() {
  494. r, e = common.ToBoolSlice(src, sn)
  495. } else {
  496. r, e = common.ToBool(src, sn)
  497. }
  498. case dpb.FieldDescriptorProto_TYPE_STRING:
  499. if field.IsRepeated() {
  500. r, e = common.ToStringSlice(src, sn)
  501. } else {
  502. r, e = common.ToString(src, sn)
  503. }
  504. case dpb.FieldDescriptorProto_TYPE_BYTES:
  505. if field.IsRepeated() {
  506. r, e = common.ToBytesSlice(src, sn)
  507. } else {
  508. r, e = common.ToBytes(src, sn)
  509. }
  510. case dpb.FieldDescriptorProto_TYPE_MESSAGE:
  511. if field.IsRepeated() {
  512. r, e = common.ToTypedSlice(src, func(input interface{}, ssn common.Strictness) (interface{}, error) {
  513. return decodeSubMessage(input, field.GetMessageType(), ssn)
  514. }, "map", sn)
  515. } else {
  516. r, e = decodeSubMessage(src, field.GetMessageType(), sn)
  517. }
  518. default:
  519. return nil, fmt.Errorf("unsupported type for %s", fn)
  520. }
  521. if e != nil {
  522. e = fmt.Errorf("invalid type of return value for '%s': %v", fn, e)
  523. }
  524. return r, e
  525. }
  526. func decodeMap(src map[string]interface{}, ft *desc.MessageDescriptor, sn common.Strictness) (map[string]interface{}, error) {
  527. result := make(map[string]interface{})
  528. for _, field := range ft.GetFields() {
  529. val, ok := src[field.GetName()]
  530. if !ok {
  531. continue
  532. }
  533. err := decodeMessageField(val, field, result, sn)
  534. if err != nil {
  535. return nil, err
  536. }
  537. }
  538. return result, nil
  539. }
  540. func decodeSubMessage(input interface{}, ft *desc.MessageDescriptor, sn common.Strictness) (interface{}, error) {
  541. var m = map[string]interface{}{}
  542. switch v := input.(type) {
  543. case map[interface{}]interface{}:
  544. for k, val := range v {
  545. m[common.ToStringAlways(k)] = val
  546. }
  547. return decodeMap(m, ft, sn)
  548. case map[string]interface{}:
  549. return decodeMap(v, ft, sn)
  550. case proto.Message:
  551. message, err := dynamic.AsDynamicMessage(v)
  552. if err != nil {
  553. return nil, err
  554. }
  555. return decodeMessage(message, ft), nil
  556. case *dynamic.Message:
  557. return decodeMessage(v, ft), nil
  558. default:
  559. return nil, fmt.Errorf("cannot decode %[1]T(%[1]v) to map", input)
  560. }
  561. }