123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612 |
- // Copyright 2021 EMQ Technologies Co., Ltd.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package service
- import (
- "encoding/json"
- "fmt"
- "github.com/golang/protobuf/proto"
- dpb "github.com/golang/protobuf/protoc-gen-go/descriptor"
- "github.com/jhump/protoreflect/desc"
- "github.com/jhump/protoreflect/desc/protoparse"
- "github.com/jhump/protoreflect/dynamic"
- kconf "github.com/lf-edge/ekuiper/internal/conf"
- "github.com/lf-edge/ekuiper/internal/xsql"
- "github.com/lf-edge/ekuiper/pkg/cast"
- _ "google.golang.org/genproto/googleapis/api/annotations"
- "sync"
- )
- const (
- wrapperBool = "google.protobuf.BoolValue"
- wrapperBytes = "google.protobuf.BytesValue"
- wrapperDouble = "google.protobuf.DoubleValue"
- wrapperFloat = "google.protobuf.FloatValue"
- wrapperInt32 = "google.protobuf.Int32Value"
- wrapperInt64 = "google.protobuf.Int64Value"
- wrapperString = "google.protobuf.StringValue"
- wrapperUInt32 = "google.protobuf.UInt32Value"
- wrapperUInt64 = "google.protobuf.UInt64Value"
- wrapperVoid = "google.protobuf.EMPTY"
- )
- var WRAPPER_TYPES = map[string]struct{}{
- wrapperBool: {},
- wrapperBytes: {},
- wrapperDouble: {},
- wrapperFloat: {},
- wrapperInt32: {},
- wrapperInt64: {},
- wrapperString: {},
- wrapperUInt32: {},
- wrapperUInt64: {},
- }
- type descriptor interface {
- GetFunctions() []string
- }
- type protoDescriptor interface {
- ConvertParamsToMessage(method string, params []interface{}) (*dynamic.Message, error)
- ConvertReturnMessage(method string, returnVal *dynamic.Message) (interface{}, error)
- MethodDescriptor(method string) *desc.MethodDescriptor
- MessageFactory() *dynamic.MessageFactory
- }
- type jsonDescriptor interface {
- ConvertParamsToJson(method string, params []interface{}) ([]byte, error)
- ConvertReturnJson(method string, returnVal []byte) (interface{}, error)
- }
- type textDescriptor interface {
- ConvertParamsToText(method string, params []interface{}) ([]byte, error)
- ConvertReturnText(method string, returnVal []byte) (interface{}, error)
- }
- type interfaceDescriptor interface {
- ConvertParams(method string, params []interface{}) ([]interface{}, error)
- ConvertReturn(method string, returnVal interface{}) (interface{}, error)
- }
- type multiplexDescriptor interface {
- jsonDescriptor
- textDescriptor
- interfaceDescriptor
- httpMapping
- }
- var ( //Do not call these directly, use the get methods
- protoParser *protoparse.Parser
- // A buffer of descriptor for schemas
- reg = &sync.Map{}
- )
- func ProtoParser() *protoparse.Parser {
- once.Do(func() {
- dir := "etc/services/schemas/"
- if kconf.IsTesting {
- dir = "service/test/schemas/"
- }
- schemaDir, _ := kconf.GetLoc(dir)
- protoParser = &protoparse.Parser{ImportPaths: []string{schemaDir}}
- })
- return protoParser
- }
- func parse(schema schema, file string) (descriptor, error) {
- info := &schemaInfo{
- SchemaType: schema,
- SchemaFile: file,
- }
- switch schema {
- case PROTOBUFF:
- if v, ok := reg.Load(info); ok {
- return v.(descriptor), nil
- }
- if fds, err := ProtoParser().ParseFiles(file); err != nil {
- return nil, err
- } else {
- result := &wrappedProtoDescriptor{
- FileDescriptor: fds[0],
- mf: dynamic.NewMessageFactoryWithDefaults(),
- }
- err := result.parseHttpOptions()
- if err != nil {
- return nil, err
- }
- reg.Store(info, result)
- return result, nil
- }
- default:
- return nil, fmt.Errorf("unsupported schema %s", schema)
- }
- }
- type wrappedProtoDescriptor struct {
- *desc.FileDescriptor
- methodOptions map[string]*httpOptions
- mf *dynamic.MessageFactory
- }
- //TODO support for duplicate names
- func (d *wrappedProtoDescriptor) GetFunctions() (result []string) {
- for _, s := range d.GetServices() {
- for _, m := range s.GetMethods() {
- result = append(result, m.GetName())
- }
- }
- return
- }
- func (d *wrappedProtoDescriptor) MessageFactory() *dynamic.MessageFactory {
- return d.mf
- }
- // ConvertParams TODO support optional field, support enum type
- // Parameter mapping for protobuf
- // 1. If param length is 1, it can either a map contains all field or a field only.
- // 2. If param length is more then 1, they will map to message fields in the order
- func (d *wrappedProtoDescriptor) ConvertParams(method string, params []interface{}) ([]interface{}, error) {
- m := d.MethodDescriptor(method)
- if m == nil {
- return nil, fmt.Errorf("can't find method %s in proto", method)
- }
- im := m.GetInputType()
- return d.convertParams(im, params)
- }
- func (d *wrappedProtoDescriptor) ConvertParamsToMessage(method string, params []interface{}) (*dynamic.Message, error) {
- m := d.MethodDescriptor(method)
- if m == nil {
- return nil, fmt.Errorf("can't find method %s in proto", method)
- }
- im := m.GetInputType()
- message := d.mf.NewDynamicMessage(im)
- typedParams, err := d.convertParams(im, params)
- if err != nil {
- return nil, err
- }
- for i, typeParam := range typedParams {
- message.SetFieldByNumber(i+1, typeParam)
- }
- return message, nil
- }
- func (d *wrappedProtoDescriptor) ConvertParamsToJson(method string, params []interface{}) ([]byte, error) {
- // Deal with encoded json string. Just return the string
- if len(params) == 1 {
- m := d.MethodDescriptor(method)
- if m == nil {
- return nil, fmt.Errorf("can't find method %s in proto", method)
- }
- im := m.GetInputType()
- if im.GetFullyQualifiedName() == wrapperString {
- ss, err := cast.ToString(params[0], cast.STRICT)
- if err != nil {
- return nil, err
- }
- return []byte(ss), nil
- }
- }
- if message, err := d.ConvertParamsToMessage(method, params); err != nil {
- return nil, err
- } else {
- return message.MarshalJSON()
- }
- }
- func (d *wrappedProtoDescriptor) ConvertParamsToText(method string, params []interface{}) ([]byte, error) {
- if message, err := d.ConvertParamsToMessage(method, params); err != nil {
- return nil, err
- } else {
- return message.MarshalText()
- }
- }
- func (d *wrappedProtoDescriptor) convertParams(im *desc.MessageDescriptor, params []interface{}) ([]interface{}, error) {
- fields := im.GetFields()
- var result []interface{}
- switch len(params) {
- case 0:
- if len(fields) == 0 {
- return result, nil
- } else {
- return nil, fmt.Errorf("require %d parameters but none", len(fields))
- }
- case 1:
- // If it is map, try unfold it
- // TODO custom error for non map or map name not match
- if r, err := d.unfoldMap(im, params[0]); err != nil {
- kconf.Log.Debugf("try unfold param for message %s fail: %v", im.GetName(), err)
- } else {
- return r, nil
- }
- // For non map params, treat it as special case of multiple params
- if len(fields) == 1 {
- param0, err := d.encodeField(fields[0], params[0])
- if err != nil {
- return nil, err
- }
- return append(result, param0), nil
- } else {
- return nil, fmt.Errorf("require %d parameters but only got 1", len(fields))
- }
- default:
- if len(fields) == len(params) {
- for i, field := range fields {
- param, err := d.encodeField(field, params[i])
- if err != nil {
- return nil, err
- }
- result = append(result, param)
- }
- return result, nil
- } else {
- return nil, fmt.Errorf("require %d parameters but only got %d", len(fields), len(params))
- }
- }
- }
- func (d *wrappedProtoDescriptor) ConvertReturn(method string, returnVal interface{}) (interface{}, error) {
- m := d.MethodDescriptor(method)
- t := m.GetOutputType()
- if _, ok := WRAPPER_TYPES[t.GetFullyQualifiedName()]; ok {
- return decodeField(returnVal, t.FindFieldByNumber(1), cast.STRICT)
- } else { // MUST be a map
- if retMap, ok := returnVal.(map[string]interface{}); ok {
- return decodeMap(retMap, t, cast.CONVERT_SAMEKIND)
- } else {
- return nil, fmt.Errorf("fail to convert return val, must be a map but got %v", returnVal)
- }
- }
- }
- func (d *wrappedProtoDescriptor) ConvertReturnMessage(method string, returnVal *dynamic.Message) (interface{}, error) {
- m := d.MethodDescriptor(method)
- return decodeMessage(returnVal, m.GetOutputType()), nil
- }
- func (d *wrappedProtoDescriptor) ConvertReturnJson(method string, returnVal []byte) (interface{}, error) {
- r := make(map[string]interface{})
- err := json.Unmarshal(returnVal, &r)
- if err != nil {
- return nil, err
- }
- m := d.MethodDescriptor(method)
- return decodeMap(r, m.GetOutputType(), cast.CONVERT_SAMEKIND)
- }
- func (d *wrappedProtoDescriptor) ConvertReturnText(method string, returnVal []byte) (interface{}, error) {
- m := d.MethodDescriptor(method)
- t := m.GetOutputType()
- if _, ok := WRAPPER_TYPES[t.GetFullyQualifiedName()]; ok {
- return decodeField(string(returnVal), t.FindFieldByNumber(1), cast.CONVERT_ALL)
- } else {
- return nil, fmt.Errorf("fail to convert return val to text, return type must be primitive type but got %s", t.GetName())
- }
- }
- func (d *wrappedProtoDescriptor) MethodDescriptor(name string) *desc.MethodDescriptor {
- var m *desc.MethodDescriptor
- for _, s := range d.GetServices() {
- m = s.FindMethodByName(name)
- if m != nil {
- break
- }
- }
- return m
- }
- func (d *wrappedProtoDescriptor) unfoldMap(ft *desc.MessageDescriptor, i interface{}) ([]interface{}, error) {
- fields := ft.GetFields()
- result := make([]interface{}, len(fields))
- if m, ok := xsql.ToMessage(i); ok {
- for _, field := range fields {
- v, ok := m.Value(field.GetName())
- if !ok {
- return nil, fmt.Errorf("field %s not found", field.GetName())
- }
- fv, err := d.encodeField(field, v)
- if err != nil {
- return nil, err
- }
- result[field.GetNumber()-1] = fv
- }
- } else {
- return nil, fmt.Errorf("not a map")
- }
- return result, nil
- }
- func (d *wrappedProtoDescriptor) encodeMap(im *desc.MessageDescriptor, i interface{}) (*dynamic.Message, error) {
- result := d.mf.NewDynamicMessage(im)
- fields := im.GetFields()
- if m, ok := i.(map[string]interface{}); ok {
- for _, field := range fields {
- v, ok := m[field.GetName()]
- if !ok {
- return nil, fmt.Errorf("field %s not found", field.GetName())
- }
- fv, err := d.encodeField(field, v)
- if err != nil {
- return nil, err
- }
- result.SetFieldByName(field.GetName(), fv)
- }
- }
- return result, nil
- }
- func (d *wrappedProtoDescriptor) encodeField(field *desc.FieldDescriptor, v interface{}) (interface{}, error) {
- fn := field.GetName()
- ft := field.GetType()
- if field.IsRepeated() {
- var (
- result interface{}
- err error
- )
- switch ft {
- case dpb.FieldDescriptorProto_TYPE_DOUBLE:
- result, err = cast.ToFloat64Slice(v, cast.STRICT)
- case dpb.FieldDescriptorProto_TYPE_FLOAT:
- result, err = cast.ToTypedSlice(v, func(input interface{}, sn cast.Strictness) (interface{}, error) {
- r, err := cast.ToFloat64(input, sn)
- if err != nil {
- return 0, nil
- } else {
- return float32(r), nil
- }
- }, "float", cast.STRICT)
- case dpb.FieldDescriptorProto_TYPE_INT32, dpb.FieldDescriptorProto_TYPE_SFIXED32, dpb.FieldDescriptorProto_TYPE_SINT32:
- result, err = cast.ToTypedSlice(v, func(input interface{}, sn cast.Strictness) (interface{}, error) {
- r, err := cast.ToInt(input, sn)
- if err != nil {
- return 0, nil
- } else {
- return int32(r), nil
- }
- }, "int", cast.STRICT)
- case dpb.FieldDescriptorProto_TYPE_INT64, dpb.FieldDescriptorProto_TYPE_SFIXED64, dpb.FieldDescriptorProto_TYPE_SINT64:
- result, err = cast.ToInt64Slice(v, cast.STRICT)
- case dpb.FieldDescriptorProto_TYPE_FIXED32, dpb.FieldDescriptorProto_TYPE_UINT32:
- result, err = cast.ToTypedSlice(v, func(input interface{}, sn cast.Strictness) (interface{}, error) {
- r, err := cast.ToUint64(input, sn)
- if err != nil {
- return 0, nil
- } else {
- return uint32(r), nil
- }
- }, "uint", cast.STRICT)
- case dpb.FieldDescriptorProto_TYPE_FIXED64, dpb.FieldDescriptorProto_TYPE_UINT64:
- result, err = cast.ToUint64Slice(v, cast.STRICT)
- case dpb.FieldDescriptorProto_TYPE_BOOL:
- result, err = cast.ToBoolSlice(v, cast.STRICT)
- case dpb.FieldDescriptorProto_TYPE_STRING:
- result, err = cast.ToStringSlice(v, cast.STRICT)
- case dpb.FieldDescriptorProto_TYPE_BYTES:
- result, err = cast.ToBytesSlice(v, cast.STRICT)
- case dpb.FieldDescriptorProto_TYPE_MESSAGE:
- result, err = cast.ToTypedSlice(v, func(input interface{}, sn cast.Strictness) (interface{}, error) {
- r, err := cast.ToStringMap(v)
- if err == nil {
- return d.encodeMap(field.GetMessageType(), r)
- } else {
- return nil, fmt.Errorf("invalid type for map type field '%s': %v", fn, err)
- }
- }, "map", cast.STRICT)
- default:
- return nil, fmt.Errorf("invalid type for field '%s'", fn)
- }
- if err != nil {
- err = fmt.Errorf("failed to encode field '%s':%v", fn, err)
- }
- return result, err
- } else {
- return d.encodeSingleField(field, v)
- }
- }
- func (d *wrappedProtoDescriptor) encodeSingleField(field *desc.FieldDescriptor, v interface{}) (interface{}, error) {
- fn := field.GetName()
- switch field.GetType() {
- case dpb.FieldDescriptorProto_TYPE_DOUBLE:
- r, err := cast.ToFloat64(v, cast.STRICT)
- if err == nil {
- return r, nil
- } else {
- return nil, fmt.Errorf("invalid type for float type field '%s': %v", fn, err)
- }
- case dpb.FieldDescriptorProto_TYPE_FLOAT:
- r, err := cast.ToFloat64(v, cast.STRICT)
- if err == nil {
- return float32(r), nil
- } else {
- return nil, fmt.Errorf("invalid type for float type field '%s': %v", fn, err)
- }
- case dpb.FieldDescriptorProto_TYPE_INT32, dpb.FieldDescriptorProto_TYPE_SFIXED32, dpb.FieldDescriptorProto_TYPE_SINT32:
- r, err := cast.ToInt(v, cast.STRICT)
- if err == nil {
- return int32(r), nil
- } else {
- return nil, fmt.Errorf("invalid type for int type field '%s': %v", fn, err)
- }
- case dpb.FieldDescriptorProto_TYPE_INT64, dpb.FieldDescriptorProto_TYPE_SFIXED64, dpb.FieldDescriptorProto_TYPE_SINT64:
- r, err := cast.ToInt64(v, cast.STRICT)
- if err == nil {
- return r, nil
- } else {
- return nil, fmt.Errorf("invalid type for int type field '%s': %v", fn, err)
- }
- case dpb.FieldDescriptorProto_TYPE_FIXED32, dpb.FieldDescriptorProto_TYPE_UINT32:
- r, err := cast.ToUint64(v, cast.STRICT)
- if err == nil {
- return uint32(r), nil
- } else {
- return nil, fmt.Errorf("invalid type for uint type field '%s': %v", fn, err)
- }
- case dpb.FieldDescriptorProto_TYPE_FIXED64, dpb.FieldDescriptorProto_TYPE_UINT64:
- r, err := cast.ToUint64(v, cast.STRICT)
- if err == nil {
- return r, nil
- } else {
- return nil, fmt.Errorf("invalid type for uint type field '%s': %v", fn, err)
- }
- case dpb.FieldDescriptorProto_TYPE_BOOL:
- r, err := cast.ToBool(v, cast.STRICT)
- if err == nil {
- return r, nil
- } else {
- return nil, fmt.Errorf("invalid type for bool type field '%s': %v", fn, err)
- }
- case dpb.FieldDescriptorProto_TYPE_STRING:
- r, err := cast.ToString(v, cast.STRICT)
- if err == nil {
- return r, nil
- } else {
- return nil, fmt.Errorf("invalid type for string type field '%s': %v", fn, err)
- }
- case dpb.FieldDescriptorProto_TYPE_BYTES:
- r, err := cast.ToBytes(v, cast.STRICT)
- if err == nil {
- return r, nil
- } else {
- return nil, fmt.Errorf("invalid type for bytes type field '%s': %v", fn, err)
- }
- case dpb.FieldDescriptorProto_TYPE_MESSAGE:
- r, err := cast.ToStringMap(v)
- if err == nil {
- return d.encodeMap(field.GetMessageType(), r)
- } else {
- return nil, fmt.Errorf("invalid type for map type field '%s': %v", fn, err)
- }
- default:
- return nil, fmt.Errorf("invalid type for field '%s'", fn)
- }
- }
- func decodeMessage(message *dynamic.Message, outputType *desc.MessageDescriptor) interface{} {
- if _, ok := WRAPPER_TYPES[outputType.GetFullyQualifiedName()]; ok {
- return message.GetFieldByNumber(1)
- } else if wrapperVoid == outputType.GetFullyQualifiedName() {
- return nil
- }
- result := make(map[string]interface{})
- for _, field := range outputType.GetFields() {
- decodeMessageField(message.GetField(field), field, result, cast.STRICT)
- }
- return result
- }
- func decodeMessageField(src interface{}, field *desc.FieldDescriptor, result map[string]interface{}, sn cast.Strictness) error {
- if f, err := decodeField(src, field, sn); err != nil {
- return err
- } else {
- result[field.GetName()] = f
- return nil
- }
- }
- func decodeField(src interface{}, field *desc.FieldDescriptor, sn cast.Strictness) (interface{}, error) {
- var (
- r interface{}
- e error
- )
- fn := field.GetName()
- switch field.GetType() {
- case dpb.FieldDescriptorProto_TYPE_DOUBLE, dpb.FieldDescriptorProto_TYPE_FLOAT:
- if field.IsRepeated() {
- r, e = cast.ToFloat64Slice(src, sn)
- } else {
- r, e = cast.ToFloat64(src, sn)
- }
- case dpb.FieldDescriptorProto_TYPE_INT32, dpb.FieldDescriptorProto_TYPE_SFIXED32, dpb.FieldDescriptorProto_TYPE_SINT32, dpb.FieldDescriptorProto_TYPE_INT64, dpb.FieldDescriptorProto_TYPE_SFIXED64, dpb.FieldDescriptorProto_TYPE_SINT64, dpb.FieldDescriptorProto_TYPE_FIXED32, dpb.FieldDescriptorProto_TYPE_UINT32, dpb.FieldDescriptorProto_TYPE_FIXED64, dpb.FieldDescriptorProto_TYPE_UINT64:
- if field.IsRepeated() {
- r, e = cast.ToInt64Slice(src, sn)
- } else {
- r, e = cast.ToInt64(src, sn)
- }
- case dpb.FieldDescriptorProto_TYPE_BOOL:
- if field.IsRepeated() {
- r, e = cast.ToBoolSlice(src, sn)
- } else {
- r, e = cast.ToBool(src, sn)
- }
- case dpb.FieldDescriptorProto_TYPE_STRING:
- if field.IsRepeated() {
- r, e = cast.ToStringSlice(src, sn)
- } else {
- r, e = cast.ToString(src, sn)
- }
- case dpb.FieldDescriptorProto_TYPE_BYTES:
- if field.IsRepeated() {
- r, e = cast.ToBytesSlice(src, sn)
- } else {
- r, e = cast.ToBytes(src, sn)
- }
- case dpb.FieldDescriptorProto_TYPE_MESSAGE:
- if field.IsRepeated() {
- r, e = cast.ToTypedSlice(src, func(input interface{}, ssn cast.Strictness) (interface{}, error) {
- return decodeSubMessage(input, field.GetMessageType(), ssn)
- }, "map", sn)
- } else {
- r, e = decodeSubMessage(src, field.GetMessageType(), sn)
- }
- default:
- return nil, fmt.Errorf("unsupported type for %s", fn)
- }
- if e != nil {
- e = fmt.Errorf("invalid type of return value for '%s': %v", fn, e)
- }
- return r, e
- }
- func decodeMap(src map[string]interface{}, ft *desc.MessageDescriptor, sn cast.Strictness) (map[string]interface{}, error) {
- result := make(map[string]interface{})
- for _, field := range ft.GetFields() {
- val, ok := src[field.GetName()]
- if !ok {
- continue
- }
- err := decodeMessageField(val, field, result, sn)
- if err != nil {
- return nil, err
- }
- }
- return result, nil
- }
- func decodeSubMessage(input interface{}, ft *desc.MessageDescriptor, sn cast.Strictness) (interface{}, error) {
- var m = map[string]interface{}{}
- switch v := input.(type) {
- case map[interface{}]interface{}:
- for k, val := range v {
- m[cast.ToStringAlways(k)] = val
- }
- return decodeMap(m, ft, sn)
- case map[string]interface{}:
- return decodeMap(v, ft, sn)
- case proto.Message:
- message, err := dynamic.AsDynamicMessage(v)
- if err != nil {
- return nil, err
- }
- return decodeMessage(message, ft), nil
- case *dynamic.Message:
- return decodeMessage(v, ft), nil
- default:
- return nil, fmt.Errorf("cannot decode %[1]T(%[1]v) to map", input)
- }
- }
|