|
@@ -37,6 +37,15 @@ type genNodeFunc func(name string, props map[string]interface{}, options *api.Ru
|
|
|
|
|
|
var extNodes = map[string]genNodeFunc{}
|
|
|
|
|
|
+type sourceType int
|
|
|
+
|
|
|
+const (
|
|
|
+ ILLEGAL sourceType = iota
|
|
|
+ STREAM
|
|
|
+ SCANTABLE
|
|
|
+ LOOKUPTABLE
|
|
|
+)
|
|
|
+
|
|
|
// PlanByGraph returns a topo.Topo object by a graph
|
|
|
func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
|
|
|
ruleGraph := rule.Graph
|
|
@@ -54,6 +63,7 @@ func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
|
|
|
store kv.KeyValue
|
|
|
lookupTableChildren = make(map[string]*ast.Options)
|
|
|
scanTableEmitters []string
|
|
|
+ sourceNames []string
|
|
|
streamEmitters = make(map[string]struct{})
|
|
|
)
|
|
|
for _, srcName := range ruleGraph.Topo.Sources {
|
|
@@ -64,11 +74,20 @@ func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
|
|
|
if _, ok := ruleGraph.Topo.Edges[srcName]; !ok {
|
|
|
return nil, fmt.Errorf("no edge defined for source node %s", srcName)
|
|
|
}
|
|
|
- var srcNode *node.SourceNode
|
|
|
- srcNode, scanTableEmitters, err = parseSource(srcName, gn, rule, store, lookupTableChildren, streamEmitters)
|
|
|
+ srcNode, srcType, name, err := parseSource(srcName, gn, rule, store, lookupTableChildren)
|
|
|
if err != nil {
|
|
|
return nil, fmt.Errorf("parse source %s with %v error: %w", srcName, gn.Props, err)
|
|
|
}
|
|
|
+ switch srcType {
|
|
|
+ case STREAM:
|
|
|
+ streamEmitters[name] = struct{}{}
|
|
|
+ sourceNames = append(sourceNames, name)
|
|
|
+ case SCANTABLE:
|
|
|
+ scanTableEmitters = append(scanTableEmitters, name)
|
|
|
+ sourceNames = append(sourceNames, name)
|
|
|
+ case LOOKUPTABLE:
|
|
|
+ sourceNames = append(sourceNames, name)
|
|
|
+ }
|
|
|
if srcNode != nil {
|
|
|
nodeMap[srcName] = srcNode
|
|
|
tp.AddSrc(srcNode)
|
|
@@ -92,14 +111,14 @@ func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
|
|
|
nt := strings.ToLower(gn.NodeType)
|
|
|
switch nt {
|
|
|
case "function":
|
|
|
- fop, err := parseFunc(gn.Props)
|
|
|
+ fop, err := parseFunc(gn.Props, sourceNames)
|
|
|
if err != nil {
|
|
|
return nil, fmt.Errorf("parse function %s with %v error: %w", nodeName, gn.Props, err)
|
|
|
}
|
|
|
op := Transform(fop, nodeName, rule.Options)
|
|
|
nodeMap[nodeName] = op
|
|
|
case "aggfunc":
|
|
|
- fop, err := parseFunc(gn.Props)
|
|
|
+ fop, err := parseFunc(gn.Props, sourceNames)
|
|
|
if err != nil {
|
|
|
return nil, fmt.Errorf("parse aggfunc %s with %v error: %w", nodeName, gn.Props, err)
|
|
|
}
|
|
@@ -107,14 +126,14 @@ func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
|
|
|
op := Transform(fop, nodeName, rule.Options)
|
|
|
nodeMap[nodeName] = op
|
|
|
case "filter":
|
|
|
- fop, err := parseFilter(gn.Props)
|
|
|
+ fop, err := parseFilter(gn.Props, sourceNames)
|
|
|
if err != nil {
|
|
|
return nil, fmt.Errorf("parse filter %s with %v error: %w", nodeName, gn.Props, err)
|
|
|
}
|
|
|
op := Transform(fop, nodeName, rule.Options)
|
|
|
nodeMap[nodeName] = op
|
|
|
case "pick":
|
|
|
- pop, err := parsePick(gn.Props)
|
|
|
+ pop, err := parsePick(gn.Props, sourceNames)
|
|
|
if err != nil {
|
|
|
return nil, fmt.Errorf("parse pick %s with %v error: %w", nodeName, gn.Props, err)
|
|
|
}
|
|
@@ -131,7 +150,7 @@ func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
|
|
|
}
|
|
|
nodeMap[nodeName] = op
|
|
|
case "join":
|
|
|
- stmt, err := parseJoinAst(gn.Props)
|
|
|
+ stmt, err := parseJoinAst(gn.Props, sourceNames)
|
|
|
if err != nil {
|
|
|
return nil, fmt.Errorf("parse join %s with %v error: %w", nodeName, gn.Props, err)
|
|
|
}
|
|
@@ -178,21 +197,21 @@ func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
|
|
|
}
|
|
|
}
|
|
|
case "groupby":
|
|
|
- gop, err := parseGroupBy(gn.Props)
|
|
|
+ gop, err := parseGroupBy(gn.Props, sourceNames)
|
|
|
if err != nil {
|
|
|
return nil, fmt.Errorf("parse groupby %s with %v error: %w", nodeName, gn.Props, err)
|
|
|
}
|
|
|
op := Transform(gop, nodeName, rule.Options)
|
|
|
nodeMap[nodeName] = op
|
|
|
case "orderby":
|
|
|
- oop, err := parseOrderBy(gn.Props)
|
|
|
+ oop, err := parseOrderBy(gn.Props, sourceNames)
|
|
|
if err != nil {
|
|
|
return nil, fmt.Errorf("parse orderby %s with %v error: %w", nodeName, gn.Props, err)
|
|
|
}
|
|
|
op := Transform(oop, nodeName, rule.Options)
|
|
|
nodeMap[nodeName] = op
|
|
|
case "switch":
|
|
|
- sconf, err := parseSwitch(gn.Props)
|
|
|
+ sconf, err := parseSwitch(gn.Props, sourceNames)
|
|
|
if err != nil {
|
|
|
return nil, fmt.Errorf("parse switch %s with %v error: %w", nodeName, gn.Props, err)
|
|
|
}
|
|
@@ -329,7 +348,7 @@ func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
|
|
|
dataFlow[n] = graph.MapOut(in, out)
|
|
|
// convert filter to having if the input is aggregated
|
|
|
if gn.NodeType == "filter" && in.Type == graph.IOINPUT_TYPE_COLLECTION && in.CollectionType == graph.IOCOLLECTION_TYPE_GROUPED {
|
|
|
- fop, err := parseHaving(gn.Props)
|
|
|
+ fop, err := parseHaving(gn.Props, sourceNames)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
@@ -398,49 +417,48 @@ func genNodesInOrder(toNodes []string, edges map[string][]interface{}, flatRever
|
|
|
return i
|
|
|
}
|
|
|
|
|
|
-func parseSource(nodeName string, gn *api.GraphNode, rule *api.Rule, store kv.KeyValue, lookupTableChildren map[string]*ast.Options, streamEmitters map[string]struct{}) (*node.SourceNode, []string, error) {
|
|
|
- scanTableEmitters := make([]string, 0)
|
|
|
+func parseSource(nodeName string, gn *api.GraphNode, rule *api.Rule, store kv.KeyValue, lookupTableChildren map[string]*ast.Options) (*node.SourceNode, sourceType, string, error) {
|
|
|
sourceMeta := &api.SourceMeta{
|
|
|
SourceType: "stream",
|
|
|
}
|
|
|
err := cast.MapToStruct(gn.Props, sourceMeta)
|
|
|
if err != nil {
|
|
|
- return nil, scanTableEmitters, err
|
|
|
+ return nil, ILLEGAL, "", err
|
|
|
}
|
|
|
if sourceMeta.SourceType != "stream" && sourceMeta.SourceType != "table" {
|
|
|
- return nil, scanTableEmitters, fmt.Errorf("source type %s not supported", sourceMeta.SourceType)
|
|
|
+ return nil, ILLEGAL, "", fmt.Errorf("source type %s not supported", sourceMeta.SourceType)
|
|
|
}
|
|
|
// If source name is specified, find the created stream/table from store
|
|
|
if sourceMeta.SourceName != "" {
|
|
|
if store == nil {
|
|
|
store, err = store2.GetKV("stream")
|
|
|
if err != nil {
|
|
|
- return nil, scanTableEmitters, err
|
|
|
+ return nil, ILLEGAL, "", err
|
|
|
}
|
|
|
}
|
|
|
streamStmt, e := xsql.GetDataSource(store, sourceMeta.SourceName)
|
|
|
if e != nil {
|
|
|
- return nil, scanTableEmitters, fmt.Errorf("fail to get stream %s, please check if stream is created", sourceMeta.SourceName)
|
|
|
+ return nil, ILLEGAL, "", fmt.Errorf("fail to get stream %s, please check if stream is created", sourceMeta.SourceName)
|
|
|
}
|
|
|
if streamStmt.StreamType == ast.TypeStream && sourceMeta.SourceType == "table" {
|
|
|
- return nil, scanTableEmitters, fmt.Errorf("stream %s is not a table", sourceMeta.SourceName)
|
|
|
+ return nil, ILLEGAL, "", fmt.Errorf("stream %s is not a table", sourceMeta.SourceName)
|
|
|
} else if streamStmt.StreamType == ast.TypeTable && sourceMeta.SourceType == "stream" {
|
|
|
- return nil, scanTableEmitters, fmt.Errorf("table %s is not a stream", sourceMeta.SourceName)
|
|
|
+ return nil, ILLEGAL, "", fmt.Errorf("table %s is not a stream", sourceMeta.SourceName)
|
|
|
}
|
|
|
st := streamStmt.Options.TYPE
|
|
|
if st == "" {
|
|
|
st = "mqtt"
|
|
|
}
|
|
|
if st != gn.NodeType {
|
|
|
- return nil, scanTableEmitters, fmt.Errorf("source type %s does not match the stream type %s", gn.NodeType, st)
|
|
|
+ return nil, ILLEGAL, "", fmt.Errorf("source type %s does not match the stream type %s", gn.NodeType, st)
|
|
|
}
|
|
|
sInfo, err := convertStreamInfo(streamStmt)
|
|
|
if err != nil {
|
|
|
- return nil, scanTableEmitters, err
|
|
|
+ return nil, ILLEGAL, "", err
|
|
|
}
|
|
|
if sInfo.stmt.StreamType == ast.TypeTable && sInfo.stmt.Options.KIND == ast.StreamKindLookup {
|
|
|
lookupTableChildren[string(sInfo.stmt.Name)] = sInfo.stmt.Options
|
|
|
- return nil, scanTableEmitters, nil
|
|
|
+ return nil, LOOKUPTABLE, string(sInfo.stmt.Name), nil
|
|
|
} else {
|
|
|
// Use the plan to calculate the schema and other meta info
|
|
|
p := DataSourcePlan{
|
|
@@ -455,43 +473,40 @@ func parseSource(nodeName string, gn *api.GraphNode, rule *api.Rule, store kv.Ke
|
|
|
if sInfo.stmt.StreamType == ast.TypeStream {
|
|
|
err = p.PruneColumns(nil)
|
|
|
if err != nil {
|
|
|
- return nil, scanTableEmitters, err
|
|
|
+ return nil, ILLEGAL, "", err
|
|
|
}
|
|
|
srcNode, e := transformSourceNode(p, nil, rule.Options)
|
|
|
if e != nil {
|
|
|
- return nil, scanTableEmitters, e
|
|
|
+ return nil, ILLEGAL, "", e
|
|
|
}
|
|
|
- streamEmitters[string(sInfo.stmt.Name)] = struct{}{}
|
|
|
- return srcNode, scanTableEmitters, nil
|
|
|
+ return srcNode, STREAM, string(sInfo.stmt.Name), nil
|
|
|
} else {
|
|
|
- scanTableEmitters = append(scanTableEmitters, string(sInfo.stmt.Name))
|
|
|
- return nil, scanTableEmitters, nil
|
|
|
+ return nil, SCANTABLE, string(sInfo.stmt.Name), nil
|
|
|
}
|
|
|
}
|
|
|
} else {
|
|
|
sourceOption := &ast.Options{}
|
|
|
err = cast.MapToStruct(gn.Props, sourceOption)
|
|
|
if err != nil {
|
|
|
- return nil, scanTableEmitters, err
|
|
|
+ return nil, ILLEGAL, "", err
|
|
|
}
|
|
|
sourceOption.TYPE = gn.NodeType
|
|
|
switch sourceMeta.SourceType {
|
|
|
case "stream":
|
|
|
pp, err := operator.NewPreprocessor(true, nil, true, nil, rule.Options.IsEventTime, sourceOption.TIMESTAMP, sourceOption.TIMESTAMP_FORMAT, strings.EqualFold(sourceOption.FORMAT, message.FormatBinary), sourceOption.STRICT_VALIDATION)
|
|
|
if err != nil {
|
|
|
- return nil, scanTableEmitters, err
|
|
|
+ return nil, ILLEGAL, "", err
|
|
|
}
|
|
|
srcNode := node.NewSourceNode(nodeName, ast.TypeStream, pp, sourceOption, rule.Options.SendError)
|
|
|
- streamEmitters[nodeName] = struct{}{}
|
|
|
- return srcNode, scanTableEmitters, nil
|
|
|
+ return srcNode, STREAM, nodeName, nil
|
|
|
case "table":
|
|
|
- return nil, scanTableEmitters, fmt.Errorf("anonymouse table source is not supported, please create it prior to the rule")
|
|
|
+ return nil, ILLEGAL, "", fmt.Errorf("anonymouse table source is not supported, please create it prior to the rule")
|
|
|
}
|
|
|
}
|
|
|
- return nil, scanTableEmitters, errors.New("invalid source node")
|
|
|
+ return nil, ILLEGAL, "", errors.New("invalid source node")
|
|
|
}
|
|
|
|
|
|
-func parseOrderBy(props map[string]interface{}) (*operator.OrderOp, error) {
|
|
|
+func parseOrderBy(props map[string]interface{}, sourceNames []string) (*operator.OrderOp, error) {
|
|
|
n := &graph.Orderby{}
|
|
|
err := cast.MapToStruct(props, n)
|
|
|
if err != nil {
|
|
@@ -504,7 +519,7 @@ func parseOrderBy(props map[string]interface{}) (*operator.OrderOp, error) {
|
|
|
stmt += "DESC"
|
|
|
}
|
|
|
}
|
|
|
- p, err := xsql.NewParser(strings.NewReader(stmt)).Parse()
|
|
|
+ p, err := xsql.NewParserWithSources(strings.NewReader(stmt), sourceNames).Parse()
|
|
|
if err != nil {
|
|
|
return nil, fmt.Errorf("invalid order by statement error: %v", err)
|
|
|
}
|
|
@@ -516,7 +531,7 @@ func parseOrderBy(props map[string]interface{}) (*operator.OrderOp, error) {
|
|
|
}, nil
|
|
|
}
|
|
|
|
|
|
-func parseGroupBy(props map[string]interface{}) (*operator.AggregateOp, error) {
|
|
|
+func parseGroupBy(props map[string]interface{}, sourceNames []string) (*operator.AggregateOp, error) {
|
|
|
n := &graph.Groupby{}
|
|
|
err := cast.MapToStruct(props, n)
|
|
|
if err != nil {
|
|
@@ -526,14 +541,14 @@ func parseGroupBy(props map[string]interface{}) (*operator.AggregateOp, error) {
|
|
|
return nil, fmt.Errorf("groupby must have at least one dimension")
|
|
|
}
|
|
|
stmt := "SELECT * FROM unknown Group By " + strings.Join(n.Dimensions, ",")
|
|
|
- p, err := xsql.NewParser(strings.NewReader(stmt)).Parse()
|
|
|
+ p, err := xsql.NewParserWithSources(strings.NewReader(stmt), sourceNames).Parse()
|
|
|
if err != nil {
|
|
|
return nil, fmt.Errorf("invalid join statement error: %v", err)
|
|
|
}
|
|
|
return &operator.AggregateOp{Dimensions: p.Dimensions}, nil
|
|
|
}
|
|
|
|
|
|
-func parseJoinAst(props map[string]interface{}) (*ast.SelectStatement, error) {
|
|
|
+func parseJoinAst(props map[string]interface{}, sourceNames []string) (*ast.SelectStatement, error) {
|
|
|
n := &graph.Join{}
|
|
|
err := cast.MapToStruct(props, n)
|
|
|
if err != nil {
|
|
@@ -543,7 +558,7 @@ func parseJoinAst(props map[string]interface{}) (*ast.SelectStatement, error) {
|
|
|
for _, join := range n.Joins {
|
|
|
stmt += " " + join.Type + " JOIN " + join.Name + " ON " + join.On
|
|
|
}
|
|
|
- return xsql.NewParser(strings.NewReader(stmt)).Parse()
|
|
|
+ return xsql.NewParserWithSources(strings.NewReader(stmt), sourceNames).Parse()
|
|
|
}
|
|
|
|
|
|
func parseWindow(props map[string]interface{}) (*node.WindowConfig, error) {
|
|
@@ -628,13 +643,13 @@ func parseWindow(props map[string]interface{}) (*node.WindowConfig, error) {
|
|
|
}, nil
|
|
|
}
|
|
|
|
|
|
-func parsePick(props map[string]interface{}) (*operator.ProjectOp, error) {
|
|
|
+func parsePick(props map[string]interface{}, sourceNames []string) (*operator.ProjectOp, error) {
|
|
|
n := &graph.Select{}
|
|
|
err := cast.MapToStruct(props, n)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
- stmt, err := xsql.NewParser(strings.NewReader("select " + strings.Join(n.Fields, ",") + " from nonexist")).Parse()
|
|
|
+ stmt, err := xsql.NewParserWithSources(strings.NewReader("select "+strings.Join(n.Fields, ",")+" from nonexist"), sourceNames).Parse()
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
@@ -645,7 +660,7 @@ func parsePick(props map[string]interface{}) (*operator.ProjectOp, error) {
|
|
|
return &operator.ProjectOp{ColNames: t.colNames, AliasNames: t.aliasNames, AliasFields: t.aliasFields, ExprFields: t.exprFields, IsAggregate: t.isAggregate, AllWildcard: t.allWildcard, WildcardEmitters: t.wildcardEmitters, ExprNames: t.exprNames, SendMeta: t.sendMeta}, nil
|
|
|
}
|
|
|
|
|
|
-func parseFunc(props map[string]interface{}) (*operator.FuncOp, error) {
|
|
|
+func parseFunc(props map[string]interface{}, sourceNames []string) (*operator.FuncOp, error) {
|
|
|
m, ok := props["expr"]
|
|
|
if !ok {
|
|
|
return nil, errors.New("no expr")
|
|
@@ -654,7 +669,7 @@ func parseFunc(props map[string]interface{}) (*operator.FuncOp, error) {
|
|
|
if !ok {
|
|
|
return nil, fmt.Errorf("expr %v is not string", m)
|
|
|
}
|
|
|
- stmt, err := xsql.NewParser(strings.NewReader("select " + funcExpr + " from nonexist")).Parse()
|
|
|
+ stmt, err := xsql.NewParserWithSources(strings.NewReader("select "+funcExpr+" from nonexist"), sourceNames).Parse()
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
@@ -673,7 +688,7 @@ func parseFunc(props map[string]interface{}) (*operator.FuncOp, error) {
|
|
|
return &operator.FuncOp{CallExpr: c, Name: name, IsAgg: function.IsAggFunc(name)}, nil
|
|
|
}
|
|
|
|
|
|
-func parseFilter(props map[string]interface{}) (*operator.FilterOp, error) {
|
|
|
+func parseFilter(props map[string]interface{}, sourceNames []string) (*operator.FilterOp, error) {
|
|
|
m, ok := props["expr"]
|
|
|
if !ok {
|
|
|
return nil, errors.New("no expr")
|
|
@@ -682,7 +697,7 @@ func parseFilter(props map[string]interface{}) (*operator.FilterOp, error) {
|
|
|
if !ok {
|
|
|
return nil, fmt.Errorf("expr %v is not string", m)
|
|
|
}
|
|
|
- p := xsql.NewParser(strings.NewReader("where " + conditionExpr))
|
|
|
+ p := xsql.NewParserWithSources(strings.NewReader("where "+conditionExpr), sourceNames)
|
|
|
if exp, err := p.ParseCondition(); err != nil {
|
|
|
return nil, err
|
|
|
} else {
|
|
@@ -693,7 +708,7 @@ func parseFilter(props map[string]interface{}) (*operator.FilterOp, error) {
|
|
|
return nil, fmt.Errorf("expr %v is not a condition", m)
|
|
|
}
|
|
|
|
|
|
-func parseHaving(props map[string]interface{}) (*operator.HavingOp, error) {
|
|
|
+func parseHaving(props map[string]interface{}, sourceNames []string) (*operator.HavingOp, error) {
|
|
|
m, ok := props["expr"]
|
|
|
if !ok {
|
|
|
return nil, errors.New("no expr")
|
|
@@ -702,7 +717,7 @@ func parseHaving(props map[string]interface{}) (*operator.HavingOp, error) {
|
|
|
if !ok {
|
|
|
return nil, fmt.Errorf("expr %v is not string", m)
|
|
|
}
|
|
|
- p := xsql.NewParser(strings.NewReader("where " + conditionExpr))
|
|
|
+ p := xsql.NewParserWithSources(strings.NewReader("where "+conditionExpr), sourceNames)
|
|
|
if exp, err := p.ParseCondition(); err != nil {
|
|
|
return nil, err
|
|
|
} else {
|
|
@@ -713,7 +728,7 @@ func parseHaving(props map[string]interface{}) (*operator.HavingOp, error) {
|
|
|
return nil, fmt.Errorf("expr %v is not a condition", m)
|
|
|
}
|
|
|
|
|
|
-func parseSwitch(props map[string]interface{}) (*node.SwitchConfig, error) {
|
|
|
+func parseSwitch(props map[string]interface{}, sourceNames []string) (*node.SwitchConfig, error) {
|
|
|
n := &graph.Switch{}
|
|
|
err := cast.MapToStruct(props, n)
|
|
|
if err != nil {
|
|
@@ -724,7 +739,7 @@ func parseSwitch(props map[string]interface{}) (*node.SwitchConfig, error) {
|
|
|
}
|
|
|
caseExprs := make([]ast.Expr, len(n.Cases))
|
|
|
for i, c := range n.Cases {
|
|
|
- p := xsql.NewParser(strings.NewReader("where " + c))
|
|
|
+ p := xsql.NewParserWithSources(strings.NewReader("where "+c), sourceNames)
|
|
|
if exp, err := p.ParseCondition(); err != nil {
|
|
|
return nil, fmt.Errorf("parse case %d error: %v", i, err)
|
|
|
} else {
|