Переглянути джерело

feat(topo): planner graph validation

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 роки тому
батько
коміт
05095889a5

+ 152 - 0
internal/topo/graph/io.go

@@ -0,0 +1,152 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package graph
+
+import "fmt"
+
+type IoInputType uint8
+type IoRowType uint8
+type IoCollectionType uint8
+
+const (
+	IOINPUT_TYPE_SAME       IoInputType = iota
+	IOINPUT_TYPE_ROW                    // 0b01
+	IOINPUT_TYPE_COLLECTION             // 0b10
+	IOINPUT_TYPE_ANY                    // 0b11
+)
+
+var inputTypes = map[IoInputType]string{
+	IOINPUT_TYPE_ROW:        "row",
+	IOINPUT_TYPE_COLLECTION: "collection",
+	IOINPUT_TYPE_ANY:        "any",
+	IOINPUT_TYPE_SAME:       "same",
+}
+
+const (
+	IOROW_TYPE_SAME   IoRowType = iota
+	IOROW_TYPE_SINGLE           // 0b01
+	IOROW_TYPE_MERGED           // 0b10
+	IOROW_TYPE_ANY              // 0b11
+)
+
+var rowTypes = map[IoRowType]string{
+	IOROW_TYPE_SINGLE: "single emitter row",
+	IOROW_TYPE_MERGED: "merged row",
+	IOROW_TYPE_ANY:    "any",
+	IOROW_TYPE_SAME:   "same",
+}
+
+const (
+	IOCOLLECTION_TYPE_SAME IoCollectionType = iota
+	IOCOLLECTION_TYPE_SINGLE
+	IOCOLLECTION_TYPE_GROUPED
+	IOCOLLECTION_TYPE_ANY
+)
+
+var collectionsTypes = map[IoCollectionType]string{
+	IOCOLLECTION_TYPE_SINGLE:  "non-grouped collection",
+	IOCOLLECTION_TYPE_GROUPED: "grouped collection",
+	IOCOLLECTION_TYPE_ANY:     "any",
+	IOCOLLECTION_TYPE_SAME:    "same",
+}
+
+// IOType is the type of input/output
+// all fields are default to any
+type IOType struct {
+	Type           IoInputType      `json:"type"`
+	RowType        IoRowType        `json:"rowType"`
+	CollectionType IoCollectionType `json:"collectionType"`
+	AllowMulti     bool             `json:"allowMulti"`
+}
+
+// NewIOType creates a new IOType
+func NewIOType() *IOType {
+	return &IOType{
+		Type:           IOINPUT_TYPE_ANY,
+		RowType:        IOROW_TYPE_ANY,
+		CollectionType: IOCOLLECTION_TYPE_ANY,
+	}
+}
+
+func Fit(value, condition *IOType) (bool, error) {
+	if value.Type&condition.Type == 0 {
+		return false, fmt.Errorf("input type mismatch, expect %s, got %s", inputTypes[condition.Type], inputTypes[value.Type])
+	}
+	if value.RowType&condition.RowType == 0 {
+		return false, fmt.Errorf("row type mismatch, expect %s, got %s", rowTypes[condition.RowType], rowTypes[value.RowType])
+	}
+	if value.CollectionType&condition.CollectionType == 0 {
+		return false, fmt.Errorf("collection type mismatch, expect %s, got %s", collectionsTypes[condition.CollectionType], collectionsTypes[value.CollectionType])
+	}
+	return true, nil
+}
+
+func MapOut(previous, origin *IOType) (result *IOType) {
+	result = NewIOType()
+	if origin.Type == IOINPUT_TYPE_SAME {
+		result.Type = previous.Type
+		result.RowType = previous.RowType
+		result.CollectionType = previous.CollectionType
+	} else {
+		result.Type = origin.Type
+		if origin.RowType == IOROW_TYPE_SAME {
+			result.RowType = previous.RowType
+		} else {
+			result.RowType = origin.RowType
+		}
+		if origin.CollectionType == IOCOLLECTION_TYPE_SAME {
+			result.CollectionType = previous.CollectionType
+		} else {
+			result.CollectionType = origin.CollectionType
+		}
+	}
+	return
+}
+
+// OpIO The io constraints for a node
+var OpIO = map[string][]*IOType{
+	"aggfunc": {
+		{Type: IOINPUT_TYPE_COLLECTION, RowType: IOROW_TYPE_ANY, CollectionType: IOCOLLECTION_TYPE_ANY},
+		{Type: IOINPUT_TYPE_COLLECTION, CollectionType: IOCOLLECTION_TYPE_GROUPED},
+	},
+	"filter": {
+		{Type: IOINPUT_TYPE_ANY, RowType: IOROW_TYPE_ANY, CollectionType: IOCOLLECTION_TYPE_ANY},
+		{Type: IOINPUT_TYPE_SAME},
+	},
+	"function": {
+		{Type: IOINPUT_TYPE_ANY, CollectionType: IOCOLLECTION_TYPE_SINGLE, RowType: IOROW_TYPE_ANY},
+		{Type: IOINPUT_TYPE_SAME},
+	},
+	"groupby": {
+		{Type: IOINPUT_TYPE_COLLECTION, CollectionType: IOCOLLECTION_TYPE_SINGLE, RowType: IOROW_TYPE_ANY},
+		{Type: IOINPUT_TYPE_COLLECTION, CollectionType: IOCOLLECTION_TYPE_GROUPED},
+	},
+	"join": {
+		{Type: IOINPUT_TYPE_COLLECTION, CollectionType: IOCOLLECTION_TYPE_SINGLE, RowType: IOROW_TYPE_SINGLE},
+		{Type: IOINPUT_TYPE_COLLECTION, CollectionType: IOCOLLECTION_TYPE_SINGLE, RowType: IOROW_TYPE_MERGED},
+	},
+	"orderby": {
+		{Type: IOINPUT_TYPE_COLLECTION, RowType: IOROW_TYPE_ANY, CollectionType: IOCOLLECTION_TYPE_ANY},
+		{Type: IOINPUT_TYPE_SAME},
+	},
+	"pick": {
+		{Type: IOINPUT_TYPE_ANY, RowType: IOROW_TYPE_ANY, CollectionType: IOCOLLECTION_TYPE_ANY},
+		{Type: IOINPUT_TYPE_SAME},
+	},
+	"window": {
+		{Type: IOINPUT_TYPE_ROW, RowType: IOROW_TYPE_ANY, CollectionType: IOCOLLECTION_TYPE_ANY, AllowMulti: true},
+		{Type: IOINPUT_TYPE_COLLECTION, CollectionType: IOCOLLECTION_TYPE_SINGLE, RowType: IOROW_TYPE_SINGLE},
+	},
+}

+ 1 - 0
internal/topo/graph/node.go

@@ -36,6 +36,7 @@ type Window struct {
 type Join struct {
 	From  string `json:"from"`
 	Joins []struct {
+		Name string `json:"name"`
 		Type string `json:"type"`
 		On   string `json:"on"`
 	}

+ 79 - 0
internal/topo/planner/planner_graph.go

@@ -47,6 +47,9 @@ func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
 	for nodeName, gn := range ruleGraph.Nodes {
 		switch gn.Type {
 		case "source":
+			if _, ok := ruleGraph.Topo.Edges[nodeName]; !ok {
+				return nil, fmt.Errorf("no edge defined for source node %s", nodeName)
+			}
 			sourceType, ok := gn.Props["source_type"]
 			if !ok {
 				sourceType = "stream"
@@ -74,6 +77,7 @@ func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
 				tp.AddSrc(srcNode)
 			case "table":
 				// TODO add table
+				return nil, fmt.Errorf("table source is not supported yet")
 			default:
 				return nil, fmt.Errorf("unknown source type %s", st)
 			}
@@ -85,6 +89,9 @@ func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
 			nodeMap[nodeName] = node.NewSinkNode(nodeName, gn.NodeType, gn.Props)
 			sinks[nodeName] = true
 		case "operator":
+			if _, ok := ruleGraph.Topo.Edges[nodeName]; !ok {
+				return nil, fmt.Errorf("no edge defined for operator node %s", nodeName)
+			}
 			switch strings.ToLower(gn.NodeType) {
 			case "function":
 				fop, err := parseFunc(gn.Props)
@@ -153,17 +160,76 @@ func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
 			return nil, fmt.Errorf("unknown node type %s", gn.Type)
 		}
 	}
+
 	// validate source node
 	for _, nodeName := range ruleGraph.Topo.Sources {
 		if _, ok := sources[nodeName]; !ok {
 			return nil, fmt.Errorf("source %s is not a source type node", nodeName)
 		}
 	}
+
 	// reverse edges
 	reversedEdges := make(map[string][]string)
+	rclone := make(map[string][]string)
 	for fromNode, toNodes := range ruleGraph.Topo.Edges {
+		if _, ok := ruleGraph.Nodes[fromNode]; !ok {
+			return nil, fmt.Errorf("node %s is not defined", fromNode)
+		}
 		for _, toNode := range toNodes {
+			if _, ok := ruleGraph.Nodes[toNode]; !ok {
+				return nil, fmt.Errorf("node %s is not defined", toNode)
+			}
 			reversedEdges[toNode] = append(reversedEdges[toNode], fromNode)
+			rclone[toNode] = append(rclone[toNode], fromNode)
+		}
+	}
+	// sort the nodes by topological order
+	nodesInOrder := make([]string, len(ruleGraph.Nodes))
+	i := 0
+	genNodesInOrder(ruleGraph.Topo.Sources, ruleGraph.Topo.Edges, rclone, nodesInOrder, i)
+
+	// validate the typo
+	// the map is to record the output for each node
+	dataFlow := make(map[string]*graph.IOType)
+	for _, n := range nodesInOrder {
+		gn := ruleGraph.Nodes[n]
+		if gn.Type == "source" {
+			dataFlow[n] = &graph.IOType{
+				Type:           graph.IOINPUT_TYPE_ROW,
+				RowType:        graph.IOROW_TYPE_SINGLE,
+				CollectionType: graph.IOCOLLECTION_TYPE_ANY,
+				AllowMulti:     false,
+			}
+		} else if gn.Type == "sink" {
+			continue
+		} else {
+			nodeIO, ok := graph.OpIO[strings.ToLower(gn.NodeType)]
+			if !ok {
+				return nil, fmt.Errorf("can't find the io definiton for node type %s", gn.NodeType)
+			}
+			dataInCondition := nodeIO[0]
+			innodes := reversedEdges[n]
+			if len(innodes) > 1 {
+				if dataInCondition.AllowMulti {
+					for _, innode := range innodes {
+						_, err = graph.Fit(dataFlow[innode], dataInCondition)
+						if err != nil {
+							return nil, fmt.Errorf("node %s output does not match node %s input: %v", innode, n, err)
+						}
+					}
+				} else {
+					return nil, fmt.Errorf("operator %s of type %s does not allow multiple inputs", n, gn.NodeType)
+				}
+			} else if len(innodes) == 1 {
+				_, err := graph.Fit(dataFlow[innodes[0]], dataInCondition)
+				if err != nil {
+					return nil, fmt.Errorf("node %s output does not match node %s input: %v", innodes[0], n, err)
+				}
+			} else {
+				return nil, fmt.Errorf("operator %s of type %s has no input", n, gn.NodeType)
+			}
+			out := nodeIO[1]
+			dataFlow[n] = graph.MapOut(dataFlow[innodes[0]], out)
 		}
 	}
 	// add the linkages
@@ -185,6 +251,19 @@ func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
 	return tp, nil
 }
 
+func genNodesInOrder(toNodes []string, edges map[string][]string, reversedEdges map[string][]string, nodesInOrder []string, i int) int {
+	for _, src := range toNodes {
+		if len(reversedEdges[src]) > 1 {
+			reversedEdges[src] = reversedEdges[src][1:]
+			continue
+		}
+		nodesInOrder[i] = src
+		i++
+		i = genNodesInOrder(edges[src], edges, reversedEdges, nodesInOrder, i)
+	}
+	return i
+}
+
 func parseOrderBy(props map[string]interface{}) (*operator.OrderOp, error) {
 	n := &graph.Orderby{}
 	err := cast.MapToStruct(props, n)

+ 523 - 0
internal/topo/planner/planner_graph_test.go

@@ -0,0 +1,523 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package planner
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/testx"
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"reflect"
+	"testing"
+)
+
+func TestPlannerGraphValidate(t *testing.T) {
+	var tests = []struct {
+		graph string
+		err   string
+	}{
+		{
+			graph: `{
+  "nodes": {
+    "abc": {
+      "type": "source",
+      "nodeType": "mqtt",
+      "props": {
+        "datasource": "demo"
+      }
+    },
+    "myfilter": {
+      "type": "operator",
+      "nodeType": "filter",
+      "props": {
+        "expr": "temperature > 20"
+      }
+    },
+    "logfunc": {
+      "type": "operator",
+      "nodeType": "function",
+      "props": {
+        "expr": "log(temperature) as log_temperature"
+      }
+    },
+    "sinfunc": {
+      "type": "operator",
+      "nodeType": "function",
+      "props": {
+        "expr": "sin(temperature) as sin_temperature"
+      }
+    },
+    "pick": {
+      "type": "operator",
+      "nodeType": "pick",
+      "props": {
+        "fields": [
+          "log_temperature",
+          "humidity"
+        ]
+      }
+    },
+    "mqttpv": {
+      "type": "sink",
+      "nodeType": "mqtt",
+      "props": {
+        "server": "tcp://syno.home:1883",
+        "topic": "result",
+        "sendSingle": true
+      }
+    },
+    "mqtt2": {
+      "type": "sink",
+      "nodeType": "mqtt",
+      "props": {
+        "server": "tcp://syno.home:1883",
+        "topic": "result2",
+        "sendSingle": true
+      }
+    }
+  },
+  "topo": {
+    "sources": [
+      "abc"
+    ],
+    "edges": {
+      "abc": [
+        "myfilter",
+        "sinfunc"
+      ],
+      "myfilter": [
+        "logfunc"
+      ],
+      "logfunc": [
+        "pick"
+      ],
+      "pick": [
+        "mqttpv"
+      ],
+      "sinfunc": [
+        "mqtt2"
+      ]
+    }
+  }
+}`,
+			err: "",
+		}, {
+			graph: `{
+  "nodes": {
+    "abc": {
+      "type": "source",
+      "nodeType": "mqtt",
+      "props": {
+        "datasource": "demo"
+      }
+    },
+    "mqtt2": {
+      "type": "sink",
+      "nodeType": "mqtt",
+      "props": {
+        "server": "tcp://syno.home:1883",
+        "topic": "result2",
+        "sendSingle": true
+      }
+    }
+  },
+  "topo": {
+    "sources": [
+      "abc"
+    ],
+    "edges": {
+      "abc": [
+        "myfilter"
+      ]
+    }
+  }
+}`,
+			err: "node myfilter is not defined",
+		}, {
+			graph: `{
+  "nodes": {
+    "abc": {
+      "type": "source",
+      "nodeType": "mqtt",
+      "props": {
+        "datasource": "demo"
+      }
+    },
+    "mqtt2": {
+      "type": "sink",
+      "nodeType": "mqtt",
+      "props": {
+        "server": "tcp://syno.home:1883",
+        "topic": "result2",
+        "sendSingle": true
+      }
+    }
+  },
+  "topo": {
+    "sources": [
+      "abc"
+    ],
+    "edges": {
+    }
+  }
+}`,
+			err: "no edge defined for source node abc",
+		}, {
+			graph: `{
+  "nodes": {
+    "abc": {
+      "type": "source",
+      "nodeType": "mqtt",
+      "props": {
+        "datasource": "demo"
+      }
+    },
+    "aggfunc": {
+      "type": "operator",
+      "nodeType": "aggfunc",
+      "props": {
+        "expr": "avg(temperature) as avg_temperature"
+      }
+    },
+    "mqtt2": {
+      "type": "sink",
+      "nodeType": "mqtt",
+      "props": {
+        "server": "tcp://syno.home:1883",
+        "topic": "result2",
+        "sendSingle": true
+      }
+    }
+  },
+  "topo": {
+    "sources": [
+      "abc"
+    ],
+    "edges": {
+      "abc": ["aggfunc"],
+      "aggfunc": ["mqtt2"]
+    }
+  }
+}`,
+			err: "node abc output does not match node aggfunc input: input type mismatch, expect collection, got row",
+		}, {
+			graph: `{
+  "nodes": {
+    "abc": {
+      "type": "source",
+      "nodeType": "mqtt",
+      "props": {
+        "datasource": "demo"
+      }
+    },
+    "abc2": {
+      "type": "source",
+      "nodeType": "mqtt",
+      "props": {
+        "datasource": "demo1"
+      }
+    },
+    "joinop": {
+      "type": "operator",
+      "nodeType": "join",
+      "props": {
+        "from": "abc",
+        "joins": [
+          {
+            "name": "abc2",
+            "type": "inner",
+            "on": "abc.id = abc2.id"
+          }
+        ]
+      }
+    },
+    "mqtt2": {
+      "type": "sink",
+      "nodeType": "mqtt",
+      "props": {
+        "server": "tcp://syno.home:1883",
+        "topic": "result2",
+        "sendSingle": true
+      }
+    }
+  },
+  "topo": {
+    "sources": [
+      "abc","abc2"
+    ],
+    "edges": {
+      "abc": ["joinop"],
+      "abc2": ["joinop"],
+      "joinop": ["mqtt2"]
+    }
+  }
+}`,
+			err: "operator joinop of type join does not allow multiple inputs",
+		}, {
+			graph: `{
+  "nodes": {
+    "abc": {
+      "type": "source",
+      "nodeType": "mqtt",
+      "props": {
+        "datasource": "demo"
+      }
+    },
+    "abc2": {
+      "type": "source",
+      "nodeType": "mqtt",
+      "props": {
+        "datasource": "demo1"
+      }
+    },
+    "windowop": {
+      "type": "operator",
+      "nodeType": "window",
+      "props": {
+        "type": "hoppingwindow",
+        "unit": "ss",
+        "size": 10,
+        "interval": 5
+      }
+    },
+    "joinop": {
+      "type": "operator",
+      "nodeType": "join",
+      "props": {
+        "from": "abc",
+        "joins": [
+          {
+            "name": "abc2",
+            "type": "inner",
+            "on": "abc.id = abc2.id"
+          }
+        ]
+      }
+    },
+    "groupop": {
+      "type": "operator",
+      "nodeType": "groupby",
+      "props": {
+        "dimensions": ["id","userId"]
+      }
+    },
+    "mqtt2": {
+      "type": "sink",
+      "nodeType": "mqtt",
+      "props": {
+        "server": "tcp://syno.home:1883",
+        "topic": "result2",
+        "sendSingle": true
+      }
+    }
+  },
+  "topo": {
+    "sources": [
+      "abc","abc2"
+    ],
+    "edges": {
+      "abc": ["windowop"],
+      "abc2": ["windowop"],
+      "windowop": ["joinop"],
+      "joinop": ["groupop"],
+      "groupop": ["mqtt2"]
+    }
+  }
+}`,
+			err: "",
+		}, {
+			graph: `{
+  "nodes": {
+    "abc": {
+      "type": "source",
+      "nodeType": "mqtt",
+      "props": {
+        "datasource": "demo"
+      }
+    },
+    "abc2": {
+      "type": "source",
+      "nodeType": "mqtt",
+      "props": {
+        "datasource": "demo1"
+      }
+    },
+    "windowop": {
+      "type": "operator",
+      "nodeType": "window",
+      "props": {
+        "type": "hoppingwindow",
+        "unit": "ss",
+        "size": 10,
+        "interval": 5
+      }
+    },
+    "joinop": {
+      "type": "operator",
+      "nodeType": "join",
+      "props": {
+        "from": "abc",
+        "joins": [
+          {
+            "name": "abc2",
+            "type": "inner",
+            "on": "abc.id = abc2.id"
+          }
+        ]
+      }
+    },
+    "groupop": {
+      "type": "operator",
+      "nodeType": "groupby",
+      "props": {
+        "dimensions": ["id","userId"]
+      }
+    },
+    "mqtt2": {
+      "type": "sink",
+      "nodeType": "mqtt",
+      "props": {
+        "server": "tcp://syno.home:1883",
+        "topic": "result2",
+        "sendSingle": true
+      }
+    }
+  },
+  "topo": {
+    "sources": [
+      "abc","abc2"
+    ],
+    "edges": {
+      "abc": ["windowop"],
+      "abc2": ["windowop"],
+      "windowop": ["groupop"],
+      "joinop": ["mqtt2"],
+      "groupop": ["joinop"]
+    }
+  }
+}`,
+			err: "node groupop output does not match node joinop input: collection type mismatch, expect non-grouped collection, got grouped collection",
+		}, {
+			graph: `{
+  "nodes": {
+    "abc": {
+      "type": "source",
+      "nodeType": "mqtt",
+      "props": {
+        "datasource": "demo"
+      }
+    },
+    "abc2": {
+      "type": "source",
+      "nodeType": "mqtt",
+      "props": {
+        "datasource": "demo1"
+      }
+    },
+    "windowop": {
+      "type": "operator",
+      "nodeType": "window",
+      "props": {
+        "type": "hoppingwindow",
+        "unit": "ss",
+        "size": 10,
+        "interval": 5
+      }
+    },
+    "joinop": {
+      "type": "operator",
+      "nodeType": "join",
+      "props": {
+        "from": "abc",
+        "joins": [
+          {
+            "name": "abc2",
+            "type": "inner",
+            "on": "abc.id = abc2.id"
+          }
+        ]
+      }
+    },
+    "groupop": {
+      "type": "operator",
+      "nodeType": "groupby",
+      "props": {
+        "dimensions": ["id","userId"]
+      }
+    },
+    "aggfunc": {
+      "type": "operator",
+      "nodeType": "aggFunc",
+      "props": {
+        "expr": "avg(temperature) as avg_temperature"
+      }
+    },
+    "mqtt2": {
+      "type": "sink",
+      "nodeType": "mqtt",
+      "props": {
+        "server": "tcp://syno.home:1883",
+        "topic": "result2",
+        "sendSingle": true
+      }
+    }
+  },
+  "topo": {
+    "sources": [
+      "abc","abc2"
+    ],
+    "edges": {
+      "abc": ["windowop"],
+      "abc2": ["windowop"],
+      "windowop": ["groupop"],
+      "joinop": ["mqtt2"],
+      "groupop": ["aggfunc"],
+      "aggfunc": ["joinop"]
+    }
+  }
+}`,
+			err: "node aggfunc output does not match node joinop input: collection type mismatch, expect non-grouped collection, got grouped collection",
+		},
+	}
+
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	for i, tt := range tests {
+		rg := &api.RuleGraph{}
+		err := json.Unmarshal([]byte(tt.graph), rg)
+		if err != nil {
+			t.Error(err)
+			continue
+		}
+		_, err = PlanByGraph(&api.Rule{
+			Triggered: false,
+			Id:        fmt.Sprintf("rule%d", i),
+			Name:      fmt.Sprintf("rule%d", i),
+			Graph:     rg,
+			Options: &api.RuleOption{
+				IsEventTime:        false,
+				LateTol:            1000,
+				Concurrency:        1,
+				BufferLength:       1024,
+				SendMetaToSink:     false,
+				SendError:          true,
+				Qos:                api.AtMostOnce,
+				CheckpointInterval: 300000,
+			},
+		})
+		if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
+			t.Errorf("%d: error mismatch:\n  exp=%s\n  got=%s\n\n", i, tt.err, err)
+		}
+	}
+}