schema.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591
  1. package services
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/golang/protobuf/proto"
  8. dpb "github.com/golang/protobuf/protoc-gen-go/descriptor"
  9. "github.com/jhump/protoreflect/desc"
  10. "github.com/jhump/protoreflect/desc/protoparse"
  11. "github.com/jhump/protoreflect/dynamic"
  12. "sync"
  13. )
  14. const (
  15. wrapperBool = "google.protobuf.BoolValue"
  16. wrapperBytes = "google.protobuf.BytesValue"
  17. wrapperDouble = "google.protobuf.DoubleValue"
  18. wrapperFloat = "google.protobuf.FloatValue"
  19. wrapperInt32 = "google.protobuf.Int32Value"
  20. wrapperInt64 = "google.protobuf.Int64Value"
  21. wrapperString = "google.protobuf.StringValue"
  22. wrapperUInt32 = "google.protobuf.UInt32Value"
  23. wrapperUInt64 = "google.protobuf.UInt64Value"
  24. wrapperVoid = "google.protobuf.EMPTY"
  25. )
  26. var WRAPPER_TYPES = map[string]struct{}{
  27. wrapperBool: {},
  28. wrapperBytes: {},
  29. wrapperDouble: {},
  30. wrapperFloat: {},
  31. wrapperInt32: {},
  32. wrapperInt64: {},
  33. wrapperString: {},
  34. wrapperUInt32: {},
  35. wrapperUInt64: {},
  36. }
  37. type descriptor interface {
  38. GetFunctions() []string
  39. }
  40. type protoDescriptor interface {
  41. ConvertParamsToMessage(method string, params []interface{}) (*dynamic.Message, error)
  42. ConvertReturnMessage(method string, returnVal *dynamic.Message) (interface{}, error)
  43. MethodDescriptor(method string) *desc.MethodDescriptor
  44. MessageFactory() *dynamic.MessageFactory
  45. }
  46. type jsonDescriptor interface {
  47. ConvertParamsToJson(method string, params []interface{}) ([]byte, error)
  48. ConvertReturnJson(method string, returnVal []byte) (interface{}, error)
  49. }
  50. type textDescriptor interface {
  51. ConvertParamsToText(method string, params []interface{}) ([]byte, error)
  52. ConvertReturnText(method string, returnVal []byte) (interface{}, error)
  53. }
  54. type interfaceDescriptor interface {
  55. ConvertParams(method string, params []interface{}) ([]interface{}, error)
  56. ConvertReturn(method string, returnVal interface{}) (interface{}, error)
  57. }
  58. type multiplexDescriptor interface {
  59. jsonDescriptor
  60. textDescriptor
  61. interfaceDescriptor
  62. }
  63. var ( //Do not call these directly, use the get methods
  64. protoParser *protoparse.Parser
  65. // A buffer of descriptor for schemas
  66. reg = &sync.Map{}
  67. )
  68. func ProtoParser() *protoparse.Parser {
  69. once.Do(func() {
  70. dir := "/etc/services/schemas/"
  71. if common.IsTesting {
  72. dir = "/services/test/schemas/"
  73. }
  74. schemaDir, _ := common.GetLoc(dir)
  75. protoParser = &protoparse.Parser{ImportPaths: []string{schemaDir}}
  76. })
  77. return protoParser
  78. }
  79. func parse(schema schema, file string) (descriptor, error) {
  80. info := &schemaInfo{
  81. SchemaType: schema,
  82. SchemaFile: file,
  83. }
  84. switch schema {
  85. case PROTOBUFF:
  86. if v, ok := reg.Load(info); ok {
  87. return v.(descriptor), nil
  88. }
  89. if fds, err := ProtoParser().ParseFiles(file); err != nil {
  90. return nil, err
  91. } else {
  92. result := &wrappedProtoDescriptor{
  93. FileDescriptor: fds[0],
  94. methods: make(map[string]*desc.MethodDescriptor),
  95. mf: dynamic.NewMessageFactoryWithDefaults(),
  96. }
  97. reg.Store(info, result)
  98. return result, nil
  99. }
  100. default:
  101. return nil, fmt.Errorf("unsupported schema %s", schema)
  102. }
  103. }
  104. type wrappedProtoDescriptor struct {
  105. *desc.FileDescriptor
  106. methods map[string]*desc.MethodDescriptor
  107. mf *dynamic.MessageFactory
  108. }
  109. //TODO support for duplicate names
  110. func (d *wrappedProtoDescriptor) GetFunctions() (result []string) {
  111. for _, s := range d.GetServices() {
  112. for _, m := range s.GetMethods() {
  113. result = append(result, m.GetName())
  114. }
  115. }
  116. return
  117. }
  118. func (d *wrappedProtoDescriptor) MessageFactory() *dynamic.MessageFactory {
  119. return d.mf
  120. }
  121. // ConvertParams TODO support optional field, support enum type
  122. // Parameter mapping for protobuf
  123. // 1. If param length is 1, it can either a map contains all field or a field only.
  124. // 2. If param length is more then 1, they will map to message fields in the order
  125. func (d *wrappedProtoDescriptor) ConvertParams(method string, params []interface{}) ([]interface{}, error) {
  126. m := d.MethodDescriptor(method)
  127. if m == nil {
  128. return nil, fmt.Errorf("can't find method %s in proto", method)
  129. }
  130. im := m.GetInputType()
  131. return convertParams(im, params)
  132. }
  133. func (d *wrappedProtoDescriptor) ConvertParamsToMessage(method string, params []interface{}) (*dynamic.Message, error) {
  134. m := d.MethodDescriptor(method)
  135. if m == nil {
  136. return nil, fmt.Errorf("can't find method %s in proto", method)
  137. }
  138. im := m.GetInputType()
  139. message := d.mf.NewDynamicMessage(im)
  140. typedParams, err := convertParams(im, params)
  141. if err != nil {
  142. return nil, err
  143. }
  144. for i, typeParam := range typedParams {
  145. message.SetFieldByNumber(i+1, typeParam)
  146. }
  147. return message, nil
  148. }
  149. func (d *wrappedProtoDescriptor) ConvertParamsToJson(method string, params []interface{}) ([]byte, error) {
  150. // Deal with encoded json string. Just return the string
  151. if len(params) == 1 {
  152. m := d.MethodDescriptor(method)
  153. if m == nil {
  154. return nil, fmt.Errorf("can't find method %s in proto", method)
  155. }
  156. im := m.GetInputType()
  157. if im.GetFullyQualifiedName() == wrapperString {
  158. ss, err := common.ToString(params[0], common.STRICT)
  159. if err != nil {
  160. return nil, err
  161. }
  162. return []byte(ss), nil
  163. }
  164. }
  165. if message, err := d.ConvertParamsToMessage(method, params); err != nil {
  166. return nil, err
  167. } else {
  168. return message.MarshalJSON()
  169. }
  170. }
  171. func (d *wrappedProtoDescriptor) ConvertParamsToText(method string, params []interface{}) ([]byte, error) {
  172. if message, err := d.ConvertParamsToMessage(method, params); err != nil {
  173. return nil, err
  174. } else {
  175. return message.MarshalText()
  176. }
  177. }
  178. func convertParams(im *desc.MessageDescriptor, params []interface{}) ([]interface{}, error) {
  179. fields := im.GetFields()
  180. var result []interface{}
  181. switch len(params) {
  182. case 0:
  183. if len(fields) == 0 {
  184. return result, nil
  185. } else {
  186. return nil, fmt.Errorf("require %d parameters but none", len(fields))
  187. }
  188. case 1:
  189. // If it is map, try unfold it
  190. // TODO custom error for non map or map name not match
  191. if r, err := unfoldMap(im, params[0]); err != nil {
  192. common.Log.Debugf("try unfold param for message %s fail: %v", im.GetName(), err)
  193. } else {
  194. return r, nil
  195. }
  196. // For non map params, treat it as special case of multiple params
  197. if len(fields) == 1 {
  198. param0, err := encodeField(fields[0], params[0])
  199. if err != nil {
  200. return nil, err
  201. }
  202. return append(result, param0), nil
  203. } else {
  204. return nil, fmt.Errorf("require %d parameters but only got 1", len(fields))
  205. }
  206. default:
  207. if len(fields) == len(params) {
  208. for i, field := range fields {
  209. param, err := encodeField(field, params[i])
  210. if err != nil {
  211. return nil, err
  212. }
  213. result = append(result, param)
  214. }
  215. return result, nil
  216. } else {
  217. return nil, fmt.Errorf("require %d parameters but only got %d", len(fields), len(params))
  218. }
  219. }
  220. }
  221. func (d *wrappedProtoDescriptor) ConvertReturn(method string, returnVal interface{}) (interface{}, error) {
  222. m := d.MethodDescriptor(method)
  223. t := m.GetOutputType()
  224. if _, ok := WRAPPER_TYPES[t.GetFullyQualifiedName()]; ok {
  225. return decodeField(returnVal, t.FindFieldByNumber(1), common.STRICT)
  226. } else { // MUST be a map
  227. if retMap, ok := returnVal.(map[string]interface{}); ok {
  228. return decodeMap(retMap, t, common.CONVERT_SAMEKIND)
  229. } else {
  230. return nil, fmt.Errorf("fail to convert return val, must be a map but got %v", returnVal)
  231. }
  232. }
  233. }
  234. func (d *wrappedProtoDescriptor) ConvertReturnMessage(method string, returnVal *dynamic.Message) (interface{}, error) {
  235. m := d.MethodDescriptor(method)
  236. return decodeMessage(returnVal, m.GetOutputType()), nil
  237. }
  238. func (d *wrappedProtoDescriptor) ConvertReturnJson(method string, returnVal []byte) (interface{}, error) {
  239. r := make(map[string]interface{})
  240. err := json.Unmarshal(returnVal, &r)
  241. if err != nil {
  242. return nil, err
  243. }
  244. m := d.MethodDescriptor(method)
  245. return decodeMap(r, m.GetOutputType(), common.CONVERT_SAMEKIND)
  246. }
  247. func (d *wrappedProtoDescriptor) ConvertReturnText(method string, returnVal []byte) (interface{}, error) {
  248. m := d.MethodDescriptor(method)
  249. t := m.GetOutputType()
  250. if _, ok := WRAPPER_TYPES[t.GetFullyQualifiedName()]; ok {
  251. return decodeField(string(returnVal), t.FindFieldByNumber(1), common.CONVERT_ALL)
  252. } else {
  253. return nil, fmt.Errorf("fail to convert return val to text, return type must be primitive type but got %s", t.GetName())
  254. }
  255. }
  256. func (d *wrappedProtoDescriptor) MethodDescriptor(name string) *desc.MethodDescriptor {
  257. var m *desc.MethodDescriptor
  258. for _, s := range d.GetServices() {
  259. m = s.FindMethodByName(name)
  260. if m != nil {
  261. break
  262. }
  263. }
  264. return m
  265. }
  266. func unfoldMap(ft *desc.MessageDescriptor, i interface{}) ([]interface{}, error) {
  267. fields := ft.GetFields()
  268. result := make([]interface{}, len(fields))
  269. if m, ok := xsql.ToMessage(i); ok {
  270. for _, field := range fields {
  271. v, ok := m.Value(field.GetName())
  272. if !ok {
  273. return nil, fmt.Errorf("field %s not found", field.GetName())
  274. }
  275. fv, err := encodeField(field, v)
  276. if err != nil {
  277. return nil, err
  278. }
  279. result[field.GetNumber()-1] = fv
  280. }
  281. } else {
  282. return nil, fmt.Errorf("not a map")
  283. }
  284. return result, nil
  285. }
  286. func encodeMap(fields []*desc.FieldDescriptor, i interface{}) (map[string]interface{}, error) {
  287. var result map[string]interface{}
  288. if m, ok := i.(map[string]interface{}); ok && len(m) == len(fields) {
  289. for _, field := range fields {
  290. v, ok := m[field.GetName()]
  291. if !ok {
  292. return nil, fmt.Errorf("field %s not found", field.GetName())
  293. }
  294. fv, err := encodeField(field, v)
  295. if err != nil {
  296. return nil, err
  297. }
  298. result[field.GetName()] = fv
  299. }
  300. }
  301. return result, nil
  302. }
  303. func encodeField(field *desc.FieldDescriptor, v interface{}) (interface{}, error) {
  304. fn := field.GetName()
  305. ft := field.GetType()
  306. if field.IsRepeated() {
  307. var (
  308. result interface{}
  309. err error
  310. )
  311. switch ft {
  312. case dpb.FieldDescriptorProto_TYPE_DOUBLE:
  313. result, err = common.ToFloat64Slice(v, common.STRICT)
  314. case dpb.FieldDescriptorProto_TYPE_FLOAT:
  315. result, err = common.ToTypedSlice(v, func(input interface{}, sn common.Strictness) (interface{}, error) {
  316. r, err := common.ToFloat64(input, sn)
  317. if err != nil {
  318. return 0, nil
  319. } else {
  320. return float32(r), nil
  321. }
  322. }, "float", common.STRICT)
  323. case dpb.FieldDescriptorProto_TYPE_INT32, dpb.FieldDescriptorProto_TYPE_SFIXED32, dpb.FieldDescriptorProto_TYPE_SINT32:
  324. result, err = common.ToTypedSlice(v, func(input interface{}, sn common.Strictness) (interface{}, error) {
  325. r, err := common.ToInt(input, sn)
  326. if err != nil {
  327. return 0, nil
  328. } else {
  329. return int32(r), nil
  330. }
  331. }, "int", common.STRICT)
  332. case dpb.FieldDescriptorProto_TYPE_INT64, dpb.FieldDescriptorProto_TYPE_SFIXED64, dpb.FieldDescriptorProto_TYPE_SINT64:
  333. result, err = common.ToInt64Slice(v, common.STRICT)
  334. case dpb.FieldDescriptorProto_TYPE_FIXED32, dpb.FieldDescriptorProto_TYPE_UINT32:
  335. result, err = common.ToTypedSlice(v, func(input interface{}, sn common.Strictness) (interface{}, error) {
  336. r, err := common.ToUint64(input, sn)
  337. if err != nil {
  338. return 0, nil
  339. } else {
  340. return uint32(r), nil
  341. }
  342. }, "uint", common.STRICT)
  343. case dpb.FieldDescriptorProto_TYPE_FIXED64, dpb.FieldDescriptorProto_TYPE_UINT64:
  344. result, err = common.ToUint64Slice(v, common.STRICT)
  345. case dpb.FieldDescriptorProto_TYPE_BOOL:
  346. result, err = common.ToBoolSlice(v, common.STRICT)
  347. case dpb.FieldDescriptorProto_TYPE_STRING:
  348. result, err = common.ToStringSlice(v, common.STRICT)
  349. case dpb.FieldDescriptorProto_TYPE_BYTES:
  350. result, err = common.ToBytesSlice(v, common.STRICT)
  351. case dpb.FieldDescriptorProto_TYPE_MESSAGE:
  352. result, err = common.ToTypedSlice(v, func(input interface{}, sn common.Strictness) (interface{}, error) {
  353. r, err := common.ToStringMap(v)
  354. if err == nil {
  355. return encodeMap(field.GetMessageType().GetFields(), r)
  356. } else {
  357. return nil, fmt.Errorf("invalid type for map type field '%s': %v", fn, err)
  358. }
  359. }, "map", common.STRICT)
  360. default:
  361. return nil, fmt.Errorf("invalid type for field '%s'", fn)
  362. }
  363. if err != nil {
  364. err = fmt.Errorf("failed to encode field '%s':%v", fn, err)
  365. }
  366. return result, err
  367. } else {
  368. return encodeSingleField(field, v)
  369. }
  370. }
  371. func encodeSingleField(field *desc.FieldDescriptor, v interface{}) (interface{}, error) {
  372. fn := field.GetName()
  373. switch field.GetType() {
  374. case dpb.FieldDescriptorProto_TYPE_DOUBLE:
  375. r, err := common.ToFloat64(v, common.STRICT)
  376. if err == nil {
  377. return r, nil
  378. } else {
  379. return nil, fmt.Errorf("invalid type for float type field '%s': %v", fn, err)
  380. }
  381. case dpb.FieldDescriptorProto_TYPE_FLOAT:
  382. r, err := common.ToFloat64(v, common.STRICT)
  383. if err == nil {
  384. return float32(r), nil
  385. } else {
  386. return nil, fmt.Errorf("invalid type for float type field '%s': %v", fn, err)
  387. }
  388. case dpb.FieldDescriptorProto_TYPE_INT32, dpb.FieldDescriptorProto_TYPE_SFIXED32, dpb.FieldDescriptorProto_TYPE_SINT32:
  389. r, err := common.ToInt(v, common.STRICT)
  390. if err == nil {
  391. return int32(r), nil
  392. } else {
  393. return nil, fmt.Errorf("invalid type for int type field '%s': %v", fn, err)
  394. }
  395. case dpb.FieldDescriptorProto_TYPE_INT64, dpb.FieldDescriptorProto_TYPE_SFIXED64, dpb.FieldDescriptorProto_TYPE_SINT64:
  396. r, err := common.ToInt64(v, common.STRICT)
  397. if err == nil {
  398. return r, nil
  399. } else {
  400. return nil, fmt.Errorf("invalid type for int type field '%s': %v", fn, err)
  401. }
  402. case dpb.FieldDescriptorProto_TYPE_FIXED32, dpb.FieldDescriptorProto_TYPE_UINT32:
  403. r, err := common.ToUint64(v, common.STRICT)
  404. if err == nil {
  405. return uint32(r), nil
  406. } else {
  407. return nil, fmt.Errorf("invalid type for uint type field '%s': %v", fn, err)
  408. }
  409. case dpb.FieldDescriptorProto_TYPE_FIXED64, dpb.FieldDescriptorProto_TYPE_UINT64:
  410. r, err := common.ToUint64(v, common.STRICT)
  411. if err == nil {
  412. return r, nil
  413. } else {
  414. return nil, fmt.Errorf("invalid type for uint type field '%s': %v", fn, err)
  415. }
  416. case dpb.FieldDescriptorProto_TYPE_BOOL:
  417. r, err := common.ToBool(v, common.STRICT)
  418. if err == nil {
  419. return r, nil
  420. } else {
  421. return nil, fmt.Errorf("invalid type for bool type field '%s': %v", fn, err)
  422. }
  423. case dpb.FieldDescriptorProto_TYPE_STRING:
  424. r, err := common.ToString(v, common.STRICT)
  425. if err == nil {
  426. return r, nil
  427. } else {
  428. return nil, fmt.Errorf("invalid type for string type field '%s': %v", fn, err)
  429. }
  430. case dpb.FieldDescriptorProto_TYPE_BYTES:
  431. r, err := common.ToBytes(v, common.STRICT)
  432. if err == nil {
  433. return r, nil
  434. } else {
  435. return nil, fmt.Errorf("invalid type for bytes type field '%s': %v", fn, err)
  436. }
  437. case dpb.FieldDescriptorProto_TYPE_MESSAGE:
  438. r, err := common.ToStringMap(v)
  439. if err == nil {
  440. return encodeMap(field.GetMessageType().GetFields(), r)
  441. } else {
  442. return nil, fmt.Errorf("invalid type for map type field '%s': %v", fn, err)
  443. }
  444. default:
  445. return nil, fmt.Errorf("invalid type for field '%s'", fn)
  446. }
  447. }
  448. func decodeMessage(message *dynamic.Message, outputType *desc.MessageDescriptor) interface{} {
  449. if _, ok := WRAPPER_TYPES[outputType.GetFullyQualifiedName()]; ok {
  450. return message.GetFieldByNumber(1)
  451. } else if wrapperVoid == outputType.GetFullyQualifiedName() {
  452. return nil
  453. }
  454. result := make(map[string]interface{})
  455. for _, field := range outputType.GetFields() {
  456. decodeMessageField(message.GetField(field), field, result, common.STRICT)
  457. }
  458. return result
  459. }
  460. func decodeMessageField(src interface{}, field *desc.FieldDescriptor, result map[string]interface{}, sn common.Strictness) error {
  461. if f, err := decodeField(src, field, sn); err != nil {
  462. return err
  463. } else {
  464. result[field.GetName()] = f
  465. return nil
  466. }
  467. }
  468. func decodeField(src interface{}, field *desc.FieldDescriptor, sn common.Strictness) (interface{}, error) {
  469. var (
  470. r interface{}
  471. e error
  472. )
  473. fn := field.GetName()
  474. switch field.GetType() {
  475. case dpb.FieldDescriptorProto_TYPE_DOUBLE, dpb.FieldDescriptorProto_TYPE_FLOAT:
  476. if field.IsRepeated() {
  477. r, e = common.ToFloat64Slice(src, sn)
  478. } else {
  479. r, e = common.ToFloat64(src, sn)
  480. }
  481. case dpb.FieldDescriptorProto_TYPE_INT32, dpb.FieldDescriptorProto_TYPE_SFIXED32, dpb.FieldDescriptorProto_TYPE_SINT32, dpb.FieldDescriptorProto_TYPE_INT64, dpb.FieldDescriptorProto_TYPE_SFIXED64, dpb.FieldDescriptorProto_TYPE_SINT64, dpb.FieldDescriptorProto_TYPE_FIXED32, dpb.FieldDescriptorProto_TYPE_UINT32, dpb.FieldDescriptorProto_TYPE_FIXED64, dpb.FieldDescriptorProto_TYPE_UINT64:
  482. if field.IsRepeated() {
  483. r, e = common.ToInt64Slice(src, sn)
  484. } else {
  485. r, e = common.ToInt64(src, sn)
  486. }
  487. case dpb.FieldDescriptorProto_TYPE_BOOL:
  488. if field.IsRepeated() {
  489. r, e = common.ToBoolSlice(src, sn)
  490. } else {
  491. r, e = common.ToBool(src, sn)
  492. }
  493. case dpb.FieldDescriptorProto_TYPE_STRING:
  494. if field.IsRepeated() {
  495. r, e = common.ToStringSlice(src, sn)
  496. } else {
  497. r, e = common.ToString(src, sn)
  498. }
  499. case dpb.FieldDescriptorProto_TYPE_BYTES:
  500. if field.IsRepeated() {
  501. r, e = common.ToBytesSlice(src, sn)
  502. } else {
  503. r, e = common.ToBytes(src, sn)
  504. }
  505. case dpb.FieldDescriptorProto_TYPE_MESSAGE:
  506. if field.IsRepeated() {
  507. r, e = common.ToTypedSlice(src, func(input interface{}, ssn common.Strictness) (interface{}, error) {
  508. return decodeSubMessage(input, field.GetMessageType(), ssn)
  509. }, "map", sn)
  510. } else {
  511. r, e = decodeSubMessage(src, field.GetMessageType(), sn)
  512. }
  513. default:
  514. return nil, fmt.Errorf("unsupported type for %s", fn)
  515. }
  516. if e != nil {
  517. e = fmt.Errorf("invalid type of return value for '%s': %v", fn, e)
  518. }
  519. return r, e
  520. }
  521. func decodeMap(src map[string]interface{}, ft *desc.MessageDescriptor, sn common.Strictness) (map[string]interface{}, error) {
  522. result := make(map[string]interface{})
  523. for _, field := range ft.GetFields() {
  524. val, ok := src[field.GetName()]
  525. if !ok {
  526. continue
  527. }
  528. err := decodeMessageField(val, field, result, sn)
  529. if err != nil {
  530. return nil, err
  531. }
  532. }
  533. return result, nil
  534. }
  535. func decodeSubMessage(input interface{}, ft *desc.MessageDescriptor, sn common.Strictness) (interface{}, error) {
  536. var m = map[string]interface{}{}
  537. switch v := input.(type) {
  538. case map[interface{}]interface{}:
  539. for k, val := range v {
  540. m[common.ToStringAlways(k)] = val
  541. }
  542. return decodeMap(m, ft, sn)
  543. case map[string]interface{}:
  544. return decodeMap(v, ft, sn)
  545. case proto.Message:
  546. message, err := dynamic.AsDynamicMessage(v)
  547. if err != nil {
  548. return nil, err
  549. }
  550. return decodeMessage(message, ft), nil
  551. case *dynamic.Message:
  552. return decodeMessage(v, ft), nil
  553. default:
  554. return nil, fmt.Errorf("cannot decode %[1]T(%[1]v) to map", input)
  555. }
  556. }