Jelajahi Sumber

doc(graph): update source node and join op

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 1 tahun lalu
induk
melakukan
226c188638

+ 47 - 0
docs/en_US/guide/rules/graph_rule.md

@@ -72,6 +72,34 @@ For sink node, the nodeType is the type of the sink like `mqtt` and `edgex`. Ple
 
 For operator node, the nodeType are newly defined. Each nodeType will have different properties.
 
+### Source Node
+
+The source node is the data source of the rule. It can be a stream or table. **User needs to define the stream/table before using it in the rule**. The `sourceType` property defines the type of the source. It can be `stream` or `table`. The `sourceName` property defines the name of the stream/table. The below example defines a source node which reads from a stream named `demoStream`. Please make sure the nodeType is the same as the type of the stream/table.
+
+```json
+  {
+      "type": "source",
+      "nodeType": "mqtt",
+      "props": {
+        "sourceType": "stream",
+        "sourceName": "demoStream"
+      }
+  }
+```
+
+Currently, users can define the source node to refer to table as well. But only lookup table can be connected to Join node, scan table is not supported. The below example defines a source node which reads from a lookup table named `demoTable`. Please make sure the nodeType is the same as the type of the stream/table.
+
+```json
+  {
+      "type": "source",
+      "nodeType": "redis",
+      "props": {
+        "sourceType": "table",
+        "sourceName": "demoTable"
+      }
+  }
+```
+
 ### Built-in Operator Node Types
 
 Currently, we supported the below node types for operator type.
@@ -201,6 +229,25 @@ Example:
   }
 ```
 
+Join operator supports to connect stream/stream join and stream/lookup table join. Stream/scan table join is not supported. If using stream/stream join, the prior node must be a window node. If using stream/lookup table join, only one join condition is supported. Below is an example of stream/lookup table join.
+
+```json
+   {
+    "type": "operator",
+    "nodeType": "join",
+    "props": {
+      "from": "demoStream",
+      "joins": [
+        {
+          "name": "demoTable",
+          "type": "inner",
+          "on": "deviceStream.id = demoTable.id"
+        }
+      ]
+    }
+  }
+```
+
 #### groupby
 
 This node defines the dimension to group by. The input must be a collection of rows. The output is a collection of grouped tuples. The properties are:

+ 47 - 0
docs/zh_CN/guide/rules/graph_rule.md

@@ -71,6 +71,34 @@
 
 对于 operator 节点,nodeType 是新定义的,而且每个 nodeType 有不同的属性。
 
+### 源节点
+
+源节点是规则的数据源。它可以是一个流或表。**用户需要在规则中使用流/表之前定义它**。`sourceType`属性定义了源的类型。它可以是 "流 "或 "表"。`sourceName`属性定义了流/表的名称。下面的例子定义了一个源节点,它从一个名为 `demoStream` 的流中读取。请确保 nodeType 与流/表的类型相同。
+
+```json
+  {
+      "type": "source",
+      "nodeType": "mqtt",
+      "props": {
+        "sourceType": "stream",
+        "sourceName": "demoStream"
+      }
+  }
+```
+
+目前,用户也可以定义源节点来引用表。但只有查询表可以连接到 Join 节点,扫描表暂不支持。下面的例子定义了一个源节点,它从一个名为 `demoTable` 的查询表中读取数据。
+
+```json
+  {
+      "type": "source",
+      "nodeType": "redis",
+      "props": {
+        "sourceType": "table",
+        "sourceName": "demoTable"
+      }
+  }
+```
+
 ### 内置 operator 节点类型
 
 目前,我们支持以下节点类型的运算符类型。
@@ -199,6 +227,25 @@
   }
 ```
 
+连接运算符支持连接流/流连接和流/查询表连接。不支持流/扫描表连接。如果使用流/流连接,前面的节点必须是一个窗口节点。如果使用流/查询表连接,只支持一个连接条件。下面是一个流/查找表连接的例子。
+
+```json
+   {
+    "type": "operator",
+    "nodeType": "join",
+    "props": {
+      "from": "demoStream",
+      "joins": [
+        {
+          "name": "demoTable",
+          "type": "inner",
+          "on": "deviceStream.id = demoTable.id"
+        }
+      ]
+    }
+  }
+```
+
 #### groupby
 
 这个节点定义了要分组的维度。输入必须是一个行的集合。输出是一个分组数据的集合。其属性为:

+ 1 - 4
internal/topo/planner/planner_graph.go

@@ -104,9 +104,6 @@ func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
 				return nil, err
 			}
 			if sInfo.stmt.StreamType == ast.TypeTable && sInfo.stmt.Options.KIND == ast.StreamKindLookup {
-				if lookupTableChildren == nil {
-					lookupTableChildren = make(map[string]*ast.Options)
-				}
 				lookupTableChildren[string(sInfo.stmt.Name)] = sInfo.stmt.Options
 			} else {
 				// Use the plan to calculate the schema and other meta info
@@ -219,7 +216,7 @@ func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
 					return nil, err
 				}
 				fromNode := stmt.Sources[0].(*ast.Table)
-				if _, ok := streamEmitters[string(fromNode.Name)]; !ok {
+				if _, ok := streamEmitters[fromNode.Name]; !ok {
 					return nil, fmt.Errorf("join source %s is not a stream", fromNode.Name)
 				}
 				hasLookup := false