Преглед на файлове

refactor(all): formats and clean up

ngjaying преди 5 години
родител
ревизия
d91a44d04d

+ 16 - 0
common/util.go

@@ -9,6 +9,7 @@ import (
 	"github.com/sirupsen/logrus"
 	"io/ioutil"
 	"os"
+	"path"
 	"path/filepath"
 )
 
@@ -241,6 +242,21 @@ func GetLoc(subdir string)(string, error) {
 	return "", fmt.Errorf("conf dir not found")
 }
 
+func GetAndCreateDataLoc(dir string) (string, error) {
+	dataDir, err := GetDataLoc()
+	if err != nil {
+		return "", err
+	}
+	d := path.Join(path.Dir(dataDir), dir)
+	if _, err := os.Stat(d); os.IsNotExist(err) {
+		err = os.MkdirAll(d, 0755)
+		if err != nil {
+			return "", err
+		}
+	}
+	return d, nil
+}
+
 //Time related. For Mock
 func GetTicker(duration int) Ticker {
 	if IsTesting{

+ 3 - 5
xsql/ast.go

@@ -980,7 +980,6 @@ func (v *ValuerEval) Eval(expr Expr) interface{} {
 	default:
 		return nil
 	}
-	return nil
 }
 
 
@@ -1512,8 +1511,8 @@ func toFloat64(para interface{}) float64 {
 	return 0
 }
 
-func IsAggStatement(node Node) (bool) {
-	var r bool = false
+func IsAggStatement(node Node) bool {
+	var r = false
 	WalkFunc(node, func(n Node) {
 		if f, ok := n.(*Call); ok {
 			fn := strings.ToLower(f.Name)
@@ -1528,10 +1527,9 @@ func IsAggStatement(node Node) (bool) {
 				return
 			}
 		}
-	});
+	})
 	return r
 }
-
 func HasAggFuncs(node Node) bool {
 	if node == nil{
 		return false

+ 7 - 7
xsql/funcs_ast_validator.go

@@ -9,7 +9,7 @@ type AllowTypes struct {
 	types []Literal
 }
 
-func validateFuncs(funcName string, args []Expr) (error) {
+func validateFuncs(funcName string, args []Expr) error {
 	lowerName := strings.ToLower(funcName)
 	if _, ok := mathFuncMap[lowerName]; ok {
 		return validateMathFunc(funcName, args)
@@ -25,7 +25,7 @@ func validateFuncs(funcName string, args []Expr) (error) {
 	return nil
 }
 
-func validateMathFunc(name string, args []Expr) (error) {
+func validateMathFunc(name string, args []Expr) error {
 	len := len(args)
 	switch name {
 	case "abs", "acos", "asin", "atan", "ceil", "cos", "cosh", "exp", "ln", "log", "round", "sign", "sin", "sinh",
@@ -74,7 +74,7 @@ func validateMathFunc(name string, args []Expr) (error) {
 	return nil
 }
 
-func validateStrFunc(name string, args []Expr) (error) {
+func validateStrFunc(name string, args []Expr) error {
 	len := len(args)
 	switch name {
 	case "concat":
@@ -182,7 +182,7 @@ func validateStrFunc(name string, args []Expr) (error) {
 	return nil
 }
 
-func validateConvFunc(name string, args []Expr) (error) {
+func validateConvFunc(name string, args []Expr) error {
 	len := len(args)
 	switch name {
 	case "cast":
@@ -239,7 +239,7 @@ func validateConvFunc(name string, args []Expr) (error) {
 	return nil
 }
 
-func validateHashFunc(name string, args []Expr) (error) {
+func validateHashFunc(name string, args []Expr) error {
 	len := len(args)
 	switch name {
 	case "md5", "sha1", "sha224", "sha256", "sha384", "sha512":
@@ -254,7 +254,7 @@ func validateHashFunc(name string, args []Expr) (error) {
 	return nil
 }
 
-func validateOtherFunc(name string, args []Expr) (error) {
+func validateOtherFunc(name string, args []Expr) error {
 	len := len(args)
 	switch name {
 	case "isNull":
@@ -296,7 +296,7 @@ func produceErrInfo(name string, index int, expect string) (err error) {
 	return
 }
 
-func validateLen(funcName string, exp, actual int) (error) {
+func validateLen(funcName string, exp, actual int) error {
 	if actual != exp {
 		return fmt.Errorf("The arguments for %s should be %d.", funcName, exp)
 	}

+ 21 - 21
xsql/funcs_ast_validator_test.go

@@ -16,14 +16,14 @@ func TestFuncValidator(t *testing.T) {
 	}{
 		{
 			s: `SELECT abs(1) FROM tbl`,
-			stmt: &SelectStatement{Fields: []Field{Field{ AName:"",  Name: "abs", Expr:&Call{Name:"abs", Args: []Expr{&IntegerLiteral{Val:1}}}}},
+			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "abs", Expr: &Call{Name: "abs", Args: []Expr{&IntegerLiteral{Val: 1}}}}},
 				Sources: []Source{&Table{Name:"tbl"}},
 			},
 		},
 
 		{
 			s: `SELECT abs(field1) FROM tbl`,
-			stmt: &SelectStatement{Fields: []Field{Field{ AName:"",  Name: "abs", Expr:&Call{Name:"abs", Args: []Expr{&FieldRef{Name:"field1"}}}}},
+			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "abs", Expr: &Call{Name: "abs", Args: []Expr{&FieldRef{Name: "field1"}}}}},
 				Sources: []Source{&Table{Name:"tbl"}},
 			},
 		},
@@ -36,7 +36,7 @@ func TestFuncValidator(t *testing.T) {
 
 		{
 			s: `SELECT abs(1.1) FROM tbl`,
-			stmt: &SelectStatement{Fields: []Field{Field{ AName:"",  Name: "abs", Expr:&Call{Name:"abs", Args: []Expr{&NumberLiteral{Val:1.1}}}}},
+			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "abs", Expr: &Call{Name: "abs", Args: []Expr{&NumberLiteral{Val: 1.1}}}}},
 				Sources: []Source{&Table{Name:"tbl"}},
 			},
 		},
@@ -63,14 +63,14 @@ func TestFuncValidator(t *testing.T) {
 		///
 		{
 			s: `SELECT sin(1) FROM tbl`,
-			stmt: &SelectStatement{Fields: []Field{Field{ AName:"",  Name: "sin", Expr:&Call{Name:"sin", Args: []Expr{&IntegerLiteral{Val:1}}}}},
+			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "sin", Expr: &Call{Name: "sin", Args: []Expr{&IntegerLiteral{Val: 1}}}}},
 				Sources: []Source{&Table{Name:"tbl"}},
 			},
 		},
 
 		{
 			s: `SELECT sin(1.1) FROM tbl`,
-			stmt: &SelectStatement{Fields: []Field{Field{ AName:"",  Name: "sin", Expr:&Call{Name:"sin", Args: []Expr{&NumberLiteral{Val:1.1}}}}},
+			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "sin", Expr: &Call{Name: "sin", Args: []Expr{&NumberLiteral{Val: 1.1}}}}},
 				Sources: []Source{&Table{Name:"tbl"}},
 			},
 		},
@@ -95,14 +95,14 @@ func TestFuncValidator(t *testing.T) {
 		///
 		{
 			s: `SELECT tanh(1) FROM tbl`,
-			stmt: &SelectStatement{Fields: []Field{Field{ AName:"",  Name: "tanh", Expr:&Call{Name:"tanh", Args: []Expr{&IntegerLiteral{Val:1}}}}},
+			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "tanh", Expr: &Call{Name: "tanh", Args: []Expr{&IntegerLiteral{Val: 1}}}}},
 				Sources: []Source{&Table{Name:"tbl"}},
 			},
 		},
 
 		{
 			s: `SELECT tanh(1.1) FROM tbl`,
-			stmt: &SelectStatement{Fields: []Field{Field{ AName:"",  Name: "tanh", Expr:&Call{Name:"tanh", Args: []Expr{&NumberLiteral{Val:1.1}}}}},
+			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "tanh", Expr: &Call{Name: "tanh", Args: []Expr{&NumberLiteral{Val: 1.1}}}}},
 				Sources: []Source{&Table{Name:"tbl"}},
 			},
 		},
@@ -128,7 +128,7 @@ func TestFuncValidator(t *testing.T) {
 		///
 		{
 			s: `SELECT bitxor(1, 2) FROM tbl`,
-			stmt: &SelectStatement{Fields: []Field{Field{ AName:"",  Name: "bitxor", Expr:&Call{Name:"bitxor", Args: []Expr{&IntegerLiteral{Val:1}, &IntegerLiteral{Val:2}}}}},
+			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "bitxor", Expr: &Call{Name: "bitxor", Args: []Expr{&IntegerLiteral{Val: 1}, &IntegerLiteral{Val: 2}}}}},
 				Sources: []Source{&Table{Name:"tbl"}},
 			},
 		},
@@ -160,7 +160,7 @@ func TestFuncValidator(t *testing.T) {
 		///
 		{
 			s: `SELECT bitnot(1) FROM tbl`,
-			stmt: &SelectStatement{Fields: []Field{Field{ AName:"",  Name: "bitnot", Expr:&Call{Name:"bitnot", Args: []Expr{&IntegerLiteral{Val:1}}}}},
+			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "bitnot", Expr: &Call{Name: "bitnot", Args: []Expr{&IntegerLiteral{Val: 1}}}}},
 				Sources: []Source{&Table{Name:"tbl"}},
 			},
 		},
@@ -180,7 +180,7 @@ func TestFuncValidator(t *testing.T) {
 		///
 		{
 			s: `SELECT mod(1, 2) FROM tbl`,
-			stmt: &SelectStatement{Fields: []Field{Field{ AName:"",  Name: "mod", Expr:&Call{Name:"mod", Args: []Expr{&IntegerLiteral{Val:1}, &IntegerLiteral{Val:2}}}}},
+			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "mod", Expr: &Call{Name: "mod", Args: []Expr{&IntegerLiteral{Val: 1}, &IntegerLiteral{Val: 2}}}}},
 				Sources: []Source{&Table{Name:"tbl"}},
 			},
 		},
@@ -206,7 +206,7 @@ func TestFuncValidator(t *testing.T) {
 		///
 		{
 			s: `SELECT concat(field, "hello") FROM tbl`,
-			stmt: &SelectStatement{Fields: []Field{Field{ AName:"",  Name: "concat", Expr:&Call{Name:"concat", Args: []Expr{&FieldRef{Name:"field"}, &StringLiteral{Val:"hello"}}}}},
+			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "concat", Expr: &Call{Name: "concat", Args: []Expr{&FieldRef{Name: "field"}, &StringLiteral{Val: "hello"}}}}},
 				Sources: []Source{&Table{Name:"tbl"}},
 			},
 		},
@@ -232,7 +232,7 @@ func TestFuncValidator(t *testing.T) {
 		///
 		{
 			s: `SELECT regexp_matches(field, "hello") FROM tbl`,
-			stmt: &SelectStatement{Fields: []Field{Field{ AName:"",  Name: "regexp_matches", Expr:&Call{Name:"regexp_matches", Args: []Expr{&FieldRef{Name:"field"}, &StringLiteral{Val:"hello"}}}}},
+			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "regexp_matches", Expr: &Call{Name: "regexp_matches", Args: []Expr{&FieldRef{Name: "field"}, &StringLiteral{Val: "hello"}}}}},
 				Sources: []Source{&Table{Name:"tbl"}},
 			},
 		},
@@ -252,7 +252,7 @@ func TestFuncValidator(t *testing.T) {
 		///
 		{
 			s: `SELECT regexp_replace(field, "hello", "h") FROM tbl`,
-			stmt: &SelectStatement{Fields: []Field{Field{ AName:"",  Name: "regexp_replace", Expr:&Call{Name:"regexp_replace", Args: []Expr{&FieldRef{Name:"field"}, &StringLiteral{Val:"hello"}, &StringLiteral{Val:"h"}}}}},
+			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "regexp_replace", Expr: &Call{Name: "regexp_replace", Args: []Expr{&FieldRef{Name: "field"}, &StringLiteral{Val: "hello"}, &StringLiteral{Val: "h"}}}}},
 				Sources: []Source{&Table{Name:"tbl"}},
 			},
 		},
@@ -266,7 +266,7 @@ func TestFuncValidator(t *testing.T) {
 		///
 		{
 			s: `SELECT trim(field) FROM tbl`,
-			stmt: &SelectStatement{Fields: []Field{Field{ AName:"",  Name: "trim", Expr:&Call{Name:"trim", Args: []Expr{&FieldRef{Name:"field"}}}}},
+			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "trim", Expr: &Call{Name: "trim", Args: []Expr{&FieldRef{Name: "field"}}}}},
 				Sources: []Source{&Table{Name:"tbl"}},
 			},
 		},
@@ -280,7 +280,7 @@ func TestFuncValidator(t *testing.T) {
 		///
 		{
 			s: `SELECT rpad(field, 3) FROM tbl`,
-			stmt: &SelectStatement{Fields: []Field{Field{ AName:"",  Name: "rpad", Expr:&Call{Name:"rpad", Args: []Expr{&FieldRef{Name:"field"}, &IntegerLiteral{Val:3}}}}},
+			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "rpad", Expr: &Call{Name: "rpad", Args: []Expr{&FieldRef{Name: "field"}, &IntegerLiteral{Val: 3}}}}},
 				Sources: []Source{&Table{Name:"tbl"}},
 			},
 		},
@@ -294,7 +294,7 @@ func TestFuncValidator(t *testing.T) {
 		///
 		{
 			s: `SELECT substring(field, 3, 4) FROM tbl`,
-			stmt: &SelectStatement{Fields: []Field{Field{ AName:"",  Name: "substring", Expr:&Call{Name:"substring", Args: []Expr{&FieldRef{Name:"field"}, &IntegerLiteral{Val:3}, &IntegerLiteral{Val:4}}}}},
+			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "substring", Expr: &Call{Name: "substring", Args: []Expr{&FieldRef{Name: "field"}, &IntegerLiteral{Val: 3}, &IntegerLiteral{Val: 4}}}}},
 				Sources: []Source{&Table{Name:"tbl"}},
 			},
 		},
@@ -320,7 +320,7 @@ func TestFuncValidator(t *testing.T) {
 		///
 		{
 			s: `SELECT cast(field, "bigint") FROM tbl`,
-			stmt: &SelectStatement{Fields: []Field{Field{ AName:"",  Name: "cast", Expr:&Call{Name:"cast", Args: []Expr{&FieldRef{Name:"field"}, &StringLiteral{Val:"bigint"}}}}},
+			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "cast", Expr: &Call{Name: "cast", Args: []Expr{&FieldRef{Name: "field"}, &StringLiteral{Val: "bigint"}}}}},
 				Sources: []Source{&Table{Name:"tbl"}},
 			},
 		},
@@ -334,7 +334,7 @@ func TestFuncValidator(t *testing.T) {
 		///
 		{
 			s: `SELECT chr(field) FROM tbl`,
-			stmt: &SelectStatement{Fields: []Field{Field{ AName:"",  Name: "chr", Expr:&Call{Name:"chr", Args: []Expr{&FieldRef{Name:"field"}}}}},
+			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "chr", Expr: &Call{Name: "chr", Args: []Expr{&FieldRef{Name: "field"}}}}},
 				Sources: []Source{&Table{Name:"tbl"}},
 			},
 		},
@@ -348,7 +348,7 @@ func TestFuncValidator(t *testing.T) {
 		///
 		{
 			s: `SELECT encode(field, "base64") FROM tbl`,
-			stmt: &SelectStatement{Fields: []Field{Field{ AName:"",  Name: "encode", Expr:&Call{Name:"encode", Args: []Expr{&FieldRef{Name:"field"}, &StringLiteral{Val:"base64"}}}}},
+			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "encode", Expr: &Call{Name: "encode", Args: []Expr{&FieldRef{Name: "field"}, &StringLiteral{Val: "base64"}}}}},
 				Sources: []Source{&Table{Name:"tbl"}},
 			},
 		},
@@ -362,7 +362,7 @@ func TestFuncValidator(t *testing.T) {
 		///
 		{
 			s: `SELECT trunc(field, 3) FROM tbl`,
-			stmt: &SelectStatement{Fields: []Field{Field{ AName:"",  Name: "trunc", Expr:&Call{Name:"trunc", Args: []Expr{&FieldRef{Name:"field"}, &IntegerLiteral{Val:3}}}}},
+			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "trunc", Expr: &Call{Name: "trunc", Args: []Expr{&FieldRef{Name: "field"}, &IntegerLiteral{Val: 3}}}}},
 				Sources: []Source{&Table{Name:"tbl"}},
 			},
 		},
@@ -376,7 +376,7 @@ func TestFuncValidator(t *testing.T) {
 		///
 		{
 			s: `SELECT sha512(field) FROM tbl`,
-			stmt: &SelectStatement{Fields: []Field{Field{ AName:"",  Name: "sha512", Expr:&Call{Name:"sha512", Args: []Expr{&FieldRef{Name:"field"}}}}},
+			stmt: &SelectStatement{Fields: []Field{{AName: "", Name: "sha512", Expr: &Call{Name: "sha512", Args: []Expr{&FieldRef{Name: "field"}}}}},
 				Sources: []Source{&Table{Name:"tbl"}},
 			},
 		},

+ 1 - 1
xsql/funcs_math.go

@@ -11,7 +11,7 @@ func mathCall(name string, args []interface{}) (interface{}, bool) {
 	case "abs":
 		if v, ok := args[0].(int); ok {
 			t := float64(v)
-			var ret int = int(math.Abs(t))
+			var ret = int(math.Abs(t))
 			return ret, true
 		} else if v, ok := args[0].(float64); ok {
 			return math.Abs(v), true

+ 5 - 5
xsql/lexical.go

@@ -276,7 +276,7 @@ func (s *Scanner) Scan() (tok Token, lit string) {
 		if r := s.read(); r == '-' {
 			s.skipUntilNewline()
 			return COMMENT, ""
-		} else if (r == '>'){
+		} else if r == '>' {
 			return ARROW, tokens[ARROW]
 		} else if isDigit(r) {
 			s.unread()
@@ -587,20 +587,20 @@ func isWhiteSpace(r rune) bool {
 
 func isLetter(ch rune) bool { return (ch >= 'a' && ch <= 'z') || (ch >= 'A' && ch <= 'Z') }
 
-func isDigit(ch rune) bool { return (ch >= '0' && ch <= '9') }
+func isDigit(ch rune) bool { return ch >= '0' && ch <= '9' }
 
 func isQuotation(ch rune) bool { return ch == '"' }
 
 func (tok Token) isOperator() bool { return (tok > operatorBeg && tok < operatorEnd) || tok == ASTERISK || tok == LBRACKET }
 
-func (tok Token) isTimeLiteral() bool { return (tok >= DD && tok <= MS) }
+func (tok Token) isTimeLiteral() bool { return tok >= DD && tok <= MS }
 
 func (tok Token) allowedSourceToken() bool {
-	return (tok == IDENT || tok == DIV || tok == HASH || tok == ADD)
+	return tok == IDENT || tok == DIV || tok == HASH || tok == ADD
 }
 
 //Allowed special field name token
-func (tok Token) allowedSFNToken() bool { return (tok == DOT) }
+func (tok Token) allowedSFNToken() bool { return tok == DOT }
 
 func (tok Token) Precedence() int {
 	switch tok {

+ 3 - 3
xsql/parser.go

@@ -236,7 +236,7 @@ func (p *Parser) parseJoins() (Joins, error) {
 	for {
 		if tok, lit := p.scanIgnoreWhitespace(); tok == INNER || tok == LEFT || tok == RIGHT || tok == FULL || tok == CROSS {
 			if tok1, _ := p.scanIgnoreWhitespace(); tok1 == JOIN {
-				var jt JoinType = INNER_JOIN
+				var jt = INNER_JOIN
 				switch tok {
 				case INNER:
 					jt = INNER_JOIN
@@ -659,7 +659,7 @@ func validateWindows(name string, args []Expr) (WindowType, error) {
 	return NOT_WINDOW, nil
 }
 
-func validateWindow(funcName string, expectLen int, args []Expr) (error) {
+func validateWindow(funcName string, expectLen int, args []Expr) error {
 	if len(args) != expectLen {
 		return fmt.Errorf("The arguments for %s should be %d.\n", funcName, expectLen)
 	}
@@ -961,7 +961,7 @@ func (p *Parser) parseStreamStructType() (FieldType, error) {
 }
 
 func (p *Parser) parseStreamOptions() (map[string]string, error) {
-	var opts map[string]string = make(map[string]string)
+	var opts = make(map[string]string)
 	lStack := &stack.Stack{}
 	if tok, lit := p.scanIgnoreWhitespace(); tok == LPAREN {
 		lStack.Push(LPAREN)

Файловите разлики са ограничени, защото са твърде много
+ 486 - 486
xsql/parser_test.go


+ 1 - 1
xsql/plans/join_operator.go

@@ -59,7 +59,7 @@ func getStreamNames(join *xsql.Join) ([]string, error) {
 			}
 			srcs = append(srcs, string(f.StreamName))
 		}
-	});
+	})
 	if len(srcs) != 2 {
 		return nil, fmt.Errorf("Not correct join expression, it requires exactly 2 sources at ON expression.")
 	}

+ 0 - 1
xsql/plans/order_operator.go

@@ -28,5 +28,4 @@ func (p *OrderPlan) Apply(ctx context.Context, data interface{}) interface{} {
 		log.Errorf("Expect xsql.Valuer or its array type.")
 		return nil
 	}
-	return nil
 }

+ 1 - 4
xsql/processors/xsql_processor_test.go

@@ -16,13 +16,10 @@ import (
 
 var BadgerDir string
 func init(){
-	dataDir, err := common.GetDataLoc()
+	BadgerDir, err := common.GetAndCreateDataLoc("test")
 	if err != nil {
 		log.Panic(err)
-	}else{
-		log.Infof("db location is %s", dataDir)
 	}
-	BadgerDir = path.Join(path.Dir(dataDir), "test")
 	log.Infof("badge location is %s", BadgerDir)
 }
 

+ 1 - 1
xsql/util.go

@@ -49,7 +49,7 @@ func GetStreams(stmt *SelectStatement) (result []string){
 	return
 }
 
-func LowercaseKeyMap(m map[string]interface{}) (map[string]interface{}) {
+func LowercaseKeyMap(m map[string]interface{}) map[string]interface{} {
 	m1 := make(map[string]interface{})
 	for k, v := range m {
 		if m2, ok := v.(map[string]interface{}); ok {

+ 1 - 1
xsql/xsql_manager.go

@@ -29,7 +29,7 @@ func (t *ParseTree) Handle(tok Token, fn func(*Parser) (Statement, error)) {
 
 
 func (pt *ParseTree) Parse(p *Parser) (Statement, error) {
-	tok, _ := p.scanIgnoreWhitespace();
+	tok, _ := p.scanIgnoreWhitespace()
 	p.unscan()
 	if f, ok  := pt.Handlers[tok]; ok {
 		return f(p)

+ 1 - 1
xsql/xsql_parser_tree_test.go

@@ -20,7 +20,7 @@ func TestParser_ParseTree(t *testing.T) {
 			stmt: &StreamStmt{
 				Name: StreamName("demo"),
 				StreamFields: []StreamField{
-					StreamField{Name: "USERID", FieldType: &BasicType{Type: BIGINT}},
+					{Name: "USERID", FieldType: &BasicType{Type: BIGINT}},
 				},
 				Options: map[string]string{
 					"DATASOURCE" : "users",

+ 1 - 2
xstream/cli/main.go

@@ -150,7 +150,6 @@ func main() {
 								args := strings.Join([]string{"CREATE STREAM ", string(stream)}, " ")
 								return streamProcess(client, args)
 							}
-							return nil
 						} else {
 							return streamProcess(client, "")
 						}
@@ -442,6 +441,6 @@ func main() {
 
 	err = app.Run(os.Args)
 	if err != nil {
-		fmt.Errorf("%s", err)
+		fmt.Printf("%v", err)
 	}
 }

+ 1 - 1
xstream/demo/func_visitor.go

@@ -18,7 +18,7 @@ func main() {
 			}
 			srcs = append(srcs, string(f.StreamName))
 		}
-	});
+	})
 
 	for _, src := range srcs {
 		fmt.Println(src)

+ 0 - 63
xstream/demo/test.go

@@ -1,63 +0,0 @@
-package main
-
-import (
-	"engine/common"
-	"engine/xsql"
-	"engine/xsql/plans"
-	"engine/xstream"
-	"engine/xstream/collectors"
-	"engine/xstream/extensions"
-	"strings"
-)
-
-func main() {
-
-	log := common.Log
-
-	demo1Stream, err := xsql.NewParser(strings.NewReader("CREATE STREAM demo1 (count bigint) WITH (source=\"users\", FORMAT=\"AVRO\", KEY=\"USERID\")")).ParseCreateStreamStmt()
-	demo2Stream, err := xsql.NewParser(strings.NewReader("CREATE STREAM demo2 (abc bigint) WITH (source=\"users\", FORMAT=\"AVRO\", KEY=\"USERID\")")).ParseCreateStreamStmt()
-	stmt, err := xsql.NewParser(strings.NewReader("SELECT count FROM demo1 where demo1.count > 3")).Parse()
-	if err != nil {
-		log.Fatal("Failed to parse SQL for %s. \n", err)
-	}
-
-	tp := xstream.New()
-
-	mqs1, err := extensions.NewWithName("srv1", "demo1", "")
-	if err != nil {
-		log.Fatalf("Found error %s.\n", err)
-		return
-	}
-	tp.AddSrc(mqs1)
-
-	mqs2, err := extensions.NewWithName("srv2", "demo2", "")
-	if err != nil {
-		log.Fatalf("Found error %s.\n", err)
-		return
-	}
-	tp.AddSrc(mqs2)
-
-	preprocessorOp1 := xstream.Transform(&plans.Preprocessor{StreamStmt: demo1Stream}, "preprocessor1")
-	tp.AddOperator([]xstream.Emitter{mqs1}, preprocessorOp1)
-
-	preprocessorOp2 := xstream.Transform(&plans.Preprocessor{StreamStmt: demo2Stream}, "preprocessor2")
-	tp.AddOperator([]xstream.Emitter{mqs2}, preprocessorOp2)
-
-	filterOp := xstream.Transform(&plans.FilterPlan{Condition: stmt.Condition}, "filter plan")
-	filterOp.SetConcurrency(3)
-	tp.AddOperator([]xstream.Emitter{preprocessorOp1, preprocessorOp2}, filterOp)
-
-	projectOp := xstream.Transform(&plans.ProjectPlan{Fields: stmt.Fields}, "project plan")
-	tp.AddOperator([]xstream.Emitter{filterOp}, projectOp)
-
-
-	tp.AddSink([]xstream.Emitter{projectOp}, collectors.Func(func(data interface{}) error {
-		log.Println("sink result %s", data)
-		return nil
-	}))
-
-	if err := <-tp.Open(); err != nil {
-		log.Fatal(err)
-		return
-	}
-}

+ 0 - 92
xstream/demo/testWindow.go

@@ -1,92 +0,0 @@
-package main
-
-import (
-	"engine/common"
-	"engine/xsql"
-	"engine/xsql/plans"
-	"engine/xstream"
-	"engine/xstream/collectors"
-	"engine/xstream/extensions"
-	"engine/xstream/operators"
-	"strings"
-)
-
-func main() {
-
-	log := common.Log
-
-	demo1Stream, err := xsql.NewParser(strings.NewReader("CREATE STREAM demo (count bigint) WITH (datasource=\"demo\", FORMAT=\"AVRO\", KEY=\"USERID\")")).ParseCreateStreamStmt()
-	//demo2Stream, err := xsql.NewParser(strings.NewReader("CREATE STREAM demo2 (abc bigint) WITH (datasource=\"demo2\", FORMAT=\"AVRO\", KEY=\"USERID\")")).ParseCreateStreamStmt()
-	//stmt, err := xsql.NewParser(strings.NewReader("SELECT count FROM demo1 where demo1.count > 3")).Parse()
-	if err != nil {
-		log.Fatal("Failed to parse SQL for %s. \n", err)
-	}
-
-	tp := xstream.New()
-
-	mqs1, err := extensions.NewWithName("srv1", "demo", "")
-	if err != nil {
-		log.Fatalf("Found error %s.\n", err)
-		return
-	}
-	tp.AddSrc(mqs1)
-
-	//mqs2, err := extensions.NewWithName("srv1", "demo2")
-	//if err != nil {
-	//	log.Fatalf("Found error %s.\n", err)
-	//	return
-	//}
-	//tp.AddSrc(mqs2)
-
-	preprocessorOp1 := xstream.Transform(&plans.Preprocessor{StreamStmt: demo1Stream}, "preprocessor1")
-	tp.AddOperator([]xstream.Emitter{mqs1}, preprocessorOp1)
-
-	//preprocessorOp2 := xstream.Transform(&plans.Preprocessor{StreamStmt: demo2Stream}, "preprocessor2")
-	//tp.AddOperator([]xstream.Emitter{mqs2}, preprocessorOp2)
-
-	//filterOp := xstream.Transform(&plans.FilterPlan{Condition: stmt.Condition}, "filter plan")
-	//filterOp.SetConcurrency(3)
-	//tp.AddOperator([]xstream.Emitter{preprocessorOp1, preprocessorOp2}, filterOp)
-	//
-	//projectOp := xstream.Transform(&plans.ProjectPlan{Fields: stmt.Fields}, "project plan")
-	//tp.AddOperator([]xstream.Emitter{filterOp}, projectOp)
-
-	//windowOp := operators.NewWindowOp("windowOp", &operators.WindowConfig{
-	//	Type: operators.NO_WINDOW,
-	//})
-
-	//windowOp := operators.NewWindowOp("windowOp", &operators.WindowConfig{
-	//	Type: operators.TUMBLING_WINDOW,
-	//	Length: 30000,
-	//})
-
-	//windowOp := operators.NewWindowOp("windowOp", &operators.WindowConfig{
-	//	Type: operators.HOPPING_WINDOW,
-	//	Length: 20000,
-	//	Interval: 10000,
-	//})
-	//
-	//windowOp := operators.NewWindowOp("windowOp", &operators.WindowConfig{
-	//	Type: operators.SLIDING_WINDOW,
-	//	Length: 20000,
-	//})
-
-	windowOp := operators.NewWindowOp("windowOp", &operators.WindowConfig{
-		Type: operators.SESSION_WINDOW,
-		Length: 20000,
-		Interval: 6000,
-	})
-
-	tp.AddOperator([]xstream.Emitter{preprocessorOp1}, windowOp)
-
-
-	tp.AddSink([]xstream.Emitter{windowOp}, collectors.Func(func(data interface{}) error {
-		log.Println("sink result %s", data)
-		return nil
-	}))
-
-	if err := <-tp.Open(); err != nil {
-		log.Fatal(err)
-		return
-	}
-}

+ 1 - 2
xstream/funcs.go

@@ -2,10 +2,9 @@ package xstream
 
 import (
 	"context"
-	"fmt"
 	"engine/xstream/operators"
+	"fmt"
 	"reflect"
-
 )
 
 type unaryFuncForm byte

+ 1 - 1
xstream/server/main.go

@@ -28,7 +28,7 @@ var processor *processors.RuleProcessor
 
 type Server int
 
-var QUERY_RULE_ID string = "internal-xstream_query_rule"
+var QUERY_RULE_ID = "internal-xstream_query_rule"
 func (t *Server) CreateQuery(sql string, reply *string) error {
 	if _, ok := registry[QUERY_RULE_ID]; ok {
 		stopQuery()

+ 7 - 7
xstream/util_test.go

@@ -7,31 +7,31 @@ import (
 func TestConf(t *testing.T) {
 	var file = "test/testconf.json"
 
-	if v, e := GetConfAsString(file, "conf_string"); (e != nil || (v != "test")) {
+	if v, e := GetConfAsString(file, "conf_string"); e != nil || (v != "test") {
 		t.Errorf("Expect %s, actual %s; error is %s. \n", "test", v, e)
 	}
 
-	if v, e := GetConfAsInt(file, "conf_int"); (e != nil || (v != 10)) {
+	if v, e := GetConfAsInt(file, "conf_int"); e != nil || (v != 10) {
 		t.Errorf("Expect %s, actual %d. error is %s. \n ", "10", v, e)
 	}
 
-	if v, e := GetConfAsFloat(file, "conf_float"); (e != nil || (v != 32.3)) {
+	if v, e := GetConfAsFloat(file, "conf_float"); e != nil || (v != 32.3) {
 		t.Errorf("Expect %s, actual %f. error is %s. \n ", "32.3", v, e)
 	}
 
-	if v, e := GetConfAsBool(file, "conf_bool"); (e != nil || (v != true)) {
+	if v, e := GetConfAsBool(file, "conf_bool"); e != nil || (v != true) {
 		t.Errorf("Expect %s, actual %v. error is %s. \n", "true", v, e)
 	}
 
-	if v, e := GetConfAsString(file, "servers.srv1.addr"); (e != nil || (v != "127.0.0.1")) {
+	if v, e := GetConfAsString(file, "servers.srv1.addr"); e != nil || (v != "127.0.0.1") {
 		t.Errorf("Expect %s, actual %s. error is %s. \n", "127.0.0.1", v, e)
 	}
 
-	if v, e := GetConfAsString(file, "servers.srv1.clientid"); (e != nil || (v != "")) {
+	if v, e := GetConfAsString(file, "servers.srv1.clientid"); e != nil || (v != "") {
 		t.Errorf("Expect %s, actual %s. error is %s. \n", "", v, e)
 	}
 
-	if v, e := GetConfAsInt(file, "servers.srv2.port"); (e != nil || (v != 1883)) {
+	if v, e := GetConfAsInt(file, "servers.srv2.port"); e != nil || (v != 1883) {
 		t.Errorf("Expect %s, actual %d. error is %s. \n", "1883", v, e)
 	}