Преглед изворни кода

refactor(processor): refactor stream processor to not store the statement string as state. Instead, make it a parameter for ExecStmt

ngjaying пре 5 година
родитељ
комит
9e5ee5db2c

+ 2 - 2
examples/testExtension.go

@@ -17,10 +17,10 @@ func main() {
 	log.Infof("db location is %s", dbDir)
 
 	demo := `DROP STREAM ext`
-	processors.NewStreamProcessor(demo, path.Join(dbDir, "stream")).Exec()
+	processors.NewStreamProcessor(path.Join(dbDir, "stream")).ExecStmt(demo)
 
 	demo = "CREATE STREAM ext (count bigint) WITH (DATASOURCE=\"users\", FORMAT=\"JSON\", TYPE=\"random\")"
-	_, err = processors.NewStreamProcessor(demo, path.Join(dbDir, "stream")).Exec()
+	_, err = processors.NewStreamProcessor(path.Join(dbDir, "stream")).ExecStmt(demo)
 	if err != nil {
 		panic(err)
 	}

+ 4 - 3
xsql/processors/extension_test.go

@@ -24,11 +24,12 @@ func setup() *RuleProcessor {
 	}
 	log.Infof("db location is %s", dbDir)
 
+	p := NewStreamProcessor(path.Join(dbDir, "stream"))
 	demo := `DROP STREAM ext`
-	NewStreamProcessor(demo, path.Join(dbDir, "stream")).Exec()
-	demo = "CREATE STREAM ext (count bigint) WITH (DATASOURCE=\"users\", FORMAT=\"JSON\", TYPE=\"random\", CONF_KEY=\"ext\")"
+	p.ExecStmt(demo)
 
-	_, err = NewStreamProcessor(demo, path.Join(dbDir, "stream")).Exec()
+	demo = "CREATE STREAM ext (count bigint) WITH (DATASOURCE=\"users\", FORMAT=\"JSON\", TYPE=\"random\", CONF_KEY=\"ext\")"
+	_, err = p.ExecStmt(demo)
 	if err != nil {
 		panic(err)
 	}

+ 8 - 11
xsql/processors/xsql_processor.go

@@ -18,22 +18,19 @@ import (
 var log = common.Log
 
 type StreamProcessor struct {
-	statement string
-	dbDir     string
+	dbDir string
 }
 
-//@params s : the sql string of create stream statement
 //@params d : the directory of the DB to save the stream info
-func NewStreamProcessor(s, d string) *StreamProcessor {
+func NewStreamProcessor(d string) *StreamProcessor {
 	processor := &StreamProcessor{
-		statement: s,
-		dbDir:     d,
+		dbDir: d,
 	}
 	return processor
 }
 
-func (p *StreamProcessor) Exec() (result []string, err error) {
-	parser := xsql.NewParser(strings.NewReader(p.statement))
+func (p *StreamProcessor) ExecStmt(statement string) (result []string, err error) {
+	parser := xsql.NewParser(strings.NewReader(statement))
 	stmt, err := xsql.Language.Parse(parser)
 	if err != nil {
 		return
@@ -49,7 +46,7 @@ func (p *StreamProcessor) Exec() (result []string, err error) {
 	switch s := stmt.(type) {
 	case *xsql.StreamStmt:
 		var r string
-		r, err = p.execCreateStream(s, store)
+		r, err = p.execCreateStream(s, store, statement)
 		result = append(result, r)
 	case *xsql.ShowStreamsStatement:
 		result, err = p.execShowStream(s, store)
@@ -70,8 +67,8 @@ func (p *StreamProcessor) Exec() (result []string, err error) {
 	return
 }
 
-func (p *StreamProcessor) execCreateStream(stmt *xsql.StreamStmt, db common.KeyValue) (string, error) {
-	err := db.Set(string(stmt.Name), p.statement)
+func (p *StreamProcessor) execCreateStream(stmt *xsql.StreamStmt, db common.KeyValue, statement string) (string, error) {
+	err := db.Set(string(stmt.Name), statement)
 	if err != nil {
 		return "", fmt.Errorf("Create stream fails: %v.", err)
 	} else {

+ 17 - 13
xsql/processors/xsql_processor_test.go

@@ -90,7 +90,7 @@ func TestStreamCreateProcessor(t *testing.T) {
 
 	streamDB := path.Join(getDbDir(), "streamTest")
 	for i, tt := range tests {
-		results, err := NewStreamProcessor(tt.s, streamDB).Exec()
+		results, err := NewStreamProcessor(streamDB).ExecStmt(tt.s)
 		if !reflect.DeepEqual(tt.err, errstring(err)) {
 			t.Errorf("%d. %q: error mismatch:\n  exp=%s\n  got=%s\n\n", i, tt.s, tt.err, err)
 		} else if tt.err == "" {
@@ -102,12 +102,13 @@ func TestStreamCreateProcessor(t *testing.T) {
 }
 
 func createStreams(t *testing.T) {
+	p := NewStreamProcessor(path.Join(DbDir, "stream"))
 	demo := `CREATE STREAM demo (
 					color STRING,
 					size BIGINT,
 					ts BIGINT
 				) WITH (DATASOURCE="demo", FORMAT="json", KEY="ts");`
-	_, err := NewStreamProcessor(demo, path.Join(DbDir, "stream")).Exec()
+	_, err := p.ExecStmt(demo)
 	if err != nil {
 		t.Log(err)
 	}
@@ -116,7 +117,7 @@ func createStreams(t *testing.T) {
 					hum BIGINT,
 					ts BIGINT
 				) WITH (DATASOURCE="demo1", FORMAT="json", KEY="ts");`
-	_, err = NewStreamProcessor(demo1, path.Join(DbDir, "stream")).Exec()
+	_, err = p.ExecStmt(demo1)
 	if err != nil {
 		t.Log(err)
 	}
@@ -125,25 +126,26 @@ func createStreams(t *testing.T) {
 					hum BIGINT,
 					ts BIGINT
 				) WITH (DATASOURCE="sessionDemo", FORMAT="json", KEY="ts");`
-	_, err = NewStreamProcessor(sessionDemo, path.Join(DbDir, "stream")).Exec()
+	_, err = p.ExecStmt(sessionDemo)
 	if err != nil {
 		t.Log(err)
 	}
 }
 
 func dropStreams(t *testing.T) {
+	p := NewStreamProcessor(path.Join(DbDir, "stream"))
 	demo := `DROP STREAM demo`
-	_, err := NewStreamProcessor(demo, path.Join(DbDir, "stream")).Exec()
+	_, err := p.ExecStmt(demo)
 	if err != nil {
 		t.Log(err)
 	}
 	demo1 := `DROP STREAM demo1`
-	_, err = NewStreamProcessor(demo1, path.Join(DbDir, "stream")).Exec()
+	_, err = p.ExecStmt(demo1)
 	if err != nil {
 		t.Log(err)
 	}
 	sessionDemo := `DROP STREAM sessionDemo`
-	_, err = NewStreamProcessor(sessionDemo, path.Join(DbDir, "stream")).Exec()
+	_, err = p.ExecStmt(sessionDemo)
 	if err != nil {
 		t.Log(err)
 	}
@@ -1038,12 +1040,13 @@ func TestWindow(t *testing.T) {
 }
 
 func createEventStreams(t *testing.T) {
+	p := NewStreamProcessor(path.Join(DbDir, "stream"))
 	demo := `CREATE STREAM demoE (
 					color STRING,
 					size BIGINT,
 					ts BIGINT
 				) WITH (DATASOURCE="demoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
-	_, err := NewStreamProcessor(demo, path.Join(DbDir, "stream")).Exec()
+	_, err := p.ExecStmt(demo)
 	if err != nil {
 		t.Log(err)
 	}
@@ -1052,7 +1055,7 @@ func createEventStreams(t *testing.T) {
 					hum BIGINT,
 					ts BIGINT
 				) WITH (DATASOURCE="demo1E", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
-	_, err = NewStreamProcessor(demo1, path.Join(DbDir, "stream")).Exec()
+	_, err = p.ExecStmt(demo1)
 	if err != nil {
 		t.Log(err)
 	}
@@ -1061,25 +1064,26 @@ func createEventStreams(t *testing.T) {
 					hum BIGINT,
 					ts BIGINT
 				) WITH (DATASOURCE="sessionDemoE", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
-	_, err = NewStreamProcessor(sessionDemo, path.Join(DbDir, "stream")).Exec()
+	_, err = p.ExecStmt(sessionDemo)
 	if err != nil {
 		t.Log(err)
 	}
 }
 
 func dropEventStreams(t *testing.T) {
+	p := NewStreamProcessor(path.Join(DbDir, "stream"))
 	demo := `DROP STREAM demoE`
-	_, err := NewStreamProcessor(demo, path.Join(DbDir, "stream")).Exec()
+	_, err := p.ExecStmt(demo)
 	if err != nil {
 		t.Log(err)
 	}
 	demo1 := `DROP STREAM demo1E`
-	_, err = NewStreamProcessor(demo1, path.Join(DbDir, "stream")).Exec()
+	_, err = p.ExecStmt(demo1)
 	if err != nil {
 		t.Log(err)
 	}
 	sessionDemo := `DROP STREAM sessionDemoE`
-	_, err = NewStreamProcessor(sessionDemo, path.Join(DbDir, "stream")).Exec()
+	_, err = p.ExecStmt(sessionDemo)
 	if err != nil {
 		t.Log(err)
 	}

+ 20 - 16
xstream/server/server/server.go

@@ -19,8 +19,15 @@ import (
 	"time"
 )
 
-var dataDir string
-var log = common.Log
+var (
+	dataDir         string
+	log             = common.Log
+	registry        RuleRegistry
+	ruleProcessor   *processors.RuleProcessor
+	streamProcessor *processors.StreamProcessor
+)
+
+const QUERY_RULE_ID = "internal-xstream_query_rule"
 
 type RuleState struct {
 	Name      string
@@ -29,13 +36,8 @@ type RuleState struct {
 }
 type RuleRegistry map[string]*RuleState
 
-var registry RuleRegistry
-var processor *processors.RuleProcessor
-
 type Server int
 
-var QUERY_RULE_ID = "internal-xstream_query_rule"
-
 func (t *Server) CreateQuery(sql string, reply *string) error {
 	if _, ok := registry[QUERY_RULE_ID]; ok {
 		stopQuery()
@@ -85,7 +87,7 @@ func (t *Server) GetQueryResult(qid string, reply *string) error {
 }
 
 func (t *Server) Stream(stream string, reply *string) error {
-	content, err := processors.NewStreamProcessor(stream, path.Join(path.Dir(dataDir), "stream")).Exec()
+	content, err := streamProcessor.ExecStmt(stream)
 	if err != nil {
 		return fmt.Errorf("Stream command error: %s", err)
 	} else {
@@ -97,7 +99,7 @@ func (t *Server) Stream(stream string, reply *string) error {
 }
 
 func (t *Server) CreateRule(rule *common.Rule, reply *string) error {
-	r, err := processor.ExecCreate(rule.Name, rule.Json)
+	r, err := ruleProcessor.ExecCreate(rule.Name, rule.Json)
 	if err != nil {
 		return fmt.Errorf("Create rule error : %s.", err)
 	} else {
@@ -116,7 +118,7 @@ func (t *Server) CreateRule(rule *common.Rule, reply *string) error {
 }
 
 func (t *Server) createRuleState(rule *api.Rule) (*RuleState, error) {
-	if tp, err := processor.ExecInitRule(rule); err != nil {
+	if tp, err := ruleProcessor.ExecInitRule(rule); err != nil {
 		return nil, err
 	} else {
 		rs := &RuleState{
@@ -178,7 +180,7 @@ func (t *Server) StartRule(name string, reply *string) error {
 	var rs *RuleState
 	rs, ok := registry[name]
 	if !ok {
-		r, err := processor.GetRuleByName(name)
+		r, err := ruleProcessor.GetRuleByName(name)
 		if err != nil {
 			return err
 		}
@@ -234,7 +236,7 @@ func (t *Server) RestartRule(name string, reply *string) error {
 }
 
 func (t *Server) DescRule(name string, reply *string) error {
-	r, err := processor.ExecDesc(name)
+	r, err := ruleProcessor.ExecDesc(name)
 	if err != nil {
 		return fmt.Errorf("Desc rule error : %s.", err)
 	} else {
@@ -244,7 +246,7 @@ func (t *Server) DescRule(name string, reply *string) error {
 }
 
 func (t *Server) ShowRules(_ int, reply *string) error {
-	r, err := processor.ExecShow()
+	r, err := ruleProcessor.ExecShow()
 	if err != nil {
 		return fmt.Errorf("Show rule error : %s.", err)
 	} else {
@@ -254,7 +256,7 @@ func (t *Server) ShowRules(_ int, reply *string) error {
 }
 
 func (t *Server) DropRule(name string, reply *string) error {
-	r, err := processor.ExecDrop(name)
+	r, err := ruleProcessor.ExecDrop(name)
 	if err != nil {
 		return fmt.Errorf("Drop rule error : %s.", err)
 	} else {
@@ -297,12 +299,14 @@ func StartUp(Version string) {
 		log.Infof("db location is %s", dr)
 		dataDir = dr
 	}
-	processor = processors.NewRuleProcessor(path.Dir(dataDir))
+	ruleProcessor = processors.NewRuleProcessor(path.Dir(dataDir))
+	streamProcessor = processors.NewStreamProcessor(path.Join(path.Dir(dataDir), "stream"))
+
 	registry = make(RuleRegistry)
 
 	server := new(Server)
 	//Start rules
-	if rules, err := processor.GetAllRules(); err != nil {
+	if rules, err := ruleProcessor.GetAllRules(); err != nil {
 		log.Infof("Start rules error: %s", err)
 	} else {
 		log.Info("Starting rules")

+ 0 - 1
xstream/test/mock_source.go

@@ -13,7 +13,6 @@ type MockSource struct {
 	isEventTime bool
 }
 
-// New creates a new CsvSource
 func NewMockSource(data []*xsql.Tuple, done <-chan int, isEventTime bool) *MockSource {
 	mock := &MockSource{
 		data:        data,