schema.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564
  1. package services
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/golang/protobuf/proto"
  8. dpb "github.com/golang/protobuf/protoc-gen-go/descriptor"
  9. "github.com/jhump/protoreflect/desc"
  10. "github.com/jhump/protoreflect/desc/protoparse"
  11. "github.com/jhump/protoreflect/dynamic"
  12. "sync"
  13. )
  14. var WRAPPER_TYPES = map[string]struct{}{
  15. "google.protobuf.BoolValue": {},
  16. "google.protobuf.BytesValue": {},
  17. "google.protobuf.DoubleValue": {},
  18. "google.protobuf.FloatValue": {},
  19. "google.protobuf.Int32Value": {},
  20. "google.protobuf.Int64Value": {},
  21. "google.protobuf.StringValue": {},
  22. "google.protobuf.UInt32Value": {},
  23. "google.protobuf.UInt64Value": {},
  24. }
  25. const VOID = "google.protobuf.EMPTY"
  26. type descriptor interface {
  27. GetFunctions() []string
  28. }
  29. type protoDescriptor interface {
  30. ConvertParamsToMessage(method string, params []interface{}) (*dynamic.Message, error)
  31. ConvertReturnMessage(method string, returnVal *dynamic.Message) (interface{}, error)
  32. MethodDescriptor(method string) *desc.MethodDescriptor
  33. MessageFactory() *dynamic.MessageFactory
  34. }
  35. type jsonDescriptor interface {
  36. ConvertParamsToJson(method string, params []interface{}) ([]byte, error)
  37. ConvertReturnJson(method string, returnVal []byte) (interface{}, error)
  38. }
  39. type textDescriptor interface {
  40. ConvertParamsToText(method string, params []interface{}) ([]byte, error)
  41. ConvertReturnText(method string, returnVal []byte) (interface{}, error)
  42. }
  43. type interfaceDescriptor interface {
  44. ConvertParams(method string, params []interface{}) ([]interface{}, error)
  45. ConvertReturn(method string, returnVal interface{}) (interface{}, error)
  46. }
  47. type multiplexDescriptor interface {
  48. jsonDescriptor
  49. textDescriptor
  50. interfaceDescriptor
  51. }
  52. var ( //Do not call these directly, use the get methods
  53. protoParser *protoparse.Parser
  54. // A buffer of descriptor for schemas
  55. reg = &sync.Map{}
  56. )
  57. func ProtoParser() *protoparse.Parser {
  58. once.Do(func() {
  59. dir := "/etc/services/schemas/"
  60. if common.IsTesting {
  61. dir = "/services/test/schemas/"
  62. }
  63. schemaDir, _ := common.GetLoc(dir)
  64. protoParser = &protoparse.Parser{ImportPaths: []string{schemaDir}}
  65. })
  66. return protoParser
  67. }
  68. func parse(schema schema, file string) (descriptor, error) {
  69. info := &schemaInfo{
  70. SchemaType: schema,
  71. SchemaFile: file,
  72. }
  73. switch schema {
  74. case PROTOBUFF:
  75. if v, ok := reg.Load(info); ok {
  76. return v.(descriptor), nil
  77. }
  78. if fds, err := ProtoParser().ParseFiles(file); err != nil {
  79. return nil, err
  80. } else {
  81. result := &wrappedProtoDescriptor{
  82. FileDescriptor: fds[0],
  83. methods: make(map[string]*desc.MethodDescriptor),
  84. mf: dynamic.NewMessageFactoryWithDefaults(),
  85. }
  86. reg.Store(info, result)
  87. return result, nil
  88. }
  89. default:
  90. return nil, fmt.Errorf("unsupported schema %s", schema)
  91. }
  92. }
  93. type wrappedProtoDescriptor struct {
  94. *desc.FileDescriptor
  95. methods map[string]*desc.MethodDescriptor
  96. mf *dynamic.MessageFactory
  97. }
  98. //TODO support for duplicate names
  99. func (d *wrappedProtoDescriptor) GetFunctions() (result []string) {
  100. for _, s := range d.GetServices() {
  101. for _, m := range s.GetMethods() {
  102. result = append(result, m.GetName())
  103. }
  104. }
  105. return
  106. }
  107. func (d *wrappedProtoDescriptor) MessageFactory() *dynamic.MessageFactory {
  108. return d.mf
  109. }
  110. // ConvertParams TODO support optional field, support enum type
  111. // Parameter mapping for protobuf
  112. // 1. If param length is 1, it can either a map contains all field or a field only.
  113. // 2. If param length is more then 1, they will map to message fields in the order
  114. func (d *wrappedProtoDescriptor) ConvertParams(method string, params []interface{}) ([]interface{}, error) {
  115. m := d.MethodDescriptor(method)
  116. if m == nil {
  117. return nil, fmt.Errorf("can't find method %s in proto", method)
  118. }
  119. im := m.GetInputType()
  120. return convertParams(im, params)
  121. }
  122. func (d *wrappedProtoDescriptor) ConvertParamsToMessage(method string, params []interface{}) (*dynamic.Message, error) {
  123. m := d.MethodDescriptor(method)
  124. if m == nil {
  125. return nil, fmt.Errorf("can't find method %s in proto", method)
  126. }
  127. im := m.GetInputType()
  128. message := d.mf.NewDynamicMessage(im)
  129. typedParams, err := convertParams(im, params)
  130. if err != nil {
  131. return nil, err
  132. }
  133. for i, typeParam := range typedParams {
  134. message.SetFieldByNumber(i+1, typeParam)
  135. }
  136. return message, nil
  137. }
  138. func (d *wrappedProtoDescriptor) ConvertParamsToJson(method string, params []interface{}) ([]byte, error) {
  139. if message, err := d.ConvertParamsToMessage(method, params); err != nil {
  140. return nil, err
  141. } else {
  142. return message.MarshalJSON()
  143. }
  144. }
  145. func (d *wrappedProtoDescriptor) ConvertParamsToText(method string, params []interface{}) ([]byte, error) {
  146. if message, err := d.ConvertParamsToMessage(method, params); err != nil {
  147. return nil, err
  148. } else {
  149. return message.MarshalText()
  150. }
  151. }
  152. func convertParams(im *desc.MessageDescriptor, params []interface{}) ([]interface{}, error) {
  153. fields := im.GetFields()
  154. var result []interface{}
  155. switch len(params) {
  156. case 0:
  157. if len(fields) == 0 {
  158. return result, nil
  159. } else {
  160. return nil, fmt.Errorf("require %d parameters but none", len(fields))
  161. }
  162. case 1:
  163. // If it is map, try unfold it
  164. // TODO custom error for non map or map name not match
  165. if r, err := unfoldMap(im, params[0]); err != nil {
  166. common.Log.Debugf("try unfold param for message %s fail: %v", im.GetName(), err)
  167. } else {
  168. return r, nil
  169. }
  170. // For non map params, treat it as special case of multiple params
  171. if len(fields) == 1 {
  172. param0, err := encodeField(fields[0], params[0])
  173. if err != nil {
  174. return nil, err
  175. }
  176. return append(result, param0), nil
  177. } else {
  178. return nil, fmt.Errorf("require %d parameters but only got 1", len(fields))
  179. }
  180. default:
  181. if len(fields) == len(params) {
  182. for i, field := range fields {
  183. param, err := encodeField(field, params[i])
  184. if err != nil {
  185. return nil, err
  186. }
  187. result = append(result, param)
  188. }
  189. return result, nil
  190. } else {
  191. return nil, fmt.Errorf("require %d parameters but only got %d", len(fields), len(params))
  192. }
  193. }
  194. }
  195. func (d *wrappedProtoDescriptor) ConvertReturn(method string, returnVal interface{}) (interface{}, error) {
  196. m := d.MethodDescriptor(method)
  197. t := m.GetOutputType()
  198. if _, ok := WRAPPER_TYPES[t.GetFullyQualifiedName()]; ok {
  199. return decodeField(returnVal, t.FindFieldByNumber(1), common.STRICT)
  200. } else { // MUST be a map
  201. if retMap, ok := returnVal.(map[string]interface{}); ok {
  202. return decodeMap(retMap, t, common.CONVERT_SAMEKIND)
  203. } else {
  204. return nil, fmt.Errorf("fail to convert return val, must be a map but got %v", returnVal)
  205. }
  206. }
  207. }
  208. func (d *wrappedProtoDescriptor) ConvertReturnMessage(method string, returnVal *dynamic.Message) (interface{}, error) {
  209. m := d.MethodDescriptor(method)
  210. return decodeMessage(returnVal, m.GetOutputType()), nil
  211. }
  212. func (d *wrappedProtoDescriptor) ConvertReturnJson(method string, returnVal []byte) (interface{}, error) {
  213. r := make(map[string]interface{})
  214. err := json.Unmarshal(returnVal, &r)
  215. if err != nil {
  216. return nil, err
  217. }
  218. m := d.MethodDescriptor(method)
  219. return decodeMap(r, m.GetOutputType(), common.CONVERT_SAMEKIND)
  220. }
  221. func (d *wrappedProtoDescriptor) ConvertReturnText(method string, returnVal []byte) (interface{}, error) {
  222. m := d.MethodDescriptor(method)
  223. t := m.GetOutputType()
  224. if _, ok := WRAPPER_TYPES[t.GetFullyQualifiedName()]; ok {
  225. return decodeField(string(returnVal), t.FindFieldByNumber(1), common.CONVERT_ALL)
  226. } else {
  227. return nil, fmt.Errorf("fail to convert return val to text, return type must be primitive type but got %s", t.GetName())
  228. }
  229. }
  230. func (d *wrappedProtoDescriptor) MethodDescriptor(name string) *desc.MethodDescriptor {
  231. var m *desc.MethodDescriptor
  232. for _, s := range d.GetServices() {
  233. m = s.FindMethodByName(name)
  234. if m != nil {
  235. break
  236. }
  237. }
  238. return m
  239. }
  240. func unfoldMap(ft *desc.MessageDescriptor, i interface{}) ([]interface{}, error) {
  241. fields := ft.GetFields()
  242. result := make([]interface{}, len(fields))
  243. if m, ok := xsql.ToMessage(i); ok {
  244. for _, field := range fields {
  245. v, ok := m.Value(field.GetName())
  246. if !ok {
  247. return nil, fmt.Errorf("field %s not found", field.GetName())
  248. }
  249. fv, err := encodeField(field, v)
  250. if err != nil {
  251. return nil, err
  252. }
  253. result[field.GetNumber()-1] = fv
  254. }
  255. } else {
  256. return nil, fmt.Errorf("not a map")
  257. }
  258. return result, nil
  259. }
  260. func encodeMap(fields []*desc.FieldDescriptor, i interface{}) (map[string]interface{}, error) {
  261. var result map[string]interface{}
  262. if m, ok := i.(map[string]interface{}); ok && len(m) == len(fields) {
  263. for _, field := range fields {
  264. v, ok := m[field.GetName()]
  265. if !ok {
  266. return nil, fmt.Errorf("field %s not found", field.GetName())
  267. }
  268. fv, err := encodeField(field, v)
  269. if err != nil {
  270. return nil, err
  271. }
  272. result[field.GetName()] = fv
  273. }
  274. }
  275. return result, nil
  276. }
  277. func encodeField(field *desc.FieldDescriptor, v interface{}) (interface{}, error) {
  278. fn := field.GetName()
  279. ft := field.GetType()
  280. if field.IsRepeated() {
  281. var (
  282. result interface{}
  283. err error
  284. )
  285. switch ft {
  286. case dpb.FieldDescriptorProto_TYPE_DOUBLE:
  287. result, err = common.ToFloat64Slice(v, common.STRICT)
  288. case dpb.FieldDescriptorProto_TYPE_FLOAT:
  289. result, err = common.ToTypedSlice(v, func(input interface{}, sn common.Strictness) (interface{}, error) {
  290. r, err := common.ToFloat64(input, sn)
  291. if err != nil {
  292. return 0, nil
  293. } else {
  294. return float32(r), nil
  295. }
  296. }, "float", common.STRICT)
  297. case dpb.FieldDescriptorProto_TYPE_INT32, dpb.FieldDescriptorProto_TYPE_SFIXED32, dpb.FieldDescriptorProto_TYPE_SINT32:
  298. result, err = common.ToTypedSlice(v, func(input interface{}, sn common.Strictness) (interface{}, error) {
  299. r, err := common.ToInt(input, sn)
  300. if err != nil {
  301. return 0, nil
  302. } else {
  303. return int32(r), nil
  304. }
  305. }, "int", common.STRICT)
  306. case dpb.FieldDescriptorProto_TYPE_INT64, dpb.FieldDescriptorProto_TYPE_SFIXED64, dpb.FieldDescriptorProto_TYPE_SINT64:
  307. result, err = common.ToInt64Slice(v, common.STRICT)
  308. case dpb.FieldDescriptorProto_TYPE_FIXED32, dpb.FieldDescriptorProto_TYPE_UINT32:
  309. result, err = common.ToTypedSlice(v, func(input interface{}, sn common.Strictness) (interface{}, error) {
  310. r, err := common.ToUint64(input, sn)
  311. if err != nil {
  312. return 0, nil
  313. } else {
  314. return uint32(r), nil
  315. }
  316. }, "uint", common.STRICT)
  317. case dpb.FieldDescriptorProto_TYPE_FIXED64, dpb.FieldDescriptorProto_TYPE_UINT64:
  318. result, err = common.ToUint64Slice(v, common.STRICT)
  319. case dpb.FieldDescriptorProto_TYPE_BOOL:
  320. result, err = common.ToBoolSlice(v, common.STRICT)
  321. case dpb.FieldDescriptorProto_TYPE_STRING:
  322. result, err = common.ToStringSlice(v, common.STRICT)
  323. case dpb.FieldDescriptorProto_TYPE_BYTES:
  324. result, err = common.ToBytesSlice(v, common.STRICT)
  325. case dpb.FieldDescriptorProto_TYPE_MESSAGE:
  326. result, err = common.ToTypedSlice(v, func(input interface{}, sn common.Strictness) (interface{}, error) {
  327. r, err := common.ToStringMap(v)
  328. if err == nil {
  329. return encodeMap(field.GetMessageType().GetFields(), r)
  330. } else {
  331. return nil, fmt.Errorf("invalid type for map type field '%s': %v", fn, err)
  332. }
  333. }, "map", common.STRICT)
  334. default:
  335. return nil, fmt.Errorf("invalid type for field '%s'", fn)
  336. }
  337. if err != nil {
  338. err = fmt.Errorf("failed to encode field '%s':%v", fn, err)
  339. }
  340. return result, err
  341. } else {
  342. return encodeSingleField(field, v)
  343. }
  344. }
  345. func encodeSingleField(field *desc.FieldDescriptor, v interface{}) (interface{}, error) {
  346. fn := field.GetName()
  347. switch field.GetType() {
  348. case dpb.FieldDescriptorProto_TYPE_DOUBLE:
  349. r, err := common.ToFloat64(v, common.STRICT)
  350. if err == nil {
  351. return r, nil
  352. } else {
  353. return nil, fmt.Errorf("invalid type for float type field '%s': %v", fn, err)
  354. }
  355. case dpb.FieldDescriptorProto_TYPE_FLOAT:
  356. r, err := common.ToFloat64(v, common.STRICT)
  357. if err == nil {
  358. return float32(r), nil
  359. } else {
  360. return nil, fmt.Errorf("invalid type for float type field '%s': %v", fn, err)
  361. }
  362. case dpb.FieldDescriptorProto_TYPE_INT32, dpb.FieldDescriptorProto_TYPE_SFIXED32, dpb.FieldDescriptorProto_TYPE_SINT32:
  363. r, err := common.ToInt(v, common.STRICT)
  364. if err == nil {
  365. return int32(r), nil
  366. } else {
  367. return nil, fmt.Errorf("invalid type for int type field '%s': %v", fn, err)
  368. }
  369. case dpb.FieldDescriptorProto_TYPE_INT64, dpb.FieldDescriptorProto_TYPE_SFIXED64, dpb.FieldDescriptorProto_TYPE_SINT64:
  370. r, err := common.ToInt64(v, common.STRICT)
  371. if err == nil {
  372. return r, nil
  373. } else {
  374. return nil, fmt.Errorf("invalid type for int type field '%s': %v", fn, err)
  375. }
  376. case dpb.FieldDescriptorProto_TYPE_FIXED32, dpb.FieldDescriptorProto_TYPE_UINT32:
  377. r, err := common.ToUint64(v, common.STRICT)
  378. if err == nil {
  379. return uint32(r), nil
  380. } else {
  381. return nil, fmt.Errorf("invalid type for uint type field '%s': %v", fn, err)
  382. }
  383. case dpb.FieldDescriptorProto_TYPE_FIXED64, dpb.FieldDescriptorProto_TYPE_UINT64:
  384. r, err := common.ToUint64(v, common.STRICT)
  385. if err == nil {
  386. return r, nil
  387. } else {
  388. return nil, fmt.Errorf("invalid type for uint type field '%s': %v", fn, err)
  389. }
  390. case dpb.FieldDescriptorProto_TYPE_BOOL:
  391. r, err := common.ToBool(v, common.STRICT)
  392. if err == nil {
  393. return r, nil
  394. } else {
  395. return nil, fmt.Errorf("invalid type for bool type field '%s': %v", fn, err)
  396. }
  397. case dpb.FieldDescriptorProto_TYPE_STRING:
  398. r, err := common.ToString(v, common.STRICT)
  399. if err == nil {
  400. return r, nil
  401. } else {
  402. return nil, fmt.Errorf("invalid type for string type field '%s': %v", fn, err)
  403. }
  404. case dpb.FieldDescriptorProto_TYPE_BYTES:
  405. r, err := common.ToBytes(v, common.STRICT)
  406. if err == nil {
  407. return r, nil
  408. } else {
  409. return nil, fmt.Errorf("invalid type for bytes type field '%s': %v", fn, err)
  410. }
  411. case dpb.FieldDescriptorProto_TYPE_MESSAGE:
  412. r, err := common.ToStringMap(v)
  413. if err == nil {
  414. return encodeMap(field.GetMessageType().GetFields(), r)
  415. } else {
  416. return nil, fmt.Errorf("invalid type for map type field '%s': %v", fn, err)
  417. }
  418. default:
  419. return nil, fmt.Errorf("invalid type for field '%s'", fn)
  420. }
  421. }
  422. func decodeMessage(message *dynamic.Message, outputType *desc.MessageDescriptor) interface{} {
  423. if _, ok := WRAPPER_TYPES[outputType.GetFullyQualifiedName()]; ok {
  424. return message.GetFieldByNumber(1)
  425. } else if VOID == outputType.GetFullyQualifiedName() {
  426. return nil
  427. }
  428. result := make(map[string]interface{})
  429. for _, field := range outputType.GetFields() {
  430. decodeMessageField(message.GetField(field), field, result, common.STRICT)
  431. }
  432. return result
  433. }
  434. func decodeMessageField(src interface{}, field *desc.FieldDescriptor, result map[string]interface{}, sn common.Strictness) error {
  435. if f, err := decodeField(src, field, sn); err != nil {
  436. return err
  437. } else {
  438. result[field.GetName()] = f
  439. return nil
  440. }
  441. }
  442. func decodeField(src interface{}, field *desc.FieldDescriptor, sn common.Strictness) (interface{}, error) {
  443. var (
  444. r interface{}
  445. e error
  446. )
  447. fn := field.GetName()
  448. switch field.GetType() {
  449. case dpb.FieldDescriptorProto_TYPE_DOUBLE, dpb.FieldDescriptorProto_TYPE_FLOAT:
  450. if field.IsRepeated() {
  451. r, e = common.ToFloat64Slice(src, sn)
  452. } else {
  453. r, e = common.ToFloat64(src, sn)
  454. }
  455. case dpb.FieldDescriptorProto_TYPE_INT32, dpb.FieldDescriptorProto_TYPE_SFIXED32, dpb.FieldDescriptorProto_TYPE_SINT32, dpb.FieldDescriptorProto_TYPE_INT64, dpb.FieldDescriptorProto_TYPE_SFIXED64, dpb.FieldDescriptorProto_TYPE_SINT64, dpb.FieldDescriptorProto_TYPE_FIXED32, dpb.FieldDescriptorProto_TYPE_UINT32, dpb.FieldDescriptorProto_TYPE_FIXED64, dpb.FieldDescriptorProto_TYPE_UINT64:
  456. if field.IsRepeated() {
  457. r, e = common.ToInt64Slice(src, sn)
  458. } else {
  459. r, e = common.ToInt64(src, sn)
  460. }
  461. case dpb.FieldDescriptorProto_TYPE_BOOL:
  462. if field.IsRepeated() {
  463. r, e = common.ToBoolSlice(src, sn)
  464. } else {
  465. r, e = common.ToBool(src, sn)
  466. }
  467. case dpb.FieldDescriptorProto_TYPE_STRING:
  468. if field.IsRepeated() {
  469. r, e = common.ToStringSlice(src, sn)
  470. } else {
  471. r, e = common.ToString(src, sn)
  472. }
  473. case dpb.FieldDescriptorProto_TYPE_BYTES:
  474. if field.IsRepeated() {
  475. r, e = common.ToBytesSlice(src, sn)
  476. } else {
  477. r, e = common.ToBytes(src, sn)
  478. }
  479. case dpb.FieldDescriptorProto_TYPE_MESSAGE:
  480. if field.IsRepeated() {
  481. r, e = common.ToTypedSlice(src, func(input interface{}, ssn common.Strictness) (interface{}, error) {
  482. return decodeSubMessage(input, field.GetMessageType(), ssn)
  483. }, "map", sn)
  484. } else {
  485. r, e = decodeSubMessage(src, field.GetMessageType(), sn)
  486. }
  487. default:
  488. return nil, fmt.Errorf("unsupported type for %s", fn)
  489. }
  490. if e != nil {
  491. e = fmt.Errorf("invalid type of return value for '%s': %v", fn, e)
  492. }
  493. return r, e
  494. }
  495. func decodeMap(src map[string]interface{}, ft *desc.MessageDescriptor, sn common.Strictness) (map[string]interface{}, error) {
  496. result := make(map[string]interface{})
  497. for _, field := range ft.GetFields() {
  498. val, ok := src[field.GetName()]
  499. if !ok {
  500. continue
  501. }
  502. err := decodeMessageField(val, field, result, sn)
  503. if err != nil {
  504. return nil, err
  505. }
  506. }
  507. return result, nil
  508. }
  509. func decodeSubMessage(input interface{}, ft *desc.MessageDescriptor, sn common.Strictness) (interface{}, error) {
  510. var m = map[string]interface{}{}
  511. switch v := input.(type) {
  512. case map[interface{}]interface{}:
  513. for k, val := range v {
  514. m[common.ToStringAlways(k)] = val
  515. }
  516. return decodeMap(m, ft, sn)
  517. case map[string]interface{}:
  518. return decodeMap(v, ft, sn)
  519. case proto.Message:
  520. message, err := dynamic.AsDynamicMessage(v)
  521. if err != nil {
  522. return nil, err
  523. }
  524. return decodeMessage(message, ft), nil
  525. case *dynamic.Message:
  526. return decodeMessage(v, ft), nil
  527. default:
  528. return nil, fmt.Errorf("cannot decode %[1]T(%[1]v) to map", input)
  529. }
  530. }