ソースを参照

feat(graph): add switch node

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 年 前
コミット
a693faae17

+ 0 - 1
etc/ops/filter.json

@@ -20,7 +20,6 @@
   "properties": [
     {
       "name": "expr",
-      "default": "12",
       "optional": false,
       "control": "text",
       "type": "string",

+ 68 - 0
etc/ops/switch.json

@@ -0,0 +1,68 @@
+{
+  "name": "switch",
+  "about": {
+    "trial": false,
+    "author": {
+      "name": "EMQ",
+      "email": "contact@emqx.io",
+      "company": "EMQ Technologies Co., Ltd",
+      "website": "https://www.emqx.io"
+    },
+    "helpUrl": {
+      "en_US": "https://github.com/lf-edge/ekuiper/blob/master/docs/en_US/sqls/query_language_elements.md",
+      "zh_CN": "https://github.com/lf-edge/ekuiper/blob/master/docs/zh_CN/sqls/query_language_elements.md"
+    },
+    "description": {
+      "en_US": "An operation to route events to different branches based on a case condition similar to switch statement in programming languages.",
+      "zh_CN": "用于根据条件分流数据的操作,类似编程语言中的 switch 语句。"
+    }
+  },
+  "properties": [
+    {
+      "name": "cases",
+      "optional": false,
+      "control": "list",
+      "type": "string",
+      "hint": {
+        "en_US": "case condition expression",
+        "zh_CN": "分流条件语句"
+      },
+      "label": {
+        "en_US": "Cases",
+        "zh_CN": "条件"
+      }
+    },{
+      "name": "stopAtFirstMatch",
+      "default": false,
+      "optional": false,
+      "control": "checkbox",
+      "type": "boolean",
+      "hint": {
+        "en_US": "Stop at first match",
+        "zh_CN": "接受第一条匹配信息后停止"
+      },
+      "label": {
+        "en_US": "Stop at first match",
+        "zh_CN": "接受第一条匹配信息后停止"
+      }
+    }
+  ],
+  "node": {
+    "display": true,
+    "category": "operator",
+    "input": {
+      "type": "any",
+      "rowType": "any",
+      "collectionType": "any"
+    },
+    "output": {
+      "type": "same",
+      "strategy": "keep"
+    },
+    "icon": "iconPath",
+    "label": {
+      "en": "Switch",
+      "zh": "Switch"
+    }
+  }
+}

+ 4 - 0
internal/topo/graph/io.go

@@ -149,4 +149,8 @@ var OpIO = map[string][]*IOType{
 		{Type: IOINPUT_TYPE_ROW, RowType: IOROW_TYPE_ANY, CollectionType: IOCOLLECTION_TYPE_ANY, AllowMulti: true},
 		{Type: IOINPUT_TYPE_COLLECTION, CollectionType: IOCOLLECTION_TYPE_SINGLE, RowType: IOROW_TYPE_SINGLE},
 	},
+	"switch": {
+		{Type: IOINPUT_TYPE_ANY, RowType: IOROW_TYPE_ANY, CollectionType: IOCOLLECTION_TYPE_ANY},
+		{Type: IOINPUT_TYPE_SAME},
+	},
 }

+ 5 - 0
internal/topo/graph/node.go

@@ -52,3 +52,8 @@ type Orderby struct {
 		Desc  bool   `json:"desc"`
 	}
 }
+
+type Switch struct {
+	Cases            []string `json:"cases"`
+	StopAtFirstMatch bool     `json:"stopAtFirstMatch"`
+}

+ 160 - 0
internal/topo/node/switch_node.go

@@ -0,0 +1,160 @@
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package node
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/topo/node/metric"
+	"github.com/lf-edge/ekuiper/internal/xsql"
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/ast"
+	"github.com/lf-edge/ekuiper/pkg/infra"
+)
+
+type SwitchConfig struct {
+	Cases            []ast.Expr
+	StopAtFirstMatch bool
+}
+
+type SwitchNode struct {
+	*defaultSinkNode
+	conf        *SwitchConfig
+	statManager metric.StatManager
+	outputNodes []defaultNode
+}
+
+// GetEmitter returns the nth emitter of the node. SwtichNode is the only node that has multiple emitters
+// In planner graph, fromNodes is a multi-dim array, switch node is the only node that could be in the second dim
+// The dim is the index
+func (n *SwitchNode) GetEmitter(outputIndex int) api.Emitter {
+	return &n.outputNodes[outputIndex]
+}
+
+// AddOutput SwitchNode overrides the defaultSinkNode's AddOutput to add output to the outputNodes
+// SwitchNode itself has multiple outlets defined by the outputNodes.
+// This default function will add the output to the first outlet
+func (n *SwitchNode) AddOutput(output chan<- interface{}, name string) error {
+	if len(n.outputNodes) == 0 { // should never happen
+		return fmt.Errorf("no output node is available")
+	}
+	return n.outputNodes[0].AddOutput(output, name)
+}
+
+func NewSwitchNode(name string, conf *SwitchConfig, options *api.RuleOption) (*SwitchNode, error) {
+	sn := &SwitchNode{
+		conf: conf,
+	}
+	sn.defaultSinkNode = &defaultSinkNode{
+		input: make(chan interface{}, options.BufferLength),
+		defaultNode: &defaultNode{
+			outputs:   nil,
+			name:      name,
+			sendError: options.SendError,
+		},
+	}
+	outputs := make([]defaultNode, len(conf.Cases))
+	for i := range conf.Cases {
+		outputs[i] = defaultNode{
+			outputs:   make(map[string]chan<- interface{}),
+			name:      name + fmt.Sprintf("_%d", i),
+			sendError: options.SendError,
+		}
+	}
+	sn.outputNodes = outputs
+	return sn, nil
+}
+
+func (n *SwitchNode) Exec(ctx api.StreamContext, errCh chan<- error) {
+	ctx.GetLogger().Infof("SwitchNode %s is started", n.name)
+	stats, err := metric.NewStatManager(ctx, "op")
+	if err != nil {
+		infra.DrainError(ctx, fmt.Errorf("cannot create state for switch node %s", n.name), errCh)
+		return
+	}
+	n.statManager = stats
+	n.ctx = ctx
+	for i, _ := range n.outputNodes {
+		n.outputNodes[i].ctx = ctx
+	}
+	fv, afv := xsql.NewFunctionValuersForOp(ctx)
+	go func() {
+		err := infra.SafeRun(func() error {
+			for {
+				ctx.GetLogger().Debugf("Switch node %s is looping", n.name)
+				select {
+				// process incoming item from both streams(transformed) and tables
+				case item, opened := <-n.input:
+					processed := false
+					if item, processed = n.preprocess(item); processed {
+						break
+					}
+					n.statManager.IncTotalRecordsIn()
+					n.statManager.ProcessTimeStart()
+					if !opened {
+						n.statManager.IncTotalExceptions("input channel closed")
+						break
+					}
+					var ve *xsql.ValuerEval
+					switch d := item.(type) {
+					case error:
+						n.statManager.IncTotalExceptions(d.Error())
+						break
+					case xsql.TupleRow:
+						ctx.GetLogger().Debugf("SwitchNode receive tuple input %s", d)
+						ve = &xsql.ValuerEval{Valuer: xsql.MultiValuer(d, fv)}
+
+					case xsql.SingleCollection:
+						ctx.GetLogger().Debugf("SwitchNode receive window input %s", d)
+						afv.SetData(d)
+						ve = &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(d, fv, d, fv, afv, &xsql.WildcardValuer{Data: d})}
+					default:
+						e := fmt.Errorf("run switch node error: invalid input type but got %[1]T(%[1]v)", d)
+						n.Broadcast(e)
+						n.statManager.IncTotalExceptions(e.Error())
+						break
+					}
+				caseLoop:
+					for i, c := range n.conf.Cases {
+						result := ve.Eval(c)
+						switch r := result.(type) {
+						case error:
+							ctx.GetLogger().Errorf("run switch node %s, case %s error: %s", n.name, c, r)
+							n.statManager.IncTotalExceptions(r.Error())
+						case bool:
+							if r {
+								n.outputNodes[i].Broadcast(item)
+								if n.conf.StopAtFirstMatch {
+									break caseLoop
+								}
+							}
+						case nil: // nil is false
+							break
+						default:
+							m := fmt.Sprintf("run switch node %s, case %s error: invalid condition that returns non-bool value %[1]T(%[1]v)", n.name, c, r)
+							ctx.GetLogger().Errorf(m)
+							n.statManager.IncTotalExceptions(m)
+						}
+					}
+				case <-ctx.Done():
+					ctx.GetLogger().Infoln("Cancelling switch node....")
+					return nil
+				}
+			}
+		})
+		if err != nil {
+			infra.DrainError(ctx, err, errCh)
+		}
+	}()
+}

+ 397 - 0
internal/topo/node/switch_node_test.go

@@ -0,0 +1,397 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package node
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/topo/context"
+	"github.com/lf-edge/ekuiper/internal/xsql"
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/ast"
+	"reflect"
+	"testing"
+	"time"
+)
+
+func TestTuple(t *testing.T) {
+	inputs := []*xsql.Tuple{
+		{
+			Message: map[string]interface{}{
+				"f1": "v1",
+				"f2": 45.6,
+			},
+		}, {
+			Message: map[string]interface{}{
+				"f1": "v2",
+				"f2": 46.6,
+			},
+		}, {
+			Message: map[string]interface{}{
+				"f1": "v2",
+				"f2": 26.6,
+			},
+		}, {
+			Message: map[string]interface{}{
+				"f1": "v2",
+				"f2": 54.3,
+			},
+		}, {
+			Message: map[string]interface{}{
+				"f1": "v1",
+				"f2": 36.6,
+			},
+		}, {
+			Message: map[string]interface{}{
+				"f1": "v1",
+				"f2": 76.6,
+			},
+		}, {
+			Message: map[string]interface{}{
+				"f1": "v2",
+				"f2": 41.2,
+			},
+		}, {
+			Message: map[string]interface{}{
+				"f1": "v2",
+				"f2": 86.6,
+			},
+		},
+	}
+	outputs := [][]*xsql.Tuple{
+		{ // f2 > 40
+			{
+				Message: map[string]interface{}{
+					"f1": "v1",
+					"f2": 45.6,
+				},
+			}, {
+				Message: map[string]interface{}{
+					"f1": "v2",
+					"f2": 46.6,
+				},
+			}, {
+				Message: map[string]interface{}{
+					"f1": "v2",
+					"f2": 54.3,
+				},
+			}, {
+				Message: map[string]interface{}{
+					"f1": "v1",
+					"f2": 76.6,
+				},
+			}, {
+				Message: map[string]interface{}{
+					"f1": "v2",
+					"f2": 41.2,
+				},
+			}, {
+				Message: map[string]interface{}{
+					"f1": "v2",
+					"f2": 86.6,
+				},
+			},
+		},
+		{ // f1 == v1
+			{
+				Message: map[string]interface{}{
+					"f1": "v1",
+					"f2": 45.6,
+				},
+			}, {
+				Message: map[string]interface{}{
+					"f1": "v1",
+					"f2": 36.6,
+				},
+			}, {
+				Message: map[string]interface{}{
+					"f1": "v1",
+					"f2": 76.6,
+				},
+			},
+		},
+		{ // f1 == v2 && f2 < 40
+			{
+				Message: map[string]interface{}{
+					"f1": "v2",
+					"f2": 26.6,
+				},
+			},
+		},
+	}
+
+	sn, err := NewSwitchNode("test", &SwitchConfig{
+		Cases: []ast.Expr{
+			&ast.BinaryExpr{
+				LHS: &ast.FieldRef{Name: "f2"},
+				OP:  ast.GT,
+				RHS: &ast.NumberLiteral{Val: 40},
+			},
+			&ast.BinaryExpr{
+				LHS: &ast.FieldRef{Name: "f1"},
+				OP:  ast.EQ,
+				RHS: &ast.StringLiteral{Val: "v1"},
+			},
+			&ast.BinaryExpr{
+				LHS: &ast.BinaryExpr{
+					LHS: &ast.FieldRef{Name: "f1"},
+					OP:  ast.EQ,
+					RHS: &ast.StringLiteral{Val: "v2"},
+				},
+				OP: ast.AND,
+				RHS: &ast.BinaryExpr{
+					LHS: &ast.FieldRef{Name: "f2"},
+					OP:  ast.LT,
+					RHS: &ast.NumberLiteral{Val: 40},
+				},
+			},
+		},
+		StopAtFirstMatch: false,
+	}, &api.RuleOption{})
+	if err != nil {
+		t.Fatalf("Failed to create switch node: %v", err)
+	}
+	contextLogger := conf.Log.WithField("rule", "TestSwitchTuple")
+	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
+	errCh := make(chan error)
+	output1 := make(chan interface{}, 10)
+	output2 := make(chan interface{}, 10)
+	output3 := make(chan interface{}, 10)
+	sn.outputNodes[0].AddOutput(output1, "output1")
+	sn.outputNodes[1].AddOutput(output2, "output2")
+	sn.outputNodes[2].AddOutput(output3, "output3")
+	go sn.Exec(ctx, errCh)
+	go func() {
+		for i, input := range inputs {
+			select {
+			case sn.input <- input:
+				fmt.Println("send input", i)
+			case <-time.After(time.Second):
+				t.Fatalf("Timeout sending input %d", i)
+			}
+		}
+	}()
+	actualOuts := make([][]*xsql.Tuple, 3)
+outterFor:
+	for {
+		select {
+		case err := <-errCh:
+			t.Fatalf("Error received: %v", err)
+		case out1 := <-output1:
+			actualOuts[0] = append(actualOuts[0], out1.(*xsql.Tuple))
+		case out2 := <-output2:
+			actualOuts[1] = append(actualOuts[1], out2.(*xsql.Tuple))
+		case out3 := <-output3:
+			actualOuts[2] = append(actualOuts[2], out3.(*xsql.Tuple))
+		case <-time.After(100 * time.Millisecond):
+			break outterFor
+		}
+	}
+	if !reflect.DeepEqual(actualOuts, outputs) {
+		t.Errorf("Expected: %v, actual: %v", outputs, actualOuts)
+	}
+}
+
+func TestCollection(t *testing.T) {
+	inputs := []*xsql.WindowTuples{
+		{
+			Content: []xsql.TupleRow{
+				&xsql.Tuple{
+					Message: map[string]interface{}{
+						"f1": "v1",
+						"f2": 45.6,
+					},
+				},
+				&xsql.Tuple{
+					Message: map[string]interface{}{
+						"f1": "v1",
+						"f2": 65.6,
+					},
+				},
+			},
+		}, {
+			Content: []xsql.TupleRow{
+				&xsql.Tuple{
+					Message: map[string]interface{}{
+						"f1": "v2",
+						"f2": 46.6,
+					},
+				},
+				&xsql.Tuple{
+					Message: map[string]interface{}{
+						"f1": "v2",
+						"f2": 26.6,
+					},
+				},
+				&xsql.Tuple{
+					Message: map[string]interface{}{
+						"f1": "v2",
+						"f2": 54.3,
+					},
+				},
+			},
+		}, {
+			Content: []xsql.TupleRow{
+				&xsql.Tuple{
+					Message: map[string]interface{}{
+						"f1": "v1",
+						"f2": 36.6,
+					},
+				},
+				&xsql.Tuple{
+					Message: map[string]interface{}{
+						"f1": "v1",
+						"f2": 76.6,
+					},
+				},
+				&xsql.Tuple{
+					Message: map[string]interface{}{
+						"f1": "v2",
+						"f2": 41.2,
+					},
+				},
+			},
+		},
+	}
+	outputs := [][]*xsql.WindowTuples{
+		{ // avg(f2) > 50
+			{
+				Content: []xsql.TupleRow{
+					&xsql.Tuple{
+						Message: map[string]interface{}{
+							"f1": "v1",
+							"f2": 45.6,
+						},
+					},
+					&xsql.Tuple{
+						Message: map[string]interface{}{
+							"f1": "v1",
+							"f2": 65.6,
+						},
+					},
+				},
+			}, {
+				Content: []xsql.TupleRow{
+					&xsql.Tuple{
+						Message: map[string]interface{}{
+							"f1": "v1",
+							"f2": 36.6,
+						},
+					},
+					&xsql.Tuple{
+						Message: map[string]interface{}{
+							"f1": "v1",
+							"f2": 76.6,
+						},
+					},
+					&xsql.Tuple{
+						Message: map[string]interface{}{
+							"f1": "v2",
+							"f2": 41.2,
+						},
+					},
+				},
+			},
+		},
+		{ // else
+			{
+				Content: []xsql.TupleRow{
+					&xsql.Tuple{
+						Message: map[string]interface{}{
+							"f1": "v2",
+							"f2": 46.6,
+						},
+					},
+					&xsql.Tuple{
+						Message: map[string]interface{}{
+							"f1": "v2",
+							"f2": 26.6,
+						},
+					},
+					&xsql.Tuple{
+						Message: map[string]interface{}{
+							"f1": "v2",
+							"f2": 54.3,
+						},
+					},
+				},
+			},
+		},
+	}
+
+	sn, err := NewSwitchNode("test", &SwitchConfig{
+		Cases: []ast.Expr{
+			&ast.BinaryExpr{
+				LHS: &ast.Call{
+					Name:     "avg",
+					FuncId:   0,
+					FuncType: ast.FuncTypeAgg,
+					Args:     []ast.Expr{&ast.FieldRef{Name: "f2"}},
+				},
+				OP:  ast.GT,
+				RHS: &ast.NumberLiteral{Val: 50},
+			},
+			&ast.BinaryExpr{
+				LHS: &ast.Call{
+					Name:     "avg",
+					FuncId:   0,
+					FuncType: ast.FuncTypeAgg,
+					Args:     []ast.Expr{&ast.FieldRef{Name: "f2"}},
+				},
+				OP:  ast.LTE,
+				RHS: &ast.NumberLiteral{Val: 50},
+			},
+		},
+		StopAtFirstMatch: true,
+	}, &api.RuleOption{})
+	if err != nil {
+		t.Fatalf("Failed to create switch node: %v", err)
+	}
+	contextLogger := conf.Log.WithField("rule", "TestSwitchWindow")
+	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
+	errCh := make(chan error)
+	output1 := make(chan interface{}, 10)
+	output2 := make(chan interface{}, 10)
+	sn.outputNodes[0].AddOutput(output1, "output1")
+	sn.outputNodes[1].AddOutput(output2, "output2")
+	go sn.Exec(ctx, errCh)
+	go func() {
+		for i, input := range inputs {
+			select {
+			case sn.input <- input:
+				fmt.Println("send input", i)
+			case <-time.After(time.Second):
+				t.Fatalf("Timeout sending input %d", i)
+			}
+		}
+	}()
+	actualOuts := make([][]*xsql.WindowTuples, 2)
+outterFor:
+	for {
+		select {
+		case err := <-errCh:
+			t.Fatalf("Error received: %v", err)
+		case out1 := <-output1:
+			actualOuts[0] = append(actualOuts[0], out1.(*xsql.WindowTuples))
+		case out2 := <-output2:
+			actualOuts[1] = append(actualOuts[1], out2.(*xsql.WindowTuples))
+		case <-time.After(100 * time.Millisecond):
+			break outterFor
+		}
+	}
+	if !reflect.DeepEqual(actualOuts, outputs) {
+		t.Errorf("Expected: %v, actual: %v", outputs, actualOuts)
+	}
+}

+ 101 - 14
internal/topo/planner/planner_graph.go

@@ -154,6 +154,16 @@ func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
 				}
 				op := Transform(oop, nodeName, rule.Options)
 				nodeMap[nodeName] = op
+			case "switch":
+				sconf, err := parseSwitch(gn.Props)
+				if err != nil {
+					return nil, fmt.Errorf("parse switch %s error: %v", nodeName, err)
+				}
+				op, err := node.NewSwitchNode(nodeName, sconf, rule.Options)
+				if err != nil {
+					return nil, fmt.Errorf("create switch %s error: %v", nodeName, err)
+				}
+				nodeMap[nodeName] = op
 			default: // TODO other node type
 				return nil, fmt.Errorf("unknown operator type %s", gn.NodeType)
 			}
@@ -169,19 +179,40 @@ func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
 		}
 	}
 
-	// reverse edges
-	reversedEdges := make(map[string][]string)
+	// reverse edges, value is a 2-dim array. Only switch node will have the second dim
+	reversedEdges := make(map[string][][]string)
 	rclone := make(map[string][]string)
 	for fromNode, toNodes := range ruleGraph.Topo.Edges {
 		if _, ok := ruleGraph.Nodes[fromNode]; !ok {
 			return nil, fmt.Errorf("node %s is not defined", fromNode)
 		}
-		for _, toNode := range toNodes {
-			if _, ok := ruleGraph.Nodes[toNode]; !ok {
-				return nil, fmt.Errorf("node %s is not defined", toNode)
+		for i, toNode := range toNodes {
+			switch tn := toNode.(type) {
+			case string:
+				if _, ok := ruleGraph.Nodes[tn]; !ok {
+					return nil, fmt.Errorf("node %s is not defined", tn)
+				}
+				if _, ok := reversedEdges[tn]; !ok {
+					reversedEdges[tn] = make([][]string, 1)
+				}
+				reversedEdges[tn][0] = append(reversedEdges[tn][0], fromNode)
+				rclone[tn] = append(rclone[tn], fromNode)
+			case []interface{}:
+				for _, tni := range tn {
+					tnn, ok := tni.(string)
+					if !ok { // never happen
+						return nil, fmt.Errorf("invalid edge toNode %v", toNode)
+					}
+					if _, ok := ruleGraph.Nodes[tnn]; !ok {
+						return nil, fmt.Errorf("node %s is not defined", tnn)
+					}
+					for len(reversedEdges[tnn]) <= i {
+						reversedEdges[tnn] = append(reversedEdges[tnn], []string{})
+					}
+					reversedEdges[tnn][i] = append(reversedEdges[tnn][i], fromNode)
+					rclone[tnn] = append(rclone[tnn], fromNode)
+				}
 			}
-			reversedEdges[toNode] = append(reversedEdges[toNode], fromNode)
-			rclone[toNode] = append(rclone[toNode], fromNode)
 		}
 	}
 	// sort the nodes by topological order
@@ -212,7 +243,11 @@ func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
 				return nil, fmt.Errorf("can't find the io definiton for node type %s", gn.NodeType)
 			}
 			dataInCondition := nodeIO[0]
-			innodes := reversedEdges[n]
+			indim := reversedEdges[n]
+			var innodes []string
+			for _, in := range indim {
+				innodes = append(innodes, in...)
+			}
 			if len(innodes) > 1 {
 				if dataInCondition.AllowMulti {
 					for _, innode := range innodes {
@@ -248,9 +283,24 @@ func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
 	}
 	// add the linkages
 	for nodeName, fromNodes := range reversedEdges {
-		inputs := make([]api.Emitter, len(fromNodes))
+		totalLen := 0
+		for _, fromNode := range fromNodes {
+			totalLen += len(fromNode)
+		}
+		inputs := make([]api.Emitter, 0, totalLen)
 		for i, fromNode := range fromNodes {
-			inputs[i] = nodeMap[fromNode].(api.Emitter)
+			for _, from := range fromNode {
+				if i == 0 {
+					inputs = append(inputs, nodeMap[from].(api.Emitter))
+				} else {
+					switch sn := nodeMap[from].(type) {
+					case *node.SwitchNode:
+						inputs = append(inputs, sn.GetEmitter(i))
+					default:
+						return nil, fmt.Errorf("node %s is not a switch node but have multiple output", from)
+					}
+				}
+			}
 		}
 		n := nodeMap[nodeName]
 		if n == nil {
@@ -265,15 +315,26 @@ func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
 	return tp, nil
 }
 
-func genNodesInOrder(toNodes []string, edges map[string][]string, reversedEdges map[string][]string, nodesInOrder []string, i int) int {
+func genNodesInOrder(toNodes []string, edges map[string][]interface{}, flatReversedEdges map[string][]string, nodesInOrder []string, i int) int {
 	for _, src := range toNodes {
-		if len(reversedEdges[src]) > 1 {
-			reversedEdges[src] = reversedEdges[src][1:]
+		if len(flatReversedEdges[src]) > 1 {
+			flatReversedEdges[src] = flatReversedEdges[src][1:]
 			continue
 		}
 		nodesInOrder[i] = src
 		i++
-		i = genNodesInOrder(edges[src], edges, reversedEdges, nodesInOrder, i)
+		tns := make([]string, 0, len(edges[src]))
+		for _, toNode := range edges[src] {
+			switch toNode.(type) {
+			case string:
+				tns = append(tns, toNode.(string))
+			case []interface{}:
+				for _, tni := range toNode.([]interface{}) {
+					tns = append(tns, tni.(string))
+				}
+			}
+		}
+		i = genNodesInOrder(tns, edges, flatReversedEdges, nodesInOrder, i)
 	}
 	return i
 }
@@ -503,3 +564,29 @@ func parseHaving(props map[string]interface{}) (*operator.HavingOp, error) {
 	}
 	return nil, fmt.Errorf("expr %v is not a condition", m)
 }
+
+func parseSwitch(props map[string]interface{}) (*node.SwitchConfig, error) {
+	n := &graph.Switch{}
+	err := cast.MapToStruct(props, n)
+	if err != nil {
+		return nil, err
+	}
+	if len(n.Cases) == 0 {
+		return nil, fmt.Errorf("switch node must have at least one case")
+	}
+	caseExprs := make([]ast.Expr, len(n.Cases))
+	for i, c := range n.Cases {
+		p := xsql.NewParser(strings.NewReader("where " + c))
+		if exp, err := p.ParseCondition(); err != nil {
+			return nil, fmt.Errorf("parse case %d error: %v", i, err)
+		} else {
+			if exp != nil {
+				caseExprs[i] = exp
+			}
+		}
+	}
+	return &node.SwitchConfig{
+		Cases:            caseExprs,
+		StopAtFirstMatch: n.StopAtFirstMatch,
+	}, nil
+}

+ 2 - 2
internal/topo/topo.go

@@ -52,7 +52,7 @@ func NewWithNameAndQos(name string, qos api.Qos, checkpointInterval int) (*Topo,
 		checkpointInterval: checkpointInterval,
 		topo: &api.PrintableTopo{
 			Sources: make([]string, 0),
-			Edges:   make(map[string][]string),
+			Edges:   make(map[string][]interface{}),
 		},
 	}
 	return tp, nil
@@ -110,7 +110,7 @@ func (s *Topo) addEdge(from api.TopNode, to api.TopNode, toType string) {
 	t := fmt.Sprintf("%s_%s", toType, to.GetName())
 	e, ok := s.topo.Edges[f]
 	if !ok {
-		e = make([]string, 0)
+		e = make([]interface{}, 0)
 	}
 	s.topo.Edges[f] = append(e, t)
 }

+ 2 - 2
internal/topo/topotest/rule_test.go

@@ -78,7 +78,7 @@ func TestSingleSQL(t *testing.T) {
 			},
 			T: &api.PrintableTopo{
 				Sources: []string{"source_demo"},
-				Edges: map[string][]string{
+				Edges: map[string][]interface{}{
 					"source_demo":  {"op_2_project"},
 					"op_2_project": {"sink_mockSink"},
 				},
@@ -373,7 +373,7 @@ func TestSingleSQL(t *testing.T) {
 			},
 			T: &api.PrintableTopo{
 				Sources: []string{"source_demo"},
-				Edges: map[string][]string{
+				Edges: map[string][]interface{}{
 					"source_demo":  {"op_2_project"},
 					"op_2_project": {"sink_mockSink"},
 				},

+ 1 - 1
internal/topo/topotest/window_rule_test.go

@@ -226,7 +226,7 @@ func TestWindow(t *testing.T) {
 			},
 			T: &api.PrintableTopo{
 				Sources: []string{"source_demo", "source_demo1"},
-				Edges: map[string][]string{
+				Edges: map[string][]interface{}{
 					"source_demo":  {"op_3_window"},
 					"source_demo1": {"op_3_window"},
 					"op_3_window":  {"op_4_join"},

+ 2 - 2
pkg/api/stream.go

@@ -137,8 +137,8 @@ type RestartStrategy struct {
 }
 
 type PrintableTopo struct {
-	Sources []string            `json:"sources"`
-	Edges   map[string][]string `json:"edges"`
+	Sources []string                 `json:"sources"`
+	Edges   map[string][]interface{} `json:"edges"`
 }
 
 type GraphNode struct {