schema.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package service
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "github.com/golang/protobuf/proto"
  19. dpb "github.com/golang/protobuf/protoc-gen-go/descriptor"
  20. "github.com/jhump/protoreflect/desc"
  21. "github.com/jhump/protoreflect/desc/protoparse"
  22. "github.com/jhump/protoreflect/dynamic"
  23. kconf "github.com/lf-edge/ekuiper/internal/conf"
  24. "github.com/lf-edge/ekuiper/internal/schema/protobuf"
  25. "github.com/lf-edge/ekuiper/internal/xsql"
  26. "github.com/lf-edge/ekuiper/pkg/cast"
  27. _ "google.golang.org/genproto/googleapis/api/annotations"
  28. "sync"
  29. )
  30. const (
  31. wrapperBool = "google.protobuf.BoolValue"
  32. wrapperBytes = "google.protobuf.BytesValue"
  33. wrapperDouble = "google.protobuf.DoubleValue"
  34. wrapperFloat = "google.protobuf.FloatValue"
  35. wrapperInt32 = "google.protobuf.Int32Value"
  36. wrapperInt64 = "google.protobuf.Int64Value"
  37. wrapperString = "google.protobuf.StringValue"
  38. wrapperUInt32 = "google.protobuf.UInt32Value"
  39. wrapperUInt64 = "google.protobuf.UInt64Value"
  40. wrapperVoid = "google.protobuf.EMPTY"
  41. )
  42. var WRAPPER_TYPES = map[string]struct{}{
  43. wrapperBool: {},
  44. wrapperBytes: {},
  45. wrapperDouble: {},
  46. wrapperFloat: {},
  47. wrapperInt32: {},
  48. wrapperInt64: {},
  49. wrapperString: {},
  50. wrapperUInt32: {},
  51. wrapperUInt64: {},
  52. }
  53. type descriptor interface {
  54. GetFunctions() []string
  55. }
  56. type protoDescriptor interface {
  57. ConvertParamsToMessage(method string, params []interface{}) (*dynamic.Message, error)
  58. ConvertReturnMessage(method string, returnVal *dynamic.Message) (interface{}, error)
  59. MethodDescriptor(method string) *desc.MethodDescriptor
  60. MessageFactory() *dynamic.MessageFactory
  61. }
  62. type jsonDescriptor interface {
  63. ConvertParamsToJson(method string, params []interface{}) ([]byte, error)
  64. ConvertReturnJson(method string, returnVal []byte) (interface{}, error)
  65. }
  66. type textDescriptor interface {
  67. ConvertParamsToText(method string, params []interface{}) ([]byte, error)
  68. ConvertReturnText(method string, returnVal []byte) (interface{}, error)
  69. }
  70. type interfaceDescriptor interface {
  71. ConvertParams(method string, params []interface{}) ([]interface{}, error)
  72. ConvertReturn(method string, returnVal interface{}) (interface{}, error)
  73. }
  74. type multiplexDescriptor interface {
  75. jsonDescriptor
  76. textDescriptor
  77. interfaceDescriptor
  78. httpMapping
  79. }
  80. var ( //Do not call these directly, use the get methods
  81. protoParser *protoparse.Parser
  82. // A buffer of descriptor for schemas
  83. reg = &sync.Map{}
  84. )
  85. func ProtoParser() *protoparse.Parser {
  86. once.Do(func() {
  87. dir := "etc/services/schemas/"
  88. if kconf.IsTesting {
  89. dir = "service/test/schemas/"
  90. }
  91. schemaDir, _ := kconf.GetLoc(dir)
  92. protoParser = &protoparse.Parser{ImportPaths: []string{schemaDir}}
  93. })
  94. return protoParser
  95. }
  96. func parse(schema schema, file string) (descriptor, error) {
  97. info := &schemaInfo{
  98. SchemaType: schema,
  99. SchemaFile: file,
  100. }
  101. switch schema {
  102. case PROTOBUFF:
  103. if v, ok := reg.Load(info); ok {
  104. return v.(descriptor), nil
  105. }
  106. if fds, err := ProtoParser().ParseFiles(file); err != nil {
  107. return nil, err
  108. } else {
  109. result := &wrappedProtoDescriptor{
  110. FileDescriptor: fds[0],
  111. mf: dynamic.NewMessageFactoryWithDefaults(),
  112. fc: protobuf.GetFieldConverter(),
  113. }
  114. err := result.parseHttpOptions()
  115. if err != nil {
  116. return nil, err
  117. }
  118. reg.Store(info, result)
  119. return result, nil
  120. }
  121. default:
  122. return nil, fmt.Errorf("unsupported schema %s", schema)
  123. }
  124. }
  125. type wrappedProtoDescriptor struct {
  126. *desc.FileDescriptor
  127. methodOptions map[string]*httpOptions
  128. mf *dynamic.MessageFactory
  129. fc *protobuf.FieldConverter
  130. }
  131. //TODO support for duplicate names
  132. func (d *wrappedProtoDescriptor) GetFunctions() (result []string) {
  133. for _, s := range d.GetServices() {
  134. for _, m := range s.GetMethods() {
  135. result = append(result, m.GetName())
  136. }
  137. }
  138. return
  139. }
  140. func (d *wrappedProtoDescriptor) MessageFactory() *dynamic.MessageFactory {
  141. return d.mf
  142. }
  143. // ConvertParams TODO support optional field, support enum type
  144. // Parameter mapping for protobuf
  145. // 1. If param length is 1, it can either a map contains all field or a field only.
  146. // 2. If param length is more then 1, they will map to message fields in the order
  147. func (d *wrappedProtoDescriptor) ConvertParams(method string, params []interface{}) ([]interface{}, error) {
  148. m := d.MethodDescriptor(method)
  149. if m == nil {
  150. return nil, fmt.Errorf("can't find method %s in proto", method)
  151. }
  152. im := m.GetInputType()
  153. return d.convertParams(im, params)
  154. }
  155. func (d *wrappedProtoDescriptor) ConvertParamsToMessage(method string, params []interface{}) (*dynamic.Message, error) {
  156. m := d.MethodDescriptor(method)
  157. if m == nil {
  158. return nil, fmt.Errorf("can't find method %s in proto", method)
  159. }
  160. im := m.GetInputType()
  161. message := d.mf.NewDynamicMessage(im)
  162. typedParams, err := d.convertParams(im, params)
  163. if err != nil {
  164. return nil, err
  165. }
  166. for i, typeParam := range typedParams {
  167. message.SetFieldByNumber(i+1, typeParam)
  168. }
  169. return message, nil
  170. }
  171. func (d *wrappedProtoDescriptor) ConvertParamsToJson(method string, params []interface{}) ([]byte, error) {
  172. // Deal with encoded json string. Just return the string
  173. if len(params) == 1 {
  174. m := d.MethodDescriptor(method)
  175. if m == nil {
  176. return nil, fmt.Errorf("can't find method %s in proto", method)
  177. }
  178. im := m.GetInputType()
  179. if im.GetFullyQualifiedName() == wrapperString {
  180. ss, err := cast.ToString(params[0], cast.STRICT)
  181. if err != nil {
  182. return nil, err
  183. }
  184. return []byte(ss), nil
  185. }
  186. }
  187. if message, err := d.ConvertParamsToMessage(method, params); err != nil {
  188. return nil, err
  189. } else {
  190. return message.MarshalJSON()
  191. }
  192. }
  193. func (d *wrappedProtoDescriptor) ConvertParamsToText(method string, params []interface{}) ([]byte, error) {
  194. if message, err := d.ConvertParamsToMessage(method, params); err != nil {
  195. return nil, err
  196. } else {
  197. return message.MarshalText()
  198. }
  199. }
  200. func (d *wrappedProtoDescriptor) convertParams(im *desc.MessageDescriptor, params []interface{}) ([]interface{}, error) {
  201. fields := im.GetFields()
  202. var result []interface{}
  203. switch len(params) {
  204. case 0:
  205. if len(fields) == 0 {
  206. return result, nil
  207. } else {
  208. return nil, fmt.Errorf("require %d parameters but none", len(fields))
  209. }
  210. case 1:
  211. // If it is map, try unfold it
  212. // TODO custom error for non map or map name not match
  213. if r, err := d.unfoldMap(im, params[0]); err != nil {
  214. kconf.Log.Debugf("try unfold param for message %s fail: %v", im.GetName(), err)
  215. } else {
  216. return r, nil
  217. }
  218. // For non map params, treat it as special case of multiple params
  219. if len(fields) == 1 {
  220. param0, err := d.fc.EncodeField(fields[0], params[0])
  221. if err != nil {
  222. return nil, err
  223. }
  224. return append(result, param0), nil
  225. } else {
  226. return nil, fmt.Errorf("require %d parameters but only got 1", len(fields))
  227. }
  228. default:
  229. if len(fields) == len(params) {
  230. for i, field := range fields {
  231. param, err := d.fc.EncodeField(field, params[i])
  232. if err != nil {
  233. return nil, err
  234. }
  235. result = append(result, param)
  236. }
  237. return result, nil
  238. } else {
  239. return nil, fmt.Errorf("require %d parameters but only got %d", len(fields), len(params))
  240. }
  241. }
  242. }
  243. func (d *wrappedProtoDescriptor) ConvertReturn(method string, returnVal interface{}) (interface{}, error) {
  244. m := d.MethodDescriptor(method)
  245. t := m.GetOutputType()
  246. if _, ok := WRAPPER_TYPES[t.GetFullyQualifiedName()]; ok {
  247. return decodeField(returnVal, t.FindFieldByNumber(1), cast.STRICT)
  248. } else { // MUST be a map
  249. if retMap, ok := returnVal.(map[string]interface{}); ok {
  250. return decodeMap(retMap, t, cast.CONVERT_SAMEKIND)
  251. } else {
  252. return nil, fmt.Errorf("fail to convert return val, must be a map but got %v", returnVal)
  253. }
  254. }
  255. }
  256. func (d *wrappedProtoDescriptor) ConvertReturnMessage(method string, returnVal *dynamic.Message) (interface{}, error) {
  257. m := d.MethodDescriptor(method)
  258. return decodeMessage(returnVal, m.GetOutputType()), nil
  259. }
  260. func (d *wrappedProtoDescriptor) ConvertReturnJson(method string, returnVal []byte) (interface{}, error) {
  261. r := make(map[string]interface{})
  262. err := json.Unmarshal(returnVal, &r)
  263. if err != nil {
  264. return nil, err
  265. }
  266. m := d.MethodDescriptor(method)
  267. return decodeMap(r, m.GetOutputType(), cast.CONVERT_SAMEKIND)
  268. }
  269. func (d *wrappedProtoDescriptor) ConvertReturnText(method string, returnVal []byte) (interface{}, error) {
  270. m := d.MethodDescriptor(method)
  271. t := m.GetOutputType()
  272. if _, ok := WRAPPER_TYPES[t.GetFullyQualifiedName()]; ok {
  273. return decodeField(string(returnVal), t.FindFieldByNumber(1), cast.CONVERT_ALL)
  274. } else {
  275. return nil, fmt.Errorf("fail to convert return val to text, return type must be primitive type but got %s", t.GetName())
  276. }
  277. }
  278. func (d *wrappedProtoDescriptor) MethodDescriptor(name string) *desc.MethodDescriptor {
  279. var m *desc.MethodDescriptor
  280. for _, s := range d.GetServices() {
  281. m = s.FindMethodByName(name)
  282. if m != nil {
  283. break
  284. }
  285. }
  286. return m
  287. }
  288. func (d *wrappedProtoDescriptor) unfoldMap(ft *desc.MessageDescriptor, i interface{}) ([]interface{}, error) {
  289. fields := ft.GetFields()
  290. result := make([]interface{}, len(fields))
  291. if m, ok := xsql.ToMessage(i); ok {
  292. for _, field := range fields {
  293. v, ok := m.Value(field.GetName(), "")
  294. if !ok {
  295. return nil, fmt.Errorf("field %s not found", field.GetName())
  296. }
  297. fv, err := d.fc.EncodeField(field, v)
  298. if err != nil {
  299. return nil, err
  300. }
  301. result[field.GetNumber()-1] = fv
  302. }
  303. } else {
  304. return nil, fmt.Errorf("not a map")
  305. }
  306. return result, nil
  307. }
  308. func decodeMessage(message *dynamic.Message, outputType *desc.MessageDescriptor) interface{} {
  309. if _, ok := WRAPPER_TYPES[outputType.GetFullyQualifiedName()]; ok {
  310. return message.GetFieldByNumber(1)
  311. } else if wrapperVoid == outputType.GetFullyQualifiedName() {
  312. return nil
  313. }
  314. result := make(map[string]interface{})
  315. for _, field := range outputType.GetFields() {
  316. decodeMessageField(message.GetField(field), field, result, cast.STRICT)
  317. }
  318. return result
  319. }
  320. func decodeMessageField(src interface{}, field *desc.FieldDescriptor, result map[string]interface{}, sn cast.Strictness) error {
  321. if f, err := decodeField(src, field, sn); err != nil {
  322. return err
  323. } else {
  324. result[field.GetName()] = f
  325. return nil
  326. }
  327. }
  328. func decodeField(src interface{}, field *desc.FieldDescriptor, sn cast.Strictness) (interface{}, error) {
  329. var (
  330. r interface{}
  331. e error
  332. )
  333. fn := field.GetName()
  334. switch field.GetType() {
  335. case dpb.FieldDescriptorProto_TYPE_DOUBLE, dpb.FieldDescriptorProto_TYPE_FLOAT:
  336. if field.IsRepeated() {
  337. r, e = cast.ToFloat64Slice(src, sn)
  338. } else {
  339. r, e = cast.ToFloat64(src, sn)
  340. }
  341. case dpb.FieldDescriptorProto_TYPE_INT32, dpb.FieldDescriptorProto_TYPE_SFIXED32, dpb.FieldDescriptorProto_TYPE_SINT32, dpb.FieldDescriptorProto_TYPE_INT64, dpb.FieldDescriptorProto_TYPE_SFIXED64, dpb.FieldDescriptorProto_TYPE_SINT64, dpb.FieldDescriptorProto_TYPE_FIXED32, dpb.FieldDescriptorProto_TYPE_UINT32, dpb.FieldDescriptorProto_TYPE_FIXED64, dpb.FieldDescriptorProto_TYPE_UINT64:
  342. if field.IsRepeated() {
  343. r, e = cast.ToInt64Slice(src, sn)
  344. } else {
  345. r, e = cast.ToInt64(src, sn)
  346. }
  347. case dpb.FieldDescriptorProto_TYPE_BOOL:
  348. if field.IsRepeated() {
  349. r, e = cast.ToBoolSlice(src, sn)
  350. } else {
  351. r, e = cast.ToBool(src, sn)
  352. }
  353. case dpb.FieldDescriptorProto_TYPE_STRING:
  354. if field.IsRepeated() {
  355. r, e = cast.ToStringSlice(src, sn)
  356. } else {
  357. r, e = cast.ToString(src, sn)
  358. }
  359. case dpb.FieldDescriptorProto_TYPE_BYTES:
  360. if field.IsRepeated() {
  361. r, e = cast.ToBytesSlice(src, sn)
  362. } else {
  363. r, e = cast.ToBytes(src, sn)
  364. }
  365. case dpb.FieldDescriptorProto_TYPE_MESSAGE:
  366. if field.IsRepeated() {
  367. r, e = cast.ToTypedSlice(src, func(input interface{}, ssn cast.Strictness) (interface{}, error) {
  368. return decodeSubMessage(input, field.GetMessageType(), ssn)
  369. }, "map", sn)
  370. } else {
  371. r, e = decodeSubMessage(src, field.GetMessageType(), sn)
  372. }
  373. default:
  374. return nil, fmt.Errorf("unsupported type for %s", fn)
  375. }
  376. if e != nil {
  377. e = fmt.Errorf("invalid type of return value for '%s': %v", fn, e)
  378. }
  379. return r, e
  380. }
  381. func decodeMap(src map[string]interface{}, ft *desc.MessageDescriptor, sn cast.Strictness) (map[string]interface{}, error) {
  382. result := make(map[string]interface{})
  383. for _, field := range ft.GetFields() {
  384. val, ok := src[field.GetName()]
  385. if !ok {
  386. continue
  387. }
  388. err := decodeMessageField(val, field, result, sn)
  389. if err != nil {
  390. return nil, err
  391. }
  392. }
  393. return result, nil
  394. }
  395. func decodeSubMessage(input interface{}, ft *desc.MessageDescriptor, sn cast.Strictness) (interface{}, error) {
  396. var m = map[string]interface{}{}
  397. switch v := input.(type) {
  398. case map[interface{}]interface{}:
  399. for k, val := range v {
  400. m[cast.ToStringAlways(k)] = val
  401. }
  402. return decodeMap(m, ft, sn)
  403. case map[string]interface{}:
  404. return decodeMap(v, ft, sn)
  405. case proto.Message:
  406. message, err := dynamic.AsDynamicMessage(v)
  407. if err != nil {
  408. return nil, err
  409. }
  410. return decodeMessage(message, ft), nil
  411. case *dynamic.Message:
  412. return decodeMessage(v, ft), nil
  413. default:
  414. return nil, fmt.Errorf("cannot decode %[1]T(%[1]v) to map", input)
  415. }
  416. }