|
@@ -286,6 +286,19 @@ func (p *RuleProcessor) getDefaultRule(name, sql string) *api.Rule {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+func getStatementFromSql(sql string) (*xsql.SelectStatement, error) {
|
|
|
+ parser := xsql.NewParser(strings.NewReader(sql))
|
|
|
+ if stmt, err := xsql.Language.Parse(parser); err != nil {
|
|
|
+ return nil, fmt.Errorf("Parse SQL %s error: %s.", sql, err)
|
|
|
+ } else {
|
|
|
+ if r, ok := stmt.(*xsql.SelectStatement); !ok {
|
|
|
+ return nil, fmt.Errorf("SQL %s is not a select statement.", sql)
|
|
|
+ } else {
|
|
|
+ return r, nil
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func (p *RuleProcessor) getRuleByJson(name, ruleJson string) (*api.Rule, error) {
|
|
|
opt := common.Config.Rule
|
|
|
//set default rule options
|
|
@@ -309,6 +322,9 @@ func (p *RuleProcessor) getRuleByJson(name, ruleJson string) (*api.Rule, error)
|
|
|
if rule.Sql == "" {
|
|
|
return nil, fmt.Errorf("Missing rule SQL.")
|
|
|
}
|
|
|
+ if _, err := getStatementFromSql(rule.Sql); err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
if rule.Actions == nil || len(rule.Actions) == 0 {
|
|
|
return nil, fmt.Errorf("Missing rule actions.")
|
|
|
}
|
|
@@ -467,131 +483,127 @@ func (p *RuleProcessor) createTopoWithSources(rule *api.Rule, sources []*nodes.S
|
|
|
|
|
|
log.Infof("Init rule with options %+v", rule.Options)
|
|
|
shouldCreateSource := sources == nil
|
|
|
- parser := xsql.NewParser(strings.NewReader(sql))
|
|
|
- if stmt, err := xsql.Language.Parse(parser); err != nil {
|
|
|
- return nil, nil, fmt.Errorf("Parse SQL %s error: %s.", sql, err)
|
|
|
+
|
|
|
+ if selectStmt, err := getStatementFromSql(sql); err != nil {
|
|
|
+ return nil, nil, err
|
|
|
} else {
|
|
|
- if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
|
|
|
- return nil, nil, fmt.Errorf("SQL %s is not a select statement.", sql)
|
|
|
- } else {
|
|
|
- tp, err := xstream.NewWithNameAndQos(name, rule.Options.Qos, rule.Options.CheckpointInterval)
|
|
|
- if err != nil {
|
|
|
- return nil, nil, err
|
|
|
- }
|
|
|
- var inputs []api.Emitter
|
|
|
- streamsFromStmt := xsql.GetStreams(selectStmt)
|
|
|
- dimensions := selectStmt.Dimensions
|
|
|
- if !shouldCreateSource && len(streamsFromStmt) != len(sources) {
|
|
|
- return nil, nil, fmt.Errorf("Invalid parameter sources or streams, the length cannot match the statement, expect %d sources.", len(streamsFromStmt))
|
|
|
+ tp, err := xstream.NewWithNameAndQos(name, rule.Options.Qos, rule.Options.CheckpointInterval)
|
|
|
+ if err != nil {
|
|
|
+ return nil, nil, err
|
|
|
+ }
|
|
|
+ var inputs []api.Emitter
|
|
|
+ streamsFromStmt := xsql.GetStreams(selectStmt)
|
|
|
+ dimensions := selectStmt.Dimensions
|
|
|
+ if !shouldCreateSource && len(streamsFromStmt) != len(sources) {
|
|
|
+ return nil, nil, fmt.Errorf("Invalid parameter sources or streams, the length cannot match the statement, expect %d sources.", len(streamsFromStmt))
|
|
|
+ }
|
|
|
+ if rule.Options.SendMetaToSink && (len(streamsFromStmt) > 1 || dimensions != nil) {
|
|
|
+ return nil, nil, fmt.Errorf("Invalid option sendMetaToSink, it can not be applied to window")
|
|
|
+ }
|
|
|
+ store := common.GetSimpleKVStore(path.Join(p.rootDbDir, "stream"))
|
|
|
+ err = store.Open()
|
|
|
+ if err != nil {
|
|
|
+ return nil, nil, err
|
|
|
+ }
|
|
|
+ defer store.Close()
|
|
|
+
|
|
|
+ var alias, aggregateAlias xsql.Fields
|
|
|
+ for _, f := range selectStmt.Fields {
|
|
|
+ if f.AName != "" {
|
|
|
+ if !xsql.HasAggFuncs(f.Expr) {
|
|
|
+ alias = append(alias, f)
|
|
|
+ } else {
|
|
|
+ aggregateAlias = append(aggregateAlias, f)
|
|
|
+ }
|
|
|
}
|
|
|
- if rule.Options.SendMetaToSink && (len(streamsFromStmt) > 1 || dimensions != nil) {
|
|
|
- return nil, nil, fmt.Errorf("Invalid option sendMetaToSink, it can not be applied to window")
|
|
|
+ }
|
|
|
+ for i, s := range streamsFromStmt {
|
|
|
+ streamStmt, err := GetStream(store, s)
|
|
|
+ if err != nil {
|
|
|
+ return nil, nil, fmt.Errorf("fail to get stream %s, please check if stream is created", s)
|
|
|
}
|
|
|
- store := common.GetSimpleKVStore(path.Join(p.rootDbDir, "stream"))
|
|
|
- err = store.Open()
|
|
|
+ pp, err := plans.NewPreprocessor(streamStmt, alias, rule.Options.IsEventTime)
|
|
|
if err != nil {
|
|
|
return nil, nil, err
|
|
|
}
|
|
|
- defer store.Close()
|
|
|
-
|
|
|
- var alias, aggregateAlias xsql.Fields
|
|
|
- for _, f := range selectStmt.Fields {
|
|
|
- if f.AName != "" {
|
|
|
- if !xsql.HasAggFuncs(f.Expr) {
|
|
|
- alias = append(alias, f)
|
|
|
- } else {
|
|
|
- aggregateAlias = append(aggregateAlias, f)
|
|
|
- }
|
|
|
- }
|
|
|
+ var srcNode *nodes.SourceNode
|
|
|
+ if shouldCreateSource {
|
|
|
+ node := nodes.NewSourceNode(s, streamStmt.Options)
|
|
|
+ srcNode = node
|
|
|
+ } else {
|
|
|
+ srcNode = sources[i]
|
|
|
}
|
|
|
- for i, s := range streamsFromStmt {
|
|
|
- streamStmt, err := GetStream(store, s)
|
|
|
- if err != nil {
|
|
|
- return nil, nil, fmt.Errorf("fail to get stream %s, please check if stream is created", s)
|
|
|
+ tp.AddSrc(srcNode)
|
|
|
+ preprocessorOp := xstream.Transform(pp, "preprocessor_"+s, rule.Options.BufferLength)
|
|
|
+ preprocessorOp.SetConcurrency(rule.Options.Concurrency)
|
|
|
+ tp.AddOperator([]api.Emitter{srcNode}, preprocessorOp)
|
|
|
+ inputs = append(inputs, preprocessorOp)
|
|
|
+ }
|
|
|
+
|
|
|
+ var w *xsql.Window
|
|
|
+ if dimensions != nil {
|
|
|
+ w = dimensions.GetWindow()
|
|
|
+ if w != nil {
|
|
|
+ if w.Filter != nil {
|
|
|
+ wfilterOp := xstream.Transform(&plans.FilterPlan{Condition: w.Filter}, "windowFilter", rule.Options.BufferLength)
|
|
|
+ wfilterOp.SetConcurrency(rule.Options.Concurrency)
|
|
|
+ tp.AddOperator(inputs, wfilterOp)
|
|
|
+ inputs = []api.Emitter{wfilterOp}
|
|
|
}
|
|
|
- pp, err := plans.NewPreprocessor(streamStmt, alias, rule.Options.IsEventTime)
|
|
|
+ wop, err := nodes.NewWindowOp("window", w, rule.Options.IsEventTime, rule.Options.LateTol, streamsFromStmt, rule.Options.BufferLength)
|
|
|
if err != nil {
|
|
|
return nil, nil, err
|
|
|
}
|
|
|
- var srcNode *nodes.SourceNode
|
|
|
- if shouldCreateSource {
|
|
|
- node := nodes.NewSourceNode(s, streamStmt.Options)
|
|
|
- srcNode = node
|
|
|
- } else {
|
|
|
- srcNode = sources[i]
|
|
|
- }
|
|
|
- tp.AddSrc(srcNode)
|
|
|
- preprocessorOp := xstream.Transform(pp, "preprocessor_"+s, rule.Options.BufferLength)
|
|
|
- preprocessorOp.SetConcurrency(rule.Options.Concurrency)
|
|
|
- tp.AddOperator([]api.Emitter{srcNode}, preprocessorOp)
|
|
|
- inputs = append(inputs, preprocessorOp)
|
|
|
- }
|
|
|
-
|
|
|
- var w *xsql.Window
|
|
|
- if dimensions != nil {
|
|
|
- w = dimensions.GetWindow()
|
|
|
- if w != nil {
|
|
|
- if w.Filter != nil {
|
|
|
- wfilterOp := xstream.Transform(&plans.FilterPlan{Condition: w.Filter}, "windowFilter", rule.Options.BufferLength)
|
|
|
- wfilterOp.SetConcurrency(rule.Options.Concurrency)
|
|
|
- tp.AddOperator(inputs, wfilterOp)
|
|
|
- inputs = []api.Emitter{wfilterOp}
|
|
|
- }
|
|
|
- wop, err := nodes.NewWindowOp("window", w, rule.Options.IsEventTime, rule.Options.LateTol, streamsFromStmt, rule.Options.BufferLength)
|
|
|
- if err != nil {
|
|
|
- return nil, nil, err
|
|
|
- }
|
|
|
- tp.AddOperator(inputs, wop)
|
|
|
- inputs = []api.Emitter{wop}
|
|
|
- }
|
|
|
+ tp.AddOperator(inputs, wop)
|
|
|
+ inputs = []api.Emitter{wop}
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- if w != nil && selectStmt.Joins != nil {
|
|
|
- joinOp := xstream.Transform(&plans.JoinPlan{Joins: selectStmt.Joins, From: selectStmt.Sources[0].(*xsql.Table)}, "join", rule.Options.BufferLength)
|
|
|
- joinOp.SetConcurrency(rule.Options.Concurrency)
|
|
|
- tp.AddOperator(inputs, joinOp)
|
|
|
- inputs = []api.Emitter{joinOp}
|
|
|
- }
|
|
|
+ if w != nil && selectStmt.Joins != nil {
|
|
|
+ joinOp := xstream.Transform(&plans.JoinPlan{Joins: selectStmt.Joins, From: selectStmt.Sources[0].(*xsql.Table)}, "join", rule.Options.BufferLength)
|
|
|
+ joinOp.SetConcurrency(rule.Options.Concurrency)
|
|
|
+ tp.AddOperator(inputs, joinOp)
|
|
|
+ inputs = []api.Emitter{joinOp}
|
|
|
+ }
|
|
|
|
|
|
- if selectStmt.Condition != nil {
|
|
|
- filterOp := xstream.Transform(&plans.FilterPlan{Condition: selectStmt.Condition}, "filter", rule.Options.BufferLength)
|
|
|
- filterOp.SetConcurrency(rule.Options.Concurrency)
|
|
|
- tp.AddOperator(inputs, filterOp)
|
|
|
- inputs = []api.Emitter{filterOp}
|
|
|
- }
|
|
|
+ if selectStmt.Condition != nil {
|
|
|
+ filterOp := xstream.Transform(&plans.FilterPlan{Condition: selectStmt.Condition}, "filter", rule.Options.BufferLength)
|
|
|
+ filterOp.SetConcurrency(rule.Options.Concurrency)
|
|
|
+ tp.AddOperator(inputs, filterOp)
|
|
|
+ inputs = []api.Emitter{filterOp}
|
|
|
+ }
|
|
|
|
|
|
- var ds xsql.Dimensions
|
|
|
- if dimensions != nil || len(aggregateAlias) > 0 {
|
|
|
- ds = dimensions.GetGroups()
|
|
|
- if (ds != nil && len(ds) > 0) || len(aggregateAlias) > 0 {
|
|
|
- aggregateOp := xstream.Transform(&plans.AggregatePlan{Dimensions: ds, Alias: aggregateAlias}, "aggregate", rule.Options.BufferLength)
|
|
|
- aggregateOp.SetConcurrency(rule.Options.Concurrency)
|
|
|
- tp.AddOperator(inputs, aggregateOp)
|
|
|
- inputs = []api.Emitter{aggregateOp}
|
|
|
- }
|
|
|
+ var ds xsql.Dimensions
|
|
|
+ if dimensions != nil || len(aggregateAlias) > 0 {
|
|
|
+ ds = dimensions.GetGroups()
|
|
|
+ if (ds != nil && len(ds) > 0) || len(aggregateAlias) > 0 {
|
|
|
+ aggregateOp := xstream.Transform(&plans.AggregatePlan{Dimensions: ds, Alias: aggregateAlias}, "aggregate", rule.Options.BufferLength)
|
|
|
+ aggregateOp.SetConcurrency(rule.Options.Concurrency)
|
|
|
+ tp.AddOperator(inputs, aggregateOp)
|
|
|
+ inputs = []api.Emitter{aggregateOp}
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- if selectStmt.Having != nil {
|
|
|
- havingOp := xstream.Transform(&plans.HavingPlan{selectStmt.Having}, "having", rule.Options.BufferLength)
|
|
|
- havingOp.SetConcurrency(rule.Options.Concurrency)
|
|
|
- tp.AddOperator(inputs, havingOp)
|
|
|
- inputs = []api.Emitter{havingOp}
|
|
|
- }
|
|
|
+ if selectStmt.Having != nil {
|
|
|
+ havingOp := xstream.Transform(&plans.HavingPlan{selectStmt.Having}, "having", rule.Options.BufferLength)
|
|
|
+ havingOp.SetConcurrency(rule.Options.Concurrency)
|
|
|
+ tp.AddOperator(inputs, havingOp)
|
|
|
+ inputs = []api.Emitter{havingOp}
|
|
|
+ }
|
|
|
|
|
|
- if selectStmt.SortFields != nil {
|
|
|
- orderOp := xstream.Transform(&plans.OrderPlan{SortFields: selectStmt.SortFields}, "order", rule.Options.BufferLength)
|
|
|
- orderOp.SetConcurrency(rule.Options.Concurrency)
|
|
|
- tp.AddOperator(inputs, orderOp)
|
|
|
- inputs = []api.Emitter{orderOp}
|
|
|
- }
|
|
|
+ if selectStmt.SortFields != nil {
|
|
|
+ orderOp := xstream.Transform(&plans.OrderPlan{SortFields: selectStmt.SortFields}, "order", rule.Options.BufferLength)
|
|
|
+ orderOp.SetConcurrency(rule.Options.Concurrency)
|
|
|
+ tp.AddOperator(inputs, orderOp)
|
|
|
+ inputs = []api.Emitter{orderOp}
|
|
|
+ }
|
|
|
|
|
|
- if selectStmt.Fields != nil {
|
|
|
- projectOp := xstream.Transform(&plans.ProjectPlan{Fields: selectStmt.Fields, IsAggregate: xsql.IsAggStatement(selectStmt), SendMeta: rule.Options.SendMetaToSink}, "project", rule.Options.BufferLength)
|
|
|
- projectOp.SetConcurrency(rule.Options.Concurrency)
|
|
|
- tp.AddOperator(inputs, projectOp)
|
|
|
- inputs = []api.Emitter{projectOp}
|
|
|
- }
|
|
|
- return tp, inputs, nil
|
|
|
+ if selectStmt.Fields != nil {
|
|
|
+ projectOp := xstream.Transform(&plans.ProjectPlan{Fields: selectStmt.Fields, IsAggregate: xsql.IsAggStatement(selectStmt), SendMeta: rule.Options.SendMetaToSink}, "project", rule.Options.BufferLength)
|
|
|
+ projectOp.SetConcurrency(rule.Options.Concurrency)
|
|
|
+ tp.AddOperator(inputs, projectOp)
|
|
|
+ inputs = []api.Emitter{projectOp}
|
|
|
}
|
|
|
+ return tp, inputs, nil
|
|
|
}
|
|
|
}
|