Browse Source

bug(kv): check existence for Set and Delete

ngjaying 5 years atrás
parent
commit
f937681d97
3 changed files with 34 additions and 16 deletions
  1. 12 3
      common/util.go
  2. 4 4
      xsql/processors/xsql_processor.go
  3. 18 9
      xsql/processors/xsql_processor_test.go

+ 12 - 3
common/util.go

@@ -166,9 +166,11 @@ func (m *SimpleKVStore) saveToFile() error {
 
 func (m *SimpleKVStore) Set(key string, value interface{}) error  {
 	if m.c == nil {
-		return fmt.Errorf("Cache %s has not been initialized yet.", m.path)
+		return fmt.Errorf("cache %s has not been initialized yet", m.path)
+	}
+	if err := m.c.Add(key, value, cache.NoExpiration); err != nil {
+		return err
 	}
-	m.c.Set(key, value, cache.NoExpiration)
 	return m.saveToFile()
 }
 
@@ -177,7 +179,14 @@ func (m *SimpleKVStore) Get(key string) (interface{}, bool)  {
 }
 
 func (m *SimpleKVStore) Delete(key string) error {
-	m.c.Delete(key)
+	if m.c == nil {
+		return fmt.Errorf("cache %s has not been initialized yet", m.path)
+	}
+	if _, found := m.c.Get(key); found {
+		m.c.Delete(key)
+	}else{
+		return fmt.Errorf("%s is not found", key)
+	}
 	return m.saveToFile()
 }
 

+ 4 - 4
xsql/processors/xsql_processor.go

@@ -77,7 +77,7 @@ func (p *StreamProcessor) Exec() (result []string, err error) {
 func (p *StreamProcessor) execCreateStream(stmt *xsql.StreamStmt, db common.KeyValue) (string, error) {
 	err := db.Set(string(stmt.Name), p.statement)
 	if err != nil {
-		return "", err
+		return "", fmt.Errorf("Create stream fails: %v.", err)
 	}else{
 		return fmt.Sprintf("Stream %s is created.", stmt.Name), nil
 	}
@@ -117,7 +117,7 @@ func (p *StreamProcessor) execDescribeStream(stmt *xsql.DescribeStreamStatement,
 }
 
 func (p *StreamProcessor) execExplainStream(stmt *xsql.ExplainStreamStatement, db common.KeyValue) (string,error) {
-	_, f := db.Get(string(stmt.Name))
+	_, f := db.Get(stmt.Name)
 	if !f {
 		return "", fmt.Errorf("Stream %s is not found.", stmt.Name)
 	}
@@ -125,9 +125,9 @@ func (p *StreamProcessor) execExplainStream(stmt *xsql.ExplainStreamStatement, d
 }
 
 func (p *StreamProcessor) execDropStream(stmt *xsql.DropStreamStatement, db common.KeyValue) (string, error) {
-	err := db.Delete(string(stmt.Name))
+	err := db.Delete(stmt.Name)
 	if err != nil {
-		return "", err
+		return "", fmt.Errorf("Drop stream fails: %v.", err)
 	}else{
 		return fmt.Sprintf("Stream %s is dropped.", stmt.Name), nil
 	}

+ 18 - 9
xsql/processors/xsql_processor_test.go

@@ -15,13 +15,15 @@ import (
 	"time"
 )
 
-var BadgerDir string
-func init(){
-	BadgerDir, err := common.GetAndCreateDataLoc("test")
+var BadgerDir = getBadger()
+
+func getBadger() string{
+	badgerDir, err := common.GetAndCreateDataLoc("test")
 	if err != nil {
 		log.Panic(err)
 	}
-	log.Infof("badge location is %s", BadgerDir)
+	log.Infof("badge location is %s", badgerDir)
+	return badgerDir
 }
 
 func TestStreamCreateProcessor(t *testing.T) {
@@ -32,11 +34,11 @@ func TestStreamCreateProcessor(t *testing.T) {
 	}{
 		{
 			s: `SHOW STREAMS;`,
-			r: []string{"No stream definition found."},
+			r: []string{"No stream definitions are found."},
 		},
 		{
 			s: `EXPLAIN STREAM topic1;`,
-			err: "stream topic1 is not found.",
+			err: "Stream topic1 is not found.",
 		},
 		{
 			s: `CREATE STREAM topic1 (
@@ -50,6 +52,12 @@ func TestStreamCreateProcessor(t *testing.T) {
 			r: []string{"Stream topic1 is created."},
 		},
 		{
+			s: `CREATE STREAM topic1 (
+					USERID BIGINT,
+				) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID");`,
+			err: "Create stream fails: Item topic1 already exists.",
+		},
+		{
 			s: `EXPLAIN STREAM topic1;`,
 			r: []string{"TO BE SUPPORTED"},
 		},
@@ -65,7 +73,7 @@ func TestStreamCreateProcessor(t *testing.T) {
 		},
 		{
 			s: `DROP STREAM topic1;`,
-			r: []string{"stream topic1 dropped"},
+			r: []string{"Stream topic1 is dropped."},
 		},
 		{
 			s: `DESCRIBE STREAM topic1;`,
@@ -73,12 +81,13 @@ func TestStreamCreateProcessor(t *testing.T) {
 		},
 		{
 			s: `DROP STREAM topic1;`,
-			err: "Key not found",
+			err: "Drop stream fails: topic1 is not found.",
 		},
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
-	streamDB := path.Join(BadgerDir, "streamTest")
+
+	streamDB := path.Join(getBadger(), "streamTest")
 	for i, tt := range tests {
 		results, err := NewStreamProcessor(tt.s, streamDB).Exec()
 		if !reflect.DeepEqual(tt.err, errstring(err)) {