schema.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package service
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "sync"
  19. "github.com/jhump/protoreflect/desc"
  20. "github.com/jhump/protoreflect/desc/protoparse"
  21. "github.com/jhump/protoreflect/dynamic"
  22. // introduce annotations
  23. _ "google.golang.org/genproto/googleapis/api/annotations"
  24. kconf "github.com/lf-edge/ekuiper/internal/conf"
  25. "github.com/lf-edge/ekuiper/internal/converter/protobuf"
  26. "github.com/lf-edge/ekuiper/internal/xsql"
  27. "github.com/lf-edge/ekuiper/pkg/cast"
  28. )
  29. type descriptor interface {
  30. GetFunctions() []string
  31. }
  32. type protoDescriptor interface {
  33. ConvertParamsToMessage(method string, params []interface{}) (*dynamic.Message, error)
  34. ConvertReturnMessage(method string, returnVal *dynamic.Message) (interface{}, error)
  35. MethodDescriptor(method string) *desc.MethodDescriptor
  36. MessageFactory() *dynamic.MessageFactory
  37. }
  38. type jsonDescriptor interface {
  39. ConvertParamsToJson(method string, params []interface{}) ([]byte, error)
  40. ConvertReturnJson(method string, returnVal []byte) (interface{}, error)
  41. }
  42. type textDescriptor interface {
  43. ConvertParamsToText(method string, params []interface{}) ([]byte, error)
  44. ConvertReturnText(method string, returnVal []byte) (interface{}, error)
  45. }
  46. type interfaceDescriptor interface {
  47. ConvertParams(method string, params []interface{}) ([]interface{}, error)
  48. ConvertReturn(method string, returnVal interface{}) (interface{}, error)
  49. }
  50. type multiplexDescriptor interface {
  51. jsonDescriptor
  52. textDescriptor
  53. interfaceDescriptor
  54. httpMapping
  55. }
  56. var ( // Do not call these directly, use the get methods
  57. protoParser *protoparse.Parser
  58. // A buffer of descriptor for schemas
  59. reg = &sync.Map{}
  60. )
  61. func ProtoParser() *protoparse.Parser {
  62. once.Do(func() {
  63. dir := "data/services/schemas/"
  64. if kconf.IsTesting {
  65. dir = "service/test/schemas/"
  66. }
  67. schemaDir, _ := kconf.GetLoc(dir)
  68. protoParser = &protoparse.Parser{ImportPaths: []string{schemaDir}}
  69. })
  70. return protoParser
  71. }
  72. func parse(schema schema, file string) (descriptor, error) {
  73. info := &schemaInfo{
  74. SchemaType: schema,
  75. SchemaFile: file,
  76. }
  77. switch schema {
  78. case PROTOBUFF:
  79. if v, ok := reg.Load(info); ok {
  80. return v.(descriptor), nil
  81. }
  82. if fds, err := ProtoParser().ParseFiles(file); err != nil {
  83. return nil, err
  84. } else {
  85. result := &wrappedProtoDescriptor{
  86. FileDescriptor: fds[0],
  87. mf: dynamic.NewMessageFactoryWithDefaults(),
  88. fc: protobuf.GetFieldConverter(),
  89. }
  90. err := result.parseHttpOptions()
  91. if err != nil {
  92. return nil, err
  93. }
  94. reg.Store(info, result)
  95. return result, nil
  96. }
  97. default:
  98. return nil, fmt.Errorf("unsupported schema %s", schema)
  99. }
  100. }
  101. type wrappedProtoDescriptor struct {
  102. *desc.FileDescriptor
  103. methodOptions map[string]*httpOptions
  104. mf *dynamic.MessageFactory
  105. fc *protobuf.FieldConverter
  106. }
  107. // GetFunctions TODO support for duplicate names
  108. func (d *wrappedProtoDescriptor) GetFunctions() (result []string) {
  109. for _, s := range d.GetServices() {
  110. for _, m := range s.GetMethods() {
  111. result = append(result, m.GetName())
  112. }
  113. }
  114. return
  115. }
  116. func (d *wrappedProtoDescriptor) MessageFactory() *dynamic.MessageFactory {
  117. return d.mf
  118. }
  119. // ConvertParams TODO support optional field, support enum type
  120. // Parameter mapping for protobuf
  121. // 1. If param length is 1, it can either a map contains all field or a field only.
  122. // 2. If param length is more then 1, they will map to message fields in the order
  123. func (d *wrappedProtoDescriptor) ConvertParams(method string, params []interface{}) ([]interface{}, error) {
  124. m := d.MethodDescriptor(method)
  125. if m == nil {
  126. return nil, fmt.Errorf("can't find method %s in proto", method)
  127. }
  128. im := m.GetInputType()
  129. return d.convertParams(im, params)
  130. }
  131. func (d *wrappedProtoDescriptor) ConvertParamsToMessage(method string, params []interface{}) (*dynamic.Message, error) {
  132. m := d.MethodDescriptor(method)
  133. if m == nil {
  134. return nil, fmt.Errorf("can't find method %s in proto", method)
  135. }
  136. im := m.GetInputType()
  137. message := d.mf.NewDynamicMessage(im)
  138. typedParams, err := d.convertParams(im, params)
  139. if err != nil {
  140. return nil, err
  141. }
  142. for i, typeParam := range typedParams {
  143. message.SetFieldByNumber(i+1, typeParam)
  144. }
  145. return message, nil
  146. }
  147. func (d *wrappedProtoDescriptor) ConvertParamsToJson(method string, params []interface{}) ([]byte, error) {
  148. // Deal with encoded json string. Just return the string
  149. if len(params) == 1 {
  150. m := d.MethodDescriptor(method)
  151. if m == nil {
  152. return nil, fmt.Errorf("can't find method %s in proto", method)
  153. }
  154. im := m.GetInputType()
  155. if im.GetFullyQualifiedName() == protobuf.WrapperString {
  156. ss, err := cast.ToString(params[0], cast.STRICT)
  157. if err != nil {
  158. return nil, err
  159. }
  160. return []byte(ss), nil
  161. }
  162. }
  163. if message, err := d.ConvertParamsToMessage(method, params); err != nil {
  164. return nil, err
  165. } else {
  166. return message.MarshalJSON()
  167. }
  168. }
  169. func (d *wrappedProtoDescriptor) ConvertParamsToText(method string, params []interface{}) ([]byte, error) {
  170. if message, err := d.ConvertParamsToMessage(method, params); err != nil {
  171. return nil, err
  172. } else {
  173. return message.MarshalText()
  174. }
  175. }
  176. func (d *wrappedProtoDescriptor) convertParams(im *desc.MessageDescriptor, params []interface{}) ([]interface{}, error) {
  177. fields := im.GetFields()
  178. var result []interface{}
  179. switch len(params) {
  180. case 0:
  181. if len(fields) == 0 {
  182. return result, nil
  183. } else {
  184. return nil, fmt.Errorf("require %d parameters but none", len(fields))
  185. }
  186. case 1:
  187. // If it is map, try unfold it
  188. // TODO custom error for non map or map name not match
  189. if r, err := d.unfoldMap(im, params[0]); err != nil {
  190. kconf.Log.Debugf("try unfold param for message %s fail: %v", im.GetName(), err)
  191. } else {
  192. return r, nil
  193. }
  194. // For non map params, treat it as special case of multiple params
  195. if len(fields) == 1 {
  196. param0, err := d.fc.EncodeField(fields[0], params[0])
  197. if err != nil {
  198. return nil, err
  199. }
  200. return append(result, param0), nil
  201. } else {
  202. return nil, fmt.Errorf("require %d parameters but only got 1", len(fields))
  203. }
  204. default:
  205. if len(fields) == len(params) {
  206. for i, field := range fields {
  207. param, err := d.fc.EncodeField(field, params[i])
  208. if err != nil {
  209. return nil, err
  210. }
  211. result = append(result, param)
  212. }
  213. return result, nil
  214. } else {
  215. return nil, fmt.Errorf("require %d parameters but only got %d", len(fields), len(params))
  216. }
  217. }
  218. }
  219. func (d *wrappedProtoDescriptor) ConvertReturn(method string, returnVal interface{}) (interface{}, error) {
  220. m := d.MethodDescriptor(method)
  221. t := m.GetOutputType()
  222. if _, ok := protobuf.WRAPPER_TYPES[t.GetFullyQualifiedName()]; ok {
  223. return d.fc.DecodeField(returnVal, t.FindFieldByNumber(1), cast.STRICT)
  224. } else { // MUST be a map
  225. if retMap, ok := returnVal.(map[string]interface{}); ok {
  226. return d.fc.DecodeMap(retMap, t, cast.CONVERT_SAMEKIND)
  227. } else {
  228. return nil, fmt.Errorf("fail to convert return val, must be a map but got %v", returnVal)
  229. }
  230. }
  231. }
  232. func (d *wrappedProtoDescriptor) ConvertReturnMessage(method string, returnVal *dynamic.Message) (interface{}, error) {
  233. m := d.MethodDescriptor(method)
  234. return d.fc.DecodeMessage(returnVal, m.GetOutputType()), nil
  235. }
  236. func (d *wrappedProtoDescriptor) ConvertReturnJson(method string, returnVal []byte) (interface{}, error) {
  237. r := make(map[string]interface{})
  238. err := json.Unmarshal(returnVal, &r)
  239. if err != nil {
  240. return nil, err
  241. }
  242. m := d.MethodDescriptor(method)
  243. return d.fc.DecodeMap(r, m.GetOutputType(), cast.CONVERT_SAMEKIND)
  244. }
  245. func (d *wrappedProtoDescriptor) ConvertReturnText(method string, returnVal []byte) (interface{}, error) {
  246. m := d.MethodDescriptor(method)
  247. t := m.GetOutputType()
  248. if _, ok := protobuf.WRAPPER_TYPES[t.GetFullyQualifiedName()]; ok {
  249. return d.fc.DecodeField(string(returnVal), t.FindFieldByNumber(1), cast.CONVERT_ALL)
  250. } else {
  251. return nil, fmt.Errorf("fail to convert return val to text, return type must be primitive type but got %s", t.GetName())
  252. }
  253. }
  254. func (d *wrappedProtoDescriptor) MethodDescriptor(name string) *desc.MethodDescriptor {
  255. var m *desc.MethodDescriptor
  256. for _, s := range d.GetServices() {
  257. m = s.FindMethodByName(name)
  258. if m != nil {
  259. break
  260. }
  261. }
  262. return m
  263. }
  264. func (d *wrappedProtoDescriptor) unfoldMap(ft *desc.MessageDescriptor, i interface{}) ([]interface{}, error) {
  265. fields := ft.GetFields()
  266. result := make([]interface{}, len(fields))
  267. if m, ok := xsql.ToMessage(i); ok {
  268. for _, field := range fields {
  269. v, ok := m.Value(field.GetName(), "")
  270. if !ok {
  271. return nil, fmt.Errorf("field %s not found", field.GetName())
  272. }
  273. fv, err := d.fc.EncodeField(field, v)
  274. if err != nil {
  275. return nil, err
  276. }
  277. result[field.GetNumber()-1] = fv
  278. }
  279. } else {
  280. return nil, fmt.Errorf("not a map")
  281. }
  282. return result, nil
  283. }