Просмотр исходного кода

refactor(kv): refactor sqlkv and fix test failures

1. Leave only kv interface in pkg and move other sqlkv implementation to internal. So that sqlite dependency is not exported.
2. Refactor some API to move error to the last return value to comply with go convention.
3. Extract kvstore initialization as Setup function so that both server startup and testings can init it by one method call.

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 3 лет назад
Родитель
Сommit
8c658749b8

+ 2 - 2
pkg/kv/database.go

@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package kv
+package sqlkv
 
 import "database/sql"
 
@@ -20,4 +20,4 @@ type Database interface {
 	Connect() error
 	Disconnect() error
 	Apply(f func(db *sql.DB) error) error
-}
+}

+ 70 - 0
internal/pkg/sqlkv/kvStore.go

@@ -0,0 +1,70 @@
+// Copyright 2021 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sqlkv
+
+import (
+	"github.com/lf-edge/ekuiper/pkg/kv"
+	"sync"
+)
+
+type kvstores struct {
+	stores map[string]kv.KeyValue
+	mu     sync.Mutex
+}
+
+var stores = kvstores{
+	stores: make(map[string]kv.KeyValue),
+	mu:     sync.Mutex{},
+}
+
+var database Database
+
+func Setup(dataDir string) error {
+	err, db := newSqliteDatabase(dataDir)
+	if err != nil {
+		return err
+	}
+	err = db.Connect()
+	if err != nil {
+		return err
+	}
+	database = db
+	return nil
+}
+
+func Close() {
+	if database != nil {
+		database.Disconnect()
+		database = nil
+	}
+}
+
+func (s *kvstores) get(table string) (kv.KeyValue, error) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	if store, contains := s.stores[table]; contains {
+		return store, nil
+	}
+	err, store := CreateSqlKvStore(database, table)
+	if err != nil {
+		return nil, err
+	}
+	s.stores[table] = store
+	return store, nil
+}
+
+func GetKVStore(table string) (kv.KeyValue, error) {
+	return stores.get(table)
+}

+ 26 - 26
pkg/kv/sqlKv.go

@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package kv
+package sqlkv
 
 import (
 	"bytes"
@@ -25,15 +25,15 @@ import (
 
 type sqlKvStore struct {
 	database Database
-	table string
+	table    string
 }
 
 func CreateSqlKvStore(database Database, table string) (error, *sqlKvStore) {
 	store := &sqlKvStore{
 		database: database,
-		table: table,
+		table:    table,
 	}
-	err := store.database.Apply(func (db *sql.DB) error {
+	err := store.database.Apply(func(db *sql.DB) error {
 		query := fmt.Sprintf("CREATE TABLE IF NOT EXISTS '%s'('key' VARCHAR(255) PRIMARY KEY, 'val' BLOB);", table)
 		_, err := db.Exec(query)
 		return err
@@ -55,25 +55,21 @@ func encode(value interface{}) ([]byte, error) {
 }
 
 func (kv *sqlKvStore) Setnx(key string, value interface{}) error {
-	return kv.database.Apply(func (db *sql.DB) error {
+	return kv.database.Apply(func(db *sql.DB) error {
 		b, err := encode(value)
 		if nil != err {
 			return err
 		}
-		query := fmt.Sprintf("INSERT INTO %s(key,val) values(?,?);", kv.table)
+		query := fmt.Sprintf("INSERT INTO '%s'(key,val) values(?,?);", kv.table)
 		stmt, err := db.Prepare(query)
 		_, err = stmt.Exec(key, b)
-		if err != nil {
-			used := db.Stats().OpenConnections
-
-			fmt.Println(fmt.Sprintf("Here %d", used))
-			return err
-		}
 		stmt.Close()
-		if nil != err && strings.Contains(err.Error(), "UNIQUE constraint failed") {
-			return fmt.Errorf(`Item %s already exists`, key)
+		if err != nil {
+			if strings.Contains(err.Error(), "UNIQUE constraint failed") {
+				return fmt.Errorf(`Item %s already exists`, key)
+			}
 		}
-		return nil
+		return err
 	})
 }
 
@@ -83,8 +79,11 @@ func (kv *sqlKvStore) Set(key string, value interface{}) error {
 		return err
 	}
 	err = kv.database.Apply(func(db *sql.DB) error {
-		query := fmt.Sprintf("REPLACE INTO %s(key,val) values(?,?);", kv.table)
+		query := fmt.Sprintf("REPLACE INTO '%s'(key,val) values(?,?);", kv.table)
 		stmt, err := db.Prepare(query)
+		if err != nil {
+			return err
+		}
 		_, err = stmt.Exec(key, b)
 		stmt.Close()
 		return err
@@ -94,13 +93,14 @@ func (kv *sqlKvStore) Set(key string, value interface{}) error {
 
 func (kv *sqlKvStore) Get(key string, value interface{}) (bool, error) {
 	result := false
-	err := kv.database.Apply(func (db *sql.DB) error {
-		query := fmt.Sprintf("SELECT val FROM %s WHERE key='%s';", kv.table, key)
+	err := kv.database.Apply(func(db *sql.DB) error {
+		query := fmt.Sprintf("SELECT val FROM '%s' WHERE key='%s';", kv.table, key)
 		row := db.QueryRow(query)
 		var tmp []byte
 		err := row.Scan(&tmp)
-		if nil != err {
-			return err
+		if err != nil {
+			result = false
+			return nil
 		}
 		dec := gob.NewDecoder(bytes.NewBuffer(tmp))
 		if err := dec.Decode(value); err != nil {
@@ -113,15 +113,15 @@ func (kv *sqlKvStore) Get(key string, value interface{}) (bool, error) {
 }
 
 func (kv *sqlKvStore) Delete(key string) error {
-	return kv.database.Apply(func (db *sql.DB) error {
-		query := fmt.Sprintf("SELECT key FROM %s WHERE key='%s';", kv.table, key)
+	return kv.database.Apply(func(db *sql.DB) error {
+		query := fmt.Sprintf("SELECT key FROM '%s' WHERE key='%s';", kv.table, key)
 		row := db.QueryRow(query)
 		var tmp []byte
 		err := row.Scan(&tmp)
 		if nil != err || 0 == len(tmp) {
 			return errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("%s is not found", key))
 		}
-		query = fmt.Sprintf("DELETE FROM %s WHERE key='%s';", kv.table, key)
+		query = fmt.Sprintf("DELETE FROM '%s' WHERE key='%s';", kv.table, key)
 		_, err = db.Exec(query)
 		return err
 	})
@@ -130,7 +130,7 @@ func (kv *sqlKvStore) Delete(key string) error {
 func (kv *sqlKvStore) Keys() ([]string, error) {
 	keys := make([]string, 0)
 	err := kv.database.Apply(func(db *sql.DB) error {
-		query := fmt.Sprintf("SELECT key FROM %s", kv.table)
+		query := fmt.Sprintf("SELECT key FROM '%s'", kv.table)
 		row, err := db.Query(query)
 		if nil != err {
 			return err
@@ -151,8 +151,8 @@ func (kv *sqlKvStore) Keys() ([]string, error) {
 }
 
 func (kv *sqlKvStore) Clean() error {
-	return kv.database.Apply(func (db *sql.DB) error {
-		query := fmt.Sprintf("DELETE FROM %s", kv.table)
+	return kv.database.Apply(func(db *sql.DB) error {
+		query := fmt.Sprintf("DELETE FROM '%s'", kv.table)
 		_, err := db.Exec(query)
 		return err
 	})

+ 6 - 5
pkg/kv/sqlKv_test.go

@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package kv
+package sqlkv
 
 import (
 	"os"
@@ -27,11 +27,11 @@ func TestSqlKVStore_Funcs(t *testing.T) {
 	if f, _ := os.Stat(abs); f != nil {
 		os.Remove(abs)
 	}
-	_, database := NewSqliteDatabase(abs)
-	database.Connect()
-	SetKVStoreDatabase(database)
+	_, db := newSqliteDatabase(abs)
+	db.Connect()
+	database = db
 
-	_, ks := GetKVStore("test")
+	ks, _ := GetKVStore("test")
 	if err := ks.Setnx("foo", "bar"); nil != err {
 		t.Error(err)
 	}
@@ -99,6 +99,7 @@ func TestSqlKVStore_Funcs(t *testing.T) {
 		reflect.DeepEqual(0, len(keys))
 	}
 
+	database.Disconnect()
 	dir, _ := filepath.Split(abs)
 	abs = path.Join(dir, "sqliteKV.db")
 	os.Remove(abs)

+ 7 - 7
pkg/kv/sqliteDatabase.go

@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package kv
+package sqlkv
 
 import (
 	"database/sql"
@@ -24,20 +24,20 @@ import (
 )
 
 type SqliteDatabase struct {
-	db *sql.DB
+	db   *sql.DB
 	Path string
-	mu sync.Mutex
+	mu   sync.Mutex
 }
 
-func NewSqliteDatabase(dir string) (error, *SqliteDatabase) {
+func newSqliteDatabase(dir string) (error, *SqliteDatabase) {
 	if _, err := os.Stat(dir); os.IsNotExist(err) {
 		os.MkdirAll(dir, os.ModePerm)
 	}
 	dbPath := path.Join(dir, "sqliteKV.db")
 	return nil, &SqliteDatabase{
-		db: nil,
+		db:   nil,
 		Path: dbPath,
-		mu: sync.Mutex{},
+		mu:   sync.Mutex{},
 	}
 }
 
@@ -67,4 +67,4 @@ func (d *SqliteDatabase) Apply(f func(db *sql.DB) error) error {
 	err := f(d.db)
 	d.mu.Unlock()
 	return err
-}
+}

+ 2 - 1
internal/plugin/manager.go

@@ -22,6 +22,7 @@ import (
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/pkg/filex"
 	"github.com/lf-edge/ekuiper/internal/pkg/httpx"
+	"github.com/lf-edge/ekuiper/internal/pkg/sqlkv"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
 	"github.com/lf-edge/ekuiper/pkg/kv"
@@ -294,7 +295,7 @@ func NewPluginManager() (*Manager, error) {
 			outerErr = fmt.Errorf("cannot find etc folder: %s", err)
 			return
 		}
-		err, db := kv.GetKVStore("pluginFuncs")
+		db, err := sqlkv.GetKVStore("pluginFuncs")
 		if err != nil {
 			outerErr = fmt.Errorf("error when opening db: %v.", err)
 		}

+ 2 - 0
internal/plugin/manager_test.go

@@ -17,6 +17,7 @@ package plugin
 import (
 	"errors"
 	"fmt"
+	"github.com/lf-edge/ekuiper/internal/testx"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"net/http"
 	"net/http/httptest"
@@ -31,6 +32,7 @@ var manager *Manager
 
 func init() {
 	var err error
+	testx.InitEnv()
 	manager, err = NewPluginManager()
 	if err != nil {
 		panic(err)

+ 8 - 9
internal/processor/rule.go

@@ -19,6 +19,7 @@ import (
 	"encoding/json"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/pkg/sqlkv"
 	"github.com/lf-edge/ekuiper/internal/pkg/tskv"
 	"github.com/lf-edge/ekuiper/internal/topo"
 	"github.com/lf-edge/ekuiper/internal/topo/node"
@@ -31,18 +32,16 @@ import (
 )
 
 type RuleProcessor struct {
-	db        kv.KeyValue
-	rootDbDir string
+	db kv.KeyValue
 }
 
-func NewRuleProcessor(d string) *RuleProcessor {
-	err, db := kv.GetKVStore("rule")
+func NewRuleProcessor() *RuleProcessor {
+	db, err := sqlkv.GetKVStore("rule")
 	if err != nil {
-		panic(fmt.Sprintf("Can not initalize store for the Rule Processor"))
+		panic(fmt.Sprintf("Can not initalize store for the rule processor at path 'rule': %v", err))
 	}
 	processor := &RuleProcessor{
-		db:        db,
-		rootDbDir: d,
+		db: db,
 	}
 	return processor
 }
@@ -174,7 +173,7 @@ func (p *RuleProcessor) getRuleByJson(name, ruleJson string) (*api.Rule, error)
 }
 
 func (p *RuleProcessor) ExecQuery(ruleid, sql string) (*topo.Topo, error) {
-	if tp, err := planner.PlanWithSourcesAndSinks(p.getDefaultRule(ruleid, sql), p.rootDbDir, nil, []*node.SinkNode{node.NewSinkNode("sink_memory_log", "logToMemory", nil)}); err != nil {
+	if tp, err := planner.PlanWithSourcesAndSinks(p.getDefaultRule(ruleid, sql), nil, []*node.SinkNode{node.NewSinkNode("sink_memory_log", "logToMemory", nil)}); err != nil {
 		return nil, err
 	} else {
 		go func() {
@@ -243,7 +242,7 @@ func cleanCheckpoint(name string) error {
 }
 
 func cleanSinkCache(rule *api.Rule) error {
-	err, store := kv.GetKVStore("sink")
+	store, err := sqlkv.GetKVStore("sink")
 	if err != nil {
 		return err
 	}

+ 1 - 1
internal/processor/rule_test.go

@@ -142,7 +142,7 @@ func TestRuleActionParse_Apply(t *testing.T) {
 		},
 	}
 
-	p := NewRuleProcessor(DbDir)
+	p := NewRuleProcessor()
 	for i, tt := range tests {
 		r, err := p.getRuleByJson(tt.result.Id, tt.ruleStr)
 		if err != nil {

+ 4 - 4
internal/processor/stream.go

@@ -19,6 +19,7 @@ import (
 	"encoding/json"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/pkg/sqlkv"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
@@ -34,11 +35,10 @@ type StreamProcessor struct {
 	db kv.KeyValue
 }
 
-//@params d : the directory of the DB to save the stream info
-func NewStreamProcessor(d string) *StreamProcessor {
-	err, db := kv.GetKVStore(d)
+func NewStreamProcessor() *StreamProcessor {
+	db, err := sqlkv.GetKVStore("stream")
 	if err != nil {
-		panic(fmt.Sprintf("Can not initalize store for the stream processor at path %s", d))
+		panic(fmt.Sprintf("Can not initalize store for the stream processor at path 'stream': %v", err))
 	}
 	processor := &StreamProcessor{
 		db: db,

+ 7 - 6
internal/processor/stream_test.go

@@ -21,9 +21,9 @@ import (
 	"testing"
 )
 
-var (
-	DbDir = testx.GetDbDir()
-)
+func init() {
+	testx.InitEnv()
+}
 
 func TestStreamCreateProcessor(t *testing.T) {
 	var tests = []struct {
@@ -101,8 +101,9 @@ func TestStreamCreateProcessor(t *testing.T) {
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
 
+	p := NewStreamProcessor()
 	for i, tt := range tests {
-		results, err := NewStreamProcessor("streamTest").ExecStmt(tt.s)
+		results, err := p.ExecStmt(tt.s)
 		if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
 			t.Errorf("%d. %q: error mismatch:\n  exp=%s\n  got=%s\n\n", i, tt.s, tt.err, err)
 		} else if tt.err == "" {
@@ -188,9 +189,9 @@ func TestTableProcessor(t *testing.T) {
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
-
+	p := NewStreamProcessor()
 	for i, tt := range tests {
-		results, err := NewStreamProcessor("streamTest").ExecStmt(tt.s)
+		results, err := p.ExecStmt(tt.s)
 		if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
 			t.Errorf("%d. %q: error mismatch:\n  exp=%s\n  got=%s\n\n", i, tt.s, tt.err, err)
 		} else if tt.err == "" {

+ 1 - 1
internal/server/ruleManager.go

@@ -89,7 +89,7 @@ func createRuleState(rule *api.Rule) (*RuleState, error) {
 		Name: rule.Id,
 	}
 	registry.Store(rule.Id, rs)
-	if tp, err := planner.Plan(rule, dataDir); err != nil {
+	if tp, err := planner.Plan(rule); err != nil {
 		return rs, err
 	} else {
 		rs.Topology = tp

+ 9 - 15
internal/server/server.go

@@ -16,11 +16,11 @@ package server
 
 import (
 	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/pkg/sqlkv"
 	"github.com/lf-edge/ekuiper/internal/plugin"
 	"github.com/lf-edge/ekuiper/internal/processor"
 	"github.com/lf-edge/ekuiper/internal/service"
 	"github.com/lf-edge/ekuiper/internal/xsql"
-	"github.com/lf-edge/ekuiper/pkg/kv"
 	"github.com/prometheus/client_golang/prometheus/promhttp"
 
 	"context"
@@ -42,7 +42,6 @@ var (
 	streamProcessor *processor.StreamProcessor
 	pluginManager   *plugin.Manager
 	serviceManager  *service.Manager
-	store			*kv.KeyValue
 )
 
 func StartUp(Version, LoadFileType string) {
@@ -53,31 +52,26 @@ func StartUp(Version, LoadFileType string) {
 
 	dr, err := conf.GetDataLoc()
 	if err != nil {
-		logger.Panic(err)
+		panic(err)
 	} else {
 		logger.Infof("db location is %s", dr)
 		dataDir = dr
 	}
 
-	err, database := kv.NewSqliteDatabase(dataDir)
+	err = sqlkv.Setup(dataDir)
 	if err != nil {
-		logger.Panic(err)
+		panic(err)
 	}
-	err = database.Connect()
-	if err != nil {
-		logger.Panic(err)
-	}
-	kv.SetKVStoreDatabase(database)
 
-	ruleProcessor = processor.NewRuleProcessor(dataDir)
-	streamProcessor = processor.NewStreamProcessor("stream")
+	ruleProcessor = processor.NewRuleProcessor()
+	streamProcessor = processor.NewStreamProcessor()
 	pluginManager, err = plugin.NewPluginManager()
 	if err != nil {
-		logger.Panic(err)
+		panic(err)
 	}
 	serviceManager, err = service.GetServiceManager()
 	if err != nil {
-		logger.Panic(err)
+		panic(err)
 	}
 	xsql.InitFuncRegisters(serviceManager, pluginManager)
 
@@ -191,6 +185,6 @@ func StartUp(Version, LoadFileType string) {
 		logger.Info("prometheus server successfully shutdown.")
 	}
 
-	database.Disconnect()
+	sqlkv.Close()
 	os.Exit(0)
 }

+ 3 - 2
internal/service/manager.go

@@ -20,6 +20,7 @@ import (
 	kconf "github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/pkg/filex"
 	"github.com/lf-edge/ekuiper/internal/pkg/httpx"
+	"github.com/lf-edge/ekuiper/internal/pkg/sqlkv"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/kv"
 	"io/ioutil"
@@ -59,11 +60,11 @@ func GetServiceManager() (*Manager, error) {
 		if err != nil {
 			return nil, fmt.Errorf("cannot find etc/services folder: %s", err)
 		}
-		err, sdb := kv.GetKVStore("services")
+		sdb, err := sqlkv.GetKVStore("services")
 		if err != nil {
 			return nil, fmt.Errorf("cannot open service db: %s", err)
 		}
-		err, fdb := kv.GetKVStore("serviceFuncs")
+		fdb, err := sqlkv.GetKVStore("serviceFuncs")
 		if err != nil {
 			return nil, fmt.Errorf("cannot open function db: %s", err)
 		}

+ 3 - 1
internal/testx/testUtil.go

@@ -16,6 +16,7 @@ package testx
 
 import (
 	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/pkg/sqlkv"
 )
 
 // errstring returns the string representation of an error.
@@ -26,11 +27,12 @@ func Errstring(err error) string {
 	return ""
 }
 
-func GetDbDir() string {
+func InitEnv() string {
 	conf.InitConf()
 	dbDir, err := conf.GetDataLoc()
 	if err != nil {
 		conf.Log.Fatal(err)
 	}
+	sqlkv.Setup(dbDir)
 	return dbDir
 }

+ 3 - 6
internal/topo/node/sink_cache.go

@@ -18,6 +18,7 @@ import (
 	"encoding/gob"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/pkg/sqlkv"
 	"github.com/lf-edge/ekuiper/internal/topo/checkpoint"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/kv"
@@ -99,12 +100,8 @@ func (c *Cache) initStore(ctx api.StreamContext) {
 		Data: make(map[int]interface{}),
 		Tail: 0,
 	}
-	dbDir, err := conf.GetDataLoc()
-	logger.Debugf("cache saved to %s", dbDir)
-	if err != nil {
-		c.drainError(err)
-	}
-	err, c.store = kv.GetKVStore(path.Join("sink", ctx.GetRuleId()))
+	var err error
+	c.store, err = sqlkv.GetKVStore(path.Join("sink", ctx.GetRuleId()))
 	if err != nil {
 		c.drainError(err)
 	}

+ 3 - 3
internal/topo/planner/analyzer_test.go

@@ -17,11 +17,11 @@ package planner
 import (
 	"encoding/json"
 	"fmt"
+	"github.com/lf-edge/ekuiper/internal/pkg/sqlkv"
 	"github.com/lf-edge/ekuiper/internal/testx"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
-	"github.com/lf-edge/ekuiper/pkg/kv"
 	"reflect"
 	"strings"
 	"testing"
@@ -115,7 +115,7 @@ var tests = []struct {
 }
 
 func Test_validation(t *testing.T) {
-	err, store := kv.GetKVStore("stream")
+	store, err := sqlkv.GetKVStore("stream")
 	if err != nil {
 		t.Error(err)
 		return
@@ -176,7 +176,7 @@ func Test_validation(t *testing.T) {
 }
 
 func Test_validationSchemaless(t *testing.T) {
-	err, store := kv.GetKVStore("stream")
+	store, err := sqlkv.GetKVStore("stream")
 	if err != nil {
 		t.Error(err)
 		return

+ 5 - 4
internal/topo/planner/planner.go

@@ -18,6 +18,7 @@ import (
 	"errors"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/pkg/sqlkv"
 	"github.com/lf-edge/ekuiper/internal/topo"
 	"github.com/lf-edge/ekuiper/internal/topo/node"
 	"github.com/lf-edge/ekuiper/internal/topo/operator"
@@ -27,12 +28,12 @@ import (
 	"github.com/lf-edge/ekuiper/pkg/kv"
 )
 
-func Plan(rule *api.Rule, storePath string) (*topo.Topo, error) {
-	return PlanWithSourcesAndSinks(rule, storePath, nil, nil)
+func Plan(rule *api.Rule) (*topo.Topo, error) {
+	return PlanWithSourcesAndSinks(rule, nil, nil)
 }
 
 // For test only
-func PlanWithSourcesAndSinks(rule *api.Rule, storePath string, sources []*node.SourceNode, sinks []*node.SinkNode) (*topo.Topo, error) {
+func PlanWithSourcesAndSinks(rule *api.Rule, sources []*node.SourceNode, sinks []*node.SinkNode) (*topo.Topo, error) {
 	sql := rule.Sql
 
 	conf.Log.Infof("Init rule with options %+v", rule.Options)
@@ -48,7 +49,7 @@ func PlanWithSourcesAndSinks(rule *api.Rule, storePath string, sources []*node.S
 	if rule.Options.SendMetaToSink && (len(streamsFromStmt) > 1 || stmt.Dimensions != nil) {
 		return nil, fmt.Errorf("Invalid option sendMetaToSink, it can not be applied to window")
 	}
-	err, store := kv.GetKVStore("stream")
+	store, err := sqlkv.GetKVStore("stream")
 	if err != nil {
 		return nil, err
 	}

+ 6 - 6
internal/topo/planner/planner_test.go

@@ -18,22 +18,22 @@ import (
 	"encoding/json"
 	"fmt"
 	"github.com/gdexlab/go-render/render"
+	"github.com/lf-edge/ekuiper/internal/pkg/sqlkv"
 	"github.com/lf-edge/ekuiper/internal/testx"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
-	"github.com/lf-edge/ekuiper/pkg/kv"
 	"reflect"
 	"strings"
 	"testing"
 )
 
-var (
-	DbDir = testx.GetDbDir()
-)
+func init() {
+	testx.InitEnv()
+}
 
 func Test_createLogicalPlan(t *testing.T) {
-	err, store := kv.GetKVStore("stream")
+	store, err := sqlkv.GetKVStore("stream")
 	if err != nil {
 		t.Error(err)
 		return
@@ -1050,7 +1050,7 @@ func Test_createLogicalPlan(t *testing.T) {
 }
 
 func Test_createLogicalPlanSchemaless(t *testing.T) {
-	err, store := kv.GetKVStore("stream")
+	store, err := sqlkv.GetKVStore("stream")
 	if err != nil {
 		t.Error(err)
 		return

+ 7 - 7
internal/topo/topotest/mock_topo.go

@@ -35,6 +35,10 @@ import (
 	"time"
 )
 
+func init() {
+	testx.InitEnv()
+}
+
 const POSTLEAP = 1000 // Time change after all data sends out
 type RuleTest struct {
 	Name string
@@ -45,10 +49,6 @@ type RuleTest struct {
 	W    int                    // wait time for each data sending, in milli
 }
 
-var (
-	DbDir = testx.GetDbDir()
-)
-
 func compareMetrics(tp *topo.Topo, m map[string]interface{}) (err error) {
 	keys, values := tp.GetMetrics()
 	for k, v := range m {
@@ -251,7 +251,7 @@ func createStream(t *testing.T, tt RuleTest, j int, opt *api.RuleOption, sinkPro
 	}
 	mockSink := mocknode.NewMockSink()
 	sink := node.NewSinkNodeWithSink("mockSink", mockSink, sinkProps)
-	tp, err := planner.PlanWithSourcesAndSinks(&api.Rule{Id: fmt.Sprintf("%s_%d", tt.Name, j), Sql: tt.Sql, Options: opt}, DbDir, sources, []*node.SinkNode{sink})
+	tp, err := planner.PlanWithSourcesAndSinks(&api.Rule{Id: fmt.Sprintf("%s_%d", tt.Name, j), Sql: tt.Sql, Options: opt}, sources, []*node.SinkNode{sink})
 	if err != nil {
 		t.Error(err)
 		return nil, 0, nil, nil, nil
@@ -262,7 +262,7 @@ func createStream(t *testing.T, tt RuleTest, j int, opt *api.RuleOption, sinkPro
 
 // Create or drop streams
 func HandleStream(createOrDrop bool, names []string, t *testing.T) {
-	p := processor.NewStreamProcessor("stream")
+	p := processor.NewStreamProcessor()
 	for _, name := range names {
 		var sql string
 		if createOrDrop {
@@ -444,7 +444,7 @@ func DoCheckpointRuleTest(t *testing.T, tests []RuleCheckpointTest, j int, opt *
 }
 
 func CreateRule(name, sql string) (*api.Rule, error) {
-	p := processor.NewRuleProcessor(DbDir)
+	p := processor.NewRuleProcessor()
 	p.ExecDrop(name)
 	return p.ExecCreate(name, sql)
 }

+ 1 - 1
internal/topo/topotest/plugin_rule_test.go

@@ -81,7 +81,7 @@ func TestExtensions(t *testing.T) {
 			t.Errorf("failed to create rule: %s.", err)
 			continue
 		}
-		tp, err := planner.Plan(rs, DbDir)
+		tp, err := planner.Plan(rs)
 		if err != nil {
 			t.Errorf("fail to init rule: %v", err)
 			continue

+ 0 - 38
pkg/kv/kv.go

@@ -14,22 +14,6 @@
 
 package kv
 
-import (
-	"sync"
-)
-
-type kvstores struct {
-	stores map[string]KeyValue
-	mu	   sync.Mutex
-}
-
-var stores = kvstores{
-	stores: make(map[string]KeyValue),
-	mu: sync.Mutex{},
-}
-
-var database Database
-
 type KeyValue interface {
 	// Set key to hold string value if key does not exist otherwise return an error
 	Setnx(key string, value interface{}) error
@@ -41,25 +25,3 @@ type KeyValue interface {
 	Keys() (keys []string, err error)
 	Clean() error
 }
-
-func SetKVStoreDatabase(d Database) {
-	database = d
-}
-
-func (s *kvstores) get(table string) (error, KeyValue) {
-	s.mu.Lock()
-	defer s.mu.Unlock()
-	if store, contains := s.stores[table]; contains {
-		return nil, store
-	}
-	err, store := CreateSqlKvStore(database, table)
-	if err != nil {
-		return err, nil
-	}
-	s.stores[table] = store
-	return nil, store
-}
-
-func GetKVStore(table string) (error, KeyValue) {
-	return stores.get(table)
-}

+ 14 - 14
test/select_aggr_rule.jmx

@@ -131,7 +131,7 @@
                 <elementProp name="" elementType="HTTPArgument">
                   <boolProp name="HTTPArgument.always_encode">false</boolProp>
                   <stringProp name="Argument.value">{&#xd;
-&quot;sql&quot; : &quot;create stream demo (temperature float, humidity bigint) WITH (FORMAT=\&quot;JSON\&quot;, DATASOURCE=\&quot;devices/+/messages\&quot; )&quot;&#xd;
+&quot;sql&quot; : &quot;create stream ademo (temperature float, humidity bigint) WITH (FORMAT=\&quot;JSON\&quot;, DATASOURCE=\&quot;devices/+/messages\&quot; )&quot;&#xd;
 }</stringProp>
                   <stringProp name="Argument.metadata">=</stringProp>
                 </elementProp>
@@ -154,7 +154,7 @@
           <hashTree>
             <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
               <collectionProp name="Asserion.test_strings">
-                <stringProp name="-1754954177">Stream demo is created.</stringProp>
+                <stringProp name="-1754954177">Stream ademo is created.</stringProp>
               </collectionProp>
               <stringProp name="Assertion.custom_message"></stringProp>
               <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
@@ -170,15 +170,15 @@
                 <elementProp name="" elementType="HTTPArgument">
                   <boolProp name="HTTPArgument.always_encode">false</boolProp>
                   <stringProp name="Argument.value">{&#xd;
-  &quot;id&quot;: &quot;rule1&quot;,&#xd;
-  &quot;sql&quot;: &quot;SELECT avg(temperature) AS t_av, max(temperature) AS t_max, min(temperature) AS t_min, COUNT(*) As t_count, split_value(mqtt(topic), \&quot;/\&quot;, 1) AS device_id FROM demo GROUP BY device_id, TUMBLINGWINDOW(ss, 5)&quot;,&#xd;
+  &quot;id&quot;: &quot;arule&quot;,&#xd;
+  &quot;sql&quot;: &quot;SELECT avg(temperature) AS t_av, max(temperature) AS t_max, min(temperature) AS t_min, COUNT(*) As t_count, split_value(mqtt(topic), \&quot;/\&quot;, 1) AS device_id FROM ademo GROUP BY device_id, TUMBLINGWINDOW(ss, 5)&quot;,&#xd;
   &quot;actions&quot;: [&#xd;
     {&#xd;
       &quot;mqtt&quot;: {&#xd;
         &quot;server&quot;: &quot;tcp://${mqtt_srv}:1883&quot;,&#xd;
         &quot;topic&quot;: &quot;devices/result&quot;,&#xd;
         &quot;qos&quot;: 1,&#xd;
-        &quot;clientId&quot;: &quot;demo_001&quot;&#xd;
+        &quot;clientId&quot;: &quot;ademo_001&quot;&#xd;
       }&#xd;
     }&#xd;
   ]&#xd;
@@ -204,7 +204,7 @@
           <hashTree>
             <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
               <collectionProp name="Asserion.test_strings">
-                <stringProp name="-2022196798">Rule rule1 was created</stringProp>
+                <stringProp name="-2022196798">Rule arule was created</stringProp>
               </collectionProp>
               <stringProp name="Assertion.custom_message"></stringProp>
               <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
@@ -228,7 +228,7 @@
             <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
             <stringProp name="HTTPSampler.protocol"></stringProp>
             <stringProp name="HTTPSampler.contentEncoding"></stringProp>
-            <stringProp name="HTTPSampler.path">/rules/rule1/status</stringProp>
+            <stringProp name="HTTPSampler.path">/rules/arule/status</stringProp>
             <stringProp name="HTTPSampler.method">GET</stringProp>
             <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
             <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
@@ -240,7 +240,7 @@
           </HTTPSamplerProxy>
           <hashTree>
             <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="JSON Assertion" enabled="true">
-              <stringProp name="JSON_PATH">$.source_demo_0_records_in_total</stringProp>
+              <stringProp name="JSON_PATH">$.source_ademo_0_records_in_total</stringProp>
               <stringProp name="EXPECTED_VALUE">0</stringProp>
               <boolProp name="JSONVALIDATION">true</boolProp>
               <boolProp name="EXPECT_NULL">false</boolProp>
@@ -291,7 +291,7 @@
             <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
             <stringProp name="HTTPSampler.protocol"></stringProp>
             <stringProp name="HTTPSampler.contentEncoding"></stringProp>
-            <stringProp name="HTTPSampler.path">/rules/rule1/status</stringProp>
+            <stringProp name="HTTPSampler.path">/rules/arule/status</stringProp>
             <stringProp name="HTTPSampler.method">GET</stringProp>
             <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
             <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
@@ -303,7 +303,7 @@
           </HTTPSamplerProxy>
           <hashTree>
             <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="JSON Assertion" enabled="true">
-              <stringProp name="JSON_PATH">$.source_demo_0_records_in_total</stringProp>
+              <stringProp name="JSON_PATH">$.source_ademo_0_records_in_total</stringProp>
               <stringProp name="EXPECTED_VALUE">10</stringProp>
               <boolProp name="JSONVALIDATION">true</boolProp>
               <boolProp name="EXPECT_NULL">false</boolProp>
@@ -340,7 +340,7 @@
             <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
             <stringProp name="HTTPSampler.protocol"></stringProp>
             <stringProp name="HTTPSampler.contentEncoding"></stringProp>
-            <stringProp name="HTTPSampler.path">/rules/rule1</stringProp>
+            <stringProp name="HTTPSampler.path">/rules/arule</stringProp>
             <stringProp name="HTTPSampler.method">DELETE</stringProp>
             <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
             <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
@@ -353,7 +353,7 @@
           <hashTree>
             <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
               <collectionProp name="Asserion.test_strings">
-                <stringProp name="717250485">Rule rule1 is dropped.</stringProp>
+                <stringProp name="717250485">Rule arule is dropped.</stringProp>
               </collectionProp>
               <stringProp name="Assertion.custom_message"></stringProp>
               <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
@@ -377,7 +377,7 @@
             <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
             <stringProp name="HTTPSampler.protocol"></stringProp>
             <stringProp name="HTTPSampler.contentEncoding"></stringProp>
-            <stringProp name="HTTPSampler.path">/streams/demo</stringProp>
+            <stringProp name="HTTPSampler.path">/streams/ademo</stringProp>
             <stringProp name="HTTPSampler.method">DELETE</stringProp>
             <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
             <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
@@ -390,7 +390,7 @@
           <hashTree>
             <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
               <collectionProp name="Asserion.test_strings">
-                <stringProp name="287881319">Stream demo is dropped.</stringProp>
+                <stringProp name="287881319">Stream ademo is dropped.</stringProp>
               </collectionProp>
               <stringProp name="Assertion.custom_message"></stringProp>
               <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>