|
@@ -78,14 +78,14 @@ func (p *StreamProcessor) execCreateStream(stmt *xsql.StreamStmt, db *badger.DB)
|
|
|
if err != nil {
|
|
|
return "", err
|
|
|
}else{
|
|
|
- return fmt.Sprintf("stream %s created", stmt.Name), nil
|
|
|
+ return fmt.Sprintf("Stream %s is created.\n", stmt.Name), nil
|
|
|
}
|
|
|
}
|
|
|
|
|
|
func (p *StreamProcessor) execShowStream(stmt *xsql.ShowStreamsStatement, db *badger.DB) ([]string,error) {
|
|
|
keys, err := common.DbKeys(db)
|
|
|
if len(keys) == 0 {
|
|
|
- keys = append(keys, "no stream definition found")
|
|
|
+ keys = append(keys, "No stream definitions are found.\n")
|
|
|
}
|
|
|
return keys, err
|
|
|
}
|
|
@@ -93,14 +93,14 @@ func (p *StreamProcessor) execShowStream(stmt *xsql.ShowStreamsStatement, db *ba
|
|
|
func (p *StreamProcessor) execDescribeStream(stmt *xsql.DescribeStreamStatement, db *badger.DB) (string,error) {
|
|
|
s, err := common.DbGet(db, string(stmt.Name))
|
|
|
if err != nil {
|
|
|
- return "", fmt.Errorf("stream %s not found", stmt.Name)
|
|
|
+ return "", fmt.Errorf("Stream %s is not found.\n", stmt.Name)
|
|
|
}
|
|
|
|
|
|
parser := xsql.NewParser(strings.NewReader(s))
|
|
|
stream, err := xsql.Language.Parse(parser)
|
|
|
streamStmt, ok := stream.(*xsql.StreamStmt)
|
|
|
if !ok{
|
|
|
- return "", fmt.Errorf("error resolving the stream %s, the data in db may be corrupted", stmt.Name)
|
|
|
+ return "", fmt.Errorf("Error resolving the stream %s, the data in db may be corrupted.\n", stmt.Name)
|
|
|
}
|
|
|
var buff bytes.Buffer
|
|
|
buff.WriteString("Fields\n--------------------------------------------------------------------------------\n")
|
|
@@ -117,7 +117,7 @@ func (p *StreamProcessor) execDescribeStream(stmt *xsql.DescribeStreamStatement,
|
|
|
func (p *StreamProcessor) execExplainStream(stmt *xsql.ExplainStreamStatement, db *badger.DB) (string,error) {
|
|
|
_, err := common.DbGet(db, string(stmt.Name))
|
|
|
if err != nil{
|
|
|
- return "", fmt.Errorf("stream %s not found", stmt.Name)
|
|
|
+ return "", fmt.Errorf("Stream %s is not found.\n", stmt.Name)
|
|
|
}
|
|
|
return "TO BE SUPPORTED", nil
|
|
|
}
|
|
@@ -127,7 +127,7 @@ func (p *StreamProcessor) execDropStream(stmt *xsql.DropStreamStatement, db *bad
|
|
|
if err != nil {
|
|
|
return "", err
|
|
|
}else{
|
|
|
- return fmt.Sprintf("stream %s dropped", stmt.Name), nil
|
|
|
+ return fmt.Sprintf("Stream %s is dropped.\n", stmt.Name), nil
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -141,7 +141,7 @@ func GetStream(db *badger.DB, name string) (stmt *xsql.StreamStmt, err error){
|
|
|
stream, err := xsql.Language.Parse(parser)
|
|
|
stmt, ok := stream.(*xsql.StreamStmt)
|
|
|
if !ok{
|
|
|
- err = fmt.Errorf("error resolving the stream %s, the data in db may be corrupted", name)
|
|
|
+ err = fmt.Errorf("Error resolving the stream %s, the data in db may be corrupted.\n", name)
|
|
|
}
|
|
|
return
|
|
|
}
|
|
@@ -172,7 +172,7 @@ func (p *RuleProcessor) ExecCreate(name, ruleJson string) (*api.Rule, error) {
|
|
|
common.DbClose(db)
|
|
|
return nil, err
|
|
|
}else{
|
|
|
- log.Infof("rule %s created", name)
|
|
|
+ log.Infof("Rule %s is created.\n", name)
|
|
|
common.DbClose(db)
|
|
|
}
|
|
|
return rule, nil
|
|
@@ -186,7 +186,7 @@ func (p *RuleProcessor) GetRuleByName(name string) (*api.Rule, error) {
|
|
|
defer common.DbClose(db)
|
|
|
s, err := common.DbGet(db, string(name))
|
|
|
if err != nil {
|
|
|
- return nil, fmt.Errorf("rule %s not found", name)
|
|
|
+ return nil, fmt.Errorf("Rule %s is not found.\n", name)
|
|
|
}
|
|
|
return p.getRuleByJson(name, s)
|
|
|
}
|
|
@@ -194,18 +194,18 @@ func (p *RuleProcessor) GetRuleByName(name string) (*api.Rule, error) {
|
|
|
func (p *RuleProcessor) getRuleByJson(name, ruleJson string) (*api.Rule, error) {
|
|
|
var rule api.Rule
|
|
|
if err := json.Unmarshal([]byte(ruleJson), &rule); err != nil {
|
|
|
- return nil, fmt.Errorf("parse rule %s error : %s", ruleJson, err)
|
|
|
+ return nil, fmt.Errorf("Parse rule %s error : %s.\n", ruleJson, err)
|
|
|
}
|
|
|
rule.Id = name
|
|
|
//validation
|
|
|
if name == ""{
|
|
|
- return nil, fmt.Errorf("missing rule id")
|
|
|
+ return nil, fmt.Errorf("Missing rule id.\n")
|
|
|
}
|
|
|
if rule.Sql == ""{
|
|
|
- return nil, fmt.Errorf("missing rule sql")
|
|
|
+ return nil, fmt.Errorf("Missing rule SQL.\n")
|
|
|
}
|
|
|
if rule.Actions == nil || len(rule.Actions) == 0{
|
|
|
- return nil, fmt.Errorf("missing rule actions")
|
|
|
+ return nil, fmt.Errorf("Missing rule actions.\n")
|
|
|
}
|
|
|
return &rule, nil
|
|
|
}
|
|
@@ -218,17 +218,17 @@ func (p *RuleProcessor) ExecInitRule(rule *api.Rule) (*xstream.TopologyNew, erro
|
|
|
for name, action := range m {
|
|
|
switch name {
|
|
|
case "log":
|
|
|
- log.Printf("Create log sink with %s", action)
|
|
|
+ log.Printf("Create log sink with %s.\n", action)
|
|
|
tp.AddSink(inputs, nodes.NewSinkNode("sink_log", sinks.NewLogSink()))
|
|
|
case "mqtt":
|
|
|
- log.Printf("Create mqtt sink with %s", action)
|
|
|
+ log.Printf("Create mqtt sink with %s.\n", action)
|
|
|
if ms, err := sinks.NewMqttSink(action); err != nil{
|
|
|
return nil, err
|
|
|
}else{
|
|
|
tp.AddSink(inputs, nodes.NewSinkNode("sink_mqtt", ms))
|
|
|
}
|
|
|
default:
|
|
|
- return nil, fmt.Errorf("unsupported action: %s", name)
|
|
|
+ return nil, fmt.Errorf("unsupported action: %s.\n", name)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -260,7 +260,7 @@ func (p *RuleProcessor) ExecDesc(name string) (string, error) {
|
|
|
defer common.DbClose(db)
|
|
|
s, err := common.DbGet(db, string(name))
|
|
|
if err != nil {
|
|
|
- return "", fmt.Errorf("rule %s not found", name)
|
|
|
+ return "", fmt.Errorf("Rule %s is not found.\n", name)
|
|
|
}
|
|
|
dst := &bytes.Buffer{}
|
|
|
if err := json.Indent(dst, []byte(s), "", " "); err != nil {
|
|
@@ -276,7 +276,7 @@ func (p *RuleProcessor) ExecShow() (string, error) {
|
|
|
return "", err
|
|
|
}
|
|
|
if len(keys) == 0 {
|
|
|
- keys = append(keys, "no rule definition found")
|
|
|
+ keys = append(keys, "No rule definitions are found.\n")
|
|
|
}
|
|
|
var result string
|
|
|
for _, c := range keys{
|
|
@@ -304,7 +304,7 @@ func (p *RuleProcessor) ExecDrop(name string) (string, error) {
|
|
|
if err != nil {
|
|
|
return "", err
|
|
|
}else{
|
|
|
- return fmt.Sprintf("rule %s dropped", name), nil
|
|
|
+ return fmt.Sprintf("Rule %s is dropped.\n", name), nil
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -321,7 +321,7 @@ func (p *RuleProcessor) createTopoWithSources(rule *api.Rule, sources []*nodes.S
|
|
|
if iet, ok := rule.Options["isEventTime"]; ok{
|
|
|
isEventTime, ok = iet.(bool)
|
|
|
if !ok{
|
|
|
- return nil, nil, fmt.Errorf("invalid rule option isEventTime %v, bool type required", iet)
|
|
|
+ return nil, nil, fmt.Errorf("Invalid rule option isEventTime %v, bool type is required.\n", iet)
|
|
|
}
|
|
|
}
|
|
|
if isEventTime {
|
|
@@ -329,23 +329,23 @@ func (p *RuleProcessor) createTopoWithSources(rule *api.Rule, sources []*nodes.S
|
|
|
if fl, ok := l.(float64); ok{
|
|
|
lateTol = int64(fl)
|
|
|
}else{
|
|
|
- return nil, nil, fmt.Errorf("invalid rule option lateTolerance %v, int type required", l)
|
|
|
+ return nil, nil, fmt.Errorf("Invalid rule option lateTolerance %v, int type is required.\n", l)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
shouldCreateSource := sources == nil
|
|
|
parser := xsql.NewParser(strings.NewReader(sql))
|
|
|
if stmt, err := xsql.Language.Parse(parser); err != nil{
|
|
|
- return nil, nil, fmt.Errorf("parse sql %s error: %s", sql , err)
|
|
|
+ return nil, nil, fmt.Errorf("Parse SQL %s error: %s.\n", sql , err)
|
|
|
}else {
|
|
|
if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
|
|
|
- return nil, nil, fmt.Errorf("sql %s is not a select statement", sql)
|
|
|
+ return nil, nil, fmt.Errorf("SQL %s is not a select statement.\n", sql)
|
|
|
} else {
|
|
|
tp := xstream.NewWithName(name)
|
|
|
var inputs []api.Emitter
|
|
|
streamsFromStmt := xsql.GetStreams(selectStmt)
|
|
|
if !shouldCreateSource && len(streamsFromStmt) != len(sources){
|
|
|
- return nil, nil, fmt.Errorf("invalid parameter sources or streams, the length cannot match the statement, expect %d sources", len(streamsFromStmt))
|
|
|
+ return nil, nil, fmt.Errorf("Invalid parameter sources or streams, the length cannot match the statement, expect %d sources.\n", len(streamsFromStmt))
|
|
|
}
|
|
|
db, err := common.DbOpen(path.Join(p.badgerDir, "stream"))
|
|
|
if err != nil {
|