Explorar o código

feat(service): support json string parameter for rest service

ngjaying %!s(int64=3) %!d(string=hai) anos
pai
achega
1e8a31373a

+ 1 - 1
docs/en_US/extension/external_func.md

@@ -142,7 +142,7 @@ Since REST and msgpack-rpc are not natively defined by protobuf, there are some
 
 The REST service is **POST** by default currently, and the transmission format is json. In the defined protobuf:
 
-- The input and output format cannot be a basic type, and it must be message
+- The input type must be **Message** or *google.protobuf.StringValue*. If the type is *google.protobuf.StringValue*, the parameter must be an encoded json string like `"{\"name\":\"name1\",\"size\":1}"`.
 
 The msgpack-rpc service has the following limitation:
 - Input can not be empty

+ 1 - 1
docs/zh_CN/extension/external_func.md

@@ -139,7 +139,7 @@ Protobuf 采用 proto3 格式,详细格式请参考 [proto3-spec](https://deve
 由于 REST 和 msgpack-rpc 并非原生采用 protobuf 定义,因此其使用有一些限制。
 
 REST 服务目前默认为 **POST**,且传输格式为 json。定义的protobuf 中:
-- 输入和输出格式不能为基本类型,必须为 message
+- 输入参数仅可以为 message 类型或者 *google.protobuf.StringValue* 类型。若输入参数为 *google.protobuf.StringValue*,则传入的参数必须为已编码的 json 字符串,例如 `"{\"name\":\"name1\",\"size\":1}"`。
 
 msgpack-rpc 服务有以下限制:
 - 输入不能为空

+ 40 - 0
services/external_service_rule_test.go

@@ -53,6 +53,11 @@ type ObjectDetectResponse struct {
 //	Box      Box       `json:"box,omitempty"`
 //}
 
+type EncodedRequest struct {
+	Name string `json:"name,omitempty"`
+	Size int    `json:"size,omitempty"`
+}
+
 func TestRestService(t *testing.T) {
 	// mock server, the port is set in the sample.json
 	l, err := net.Listen("tcp", "127.0.0.1:51234")
@@ -81,6 +86,9 @@ func TestRestService(t *testing.T) {
 			if err != nil {
 				http.Error(w, err.Error(), http.StatusBadRequest)
 			}
+			if req.Image == "" {
+				http.Error(w, "image is not found", http.StatusBadRequest)
+			}
 			out = &ObjectDetectResponse{
 				Info:   req.Command,
 				Code:   200,
@@ -93,6 +101,14 @@ func TestRestService(t *testing.T) {
 			count++
 			io.WriteString(w, fmt.Sprintf("%v", r))
 			return
+		case "/RestEncodedJson":
+			req := &EncodedRequest{}
+			err := json.NewDecoder(r.Body).Decode(req)
+			if err != nil {
+				http.Error(w, err.Error(), http.StatusBadRequest)
+			}
+			io.WriteString(w, req.Name)
+			return
 		default:
 			http.Error(w, "path not supported", http.StatusBadRequest)
 		}
@@ -246,6 +262,30 @@ func TestRestService(t *testing.T) {
 				"sink_mockSink_0_records_in_total":  int64(3),
 				"sink_mockSink_0_records_out_total": int64(3),
 			},
+		}, {
+			Name: `TestRestRule5`,
+			Sql:  `SELECT restEncodedJson(encoded_json) as name FROM commands`,
+			R: [][]map[string]interface{}{
+				{{
+					"name": "name1",
+				}},
+				{{
+					"name": "name2",
+				}},
+				{{
+					"name": "name3",
+				}},
+			},
+			M: map[string]interface{}{
+				"op_2_project_0_exceptions_total":   int64(0),
+				"op_2_project_0_process_latency_us": int64(0),
+				"op_2_project_0_records_in_total":   int64(3),
+				"op_2_project_0_records_out_total":  int64(3),
+
+				"sink_mockSink_0_exceptions_total":  int64(0),
+				"sink_mockSink_0_records_in_total":  int64(3),
+				"sink_mockSink_0_records_out_total": int64(3),
+			},
 		},
 	}
 	topotest.HandleStream(true, streamList, t)

+ 18 - 0
services/manager_test.go

@@ -48,6 +48,7 @@ func TestInitByFiles(t *testing.T) {
 					"getFeatureFromGrpc",
 					"objectDetectFromGrpc",
 					"getStatusFromGrpc",
+					"notUsedRpc",
 				},
 			},
 			"tsrest": {
@@ -69,6 +70,7 @@ func TestInitByFiles(t *testing.T) {
 					"getFeatureFromRest",
 					"objectDetectFromRest",
 					"getStatusFromRest",
+					"restEncodedJson",
 				},
 			},
 			"tsmsgpack": {
@@ -84,6 +86,7 @@ func TestInitByFiles(t *testing.T) {
 					"getFeatureFromMsgpack",
 					"objectDetectFromMsgpack",
 					"getStatusFromMsgpack",
+					"notUsedMsgpack",
 				},
 			},
 		},
@@ -164,6 +167,21 @@ func TestInitByFiles(t *testing.T) {
 			InterfaceName: "tsmsgpack",
 			MethodName:    "Compute",
 		},
+		"notUsedRpc": {
+			ServiceName:   "sample",
+			InterfaceName: "tsrpc",
+			MethodName:    "RestEncodedJson",
+		},
+		"restEncodedJson": {
+			ServiceName:   "sample",
+			InterfaceName: "tsrest",
+			MethodName:    "RestEncodedJson",
+		},
+		"notUsedMsgpack": {
+			ServiceName:   "sample",
+			InterfaceName: "tsmsgpack",
+			MethodName:    "RestEncodedJson",
+		},
 	}
 
 	err := m.serviceKV.Open()

+ 39 - 12
services/schema.go

@@ -13,20 +13,31 @@ import (
 	"sync"
 )
 
+const (
+	wrapperBool   = "google.protobuf.BoolValue"
+	wrapperBytes  = "google.protobuf.BytesValue"
+	wrapperDouble = "google.protobuf.DoubleValue"
+	wrapperFloat  = "google.protobuf.FloatValue"
+	wrapperInt32  = "google.protobuf.Int32Value"
+	wrapperInt64  = "google.protobuf.Int64Value"
+	wrapperString = "google.protobuf.StringValue"
+	wrapperUInt32 = "google.protobuf.UInt32Value"
+	wrapperUInt64 = "google.protobuf.UInt64Value"
+	wrapperVoid   = "google.protobuf.EMPTY"
+)
+
 var WRAPPER_TYPES = map[string]struct{}{
-	"google.protobuf.BoolValue":   {},
-	"google.protobuf.BytesValue":  {},
-	"google.protobuf.DoubleValue": {},
-	"google.protobuf.FloatValue":  {},
-	"google.protobuf.Int32Value":  {},
-	"google.protobuf.Int64Value":  {},
-	"google.protobuf.StringValue": {},
-	"google.protobuf.UInt32Value": {},
-	"google.protobuf.UInt64Value": {},
+	wrapperBool:   {},
+	wrapperBytes:  {},
+	wrapperDouble: {},
+	wrapperFloat:  {},
+	wrapperInt32:  {},
+	wrapperInt64:  {},
+	wrapperString: {},
+	wrapperUInt32: {},
+	wrapperUInt64: {},
 }
 
-const VOID = "google.protobuf.EMPTY"
-
 type descriptor interface {
 	GetFunctions() []string
 }
@@ -154,6 +165,22 @@ func (d *wrappedProtoDescriptor) ConvertParamsToMessage(method string, params []
 }
 
 func (d *wrappedProtoDescriptor) ConvertParamsToJson(method string, params []interface{}) ([]byte, error) {
+	// Deal with encoded json string. Just return the string
+	if len(params) == 1 {
+		m := d.MethodDescriptor(method)
+		if m == nil {
+			return nil, fmt.Errorf("can't find method %s in proto", method)
+		}
+		im := m.GetInputType()
+		if im.GetFullyQualifiedName() == wrapperString {
+			ss, err := common.ToString(params[0], common.STRICT)
+			if err != nil {
+				return nil, err
+			}
+			return []byte(ss), nil
+		}
+	}
+
 	if message, err := d.ConvertParamsToMessage(method, params); err != nil {
 		return nil, err
 	} else {
@@ -452,7 +479,7 @@ func encodeSingleField(field *desc.FieldDescriptor, v interface{}) (interface{},
 func decodeMessage(message *dynamic.Message, outputType *desc.MessageDescriptor) interface{} {
 	if _, ok := WRAPPER_TYPES[outputType.GetFullyQualifiedName()]; ok {
 		return message.GetFieldByNumber(1)
-	} else if VOID == outputType.GetFullyQualifiedName() {
+	} else if wrapperVoid == outputType.GetFullyQualifiedName() {
 		return nil
 	}
 	result := make(map[string]interface{})

+ 10 - 0
services/schema_test.go

@@ -96,6 +96,16 @@ func TestConvertParams(t *testing.T) {
 		//	},
 		//	jresult: []byte(`{"featureA":[0.031646,-0.800592,-1.101858,-0.354359,0.656587],"featureB":[0.354359,0.656587,-0.327047,0.198284,-2.142494,0.76016,1.680131]}`),
 		//},
+		{ // 6
+			method: "RestEncodedJson",
+			params: []interface{}{
+				[]byte("{\"name\":\"encoded json\",\"size\":1}"),
+			},
+			iresult: []interface{}{
+				"{\"name\":\"encoded json\",\"size\":1}",
+			},
+			jresult: []byte("{\"name\":\"encoded json\",\"size\":1}"),
+		},
 	}
 
 	for i, descriptor := range descriptors {

+ 12 - 0
services/test/sample.json

@@ -41,6 +41,10 @@
         {
           "name": "getStatusFromGrpc",
           "serviceName": "getStatus"
+        },
+        {
+          "name": "notUsedRpc",
+          "serviceName": "RestEncodedJson"
         }
       ]
     },
@@ -75,6 +79,10 @@
         {
           "name": "getStatusFromRest",
           "serviceName": "getStatus"
+        },
+        {
+          "name": "restEncodedJson",
+          "serviceName": "RestEncodedJson"
         }
       ]
     },
@@ -103,6 +111,10 @@
         {
           "name": "getStatusFromMsgpack",
           "serviceName": "getStatus"
+        },
+        {
+          "name": "notUsedMsgpack",
+          "serviceName": "RestEncodedJson"
         }
       ]
     }

+ 2 - 1
services/test/schemas/hw.proto

@@ -24,6 +24,7 @@ service Greeter {
     };
   }
   rpc getStatus(google.protobuf.Empty) returns(google.protobuf.BoolValue) {}
+  rpc RestEncodedJson(google.protobuf.StringValue) returns(google.protobuf.StringValue) {}
 }
 
 // The request message containing the user's name.
@@ -69,7 +70,7 @@ message FeatureResponse{
 
 message ObjectDetectionRequest {
   string cmd = 1;
-  string base64_img = 2;
+  string base64_img = 2 [json_name="base64_img"];
 }
 
 message ObjectDetectionResponse {

+ 10 - 7
xstream/topotest/mock_topo.go

@@ -936,24 +936,27 @@ var testData = map[string][]*xsql.Tuple{
 		{
 			Emitter: "commands",
 			Message: map[string]interface{}{
-				"cmd":        "get",
-				"base64_img": "my image",
+				"cmd":          "get",
+				"base64_img":   "my image",
+				"encoded_json": "{\"name\": \"name1\",\"size\": 22}",
 			},
 			Timestamp: 1541152486013,
 		},
 		{
 			Emitter: "commands",
 			Message: map[string]interface{}{
-				"cmd":        "detect",
-				"base64_img": "my image",
+				"cmd":          "detect",
+				"base64_img":   "my image",
+				"encoded_json": "{\"name\": \"name2\",\"size\": 33}",
 			},
 			Timestamp: 1541152487013,
 		},
 		{
 			Emitter: "commands",
 			Message: map[string]interface{}{
-				"cmd":        "delete",
-				"base64_img": "my image",
+				"cmd":          "delete",
+				"base64_img":   "my image",
+				"encoded_json": "{\"name\": \"name3\",\"size\": 11}",
 			},
 			Timestamp: 1541152488013,
 		},
@@ -1262,7 +1265,7 @@ func HandleStream(createOrDrop bool, names []string, t *testing.T) {
 			case "helloStr":
 				sql = `CREATE STREAM helloStr (name string) WITH (DATASOURCE="hello", FORMAT="JSON")`
 			case "commands":
-				sql = `CREATE STREAM commands (cmd string, base64_img string) WITH (DATASOURCE="commands", FORMAT="JSON")`
+				sql = `CREATE STREAM commands (cmd string, base64_img string, encoded_json string) WITH (DATASOURCE="commands", FORMAT="JSON")`
 			case "fakeBin":
 				sql = "CREATE STREAM fakeBin () WITH (DATASOURCE=\"users\", FORMAT=\"BINARY\")"
 			default: