schema.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package service
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "github.com/jhump/protoreflect/desc"
  19. "github.com/jhump/protoreflect/desc/protoparse"
  20. "github.com/jhump/protoreflect/dynamic"
  21. kconf "github.com/lf-edge/ekuiper/internal/conf"
  22. "github.com/lf-edge/ekuiper/internal/converter/protobuf"
  23. "github.com/lf-edge/ekuiper/internal/xsql"
  24. "github.com/lf-edge/ekuiper/pkg/cast"
  25. _ "google.golang.org/genproto/googleapis/api/annotations"
  26. "sync"
  27. )
  28. type descriptor interface {
  29. GetFunctions() []string
  30. }
  31. type protoDescriptor interface {
  32. ConvertParamsToMessage(method string, params []interface{}) (*dynamic.Message, error)
  33. ConvertReturnMessage(method string, returnVal *dynamic.Message) (interface{}, error)
  34. MethodDescriptor(method string) *desc.MethodDescriptor
  35. MessageFactory() *dynamic.MessageFactory
  36. }
  37. type jsonDescriptor interface {
  38. ConvertParamsToJson(method string, params []interface{}) ([]byte, error)
  39. ConvertReturnJson(method string, returnVal []byte) (interface{}, error)
  40. }
  41. type textDescriptor interface {
  42. ConvertParamsToText(method string, params []interface{}) ([]byte, error)
  43. ConvertReturnText(method string, returnVal []byte) (interface{}, error)
  44. }
  45. type interfaceDescriptor interface {
  46. ConvertParams(method string, params []interface{}) ([]interface{}, error)
  47. ConvertReturn(method string, returnVal interface{}) (interface{}, error)
  48. }
  49. type multiplexDescriptor interface {
  50. jsonDescriptor
  51. textDescriptor
  52. interfaceDescriptor
  53. httpMapping
  54. }
  55. var ( //Do not call these directly, use the get methods
  56. protoParser *protoparse.Parser
  57. // A buffer of descriptor for schemas
  58. reg = &sync.Map{}
  59. )
  60. func ProtoParser() *protoparse.Parser {
  61. once.Do(func() {
  62. dir := "data/services/schemas/"
  63. if kconf.IsTesting {
  64. dir = "service/test/schemas/"
  65. }
  66. schemaDir, _ := kconf.GetLoc(dir)
  67. protoParser = &protoparse.Parser{ImportPaths: []string{schemaDir}}
  68. })
  69. return protoParser
  70. }
  71. func parse(schema schema, file string) (descriptor, error) {
  72. info := &schemaInfo{
  73. SchemaType: schema,
  74. SchemaFile: file,
  75. }
  76. switch schema {
  77. case PROTOBUFF:
  78. if v, ok := reg.Load(info); ok {
  79. return v.(descriptor), nil
  80. }
  81. if fds, err := ProtoParser().ParseFiles(file); err != nil {
  82. return nil, err
  83. } else {
  84. result := &wrappedProtoDescriptor{
  85. FileDescriptor: fds[0],
  86. mf: dynamic.NewMessageFactoryWithDefaults(),
  87. fc: protobuf.GetFieldConverter(),
  88. }
  89. err := result.parseHttpOptions()
  90. if err != nil {
  91. return nil, err
  92. }
  93. reg.Store(info, result)
  94. return result, nil
  95. }
  96. default:
  97. return nil, fmt.Errorf("unsupported schema %s", schema)
  98. }
  99. }
  100. type wrappedProtoDescriptor struct {
  101. *desc.FileDescriptor
  102. methodOptions map[string]*httpOptions
  103. mf *dynamic.MessageFactory
  104. fc *protobuf.FieldConverter
  105. }
  106. // GetFunctions TODO support for duplicate names
  107. func (d *wrappedProtoDescriptor) GetFunctions() (result []string) {
  108. for _, s := range d.GetServices() {
  109. for _, m := range s.GetMethods() {
  110. result = append(result, m.GetName())
  111. }
  112. }
  113. return
  114. }
  115. func (d *wrappedProtoDescriptor) MessageFactory() *dynamic.MessageFactory {
  116. return d.mf
  117. }
  118. // ConvertParams TODO support optional field, support enum type
  119. // Parameter mapping for protobuf
  120. // 1. If param length is 1, it can either a map contains all field or a field only.
  121. // 2. If param length is more then 1, they will map to message fields in the order
  122. func (d *wrappedProtoDescriptor) ConvertParams(method string, params []interface{}) ([]interface{}, error) {
  123. m := d.MethodDescriptor(method)
  124. if m == nil {
  125. return nil, fmt.Errorf("can't find method %s in proto", method)
  126. }
  127. im := m.GetInputType()
  128. return d.convertParams(im, params)
  129. }
  130. func (d *wrappedProtoDescriptor) ConvertParamsToMessage(method string, params []interface{}) (*dynamic.Message, error) {
  131. m := d.MethodDescriptor(method)
  132. if m == nil {
  133. return nil, fmt.Errorf("can't find method %s in proto", method)
  134. }
  135. im := m.GetInputType()
  136. message := d.mf.NewDynamicMessage(im)
  137. typedParams, err := d.convertParams(im, params)
  138. if err != nil {
  139. return nil, err
  140. }
  141. for i, typeParam := range typedParams {
  142. message.SetFieldByNumber(i+1, typeParam)
  143. }
  144. return message, nil
  145. }
  146. func (d *wrappedProtoDescriptor) ConvertParamsToJson(method string, params []interface{}) ([]byte, error) {
  147. // Deal with encoded json string. Just return the string
  148. if len(params) == 1 {
  149. m := d.MethodDescriptor(method)
  150. if m == nil {
  151. return nil, fmt.Errorf("can't find method %s in proto", method)
  152. }
  153. im := m.GetInputType()
  154. if im.GetFullyQualifiedName() == protobuf.WrapperString {
  155. ss, err := cast.ToString(params[0], cast.STRICT)
  156. if err != nil {
  157. return nil, err
  158. }
  159. return []byte(ss), nil
  160. }
  161. }
  162. if message, err := d.ConvertParamsToMessage(method, params); err != nil {
  163. return nil, err
  164. } else {
  165. return message.MarshalJSON()
  166. }
  167. }
  168. func (d *wrappedProtoDescriptor) ConvertParamsToText(method string, params []interface{}) ([]byte, error) {
  169. if message, err := d.ConvertParamsToMessage(method, params); err != nil {
  170. return nil, err
  171. } else {
  172. return message.MarshalText()
  173. }
  174. }
  175. func (d *wrappedProtoDescriptor) convertParams(im *desc.MessageDescriptor, params []interface{}) ([]interface{}, error) {
  176. fields := im.GetFields()
  177. var result []interface{}
  178. switch len(params) {
  179. case 0:
  180. if len(fields) == 0 {
  181. return result, nil
  182. } else {
  183. return nil, fmt.Errorf("require %d parameters but none", len(fields))
  184. }
  185. case 1:
  186. // If it is map, try unfold it
  187. // TODO custom error for non map or map name not match
  188. if r, err := d.unfoldMap(im, params[0]); err != nil {
  189. kconf.Log.Debugf("try unfold param for message %s fail: %v", im.GetName(), err)
  190. } else {
  191. return r, nil
  192. }
  193. // For non map params, treat it as special case of multiple params
  194. if len(fields) == 1 {
  195. param0, err := d.fc.EncodeField(fields[0], params[0])
  196. if err != nil {
  197. return nil, err
  198. }
  199. return append(result, param0), nil
  200. } else {
  201. return nil, fmt.Errorf("require %d parameters but only got 1", len(fields))
  202. }
  203. default:
  204. if len(fields) == len(params) {
  205. for i, field := range fields {
  206. param, err := d.fc.EncodeField(field, params[i])
  207. if err != nil {
  208. return nil, err
  209. }
  210. result = append(result, param)
  211. }
  212. return result, nil
  213. } else {
  214. return nil, fmt.Errorf("require %d parameters but only got %d", len(fields), len(params))
  215. }
  216. }
  217. }
  218. func (d *wrappedProtoDescriptor) ConvertReturn(method string, returnVal interface{}) (interface{}, error) {
  219. m := d.MethodDescriptor(method)
  220. t := m.GetOutputType()
  221. if _, ok := protobuf.WRAPPER_TYPES[t.GetFullyQualifiedName()]; ok {
  222. return d.fc.DecodeField(returnVal, t.FindFieldByNumber(1), cast.STRICT)
  223. } else { // MUST be a map
  224. if retMap, ok := returnVal.(map[string]interface{}); ok {
  225. return d.fc.DecodeMap(retMap, t, cast.CONVERT_SAMEKIND)
  226. } else {
  227. return nil, fmt.Errorf("fail to convert return val, must be a map but got %v", returnVal)
  228. }
  229. }
  230. }
  231. func (d *wrappedProtoDescriptor) ConvertReturnMessage(method string, returnVal *dynamic.Message) (interface{}, error) {
  232. m := d.MethodDescriptor(method)
  233. return d.fc.DecodeMessage(returnVal, m.GetOutputType()), nil
  234. }
  235. func (d *wrappedProtoDescriptor) ConvertReturnJson(method string, returnVal []byte) (interface{}, error) {
  236. r := make(map[string]interface{})
  237. err := json.Unmarshal(returnVal, &r)
  238. if err != nil {
  239. return nil, err
  240. }
  241. m := d.MethodDescriptor(method)
  242. return d.fc.DecodeMap(r, m.GetOutputType(), cast.CONVERT_SAMEKIND)
  243. }
  244. func (d *wrappedProtoDescriptor) ConvertReturnText(method string, returnVal []byte) (interface{}, error) {
  245. m := d.MethodDescriptor(method)
  246. t := m.GetOutputType()
  247. if _, ok := protobuf.WRAPPER_TYPES[t.GetFullyQualifiedName()]; ok {
  248. return d.fc.DecodeField(string(returnVal), t.FindFieldByNumber(1), cast.CONVERT_ALL)
  249. } else {
  250. return nil, fmt.Errorf("fail to convert return val to text, return type must be primitive type but got %s", t.GetName())
  251. }
  252. }
  253. func (d *wrappedProtoDescriptor) MethodDescriptor(name string) *desc.MethodDescriptor {
  254. var m *desc.MethodDescriptor
  255. for _, s := range d.GetServices() {
  256. m = s.FindMethodByName(name)
  257. if m != nil {
  258. break
  259. }
  260. }
  261. return m
  262. }
  263. func (d *wrappedProtoDescriptor) unfoldMap(ft *desc.MessageDescriptor, i interface{}) ([]interface{}, error) {
  264. fields := ft.GetFields()
  265. result := make([]interface{}, len(fields))
  266. if m, ok := xsql.ToMessage(i); ok {
  267. for _, field := range fields {
  268. v, ok := m.Value(field.GetName(), "")
  269. if !ok {
  270. return nil, fmt.Errorf("field %s not found", field.GetName())
  271. }
  272. fv, err := d.fc.EncodeField(field, v)
  273. if err != nil {
  274. return nil, err
  275. }
  276. result[field.GetNumber()-1] = fv
  277. }
  278. } else {
  279. return nil, fmt.Errorf("not a map")
  280. }
  281. return result, nil
  282. }