Explorar el Código

fix(graph): set schema id by name/message (#2116)

* fix(sourceNode): set sourceOption.SCHEMAID by props

Signed-off-by: cea5 <396157168@qq.com>

* fix: check Props before set

Signed-off-by: cea5 <396157168@qq.com>

* fix: go fmt

Signed-off-by: cea5 <396157168@qq.com>

* add: graph source schema unit test

Signed-off-by: cea5 <396157168@qq.com>

* fix: go fmt

Signed-off-by: cea5 <396157168@qq.com>

---------

Signed-off-by: cea5 <396157168@qq.com>
cea5 hace 1 año
padre
commit
399f7d2c0e

+ 7 - 0
internal/topo/planner/planner_graph.go

@@ -498,6 +498,13 @@ func parseSource(nodeName string, gn *api.GraphNode, rule *api.Rule, store kv.Ke
 			return nil, ILLEGAL, "", err
 		}
 		sourceOption.TYPE = gn.NodeType
+		if sourceOption.SCHEMAID == "" && gn.Props["schemaName"] != nil && gn.Props["schemaMessage"] != nil {
+			schemaName, ok1 := gn.Props["schemaName"].(string)
+			schemaMessage, ok2 := gn.Props["schemaMessage"].(string)
+			if ok1 && ok2 {
+				sourceOption.SCHEMAID = schemaName + "." + schemaMessage
+			}
+		}
 		switch sourceMeta.SourceType {
 		case "stream":
 			pp, err := operator.NewPreprocessor(true, nil, true, nil, rule.Options.IsEventTime, sourceOption.TIMESTAMP, sourceOption.TIMESTAMP_FORMAT, strings.EqualFold(sourceOption.FORMAT, message.FormatBinary), sourceOption.STRICT_VALIDATION)

+ 33 - 0
internal/topo/planner/planner_graph_test.go

@@ -583,6 +583,39 @@ func TestPlannerGraphValidate(t *testing.T) {
 }`,
 			err: "",
 		},
+		{
+			graph: `{
+				"nodes": {
+					"log": {
+						"type": "sink",
+						"nodeType": "log",
+						"props": {}
+					},
+					"mqtt": {
+						"type": "source",
+						"nodeType": "mqtt",
+						"props": {
+							"datasource": "demo",
+							"format": "protobuf",
+							"schemaMessage": "PropertiesReport",
+							"schemaName": "EventBusMessage",
+							"shared": false
+						}
+					}
+				},
+				"topo": {
+					"sources": [
+						"mqtt"
+					],
+					"edges": {
+						"mqtt": [
+						"log"
+					]
+				}
+			}
+		}`,
+			err: "",
+		},
 	}
 
 	t.Logf("The test bucket size is %d.\n\n", len(tests))