|
@@ -61,99 +61,19 @@ func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
|
|
|
if !ok {
|
|
|
return nil, fmt.Errorf("source node %s not defined", srcName)
|
|
|
}
|
|
|
- nodeName := srcName
|
|
|
- if _, ok := ruleGraph.Topo.Edges[nodeName]; !ok {
|
|
|
- return nil, fmt.Errorf("no edge defined for source node %s", nodeName)
|
|
|
+ if _, ok := ruleGraph.Topo.Edges[srcName]; !ok {
|
|
|
+ return nil, fmt.Errorf("no edge defined for source node %s", srcName)
|
|
|
}
|
|
|
- sourceMeta := &api.SourceMeta{
|
|
|
- SourceType: "stream",
|
|
|
- }
|
|
|
- err = cast.MapToStruct(gn.Props, sourceMeta)
|
|
|
+ var srcNode *node.SourceNode
|
|
|
+ srcNode, scanTableEmitters, err = parseSource(srcName, gn, rule, store, lookupTableChildren, streamEmitters)
|
|
|
if err != nil {
|
|
|
- return nil, err
|
|
|
+ return nil, fmt.Errorf("parse source %s with %v error: %w", srcName, gn.Props, err)
|
|
|
}
|
|
|
- if sourceMeta.SourceType != "stream" && sourceMeta.SourceType != "table" {
|
|
|
- return nil, fmt.Errorf("source type %s not supported", sourceMeta.SourceType)
|
|
|
+ if srcNode != nil {
|
|
|
+ nodeMap[srcName] = srcNode
|
|
|
+ tp.AddSrc(srcNode)
|
|
|
}
|
|
|
- // If source name is specified, find the created stream/table from store
|
|
|
- if sourceMeta.SourceName != "" {
|
|
|
- if store == nil {
|
|
|
- store, err = store2.GetKV("stream")
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- }
|
|
|
- streamStmt, e := xsql.GetDataSource(store, sourceMeta.SourceName)
|
|
|
- if e != nil {
|
|
|
- return nil, fmt.Errorf("fail to get stream %s, please check if stream is created", sourceMeta.SourceName)
|
|
|
- }
|
|
|
- if streamStmt.StreamType == ast.TypeStream && sourceMeta.SourceType == "table" {
|
|
|
- return nil, fmt.Errorf("stream %s is not a table", sourceMeta.SourceName)
|
|
|
- } else if streamStmt.StreamType == ast.TypeTable && sourceMeta.SourceType == "stream" {
|
|
|
- return nil, fmt.Errorf("table %s is not a stream", sourceMeta.SourceName)
|
|
|
- }
|
|
|
- st := streamStmt.Options.TYPE
|
|
|
- if st == "" {
|
|
|
- st = "mqtt"
|
|
|
- }
|
|
|
- if st != gn.NodeType {
|
|
|
- return nil, fmt.Errorf("source type %s does not match the stream type %s", gn.NodeType, st)
|
|
|
- }
|
|
|
- sInfo, err := convertStreamInfo(streamStmt)
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- if sInfo.stmt.StreamType == ast.TypeTable && sInfo.stmt.Options.KIND == ast.StreamKindLookup {
|
|
|
- lookupTableChildren[string(sInfo.stmt.Name)] = sInfo.stmt.Options
|
|
|
- } else {
|
|
|
- // Use the plan to calculate the schema and other meta info
|
|
|
- p := DataSourcePlan{
|
|
|
- name: sInfo.stmt.Name,
|
|
|
- streamStmt: sInfo.stmt,
|
|
|
- streamFields: sInfo.schema.ToJsonSchema(),
|
|
|
- isSchemaless: sInfo.schema == nil,
|
|
|
- iet: rule.Options.IsEventTime,
|
|
|
- allMeta: rule.Options.SendMetaToSink,
|
|
|
- }.Init()
|
|
|
-
|
|
|
- if sInfo.stmt.StreamType == ast.TypeStream {
|
|
|
- err = p.PruneColumns(nil)
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- srcNode, e := transformSourceNode(p, nil, rule.Options)
|
|
|
- if e != nil {
|
|
|
- return nil, e
|
|
|
- }
|
|
|
- nodeMap[nodeName] = srcNode
|
|
|
- tp.AddSrc(srcNode)
|
|
|
- streamEmitters[string(sInfo.stmt.Name)] = struct{}{}
|
|
|
- } else {
|
|
|
- scanTableEmitters = append(scanTableEmitters, string(sInfo.stmt.Name))
|
|
|
- }
|
|
|
- }
|
|
|
- } else {
|
|
|
- sourceOption := &ast.Options{}
|
|
|
- err = cast.MapToStruct(gn.Props, sourceOption)
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- sourceOption.TYPE = gn.NodeType
|
|
|
- switch sourceMeta.SourceType {
|
|
|
- case "stream":
|
|
|
- pp, err := operator.NewPreprocessor(true, nil, true, nil, rule.Options.IsEventTime, sourceOption.TIMESTAMP, sourceOption.TIMESTAMP_FORMAT, strings.EqualFold(sourceOption.FORMAT, message.FormatBinary), sourceOption.STRICT_VALIDATION)
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- srcNode := node.NewSourceNode(nodeName, ast.TypeStream, pp, sourceOption, rule.Options.SendError)
|
|
|
- nodeMap[nodeName] = srcNode
|
|
|
- tp.AddSrc(srcNode)
|
|
|
- streamEmitters[nodeName] = struct{}{}
|
|
|
- case "table":
|
|
|
- return nil, fmt.Errorf("anonymouse table source is not supported, please create it prior to the rule")
|
|
|
- }
|
|
|
- }
|
|
|
- sources[nodeName] = true
|
|
|
+ sources[srcName] = true
|
|
|
}
|
|
|
for nodeName, gn := range ruleGraph.Nodes {
|
|
|
switch gn.Type {
|
|
@@ -174,14 +94,14 @@ func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
|
|
|
case "function":
|
|
|
fop, err := parseFunc(gn.Props)
|
|
|
if err != nil {
|
|
|
- return nil, err
|
|
|
+ return nil, fmt.Errorf("parse function %s with %v error: %w", nodeName, gn.Props, err)
|
|
|
}
|
|
|
op := Transform(fop, nodeName, rule.Options)
|
|
|
nodeMap[nodeName] = op
|
|
|
case "aggfunc":
|
|
|
fop, err := parseFunc(gn.Props)
|
|
|
if err != nil {
|
|
|
- return nil, err
|
|
|
+ return nil, fmt.Errorf("parse aggfunc %s with %v error: %w", nodeName, gn.Props, err)
|
|
|
}
|
|
|
fop.IsAgg = true
|
|
|
op := Transform(fop, nodeName, rule.Options)
|
|
@@ -189,35 +109,35 @@ func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
|
|
|
case "filter":
|
|
|
fop, err := parseFilter(gn.Props)
|
|
|
if err != nil {
|
|
|
- return nil, err
|
|
|
+ return nil, fmt.Errorf("parse filter %s with %v error: %w", nodeName, gn.Props, err)
|
|
|
}
|
|
|
op := Transform(fop, nodeName, rule.Options)
|
|
|
nodeMap[nodeName] = op
|
|
|
case "pick":
|
|
|
pop, err := parsePick(gn.Props)
|
|
|
if err != nil {
|
|
|
- return nil, err
|
|
|
+ return nil, fmt.Errorf("parse pick %s with %v error: %w", nodeName, gn.Props, err)
|
|
|
}
|
|
|
op := Transform(pop, nodeName, rule.Options)
|
|
|
nodeMap[nodeName] = op
|
|
|
case "window":
|
|
|
wconf, err := parseWindow(gn.Props)
|
|
|
if err != nil {
|
|
|
- return nil, err
|
|
|
+ return nil, fmt.Errorf("parse window conf %s with %v error: %w", nodeName, gn.Props, err)
|
|
|
}
|
|
|
op, err := node.NewWindowOp(nodeName, *wconf, ruleGraph.Topo.Sources, rule.Options)
|
|
|
if err != nil {
|
|
|
- return nil, err
|
|
|
+ return nil, fmt.Errorf("parse window %s with %v error: %w", nodeName, gn.Props, err)
|
|
|
}
|
|
|
nodeMap[nodeName] = op
|
|
|
case "join":
|
|
|
stmt, err := parseJoinAst(gn.Props)
|
|
|
if err != nil {
|
|
|
- return nil, err
|
|
|
+ return nil, fmt.Errorf("parse join %s with %v error: %w", nodeName, gn.Props, err)
|
|
|
}
|
|
|
fromNode := stmt.Sources[0].(*ast.Table)
|
|
|
if _, ok := streamEmitters[fromNode.Name]; !ok {
|
|
|
- return nil, fmt.Errorf("join source %s is not a stream", fromNode.Name)
|
|
|
+ return nil, fmt.Errorf("parse join %s with %v error: join source %s is not a stream", nodeName, gn.Props, fromNode.Name)
|
|
|
}
|
|
|
hasLookup := false
|
|
|
if stmt.Joins != nil {
|
|
@@ -225,7 +145,7 @@ func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
|
|
|
var joins []ast.Join
|
|
|
for _, join := range stmt.Joins {
|
|
|
if hasLookup {
|
|
|
- return nil, fmt.Errorf("parse %v error: only support to join one lookup table with one stream", gn)
|
|
|
+ return nil, fmt.Errorf("parse join %s with %v error: only support to join one lookup table with one stream", nodeName, gn.Props)
|
|
|
}
|
|
|
if streamOpt, ok := lookupTableChildren[join.Name]; ok {
|
|
|
hasLookup = true
|
|
@@ -234,11 +154,11 @@ func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
|
|
|
options: streamOpt,
|
|
|
}
|
|
|
if !lookupPlan.validateAndExtractCondition() {
|
|
|
- return nil, fmt.Errorf("join condition %s is invalid, at least one equi-join predicate is required", join.Expr)
|
|
|
+ return nil, fmt.Errorf("parse join %s with %v error: join condition %s is invalid, at least one equi-join predicate is required", nodeName, gn.Props, join.Expr)
|
|
|
}
|
|
|
op, err := node.NewLookupNode(lookupPlan.joinExpr.Name, lookupPlan.fields, lookupPlan.keys, lookupPlan.joinExpr.JoinType, lookupPlan.valvars, lookupPlan.options, rule.Options)
|
|
|
if err != nil {
|
|
|
- return nil, fmt.Errorf("parse %v error: fail to create lookup node", gn)
|
|
|
+ return nil, fmt.Errorf("parse join %s with %v error: fail to create lookup node", nodeName, gn.Props)
|
|
|
}
|
|
|
nodeMap[nodeName] = op
|
|
|
} else {
|
|
@@ -250,9 +170,8 @@ func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
|
|
|
// Not all joins are lookup joins, so we need to create a join plan for the remaining joins
|
|
|
if len(stmt.Joins) > 0 && !hasLookup {
|
|
|
if len(scanTableEmitters) > 0 {
|
|
|
- return nil, fmt.Errorf("parse %v error: do not support scan table %s yet", gn, scanTableEmitters)
|
|
|
+ return nil, fmt.Errorf("parse join %s with %v error: do not support scan table %s yet", nodeName, gn.Props, scanTableEmitters)
|
|
|
}
|
|
|
- // TODO extract on filter
|
|
|
jop := &operator.JoinOp{Joins: stmt.Joins, From: fromNode}
|
|
|
op := Transform(jop, nodeName, rule.Options)
|
|
|
nodeMap[nodeName] = op
|
|
@@ -261,25 +180,25 @@ func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
|
|
|
case "groupby":
|
|
|
gop, err := parseGroupBy(gn.Props)
|
|
|
if err != nil {
|
|
|
- return nil, err
|
|
|
+ return nil, fmt.Errorf("parse groupby %s with %v error: %w", nodeName, gn.Props, err)
|
|
|
}
|
|
|
op := Transform(gop, nodeName, rule.Options)
|
|
|
nodeMap[nodeName] = op
|
|
|
case "orderby":
|
|
|
oop, err := parseOrderBy(gn.Props)
|
|
|
if err != nil {
|
|
|
- return nil, err
|
|
|
+ return nil, fmt.Errorf("parse orderby %s with %v error: %w", nodeName, gn.Props, err)
|
|
|
}
|
|
|
op := Transform(oop, nodeName, rule.Options)
|
|
|
nodeMap[nodeName] = op
|
|
|
case "switch":
|
|
|
sconf, err := parseSwitch(gn.Props)
|
|
|
if err != nil {
|
|
|
- return nil, fmt.Errorf("parse switch %s error: %v", nodeName, err)
|
|
|
+ return nil, fmt.Errorf("parse switch %s with %v error: %w", nodeName, gn.Props, err)
|
|
|
}
|
|
|
op, err := node.NewSwitchNode(nodeName, sconf, rule.Options)
|
|
|
if err != nil {
|
|
|
- return nil, fmt.Errorf("create switch %s error: %v", nodeName, err)
|
|
|
+ return nil, fmt.Errorf("create switch %s with %v error: %w", nodeName, gn.Props, err)
|
|
|
}
|
|
|
nodeMap[nodeName] = op
|
|
|
default:
|
|
@@ -479,6 +398,99 @@ func genNodesInOrder(toNodes []string, edges map[string][]interface{}, flatRever
|
|
|
return i
|
|
|
}
|
|
|
|
|
|
+func parseSource(nodeName string, gn *api.GraphNode, rule *api.Rule, store kv.KeyValue, lookupTableChildren map[string]*ast.Options, streamEmitters map[string]struct{}) (*node.SourceNode, []string, error) {
|
|
|
+ scanTableEmitters := make([]string, 0)
|
|
|
+ sourceMeta := &api.SourceMeta{
|
|
|
+ SourceType: "stream",
|
|
|
+ }
|
|
|
+ err := cast.MapToStruct(gn.Props, sourceMeta)
|
|
|
+ if err != nil {
|
|
|
+ return nil, scanTableEmitters, err
|
|
|
+ }
|
|
|
+ if sourceMeta.SourceType != "stream" && sourceMeta.SourceType != "table" {
|
|
|
+ return nil, scanTableEmitters, fmt.Errorf("source type %s not supported", sourceMeta.SourceType)
|
|
|
+ }
|
|
|
+ // If source name is specified, find the created stream/table from store
|
|
|
+ if sourceMeta.SourceName != "" {
|
|
|
+ if store == nil {
|
|
|
+ store, err = store2.GetKV("stream")
|
|
|
+ if err != nil {
|
|
|
+ return nil, scanTableEmitters, err
|
|
|
+ }
|
|
|
+ }
|
|
|
+ streamStmt, e := xsql.GetDataSource(store, sourceMeta.SourceName)
|
|
|
+ if e != nil {
|
|
|
+ return nil, scanTableEmitters, fmt.Errorf("fail to get stream %s, please check if stream is created", sourceMeta.SourceName)
|
|
|
+ }
|
|
|
+ if streamStmt.StreamType == ast.TypeStream && sourceMeta.SourceType == "table" {
|
|
|
+ return nil, scanTableEmitters, fmt.Errorf("stream %s is not a table", sourceMeta.SourceName)
|
|
|
+ } else if streamStmt.StreamType == ast.TypeTable && sourceMeta.SourceType == "stream" {
|
|
|
+ return nil, scanTableEmitters, fmt.Errorf("table %s is not a stream", sourceMeta.SourceName)
|
|
|
+ }
|
|
|
+ st := streamStmt.Options.TYPE
|
|
|
+ if st == "" {
|
|
|
+ st = "mqtt"
|
|
|
+ }
|
|
|
+ if st != gn.NodeType {
|
|
|
+ return nil, scanTableEmitters, fmt.Errorf("source type %s does not match the stream type %s", gn.NodeType, st)
|
|
|
+ }
|
|
|
+ sInfo, err := convertStreamInfo(streamStmt)
|
|
|
+ if err != nil {
|
|
|
+ return nil, scanTableEmitters, err
|
|
|
+ }
|
|
|
+ if sInfo.stmt.StreamType == ast.TypeTable && sInfo.stmt.Options.KIND == ast.StreamKindLookup {
|
|
|
+ lookupTableChildren[string(sInfo.stmt.Name)] = sInfo.stmt.Options
|
|
|
+ return nil, scanTableEmitters, nil
|
|
|
+ } else {
|
|
|
+ // Use the plan to calculate the schema and other meta info
|
|
|
+ p := DataSourcePlan{
|
|
|
+ name: sInfo.stmt.Name,
|
|
|
+ streamStmt: sInfo.stmt,
|
|
|
+ streamFields: sInfo.schema.ToJsonSchema(),
|
|
|
+ isSchemaless: sInfo.schema == nil,
|
|
|
+ iet: rule.Options.IsEventTime,
|
|
|
+ allMeta: rule.Options.SendMetaToSink,
|
|
|
+ }.Init()
|
|
|
+
|
|
|
+ if sInfo.stmt.StreamType == ast.TypeStream {
|
|
|
+ err = p.PruneColumns(nil)
|
|
|
+ if err != nil {
|
|
|
+ return nil, scanTableEmitters, err
|
|
|
+ }
|
|
|
+ srcNode, e := transformSourceNode(p, nil, rule.Options)
|
|
|
+ if e != nil {
|
|
|
+ return nil, scanTableEmitters, e
|
|
|
+ }
|
|
|
+ streamEmitters[string(sInfo.stmt.Name)] = struct{}{}
|
|
|
+ return srcNode, scanTableEmitters, nil
|
|
|
+ } else {
|
|
|
+ scanTableEmitters = append(scanTableEmitters, string(sInfo.stmt.Name))
|
|
|
+ return nil, scanTableEmitters, nil
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ sourceOption := &ast.Options{}
|
|
|
+ err = cast.MapToStruct(gn.Props, sourceOption)
|
|
|
+ if err != nil {
|
|
|
+ return nil, scanTableEmitters, err
|
|
|
+ }
|
|
|
+ sourceOption.TYPE = gn.NodeType
|
|
|
+ switch sourceMeta.SourceType {
|
|
|
+ case "stream":
|
|
|
+ pp, err := operator.NewPreprocessor(true, nil, true, nil, rule.Options.IsEventTime, sourceOption.TIMESTAMP, sourceOption.TIMESTAMP_FORMAT, strings.EqualFold(sourceOption.FORMAT, message.FormatBinary), sourceOption.STRICT_VALIDATION)
|
|
|
+ if err != nil {
|
|
|
+ return nil, scanTableEmitters, err
|
|
|
+ }
|
|
|
+ srcNode := node.NewSourceNode(nodeName, ast.TypeStream, pp, sourceOption, rule.Options.SendError)
|
|
|
+ streamEmitters[nodeName] = struct{}{}
|
|
|
+ return srcNode, scanTableEmitters, nil
|
|
|
+ case "table":
|
|
|
+ return nil, scanTableEmitters, fmt.Errorf("anonymouse table source is not supported, please create it prior to the rule")
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return nil, scanTableEmitters, errors.New("invalid source node")
|
|
|
+}
|
|
|
+
|
|
|
func parseOrderBy(props map[string]interface{}) (*operator.OrderOp, error) {
|
|
|
n := &graph.Orderby{}
|
|
|
err := cast.MapToStruct(props, n)
|