|
@@ -270,6 +270,22 @@ func (p *RuleProcessor) GetRuleByName(name string) (*api.Rule, error) {
|
|
|
return p.getRuleByJson(name, s1)
|
|
|
}
|
|
|
|
|
|
+func (p *RuleProcessor) getDefaultRule(name, sql string) *api.Rule {
|
|
|
+ return &api.Rule{
|
|
|
+ Id: name,
|
|
|
+ Sql: sql,
|
|
|
+ Options: &api.RuleOption{
|
|
|
+ IsEventTime: false,
|
|
|
+ LateTol: 1000,
|
|
|
+ Concurrency: 1,
|
|
|
+ BufferLength: 1024,
|
|
|
+ SendMetaToSink: false,
|
|
|
+ Qos: api.AtMostOnce,
|
|
|
+ CheckpointInterval: 300000,
|
|
|
+ },
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func (p *RuleProcessor) getRuleByJson(name, ruleJson string) (*api.Rule, error) {
|
|
|
opt := common.Config.Rule
|
|
|
//set default rule options
|
|
@@ -333,7 +349,7 @@ func (p *RuleProcessor) ExecInitRule(rule *api.Rule) (*xstream.TopologyNew, erro
|
|
|
}
|
|
|
|
|
|
func (p *RuleProcessor) ExecQuery(ruleid, sql string) (*xstream.TopologyNew, error) {
|
|
|
- if tp, inputs, err := p.createTopo(&api.Rule{Id: ruleid, Sql: sql}); err != nil {
|
|
|
+ if tp, inputs, err := p.createTopo(p.getDefaultRule(ruleid, sql)); err != nil {
|
|
|
return nil, err
|
|
|
} else {
|
|
|
tp.AddSink(inputs, nodes.NewSinkNode("sink_memory_log", "logToMemory", nil))
|