Pārlūkot izejas kodu

refactor(kv): move kv to a standalone package to prevent loading sqlite at startup

ngjaying 4 gadi atpakaļ
vecāks
revīzija
25a67c02a6

+ 19 - 0
common/kv/kv.go

@@ -0,0 +1,19 @@
+package kv
+
+type KeyValue interface {
+	Open() error
+	Close() error
+	// Set key to hold string value if key does not exist otherwise return an error
+	Setnx(key string, value interface{}) error
+	// Set key to hold the string value. If key already holds a value, it is overwritten
+	Set(key string, value interface{}) error
+	Get(key string, val interface{}) (bool, error)
+	//Must return *common.Error with NOT_FOUND error
+	Delete(key string) error
+	Keys() (keys []string, err error)
+	Clean() error
+}
+
+func GetDefaultKVStore(fpath string) (ret KeyValue) {
+	return GetSqliteKVStore(fpath)
+}

+ 3 - 16
common/kv.go

@@ -1,10 +1,11 @@
-package common
+package kv
 
 
 import (
 import (
 	"bytes"
 	"bytes"
 	"database/sql"
 	"database/sql"
 	"encoding/gob"
 	"encoding/gob"
 	"fmt"
 	"fmt"
+	"github.com/emqx/kuiper/common"
 	_ "github.com/mattn/go-sqlite3"
 	_ "github.com/mattn/go-sqlite3"
 	"os"
 	"os"
 	"path"
 	"path"
@@ -12,20 +13,6 @@ import (
 	"strings"
 	"strings"
 )
 )
 
 
-type KeyValue interface {
-	Open() error
-	Close() error
-	// Set key to hold string value if key does not exist otherwise return an error
-	Setnx(key string, value interface{}) error
-	// Set key to hold the string value. If key already holds a value, it is overwritten
-	Set(key string, value interface{}) error
-	Get(key string, val interface{}) (bool, error)
-	//Must return *common.Error with NOT_FOUND error
-	Delete(key string) error
-	Keys() (keys []string, err error)
-	Clean() error
-}
-
 type SqliteKVStore struct {
 type SqliteKVStore struct {
 	db    *sql.DB
 	db    *sql.DB
 	table string
 	table string
@@ -120,7 +107,7 @@ func (m *SqliteKVStore) Delete(key string) error {
 	var tmp []byte
 	var tmp []byte
 	err := row.Scan(&tmp)
 	err := row.Scan(&tmp)
 	if nil != err || 0 == len(tmp) {
 	if nil != err || 0 == len(tmp) {
-		return NewErrorWithCode(NOT_FOUND, fmt.Sprintf("%s is not found", key))
+		return common.NewErrorWithCode(common.NOT_FOUND, fmt.Sprintf("%s is not found", key))
 	}
 	}
 	sql = fmt.Sprintf("DELETE FROM %s WHERE key='%s';", m.table, key)
 	sql = fmt.Sprintf("DELETE FROM %s WHERE key='%s';", m.table, key)
 	_, err = m.db.Exec(sql)
 	_, err = m.db.Exec(sql)

+ 101 - 0
common/kv/sqliteKV_test.go

@@ -0,0 +1,101 @@
+package kv
+
+import (
+	"os"
+	"path"
+	"path/filepath"
+	"reflect"
+	"testing"
+)
+
+func TestSqliteKVStore_Funcs(t *testing.T) {
+	abs, _ := filepath.Abs("test")
+	if f, _ := os.Stat(abs); f != nil {
+		os.Remove(abs)
+	}
+
+	ks := GetSqliteKVStore(abs)
+	if e := ks.Open(); e != nil {
+		t.Errorf("Failed to open data %s.", e)
+	}
+
+	if err := ks.Setnx("foo", "bar"); nil != err {
+		t.Error(err)
+	}
+
+	var v string
+	if ok, _ := ks.Get("foo", &v); ok {
+		if !reflect.DeepEqual("bar", v) {
+			t.Error("expect:bar", "get:", v)
+		}
+	} else {
+		t.Errorf("Should not find the foo key.")
+	}
+
+	if err := ks.Setnx("foo1", "bar1"); nil != err {
+		t.Error(err)
+	}
+
+	if err := ks.Set("foo1", "bar2"); nil != err {
+		t.Error(err)
+	}
+
+	var v1 string
+	if ok, _ := ks.Get("foo1", &v1); ok {
+		if !reflect.DeepEqual("bar2", v1) {
+			t.Error("expect:bar2", "get:", v1)
+		}
+	} else {
+		t.Errorf("Should not find the foo1 key.")
+	}
+
+	if keys, e1 := ks.Keys(); e1 != nil {
+		t.Errorf("Failed to get value: %s.", e1)
+	} else {
+		if !reflect.DeepEqual(2, len(keys)) {
+			t.Error("expect:2", "get:", len(keys))
+		}
+	}
+
+	if e2 := ks.Close(); e2 != nil {
+		t.Errorf("Failed to close data: %s.", e2)
+	}
+
+	if err := ks.Open(); nil != err {
+		t.Error(err)
+	}
+
+	var v2 string
+	if ok, _ := ks.Get("foo", &v2); ok {
+		if !reflect.DeepEqual("bar", v2) {
+			t.Error("expect:bar", "get:", v)
+		}
+	} else {
+		t.Errorf("Should not find the foo key.")
+	}
+
+	if err := ks.Delete("foo1"); nil != err {
+		t.Error(err)
+	}
+
+	if keys, e1 := ks.Keys(); e1 != nil {
+		t.Errorf("Failed to get value: %s.", e1)
+	} else {
+		reflect.DeepEqual(1, len(keys))
+	}
+
+	if err := ks.Clean(); nil != err {
+		t.Error(err)
+	}
+
+	if keys, e1 := ks.Keys(); e1 != nil {
+		t.Errorf("Failed to get value: %s.", e1)
+	} else {
+		reflect.DeepEqual(0, len(keys))
+	}
+
+	dir, _ := filepath.Split(abs)
+	abs = path.Join(dir, "sqliteKV.db")
+	os.Remove(abs)
+
+}

+ 0 - 95
common/util_test.go

@@ -1,106 +1,11 @@
 package common
 package common
 
 
 import (
 import (
-	"os"
-	"path"
-	"path/filepath"
 	"reflect"
 	"reflect"
 	"strings"
 	"strings"
 	"testing"
 	"testing"
 )
 )
 
 
-func TestSqliteKVStore_Funcs(t *testing.T) {
-	abs, _ := filepath.Abs("test")
-	if f, _ := os.Stat(abs); f != nil {
-		os.Remove(abs)
-	}
-
-	ks := GetSqliteKVStore(abs)
-	if e := ks.Open(); e != nil {
-		t.Errorf("Failed to open data %s.", e)
-	}
-
-	if err := ks.Setnx("foo", "bar"); nil != err {
-		t.Error(err)
-	}
-
-	var v string
-	if ok, _ := ks.Get("foo", &v); ok {
-		if !reflect.DeepEqual("bar", v) {
-			t.Error("expect:bar", "get:", v)
-		}
-	} else {
-		t.Errorf("Should not find the foo key.")
-	}
-
-	if err := ks.Setnx("foo1", "bar1"); nil != err {
-		t.Error(err)
-	}
-
-	if err := ks.Set("foo1", "bar2"); nil != err {
-		t.Error(err)
-	}
-
-	var v1 string
-	if ok, _ := ks.Get("foo1", &v1); ok {
-		if !reflect.DeepEqual("bar2", v1) {
-			t.Error("expect:bar2", "get:", v1)
-		}
-	} else {
-		t.Errorf("Should not find the foo1 key.")
-	}
-
-	if keys, e1 := ks.Keys(); e1 != nil {
-		t.Errorf("Failed to get value: %s.", e1)
-	} else {
-		if !reflect.DeepEqual(2, len(keys)) {
-			t.Error("expect:2", "get:", len(keys))
-		}
-	}
-
-	if e2 := ks.Close(); e2 != nil {
-		t.Errorf("Failed to close data: %s.", e2)
-	}
-
-	if err := ks.Open(); nil != err {
-		t.Error(err)
-	}
-
-	var v2 string
-	if ok, _ := ks.Get("foo", &v2); ok {
-		if !reflect.DeepEqual("bar", v2) {
-			t.Error("expect:bar", "get:", v)
-		}
-	} else {
-		t.Errorf("Should not find the foo key.")
-	}
-
-	if err := ks.Delete("foo1"); nil != err {
-		t.Error(err)
-	}
-
-	if keys, e1 := ks.Keys(); e1 != nil {
-		t.Errorf("Failed to get value: %s.", e1)
-	} else {
-		reflect.DeepEqual(1, len(keys))
-	}
-
-	if err := ks.Clean(); nil != err {
-		t.Error(err)
-	}
-
-	if keys, e1 := ks.Keys(); e1 != nil {
-		t.Errorf("Failed to get value: %s.", e1)
-	} else {
-		reflect.DeepEqual(0, len(keys))
-	}
-
-	dir, _ := filepath.Split(abs)
-	abs = path.Join(dir, "sqliteKV.db")
-	os.Remove(abs)
-
-}
-
 func TestMapConvert_Funcs(t *testing.T) {
 func TestMapConvert_Funcs(t *testing.T) {
 	source := map[interface{}]interface{}{
 	source := map[interface{}]interface{}{
 		"QUERY_TABLE": "VBAP",
 		"QUERY_TABLE": "VBAP",

+ 3 - 2
plugins/manager.go

@@ -7,6 +7,7 @@ import (
 	"errors"
 	"errors"
 	"fmt"
 	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/common"
+	"github.com/emqx/kuiper/common/kv"
 	"github.com/emqx/kuiper/xstream/api"
 	"github.com/emqx/kuiper/xstream/api"
 	"io"
 	"io"
 	"io/ioutil"
 	"io/ioutil"
@@ -281,7 +282,7 @@ type Manager struct {
 	pluginDir string
 	pluginDir string
 	etcDir    string
 	etcDir    string
 	registry  *Registry
 	registry  *Registry
-	db        common.KeyValue
+	db        kv.KeyValue
 }
 }
 
 
 func NewPluginManager() (*Manager, error) {
 func NewPluginManager() (*Manager, error) {
@@ -302,7 +303,7 @@ func NewPluginManager() (*Manager, error) {
 			outerErr = fmt.Errorf("cannot find db folder: %s", err)
 			outerErr = fmt.Errorf("cannot find db folder: %s", err)
 			return
 			return
 		}
 		}
-		db := common.GetSqliteKVStore(path.Join(dbDir, "pluginFuncs"))
+		db := kv.GetDefaultKVStore(path.Join(dbDir, "pluginFuncs"))
 		err = db.Open()
 		err = db.Open()
 		if err != nil {
 		if err != nil {
 			outerErr = fmt.Errorf("error when opening db: %v.", err)
 			outerErr = fmt.Errorf("error when opening db: %v.", err)

+ 2 - 2
tools/migration/util/migration.go

@@ -2,7 +2,7 @@ package util
 
 
 import (
 import (
 	"fmt"
 	"fmt"
-	"github.com/emqx/kuiper/common"
+	"github.com/emqx/kuiper/common/kv"
 	"github.com/patrickmn/go-cache"
 	"github.com/patrickmn/go-cache"
 	"io/ioutil"
 	"io/ioutil"
 	"os"
 	"os"
@@ -22,7 +22,7 @@ func migration(dir string) error {
 		return err
 		return err
 	}
 	}
 
 
-	store := common.GetSqliteKVStore(dir)
+	store := kv.GetDefaultKVStore(dir)
 	if err := store.Open(); nil != err {
 	if err := store.Open(); nil != err {
 		return err
 		return err
 	}
 	}

+ 2 - 2
tools/migration/util/migration_test.go

@@ -1,7 +1,7 @@
 package util
 package util
 
 
 import (
 import (
-	"github.com/emqx/kuiper/common"
+	"github.com/emqx/kuiper/common/kv"
 	"github.com/patrickmn/go-cache"
 	"github.com/patrickmn/go-cache"
 	"os"
 	"os"
 	"path"
 	"path"
@@ -47,7 +47,7 @@ func TestDataMigration(t *testing.T) {
 		return
 		return
 	}
 	}
 
 
-	store := common.GetSqliteKVStore(dir)
+	store := kv.GetDefaultKVStore(dir)
 	if err := store.Open(); nil != err {
 	if err := store.Open(); nil != err {
 		t.Error(err)
 		t.Error(err)
 		return
 		return

+ 6 - 5
xsql/processors/xsql_processor.go

@@ -5,6 +5,7 @@ import (
 	"encoding/json"
 	"encoding/json"
 	"fmt"
 	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/common"
+	"github.com/emqx/kuiper/common/kv"
 	"github.com/emqx/kuiper/xsql"
 	"github.com/emqx/kuiper/xsql"
 	"github.com/emqx/kuiper/xstream"
 	"github.com/emqx/kuiper/xstream"
 	"github.com/emqx/kuiper/xstream/api"
 	"github.com/emqx/kuiper/xstream/api"
@@ -18,13 +19,13 @@ import (
 var log = common.Log
 var log = common.Log
 
 
 type StreamProcessor struct {
 type StreamProcessor struct {
-	db common.KeyValue
+	db kv.KeyValue
 }
 }
 
 
 //@params d : the directory of the DB to save the stream info
 //@params d : the directory of the DB to save the stream info
 func NewStreamProcessor(d string) *StreamProcessor {
 func NewStreamProcessor(d string) *StreamProcessor {
 	processor := &StreamProcessor{
 	processor := &StreamProcessor{
-		db: common.GetSqliteKVStore(d),
+		db: kv.GetDefaultKVStore(d),
 	}
 	}
 	return processor
 	return processor
 }
 }
@@ -208,13 +209,13 @@ func (p *StreamProcessor) DropStream(name string) (string, error) {
 }
 }
 
 
 type RuleProcessor struct {
 type RuleProcessor struct {
-	db        common.KeyValue
+	db        kv.KeyValue
 	rootDbDir string
 	rootDbDir string
 }
 }
 
 
 func NewRuleProcessor(d string) *RuleProcessor {
 func NewRuleProcessor(d string) *RuleProcessor {
 	processor := &RuleProcessor{
 	processor := &RuleProcessor{
-		db:        common.GetSqliteKVStore(path.Join(d, "rule")),
+		db:        kv.GetDefaultKVStore(path.Join(d, "rule")),
 		rootDbDir: d,
 		rootDbDir: d,
 	}
 	}
 	return processor
 	return processor
@@ -456,7 +457,7 @@ func cleanSinkCache(rule *api.Rule) error {
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
-	store := common.GetSqliteKVStore(path.Join(dbDir, "sink"))
+	store := kv.GetDefaultKVStore(path.Join(dbDir, "sink"))
 	err = store.Open()
 	err = store.Open()
 	if err != nil {
 	if err != nil {
 		return err
 		return err

+ 3 - 2
xstream/nodes/sink_cache.go

@@ -4,6 +4,7 @@ import (
 	"encoding/gob"
 	"encoding/gob"
 	"fmt"
 	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/common"
+	"github.com/emqx/kuiper/common/kv"
 	"github.com/emqx/kuiper/xstream/api"
 	"github.com/emqx/kuiper/xstream/api"
 	"github.com/emqx/kuiper/xstream/checkpoints"
 	"github.com/emqx/kuiper/xstream/checkpoints"
 	"io"
 	"io"
@@ -65,7 +66,7 @@ type Cache struct {
 	changed bool
 	changed bool
 	//serialize
 	//serialize
 	key   string //the key for current cache
 	key   string //the key for current cache
-	store common.KeyValue
+	store kv.KeyValue
 }
 }
 
 
 func NewTimebasedCache(in <-chan interface{}, limit int, saveInterval int, errCh chan<- error, ctx api.StreamContext) *Cache {
 func NewTimebasedCache(in <-chan interface{}, limit int, saveInterval int, errCh chan<- error, ctx api.StreamContext) *Cache {
@@ -90,7 +91,7 @@ func (c *Cache) initStore(ctx api.StreamContext) {
 	if err != nil {
 	if err != nil {
 		c.drainError(err)
 		c.drainError(err)
 	}
 	}
-	c.store = common.GetSqliteKVStore(path.Join(dbDir, "sink", ctx.GetRuleId()))
+	c.store = kv.GetDefaultKVStore(path.Join(dbDir, "sink", ctx.GetRuleId()))
 	c.key = ctx.GetOpId() + strconv.Itoa(ctx.GetInstanceId())
 	c.key = ctx.GetOpId() + strconv.Itoa(ctx.GetInstanceId())
 	logger.Debugf("cache saved to key %s", c.key)
 	logger.Debugf("cache saved to key %s", c.key)
 	//load cache
 	//load cache

+ 4 - 3
xstream/planner/planner.go

@@ -3,6 +3,7 @@ package planner
 import (
 import (
 	"fmt"
 	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/common"
+	"github.com/emqx/kuiper/common/kv"
 	"github.com/emqx/kuiper/xsql"
 	"github.com/emqx/kuiper/xsql"
 	"github.com/emqx/kuiper/xstream"
 	"github.com/emqx/kuiper/xstream"
 	"github.com/emqx/kuiper/xstream/api"
 	"github.com/emqx/kuiper/xstream/api"
@@ -33,7 +34,7 @@ func PlanWithSourcesAndSinks(rule *api.Rule, storePath string, sources []*nodes.
 	if rule.Options.SendMetaToSink && (len(streamsFromStmt) > 1 || stmt.Dimensions != nil) {
 	if rule.Options.SendMetaToSink && (len(streamsFromStmt) > 1 || stmt.Dimensions != nil) {
 		return nil, fmt.Errorf("Invalid option sendMetaToSink, it can not be applied to window")
 		return nil, fmt.Errorf("Invalid option sendMetaToSink, it can not be applied to window")
 	}
 	}
-	store := common.GetSqliteKVStore(path.Join(storePath, "stream"))
+	store := kv.GetDefaultKVStore(path.Join(storePath, "stream"))
 	err = store.Open()
 	err = store.Open()
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
@@ -162,7 +163,7 @@ func buildOps(lp LogicalPlan, tp *xstream.TopologyNew, options *api.RuleOption,
 	return op, newIndex, nil
 	return op, newIndex, nil
 }
 }
 
 
-func getStream(m common.KeyValue, name string) (stmt *xsql.StreamStmt, err error) {
+func getStream(m kv.KeyValue, name string) (stmt *xsql.StreamStmt, err error) {
 	var s string
 	var s string
 	f, err := m.Get(name, &s)
 	f, err := m.Get(name, &s)
 	if !f || err != nil {
 	if !f || err != nil {
@@ -177,7 +178,7 @@ func getStream(m common.KeyValue, name string) (stmt *xsql.StreamStmt, err error
 	return
 	return
 }
 }
 
 
-func createLogicalPlan(stmt *xsql.SelectStatement, opt *api.RuleOption, store common.KeyValue) (LogicalPlan, error) {
+func createLogicalPlan(stmt *xsql.SelectStatement, opt *api.RuleOption, store kv.KeyValue) (LogicalPlan, error) {
 	streamsFromStmt := xsql.GetStreams(stmt)
 	streamsFromStmt := xsql.GetStreams(stmt)
 	dimensions := stmt.Dimensions
 	dimensions := stmt.Dimensions
 	var (
 	var (

+ 2 - 1
xstream/planner/planner_test.go

@@ -3,6 +3,7 @@ package planner
 import (
 import (
 	"fmt"
 	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/common"
+	"github.com/emqx/kuiper/common/kv"
 	"github.com/emqx/kuiper/xsql"
 	"github.com/emqx/kuiper/xsql"
 	"github.com/emqx/kuiper/xstream/api"
 	"github.com/emqx/kuiper/xstream/api"
 	"path"
 	"path"
@@ -26,7 +27,7 @@ func getDbDir() string {
 }
 }
 
 
 func Test_createLogicalPlan(t *testing.T) {
 func Test_createLogicalPlan(t *testing.T) {
-	store := common.GetSqliteKVStore(path.Join(DbDir, "stream"))
+	store := kv.GetDefaultKVStore(path.Join(DbDir, "stream"))
 	err := store.Open()
 	err := store.Open()
 	if err != nil {
 	if err != nil {
 		t.Error(err)
 		t.Error(err)

+ 3 - 2
xstream/states/kv_store.go

@@ -4,6 +4,7 @@ import (
 	"encoding/gob"
 	"encoding/gob"
 	"fmt"
 	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/common"
+	"github.com/emqx/kuiper/common/kv"
 	"github.com/emqx/kuiper/xstream/checkpoints"
 	"github.com/emqx/kuiper/xstream/checkpoints"
 	"path"
 	"path"
 	"sync"
 	"sync"
@@ -20,7 +21,7 @@ func init() {
 ***   { "checkpoint1", "checkpoint2" ... "checkpointn" : The complete or incomplete snapshot
 ***   { "checkpoint1", "checkpoint2" ... "checkpointn" : The complete or incomplete snapshot
  */
  */
 type KVStore struct {
 type KVStore struct {
-	db          common.KeyValue
+	db          kv.KeyValue
 	mapStore    *sync.Map //The current root store of a rule
 	mapStore    *sync.Map //The current root store of a rule
 	checkpoints []int64
 	checkpoints []int64
 	max         int
 	max         int
@@ -33,7 +34,7 @@ type KVStore struct {
 //Assume each operator only has one instance
 //Assume each operator only has one instance
 func getKVStore(ruleId string) (*KVStore, error) {
 func getKVStore(ruleId string) (*KVStore, error) {
 	dr, _ := common.GetDataLoc()
 	dr, _ := common.GetDataLoc()
-	db := common.GetSqliteKVStore(path.Join(dr, "checkpoints", ruleId))
+	db := kv.GetDefaultKVStore(path.Join(dr, "checkpoints", ruleId))
 	s := &KVStore{db: db, max: 3, mapStore: &sync.Map{}}
 	s := &KVStore{db: db, max: 3, mapStore: &sync.Map{}}
 	//read data from badger db
 	//read data from badger db
 	if err := s.restore(); err != nil {
 	if err := s.restore(); err != nil {