executors_msgpack.go 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. // Copyright 2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build msgpack
  15. package service
  16. import (
  17. "fmt"
  18. "net"
  19. "net/rpc"
  20. "reflect"
  21. "sync"
  22. "github.com/ugorji/go/codec"
  23. "github.com/lf-edge/ekuiper/pkg/api"
  24. )
  25. func init() {
  26. executors[MSGPACK] = func(desc descriptor, opt *interfaceOpt, _ *interfaceInfo) (executor, error) {
  27. d, ok := desc.(interfaceDescriptor)
  28. if !ok {
  29. return nil, fmt.Errorf("invalid descriptor type for msgpack-rpc")
  30. }
  31. exe := &msgpackExecutor{
  32. descriptor: d,
  33. interfaceOpt: opt,
  34. }
  35. return exe, nil
  36. }
  37. }
  38. type msgpackExecutor struct {
  39. descriptor interfaceDescriptor
  40. *interfaceOpt
  41. sync.Mutex
  42. connected bool
  43. conn *rpc.Client
  44. }
  45. // InvokeFunction flat the params and result
  46. func (m *msgpackExecutor) InvokeFunction(_ api.FunctionContext, name string, params []interface{}) (interface{}, error) {
  47. if !m.connected {
  48. m.Lock()
  49. if !m.connected {
  50. h := &codec.MsgpackHandle{}
  51. h.MapType = reflect.TypeOf(map[string]interface{}(nil))
  52. conn, err := net.Dial(m.addr.Scheme, m.addr.Host)
  53. if err != nil {
  54. return nil, err
  55. }
  56. rpcCodec := codec.MsgpackSpecRpc.ClientCodec(conn, h)
  57. m.conn = rpc.NewClientWithCodec(rpcCodec)
  58. }
  59. m.connected = true
  60. m.Unlock()
  61. }
  62. ps, err := m.descriptor.ConvertParams(name, params)
  63. if err != nil {
  64. return nil, err
  65. }
  66. var (
  67. reply interface{}
  68. args interface{}
  69. )
  70. // TODO argument flat
  71. switch len(ps) {
  72. case 0:
  73. // do nothing
  74. case 1:
  75. args = ps[0]
  76. default:
  77. args = codec.MsgpackSpecRpcMultiArgs(ps)
  78. }
  79. err = m.conn.Call(name, args, &reply)
  80. if err != nil {
  81. if err == rpc.ErrShutdown {
  82. m.connected = false
  83. }
  84. return nil, err
  85. }
  86. return m.descriptor.ConvertReturn(name, reply)
  87. }