schema.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597
  1. package services
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/golang/protobuf/proto"
  8. dpb "github.com/golang/protobuf/protoc-gen-go/descriptor"
  9. "github.com/jhump/protoreflect/desc"
  10. "github.com/jhump/protoreflect/desc/protoparse"
  11. "github.com/jhump/protoreflect/dynamic"
  12. _ "google.golang.org/genproto/googleapis/api/annotations"
  13. "sync"
  14. )
  15. const (
  16. wrapperBool = "google.protobuf.BoolValue"
  17. wrapperBytes = "google.protobuf.BytesValue"
  18. wrapperDouble = "google.protobuf.DoubleValue"
  19. wrapperFloat = "google.protobuf.FloatValue"
  20. wrapperInt32 = "google.protobuf.Int32Value"
  21. wrapperInt64 = "google.protobuf.Int64Value"
  22. wrapperString = "google.protobuf.StringValue"
  23. wrapperUInt32 = "google.protobuf.UInt32Value"
  24. wrapperUInt64 = "google.protobuf.UInt64Value"
  25. wrapperVoid = "google.protobuf.EMPTY"
  26. )
  27. var WRAPPER_TYPES = map[string]struct{}{
  28. wrapperBool: {},
  29. wrapperBytes: {},
  30. wrapperDouble: {},
  31. wrapperFloat: {},
  32. wrapperInt32: {},
  33. wrapperInt64: {},
  34. wrapperString: {},
  35. wrapperUInt32: {},
  36. wrapperUInt64: {},
  37. }
  38. type descriptor interface {
  39. GetFunctions() []string
  40. }
  41. type protoDescriptor interface {
  42. ConvertParamsToMessage(method string, params []interface{}) (*dynamic.Message, error)
  43. ConvertReturnMessage(method string, returnVal *dynamic.Message) (interface{}, error)
  44. MethodDescriptor(method string) *desc.MethodDescriptor
  45. MessageFactory() *dynamic.MessageFactory
  46. }
  47. type jsonDescriptor interface {
  48. ConvertParamsToJson(method string, params []interface{}) ([]byte, error)
  49. ConvertReturnJson(method string, returnVal []byte) (interface{}, error)
  50. }
  51. type textDescriptor interface {
  52. ConvertParamsToText(method string, params []interface{}) ([]byte, error)
  53. ConvertReturnText(method string, returnVal []byte) (interface{}, error)
  54. }
  55. type interfaceDescriptor interface {
  56. ConvertParams(method string, params []interface{}) ([]interface{}, error)
  57. ConvertReturn(method string, returnVal interface{}) (interface{}, error)
  58. }
  59. type multiplexDescriptor interface {
  60. jsonDescriptor
  61. textDescriptor
  62. interfaceDescriptor
  63. httpMapping
  64. }
  65. var ( //Do not call these directly, use the get methods
  66. protoParser *protoparse.Parser
  67. // A buffer of descriptor for schemas
  68. reg = &sync.Map{}
  69. )
  70. func ProtoParser() *protoparse.Parser {
  71. once.Do(func() {
  72. dir := "/etc/services/schemas/"
  73. if common.IsTesting {
  74. dir = "/services/test/schemas/"
  75. }
  76. schemaDir, _ := common.GetLoc(dir)
  77. protoParser = &protoparse.Parser{ImportPaths: []string{schemaDir}}
  78. })
  79. return protoParser
  80. }
  81. func parse(schema schema, file string) (descriptor, error) {
  82. info := &schemaInfo{
  83. SchemaType: schema,
  84. SchemaFile: file,
  85. }
  86. switch schema {
  87. case PROTOBUFF:
  88. if v, ok := reg.Load(info); ok {
  89. return v.(descriptor), nil
  90. }
  91. if fds, err := ProtoParser().ParseFiles(file); err != nil {
  92. return nil, err
  93. } else {
  94. result := &wrappedProtoDescriptor{
  95. FileDescriptor: fds[0],
  96. mf: dynamic.NewMessageFactoryWithDefaults(),
  97. }
  98. err := result.parseHttpOptions()
  99. if err != nil {
  100. return nil, err
  101. }
  102. reg.Store(info, result)
  103. return result, nil
  104. }
  105. default:
  106. return nil, fmt.Errorf("unsupported schema %s", schema)
  107. }
  108. }
  109. type wrappedProtoDescriptor struct {
  110. *desc.FileDescriptor
  111. methodOptions map[string]*httpOptions
  112. mf *dynamic.MessageFactory
  113. }
  114. //TODO support for duplicate names
  115. func (d *wrappedProtoDescriptor) GetFunctions() (result []string) {
  116. for _, s := range d.GetServices() {
  117. for _, m := range s.GetMethods() {
  118. result = append(result, m.GetName())
  119. }
  120. }
  121. return
  122. }
  123. func (d *wrappedProtoDescriptor) MessageFactory() *dynamic.MessageFactory {
  124. return d.mf
  125. }
  126. // ConvertParams TODO support optional field, support enum type
  127. // Parameter mapping for protobuf
  128. // 1. If param length is 1, it can either a map contains all field or a field only.
  129. // 2. If param length is more then 1, they will map to message fields in the order
  130. func (d *wrappedProtoDescriptor) ConvertParams(method string, params []interface{}) ([]interface{}, error) {
  131. m := d.MethodDescriptor(method)
  132. if m == nil {
  133. return nil, fmt.Errorf("can't find method %s in proto", method)
  134. }
  135. im := m.GetInputType()
  136. return d.convertParams(im, params)
  137. }
  138. func (d *wrappedProtoDescriptor) ConvertParamsToMessage(method string, params []interface{}) (*dynamic.Message, error) {
  139. m := d.MethodDescriptor(method)
  140. if m == nil {
  141. return nil, fmt.Errorf("can't find method %s in proto", method)
  142. }
  143. im := m.GetInputType()
  144. message := d.mf.NewDynamicMessage(im)
  145. typedParams, err := d.convertParams(im, params)
  146. if err != nil {
  147. return nil, err
  148. }
  149. for i, typeParam := range typedParams {
  150. message.SetFieldByNumber(i+1, typeParam)
  151. }
  152. return message, nil
  153. }
  154. func (d *wrappedProtoDescriptor) ConvertParamsToJson(method string, params []interface{}) ([]byte, error) {
  155. // Deal with encoded json string. Just return the string
  156. if len(params) == 1 {
  157. m := d.MethodDescriptor(method)
  158. if m == nil {
  159. return nil, fmt.Errorf("can't find method %s in proto", method)
  160. }
  161. im := m.GetInputType()
  162. if im.GetFullyQualifiedName() == wrapperString {
  163. ss, err := common.ToString(params[0], common.STRICT)
  164. if err != nil {
  165. return nil, err
  166. }
  167. return []byte(ss), nil
  168. }
  169. }
  170. if message, err := d.ConvertParamsToMessage(method, params); err != nil {
  171. return nil, err
  172. } else {
  173. return message.MarshalJSON()
  174. }
  175. }
  176. func (d *wrappedProtoDescriptor) ConvertParamsToText(method string, params []interface{}) ([]byte, error) {
  177. if message, err := d.ConvertParamsToMessage(method, params); err != nil {
  178. return nil, err
  179. } else {
  180. return message.MarshalText()
  181. }
  182. }
  183. func (d *wrappedProtoDescriptor) convertParams(im *desc.MessageDescriptor, params []interface{}) ([]interface{}, error) {
  184. fields := im.GetFields()
  185. var result []interface{}
  186. switch len(params) {
  187. case 0:
  188. if len(fields) == 0 {
  189. return result, nil
  190. } else {
  191. return nil, fmt.Errorf("require %d parameters but none", len(fields))
  192. }
  193. case 1:
  194. // If it is map, try unfold it
  195. // TODO custom error for non map or map name not match
  196. if r, err := d.unfoldMap(im, params[0]); err != nil {
  197. common.Log.Debugf("try unfold param for message %s fail: %v", im.GetName(), err)
  198. } else {
  199. return r, nil
  200. }
  201. // For non map params, treat it as special case of multiple params
  202. if len(fields) == 1 {
  203. param0, err := d.encodeField(fields[0], params[0])
  204. if err != nil {
  205. return nil, err
  206. }
  207. return append(result, param0), nil
  208. } else {
  209. return nil, fmt.Errorf("require %d parameters but only got 1", len(fields))
  210. }
  211. default:
  212. if len(fields) == len(params) {
  213. for i, field := range fields {
  214. param, err := d.encodeField(field, params[i])
  215. if err != nil {
  216. return nil, err
  217. }
  218. result = append(result, param)
  219. }
  220. return result, nil
  221. } else {
  222. return nil, fmt.Errorf("require %d parameters but only got %d", len(fields), len(params))
  223. }
  224. }
  225. }
  226. func (d *wrappedProtoDescriptor) ConvertReturn(method string, returnVal interface{}) (interface{}, error) {
  227. m := d.MethodDescriptor(method)
  228. t := m.GetOutputType()
  229. if _, ok := WRAPPER_TYPES[t.GetFullyQualifiedName()]; ok {
  230. return decodeField(returnVal, t.FindFieldByNumber(1), common.STRICT)
  231. } else { // MUST be a map
  232. if retMap, ok := returnVal.(map[string]interface{}); ok {
  233. return decodeMap(retMap, t, common.CONVERT_SAMEKIND)
  234. } else {
  235. return nil, fmt.Errorf("fail to convert return val, must be a map but got %v", returnVal)
  236. }
  237. }
  238. }
  239. func (d *wrappedProtoDescriptor) ConvertReturnMessage(method string, returnVal *dynamic.Message) (interface{}, error) {
  240. m := d.MethodDescriptor(method)
  241. return decodeMessage(returnVal, m.GetOutputType()), nil
  242. }
  243. func (d *wrappedProtoDescriptor) ConvertReturnJson(method string, returnVal []byte) (interface{}, error) {
  244. r := make(map[string]interface{})
  245. err := json.Unmarshal(returnVal, &r)
  246. if err != nil {
  247. return nil, err
  248. }
  249. m := d.MethodDescriptor(method)
  250. return decodeMap(r, m.GetOutputType(), common.CONVERT_SAMEKIND)
  251. }
  252. func (d *wrappedProtoDescriptor) ConvertReturnText(method string, returnVal []byte) (interface{}, error) {
  253. m := d.MethodDescriptor(method)
  254. t := m.GetOutputType()
  255. if _, ok := WRAPPER_TYPES[t.GetFullyQualifiedName()]; ok {
  256. return decodeField(string(returnVal), t.FindFieldByNumber(1), common.CONVERT_ALL)
  257. } else {
  258. return nil, fmt.Errorf("fail to convert return val to text, return type must be primitive type but got %s", t.GetName())
  259. }
  260. }
  261. func (d *wrappedProtoDescriptor) MethodDescriptor(name string) *desc.MethodDescriptor {
  262. var m *desc.MethodDescriptor
  263. for _, s := range d.GetServices() {
  264. m = s.FindMethodByName(name)
  265. if m != nil {
  266. break
  267. }
  268. }
  269. return m
  270. }
  271. func (d *wrappedProtoDescriptor) unfoldMap(ft *desc.MessageDescriptor, i interface{}) ([]interface{}, error) {
  272. fields := ft.GetFields()
  273. result := make([]interface{}, len(fields))
  274. if m, ok := xsql.ToMessage(i); ok {
  275. for _, field := range fields {
  276. v, ok := m.Value(field.GetName())
  277. if !ok {
  278. return nil, fmt.Errorf("field %s not found", field.GetName())
  279. }
  280. fv, err := d.encodeField(field, v)
  281. if err != nil {
  282. return nil, err
  283. }
  284. result[field.GetNumber()-1] = fv
  285. }
  286. } else {
  287. return nil, fmt.Errorf("not a map")
  288. }
  289. return result, nil
  290. }
  291. func (d *wrappedProtoDescriptor) encodeMap(im *desc.MessageDescriptor, i interface{}) (*dynamic.Message, error) {
  292. result := d.mf.NewDynamicMessage(im)
  293. fields := im.GetFields()
  294. if m, ok := i.(map[string]interface{}); ok {
  295. for _, field := range fields {
  296. v, ok := m[field.GetName()]
  297. if !ok {
  298. return nil, fmt.Errorf("field %s not found", field.GetName())
  299. }
  300. fv, err := d.encodeField(field, v)
  301. if err != nil {
  302. return nil, err
  303. }
  304. result.SetFieldByName(field.GetName(), fv)
  305. }
  306. }
  307. return result, nil
  308. }
  309. func (d *wrappedProtoDescriptor) encodeField(field *desc.FieldDescriptor, v interface{}) (interface{}, error) {
  310. fn := field.GetName()
  311. ft := field.GetType()
  312. if field.IsRepeated() {
  313. var (
  314. result interface{}
  315. err error
  316. )
  317. switch ft {
  318. case dpb.FieldDescriptorProto_TYPE_DOUBLE:
  319. result, err = common.ToFloat64Slice(v, common.STRICT)
  320. case dpb.FieldDescriptorProto_TYPE_FLOAT:
  321. result, err = common.ToTypedSlice(v, func(input interface{}, sn common.Strictness) (interface{}, error) {
  322. r, err := common.ToFloat64(input, sn)
  323. if err != nil {
  324. return 0, nil
  325. } else {
  326. return float32(r), nil
  327. }
  328. }, "float", common.STRICT)
  329. case dpb.FieldDescriptorProto_TYPE_INT32, dpb.FieldDescriptorProto_TYPE_SFIXED32, dpb.FieldDescriptorProto_TYPE_SINT32:
  330. result, err = common.ToTypedSlice(v, func(input interface{}, sn common.Strictness) (interface{}, error) {
  331. r, err := common.ToInt(input, sn)
  332. if err != nil {
  333. return 0, nil
  334. } else {
  335. return int32(r), nil
  336. }
  337. }, "int", common.STRICT)
  338. case dpb.FieldDescriptorProto_TYPE_INT64, dpb.FieldDescriptorProto_TYPE_SFIXED64, dpb.FieldDescriptorProto_TYPE_SINT64:
  339. result, err = common.ToInt64Slice(v, common.STRICT)
  340. case dpb.FieldDescriptorProto_TYPE_FIXED32, dpb.FieldDescriptorProto_TYPE_UINT32:
  341. result, err = common.ToTypedSlice(v, func(input interface{}, sn common.Strictness) (interface{}, error) {
  342. r, err := common.ToUint64(input, sn)
  343. if err != nil {
  344. return 0, nil
  345. } else {
  346. return uint32(r), nil
  347. }
  348. }, "uint", common.STRICT)
  349. case dpb.FieldDescriptorProto_TYPE_FIXED64, dpb.FieldDescriptorProto_TYPE_UINT64:
  350. result, err = common.ToUint64Slice(v, common.STRICT)
  351. case dpb.FieldDescriptorProto_TYPE_BOOL:
  352. result, err = common.ToBoolSlice(v, common.STRICT)
  353. case dpb.FieldDescriptorProto_TYPE_STRING:
  354. result, err = common.ToStringSlice(v, common.STRICT)
  355. case dpb.FieldDescriptorProto_TYPE_BYTES:
  356. result, err = common.ToBytesSlice(v, common.STRICT)
  357. case dpb.FieldDescriptorProto_TYPE_MESSAGE:
  358. result, err = common.ToTypedSlice(v, func(input interface{}, sn common.Strictness) (interface{}, error) {
  359. r, err := common.ToStringMap(v)
  360. if err == nil {
  361. return d.encodeMap(field.GetMessageType(), r)
  362. } else {
  363. return nil, fmt.Errorf("invalid type for map type field '%s': %v", fn, err)
  364. }
  365. }, "map", common.STRICT)
  366. default:
  367. return nil, fmt.Errorf("invalid type for field '%s'", fn)
  368. }
  369. if err != nil {
  370. err = fmt.Errorf("failed to encode field '%s':%v", fn, err)
  371. }
  372. return result, err
  373. } else {
  374. return d.encodeSingleField(field, v)
  375. }
  376. }
  377. func (d *wrappedProtoDescriptor) encodeSingleField(field *desc.FieldDescriptor, v interface{}) (interface{}, error) {
  378. fn := field.GetName()
  379. switch field.GetType() {
  380. case dpb.FieldDescriptorProto_TYPE_DOUBLE:
  381. r, err := common.ToFloat64(v, common.STRICT)
  382. if err == nil {
  383. return r, nil
  384. } else {
  385. return nil, fmt.Errorf("invalid type for float type field '%s': %v", fn, err)
  386. }
  387. case dpb.FieldDescriptorProto_TYPE_FLOAT:
  388. r, err := common.ToFloat64(v, common.STRICT)
  389. if err == nil {
  390. return float32(r), nil
  391. } else {
  392. return nil, fmt.Errorf("invalid type for float type field '%s': %v", fn, err)
  393. }
  394. case dpb.FieldDescriptorProto_TYPE_INT32, dpb.FieldDescriptorProto_TYPE_SFIXED32, dpb.FieldDescriptorProto_TYPE_SINT32:
  395. r, err := common.ToInt(v, common.STRICT)
  396. if err == nil {
  397. return int32(r), nil
  398. } else {
  399. return nil, fmt.Errorf("invalid type for int type field '%s': %v", fn, err)
  400. }
  401. case dpb.FieldDescriptorProto_TYPE_INT64, dpb.FieldDescriptorProto_TYPE_SFIXED64, dpb.FieldDescriptorProto_TYPE_SINT64:
  402. r, err := common.ToInt64(v, common.STRICT)
  403. if err == nil {
  404. return r, nil
  405. } else {
  406. return nil, fmt.Errorf("invalid type for int type field '%s': %v", fn, err)
  407. }
  408. case dpb.FieldDescriptorProto_TYPE_FIXED32, dpb.FieldDescriptorProto_TYPE_UINT32:
  409. r, err := common.ToUint64(v, common.STRICT)
  410. if err == nil {
  411. return uint32(r), nil
  412. } else {
  413. return nil, fmt.Errorf("invalid type for uint type field '%s': %v", fn, err)
  414. }
  415. case dpb.FieldDescriptorProto_TYPE_FIXED64, dpb.FieldDescriptorProto_TYPE_UINT64:
  416. r, err := common.ToUint64(v, common.STRICT)
  417. if err == nil {
  418. return r, nil
  419. } else {
  420. return nil, fmt.Errorf("invalid type for uint type field '%s': %v", fn, err)
  421. }
  422. case dpb.FieldDescriptorProto_TYPE_BOOL:
  423. r, err := common.ToBool(v, common.STRICT)
  424. if err == nil {
  425. return r, nil
  426. } else {
  427. return nil, fmt.Errorf("invalid type for bool type field '%s': %v", fn, err)
  428. }
  429. case dpb.FieldDescriptorProto_TYPE_STRING:
  430. r, err := common.ToString(v, common.STRICT)
  431. if err == nil {
  432. return r, nil
  433. } else {
  434. return nil, fmt.Errorf("invalid type for string type field '%s': %v", fn, err)
  435. }
  436. case dpb.FieldDescriptorProto_TYPE_BYTES:
  437. r, err := common.ToBytes(v, common.STRICT)
  438. if err == nil {
  439. return r, nil
  440. } else {
  441. return nil, fmt.Errorf("invalid type for bytes type field '%s': %v", fn, err)
  442. }
  443. case dpb.FieldDescriptorProto_TYPE_MESSAGE:
  444. r, err := common.ToStringMap(v)
  445. if err == nil {
  446. return d.encodeMap(field.GetMessageType(), r)
  447. } else {
  448. return nil, fmt.Errorf("invalid type for map type field '%s': %v", fn, err)
  449. }
  450. default:
  451. return nil, fmt.Errorf("invalid type for field '%s'", fn)
  452. }
  453. }
  454. func decodeMessage(message *dynamic.Message, outputType *desc.MessageDescriptor) interface{} {
  455. if _, ok := WRAPPER_TYPES[outputType.GetFullyQualifiedName()]; ok {
  456. return message.GetFieldByNumber(1)
  457. } else if wrapperVoid == outputType.GetFullyQualifiedName() {
  458. return nil
  459. }
  460. result := make(map[string]interface{})
  461. for _, field := range outputType.GetFields() {
  462. decodeMessageField(message.GetField(field), field, result, common.STRICT)
  463. }
  464. return result
  465. }
  466. func decodeMessageField(src interface{}, field *desc.FieldDescriptor, result map[string]interface{}, sn common.Strictness) error {
  467. if f, err := decodeField(src, field, sn); err != nil {
  468. return err
  469. } else {
  470. result[field.GetName()] = f
  471. return nil
  472. }
  473. }
  474. func decodeField(src interface{}, field *desc.FieldDescriptor, sn common.Strictness) (interface{}, error) {
  475. var (
  476. r interface{}
  477. e error
  478. )
  479. fn := field.GetName()
  480. switch field.GetType() {
  481. case dpb.FieldDescriptorProto_TYPE_DOUBLE, dpb.FieldDescriptorProto_TYPE_FLOAT:
  482. if field.IsRepeated() {
  483. r, e = common.ToFloat64Slice(src, sn)
  484. } else {
  485. r, e = common.ToFloat64(src, sn)
  486. }
  487. case dpb.FieldDescriptorProto_TYPE_INT32, dpb.FieldDescriptorProto_TYPE_SFIXED32, dpb.FieldDescriptorProto_TYPE_SINT32, dpb.FieldDescriptorProto_TYPE_INT64, dpb.FieldDescriptorProto_TYPE_SFIXED64, dpb.FieldDescriptorProto_TYPE_SINT64, dpb.FieldDescriptorProto_TYPE_FIXED32, dpb.FieldDescriptorProto_TYPE_UINT32, dpb.FieldDescriptorProto_TYPE_FIXED64, dpb.FieldDescriptorProto_TYPE_UINT64:
  488. if field.IsRepeated() {
  489. r, e = common.ToInt64Slice(src, sn)
  490. } else {
  491. r, e = common.ToInt64(src, sn)
  492. }
  493. case dpb.FieldDescriptorProto_TYPE_BOOL:
  494. if field.IsRepeated() {
  495. r, e = common.ToBoolSlice(src, sn)
  496. } else {
  497. r, e = common.ToBool(src, sn)
  498. }
  499. case dpb.FieldDescriptorProto_TYPE_STRING:
  500. if field.IsRepeated() {
  501. r, e = common.ToStringSlice(src, sn)
  502. } else {
  503. r, e = common.ToString(src, sn)
  504. }
  505. case dpb.FieldDescriptorProto_TYPE_BYTES:
  506. if field.IsRepeated() {
  507. r, e = common.ToBytesSlice(src, sn)
  508. } else {
  509. r, e = common.ToBytes(src, sn)
  510. }
  511. case dpb.FieldDescriptorProto_TYPE_MESSAGE:
  512. if field.IsRepeated() {
  513. r, e = common.ToTypedSlice(src, func(input interface{}, ssn common.Strictness) (interface{}, error) {
  514. return decodeSubMessage(input, field.GetMessageType(), ssn)
  515. }, "map", sn)
  516. } else {
  517. r, e = decodeSubMessage(src, field.GetMessageType(), sn)
  518. }
  519. default:
  520. return nil, fmt.Errorf("unsupported type for %s", fn)
  521. }
  522. if e != nil {
  523. e = fmt.Errorf("invalid type of return value for '%s': %v", fn, e)
  524. }
  525. return r, e
  526. }
  527. func decodeMap(src map[string]interface{}, ft *desc.MessageDescriptor, sn common.Strictness) (map[string]interface{}, error) {
  528. result := make(map[string]interface{})
  529. for _, field := range ft.GetFields() {
  530. val, ok := src[field.GetName()]
  531. if !ok {
  532. continue
  533. }
  534. err := decodeMessageField(val, field, result, sn)
  535. if err != nil {
  536. return nil, err
  537. }
  538. }
  539. return result, nil
  540. }
  541. func decodeSubMessage(input interface{}, ft *desc.MessageDescriptor, sn common.Strictness) (interface{}, error) {
  542. var m = map[string]interface{}{}
  543. switch v := input.(type) {
  544. case map[interface{}]interface{}:
  545. for k, val := range v {
  546. m[common.ToStringAlways(k)] = val
  547. }
  548. return decodeMap(m, ft, sn)
  549. case map[string]interface{}:
  550. return decodeMap(v, ft, sn)
  551. case proto.Message:
  552. message, err := dynamic.AsDynamicMessage(v)
  553. if err != nil {
  554. return nil, err
  555. }
  556. return decodeMessage(message, ft), nil
  557. case *dynamic.Message:
  558. return decodeMessage(v, ft), nil
  559. default:
  560. return nil, fmt.Errorf("cannot decode %[1]T(%[1]v) to map", input)
  561. }
  562. }