Ver código fonte

fix: fix encode protobuf optional field (#2216)

Signed-off-by: Rui-Gan <1171530954@qq.com>
Regina 1 ano atrás
pai
commit
2e0f705845

+ 61 - 1
internal/service/external_service_rule_test.go

@@ -213,7 +213,7 @@ func TestRestService(t *testing.T) {
 
 	defer server.Close()
 	// Reset
-	streamList := []string{"helloStr", "commands", "fakeBin", "shelves", "demo", "mes"}
+	streamList := []string{"helloStr", "commands", "fakeBin", "shelves", "demo", "mes", "optional_commands"}
 	topotest.HandleStream(false, streamList, t)
 	// Data setup
 	tests := []topotest.RuleTest{
@@ -509,6 +509,42 @@ func TestRestService(t *testing.T) {
 				"sink_mockSink_0_records_in_total":  int64(3),
 				"sink_mockSink_0_records_out_total": int64(3),
 			},
+		}, {
+			Name: `TestRestRule12`,
+			Sql:  `SELECT objectDetectFromRest(*) AS res FROM optional_commands`,
+			R: [][]map[string]interface{}{
+				{{
+					"res": map[string]interface{}{
+						"image":  "my image1",
+						"result": " success",
+						"type":   "S",
+					},
+				}},
+				{{
+					"res": map[string]interface{}{
+						"image":  "my image2",
+						"result": " success",
+						"type":   "S",
+					},
+				}},
+				{{
+					"res": map[string]interface{}{
+						"image":  "my image3",
+						"result": " success",
+						"type":   "S",
+					},
+				}},
+			},
+			M: map[string]interface{}{
+				"op_2_project_0_exceptions_total":   int64(0),
+				"op_2_project_0_process_latency_us": int64(0),
+				"op_2_project_0_records_in_total":   int64(3),
+				"op_2_project_0_records_out_total":  int64(3),
+
+				"sink_mockSink_0_exceptions_total":  int64(0),
+				"sink_mockSink_0_records_in_total":  int64(3),
+				"sink_mockSink_0_records_out_total": int64(3),
+			},
 		},
 	}
 	topotest.HandleStream(true, streamList, t)
@@ -752,6 +788,30 @@ func TestGrpcService(t *testing.T) {
 				"sink_mockSink_0_records_in_total":  int64(3),
 				"sink_mockSink_0_records_out_total": int64(3),
 			},
+		}, {
+			Name: `TestRestRule5`,
+			Sql:  `SELECT objectDetectFromGrpc(*) -> image AS res FROM optional_commands`,
+			R: [][]map[string]interface{}{
+				{{
+					"res": "my image1",
+				}},
+				{{
+					"res": "my image2",
+				}},
+				{{
+					"res": "my image3",
+				}},
+			},
+			M: map[string]interface{}{
+				"op_2_project_0_exceptions_total":   int64(0),
+				"op_2_project_0_process_latency_us": int64(0),
+				"op_2_project_0_records_in_total":   int64(3),
+				"op_2_project_0_records_out_total":  int64(3),
+
+				"sink_mockSink_0_exceptions_total":  int64(0),
+				"sink_mockSink_0_records_in_total":  int64(3),
+				"sink_mockSink_0_records_out_total": int64(3),
+			},
 		},
 	}
 	topotest.HandleStream(true, streamList, t)

+ 17 - 4
internal/service/schema.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -158,7 +158,9 @@ func (d *wrappedProtoDescriptor) ConvertParamsToMessage(method string, params []
 		return nil, err
 	}
 	for i, typeParam := range typedParams {
-		message.SetFieldByNumber(i+1, typeParam)
+		if typeParam != nil {
+			message.SetFieldByNumber(i+1, typeParam)
+		}
 	}
 	return message, nil
 }
@@ -211,7 +213,11 @@ func (d *wrappedProtoDescriptor) convertParams(im *desc.MessageDescriptor, param
 		if r, err := d.unfoldMap(im, params[0]); err != nil {
 			kconf.Log.Debugf("try unfold param for message %s fail: %v", im.GetName(), err)
 		} else {
-			return r, nil
+			for _, v := range r {
+				if v != nil {
+					return r, nil
+				}
+			}
 		}
 		// For non map params, treat it as special case of multiple params
 		if len(fields) == 1 {
@@ -296,7 +302,14 @@ func (d *wrappedProtoDescriptor) unfoldMap(ft *desc.MessageDescriptor, i interfa
 		for _, field := range fields {
 			v, ok := m.Value(field.GetName(), "")
 			if !ok {
-				return nil, fmt.Errorf("field %s not found", field.GetName())
+				if field.IsRequired() {
+					return nil, fmt.Errorf("field %s not found", field.GetName())
+				} else {
+					continue
+				}
+			}
+			if v == nil && !field.IsRequired() {
+				continue
 			}
 			fv, err := d.fc.EncodeField(field, v)
 			if err != nil {

+ 26 - 0
internal/service/schema_test.go

@@ -127,6 +127,32 @@ func TestConvertParams(t *testing.T) {
 			jresult: []byte("{\"name\":\"encoded json\",\"size\":1}"),
 			tresult: []byte(`value:"{\"name\":\"encoded json\",\"size\":1}"`),
 		},
+		{ // 7
+			method: "RestEncodedJson",
+			params: []interface{}{
+				[]byte("{\"name\":\"encoded json\"}"),
+			},
+			iresult: []interface{}{
+				"{\"name\":\"encoded json\"}",
+			},
+			jresult: []byte("{\"name\":\"encoded json\"}"),
+			tresult: []byte(`value:"{\"name\":\"encoded json\"}"`),
+		},
+		{ // 8
+			method: "Compute",
+			params: []interface{}{
+				map[string]interface{}{
+					"rid":    "rid",
+					"outlet": nil,
+					"data":   []byte("data"),
+				},
+			},
+			iresult: []interface{}{
+				"rid", nil, nil, nil, []byte("data"), nil,
+			},
+			jresult: []byte(`{"rid":"rid","data":"ZGF0YQ=="}`),
+			tresult: []byte(`rid:"rid" data:"data"`),
+		},
 	}
 
 	for i, descriptor := range descriptors {

+ 2 - 0
internal/topo/topotest/mock_topo.go

@@ -371,6 +371,8 @@ func HandleStream(createOrDrop bool, names []string, t *testing.T) {
 				) WITH (DATASOURCE="shelves", TYPE="mock", FORMAT="json");`
 			case "mes":
 				sql = `CREATE STREAM mes (message_id string, text string) WITH (DATASOURCE="mes", TYPE="mock", FORMAT="JSON")`
+			case "optional_commands":
+				sql = `CREATE STREAM optional_commands (base64_img string) WITH (DATASOURCE="optional_commands", FORMAT="JSON", TYPE="mock")`
 			default:
 				t.Errorf("create stream %s fail", name)
 			}

+ 23 - 0
internal/topo/topotest/mocknode/mock_data.go

@@ -1122,6 +1122,29 @@ var TestData = map[string][]*xsql.Tuple{
 			Timestamp: 1541152489500,
 		},
 	},
+	"optional_commands": {
+		{
+			Emitter: "optional_commands",
+			Message: map[string]interface{}{
+				"base64_img": "my image1",
+			},
+			Timestamp: 1541152486013,
+		},
+		{
+			Emitter: "optional_commands",
+			Message: map[string]interface{}{
+				"base64_img": "my image2",
+			},
+			Timestamp: 1541152487013,
+		},
+		{
+			Emitter: "optional_commands",
+			Message: map[string]interface{}{
+				"base64_img": "my image3",
+			},
+			Timestamp: 1541152488013,
+		},
+	},
 }
 
 var Image, _ = getImg()