Forráskód Böngészése

fix,refactor(sqlite) https://github.com/lf-edge/ekuiper/issues/842 fix and refactoring
Fix (and refactoring) related to the too many files open issue - since sqlite does not work well with concurrent writes - kv store was rewritten to use single connection for access to the database,
* introduced database structure which will take away sql database specifics (sqlite in this case)
* fix in topo.go - there was possible scenario that during Open execution of underlying goroutine Cancel was called and store property was set to nil which was leading to null pointer exception

Signed-off-by: Robert Wadowski <robert.wadowski1@gmail.com>

Robert Wadowski 3 éve
szülő
commit
1925d935bf

+ 6 - 34
internal/plugin/manager.go

@@ -294,17 +294,10 @@ func NewPluginManager() (*Manager, error) {
 			outerErr = fmt.Errorf("cannot find etc folder: %s", err)
 			return
 		}
-		dbDir, err := conf.GetDataLoc()
-		if err != nil {
-			outerErr = fmt.Errorf("cannot find db folder: %s", err)
-			return
-		}
-		db := kv.GetDefaultKVStore(path.Join(dbDir, "pluginFuncs"))
-		err = db.Open()
+		err, db := kv.GetKVStore("pluginFuncs")
 		if err != nil {
 			outerErr = fmt.Errorf("error when opening db: %v.", err)
 		}
-		defer db.Close()
 		plugins := make([]map[string]string, 3)
 		for i := 0; i < 3; i++ {
 			names, err := findAll(PluginType(i), dir)
@@ -400,15 +393,10 @@ func (m *Manager) Register(t PluginType, j Plugin) error {
 	var err error
 	if t == FUNCTION {
 		if len(j.GetSymbols()) > 0 {
-			err = m.db.Open()
-			if err != nil {
-				return err
-			}
 			err = m.db.Set(name, j.GetSymbols())
 			if err != nil {
 				return err
 			}
-			m.db.Close()
 			err = m.registry.StoreSymbols(name, j.GetSymbols())
 		} else {
 			err = m.registry.StoreSymbols(name, []string{name})
@@ -430,16 +418,13 @@ func (m *Manager) Register(t PluginType, j Plugin) error {
 	//unzip and copy to destination
 	unzipFiles, version, err := m.install(t, name, zipPath, shellParas)
 	if err == nil && len(j.GetSymbols()) > 0 {
-		if err = m.db.Open(); err == nil {
-			err = m.db.Set(name, j.GetSymbols())
-		}
+		err = m.db.Set(name, j.GetSymbols())
 	}
 	if err != nil { //Revert for any errors
 		if t == SOURCE && len(unzipFiles) == 1 { //source that only copy so file
 			os.RemoveAll(unzipFiles[0])
 		}
 		if len(j.GetSymbols()) > 0 {
-			m.db.Close()
 			m.registry.RemoveSymbols(j.GetSymbols())
 		} else {
 			m.registry.RemoveSymbols([]string{name})
@@ -470,11 +455,6 @@ func (m *Manager) RegisterFuncs(name string, functions []string) error {
 	if len(functions) == 0 {
 		return fmt.Errorf("property 'functions' must not be empty")
 	}
-	err := m.db.Open()
-	if err != nil {
-		return err
-	}
-	defer m.db.Close()
 	old := make([]string, 0)
 	if ok, err := m.db.Get(name, &old); err != nil {
 		return err
@@ -483,7 +463,7 @@ func (m *Manager) RegisterFuncs(name string, functions []string) error {
 	} else if !ok {
 		m.registry.RemoveSymbols([]string{name})
 	}
-	err = m.db.Set(name, functions)
+	err := m.db.Set(name, functions)
 	if err != nil {
 		return err
 	}
@@ -518,10 +498,6 @@ func (m *Manager) Delete(t PluginType, name string, stop bool) error {
 		m.uninstalSink(name)
 	case FUNCTION:
 		old := make([]string, 0)
-		err = m.db.Open()
-		if err != nil {
-			return err
-		}
 		if ok, err := m.db.Get(name, &old); err != nil {
 			return err
 		} else if ok {
@@ -533,7 +509,6 @@ func (m *Manager) Delete(t PluginType, name string, stop bool) error {
 		} else if !ok {
 			m.registry.RemoveSymbols([]string{name})
 		}
-		m.db.Close()
 		m.uninstalFunc(name)
 	}
 
@@ -573,12 +548,9 @@ func (m *Manager) Get(t PluginType, name string) (map[string]interface{}, bool)
 			"version": v,
 		}
 		if t == FUNCTION {
-			if err := m.db.Open(); err == nil {
-				l := make([]string, 0)
-				if ok, _ := m.db.Get(name, &l); ok {
-					r["functions"] = l
-				}
-				m.db.Close()
+			l := make([]string, 0)
+			if ok, _ := m.db.Get(name, &l); ok {
+				r["functions"] = l
 			}
 			// ignore the error
 		}

+ 7 - 48
internal/processor/rule.go

@@ -28,7 +28,6 @@ import (
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
 	"github.com/lf-edge/ekuiper/pkg/kv"
-	"path"
 )
 
 type RuleProcessor struct {
@@ -37,8 +36,12 @@ type RuleProcessor struct {
 }
 
 func NewRuleProcessor(d string) *RuleProcessor {
+	err, db := kv.GetKVStore("rule")
+	if err != nil {
+		panic(fmt.Sprintf("Can not initalize store for the Rule Processor"))
+	}
 	processor := &RuleProcessor{
-		db:        kv.GetDefaultKVStore(path.Join(d, "rule")),
+		db:        db,
 		rootDbDir: d,
 	}
 	return processor
@@ -50,12 +53,6 @@ func (p *RuleProcessor) ExecCreate(name, ruleJson string) (*api.Rule, error) {
 		return nil, err
 	}
 
-	err = p.db.Open()
-	if err != nil {
-		return nil, err
-	}
-	defer p.db.Close()
-
 	err = p.db.Setnx(rule.Id, ruleJson)
 	if err != nil {
 		return nil, err
@@ -71,12 +68,6 @@ func (p *RuleProcessor) ExecUpdate(name, ruleJson string) (*api.Rule, error) {
 		return nil, err
 	}
 
-	err = p.db.Open()
-	if err != nil {
-		return nil, err
-	}
-	defer p.db.Close()
-
 	err = p.db.Set(rule.Id, ruleJson)
 	if err != nil {
 		return nil, err
@@ -99,12 +90,6 @@ func (p *RuleProcessor) ExecReplaceRuleState(name string, triggered bool) (err e
 		return fmt.Errorf("Marshal rule %s error : %s.", name, err)
 	}
 
-	err = p.db.Open()
-	if err != nil {
-		return err
-	}
-	defer p.db.Close()
-
 	err = p.db.Set(name, string(ruleJson))
 	if err != nil {
 		return err
@@ -115,11 +100,6 @@ func (p *RuleProcessor) ExecReplaceRuleState(name string, triggered bool) (err e
 }
 
 func (p *RuleProcessor) GetRuleByName(name string) (*api.Rule, error) {
-	err := p.db.Open()
-	if err != nil {
-		return nil, err
-	}
-	defer p.db.Close()
 	var s1 string
 	f, _ := p.db.Get(name, &s1)
 	if !f {
@@ -214,11 +194,6 @@ func (p *RuleProcessor) ExecQuery(ruleid, sql string) (*topo.Topo, error) {
 }
 
 func (p *RuleProcessor) ExecDesc(name string) (string, error) {
-	err := p.db.Open()
-	if err != nil {
-		return "", err
-	}
-	defer p.db.Close()
 	var s1 string
 	f, _ := p.db.Get(name, &s1)
 	if !f {
@@ -233,20 +208,10 @@ func (p *RuleProcessor) ExecDesc(name string) (string, error) {
 }
 
 func (p *RuleProcessor) GetAllRules() ([]string, error) {
-	err := p.db.Open()
-	if err != nil {
-		return nil, err
-	}
-	defer p.db.Close()
 	return p.db.Keys()
 }
 
 func (p *RuleProcessor) ExecDrop(name string) (string, error) {
-	err := p.db.Open()
-	if err != nil {
-		return "", err
-	}
-	defer p.db.Close()
 	result := fmt.Sprintf("Rule %s is dropped.", name)
 	var ruleJson string
 	if ok, _ := p.db.Get(name, &ruleJson); ok {
@@ -261,7 +226,7 @@ func (p *RuleProcessor) ExecDrop(name string) (string, error) {
 			result = fmt.Sprintf("%s. Clean checkpoint cache faile: %s.", result, err)
 		}
 	}
-	err = p.db.Delete(name)
+	err := p.db.Delete(name)
 	if err != nil {
 		return "", err
 	} else {
@@ -278,16 +243,10 @@ func cleanCheckpoint(name string) error {
 }
 
 func cleanSinkCache(rule *api.Rule) error {
-	dbDir, err := conf.GetDataLoc()
-	if err != nil {
-		return err
-	}
-	store := kv.GetDefaultKVStore(path.Join(dbDir, "sink"))
-	err = store.Open()
+	err, store := kv.GetKVStore("sink")
 	if err != nil {
 		return err
 	}
-	defer store.Close()
 	for d, m := range rule.Actions {
 		con := 1
 		for name, action := range m {

+ 5 - 17
internal/processor/stream.go

@@ -36,8 +36,12 @@ type StreamProcessor struct {
 
 //@params d : the directory of the DB to save the stream info
 func NewStreamProcessor(d string) *StreamProcessor {
+	err, db := kv.GetKVStore(d)
+	if err != nil {
+		panic(fmt.Sprintf("Can not initalize store for the stream processor at path %s", d))
+	}
 	processor := &StreamProcessor{
-		db: kv.GetDefaultKVStore(d),
+		db: db,
 	}
 	return processor
 }
@@ -96,11 +100,6 @@ func (p *StreamProcessor) ExecStmt(statement string) (result []string, err error
 }
 
 func (p *StreamProcessor) execSave(stmt *ast.StreamStmt, statement string, replace bool) error {
-	err := p.db.Open()
-	if err != nil {
-		return fmt.Errorf("error when opening db: %v.", err)
-	}
-	defer p.db.Close()
 	s, err := json.Marshal(xsql.StreamInfo{
 		StreamType: stmt.StreamType,
 		Statement:  statement,
@@ -160,11 +159,6 @@ func (p *StreamProcessor) execShow(st ast.StreamType) ([]string, error) {
 
 func (p *StreamProcessor) ShowStream(st ast.StreamType) ([]string, error) {
 	stt := ast.StreamTypeMap[st]
-	err := p.db.Open()
-	if err != nil {
-		return nil, fmt.Errorf("Show %ss fails, error when opening db: %v.", stt, err)
-	}
-	defer p.db.Close()
 	keys, err := p.db.Keys()
 	if err != nil {
 		return nil, fmt.Errorf("Show %ss fails, error when loading data from db: %v.", stt, err)
@@ -281,17 +275,11 @@ func (p *StreamProcessor) execDrop(stmt ast.NameNode, st ast.StreamType) (string
 }
 
 func (p *StreamProcessor) DropStream(name string, st ast.StreamType) (string, error) {
-	defer p.db.Close()
 	_, err := p.getStream(name, st)
 	if err != nil {
 		return "", err
 	}
 
-	err = p.db.Open()
-	if err != nil {
-		return "", fmt.Errorf("error when opening db: %v", err)
-	}
-	defer p.db.Close()
 	err = p.db.Delete(name)
 	if err != nil {
 		return "", err

+ 2 - 5
internal/processor/stream_test.go

@@ -17,7 +17,6 @@ package processor
 import (
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/testx"
-	"path"
 	"reflect"
 	"testing"
 )
@@ -102,9 +101,8 @@ func TestStreamCreateProcessor(t *testing.T) {
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
 
-	streamDB := path.Join(DbDir, "streamTest")
 	for i, tt := range tests {
-		results, err := NewStreamProcessor(streamDB).ExecStmt(tt.s)
+		results, err := NewStreamProcessor("streamTest").ExecStmt(tt.s)
 		if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
 			t.Errorf("%d. %q: error mismatch:\n  exp=%s\n  got=%s\n\n", i, tt.s, tt.err, err)
 		} else if tt.err == "" {
@@ -191,9 +189,8 @@ func TestTableProcessor(t *testing.T) {
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
 
-	streamDB := path.Join(testx.GetDbDir(), "streamTest")
 	for i, tt := range tests {
-		results, err := NewStreamProcessor(streamDB).ExecStmt(tt.s)
+		results, err := NewStreamProcessor("streamTest").ExecStmt(tt.s)
 		if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
 			t.Errorf("%d. %q: error mismatch:\n  exp=%s\n  got=%s\n\n", i, tt.s, tt.err, err)
 		} else if tt.err == "" {

+ 14 - 2
internal/server/server.go

@@ -20,6 +20,7 @@ import (
 	"github.com/lf-edge/ekuiper/internal/processor"
 	"github.com/lf-edge/ekuiper/internal/service"
 	"github.com/lf-edge/ekuiper/internal/xsql"
+	"github.com/lf-edge/ekuiper/pkg/kv"
 	"github.com/prometheus/client_golang/prometheus/promhttp"
 
 	"context"
@@ -28,7 +29,6 @@ import (
 	"net/rpc"
 	"os"
 	"os/signal"
-	"path"
 	"syscall"
 	"time"
 )
@@ -42,6 +42,7 @@ var (
 	streamProcessor *processor.StreamProcessor
 	pluginManager   *plugin.Manager
 	serviceManager  *service.Manager
+	store			*kv.KeyValue
 )
 
 func StartUp(Version, LoadFileType string) {
@@ -58,8 +59,18 @@ func StartUp(Version, LoadFileType string) {
 		dataDir = dr
 	}
 
+	err, database := kv.NewSqliteDatabase(dataDir)
+	if err != nil {
+		logger.Panic(err)
+	}
+	err = database.Connect()
+	if err != nil {
+		logger.Panic(err)
+	}
+	kv.SetKVStoreDatabase(database)
+
 	ruleProcessor = processor.NewRuleProcessor(dataDir)
-	streamProcessor = processor.NewStreamProcessor(path.Join(dataDir, "stream"))
+	streamProcessor = processor.NewStreamProcessor("stream")
 	pluginManager, err = plugin.NewPluginManager()
 	if err != nil {
 		logger.Panic(err)
@@ -180,5 +191,6 @@ func StartUp(Version, LoadFileType string) {
 		logger.Info("prometheus server successfully shutdown.")
 	}
 
+	database.Disconnect()
 	os.Exit(0)
 }

+ 2 - 8
internal/service/manager.go

@@ -59,17 +59,11 @@ func GetServiceManager() (*Manager, error) {
 		if err != nil {
 			return nil, fmt.Errorf("cannot find etc/services folder: %s", err)
 		}
-		dbDir, err := kconf.GetDataLoc()
-		if err != nil {
-			return nil, fmt.Errorf("cannot find db folder: %s", err)
-		}
-		sdb := kv.GetDefaultKVStore(path.Join(dbDir, "services"))
-		fdb := kv.GetDefaultKVStore(path.Join(dbDir, "serviceFuncs"))
-		err = sdb.Open()
+		err, sdb := kv.GetKVStore("services")
 		if err != nil {
 			return nil, fmt.Errorf("cannot open service db: %s", err)
 		}
-		err = fdb.Open()
+		err, fdb := kv.GetKVStore("serviceFuncs")
 		if err != nil {
 			return nil, fmt.Errorf("cannot open function db: %s", err)
 		}

+ 0 - 12
internal/service/manager_test.go

@@ -258,12 +258,6 @@ func TestInitByFiles(t *testing.T) {
 		},
 	}
 
-	err := m.serviceKV.Open()
-	if err != nil {
-		t.Error(err)
-		t.FailNow()
-	}
-	defer m.serviceKV.Close()
 	actualService := &serviceInfo{}
 	ok, err := m.serviceKV.Get(name, actualService)
 	if err != nil {
@@ -278,12 +272,6 @@ func TestInitByFiles(t *testing.T) {
 		t.Errorf("service info mismatch, expect %v but got %v", info, actualService)
 	}
 
-	err = m.functionKV.Open()
-	if err != nil {
-		t.Error(err)
-		t.FailNow()
-	}
-	defer m.functionKV.Close()
 	actualKeys, _ := m.functionKV.Keys()
 	if len(funcs) != len(actualKeys) {
 		t.Errorf("functions info mismatch: expect %d funcs but got %v", len(funcs), actualKeys)

+ 26 - 41
internal/topo/node/sink_cache.go

@@ -21,7 +21,6 @@ import (
 	"github.com/lf-edge/ekuiper/internal/topo/checkpoint"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/kv"
-	"io"
 	"path"
 	"sort"
 	"strconv"
@@ -105,7 +104,10 @@ func (c *Cache) initStore(ctx api.StreamContext) {
 	if err != nil {
 		c.drainError(err)
 	}
-	c.store = kv.GetDefaultKVStore(path.Join(dbDir, "sink", ctx.GetRuleId()))
+	err, c.store = kv.GetKVStore(path.Join("sink", ctx.GetRuleId()))
+	if err != nil {
+		c.drainError(err)
+	}
 	c.key = ctx.GetOpId() + strconv.Itoa(ctx.GetInstanceId())
 	logger.Debugf("cache saved to key %s", c.key)
 	//load cache
@@ -172,52 +174,35 @@ func (c *Cache) timebasedRun(ctx api.StreamContext, saveInterval int) {
 
 func (c *Cache) loadCache() error {
 	gob.Register(c.pending)
-	err := c.store.Open()
-	if err != nil && err != io.EOF {
-		return err
-	}
-	defer c.store.Close()
-	if err == nil {
-		mt := new(LinkedQueue)
-		if f, err := c.store.Get(c.key, &mt); f {
-			if nil != err {
-				return fmt.Errorf("load malform cache, found %v(%v)", c.key, mt)
-			}
-			c.pending = mt
-			c.changed = true
-			// To store the keys in slice in sorted order
-			var keys []int
-			for k := range mt.Data {
-				keys = append(keys, k)
-			}
-			sort.Ints(keys)
-			for _, k := range keys {
-				t := &CacheTuple{
-					index: k,
-					data:  mt.Data[k],
-				}
-				c.Out <- t
+	mt := new(LinkedQueue)
+	if f, err := c.store.Get(c.key, &mt); f {
+		if nil != err {
+			return fmt.Errorf("load malform cache, found %v(%v)", c.key, mt)
+		}
+		c.pending = mt
+		c.changed = true
+		// To store the keys in slice in sorted order
+		var keys []int
+		for k := range mt.Data {
+			keys = append(keys, k)
+		}
+		sort.Ints(keys)
+		for _, k := range keys {
+			t := &CacheTuple{
+				index: k,
+				data:  mt.Data[k],
 			}
-			return nil
+			c.Out <- t
 		}
+		return nil
 	}
 	return nil
 }
 
 func (c *Cache) saveCache(logger api.Logger, p *LinkedQueue) error {
-	err := c.store.Open()
-	if err != nil {
-		logger.Errorf("save cache error while opening cache store: %s", err)
-		logger.Infof("clean the cache and reopen")
-		c.store.Close()
-		c.store.Clean()
-		err = c.store.Open()
-		if err != nil {
-			logger.Errorf("save cache error after reset the cache store: %s", err)
-			return err
-		}
-	}
-	defer c.store.Close()
+	logger.Infof("clean the cache and reopen")
+	c.store.Clean()
+
 	return c.store.Set(c.key, p)
 }
 

+ 2 - 7
internal/topo/planner/analyzer_test.go

@@ -22,7 +22,6 @@ import (
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/kv"
-	"path"
 	"reflect"
 	"strings"
 	"testing"
@@ -116,13 +115,11 @@ var tests = []struct {
 }
 
 func Test_validation(t *testing.T) {
-	store := kv.GetDefaultKVStore(path.Join(DbDir, "stream"))
-	err := store.Open()
+	err, store := kv.GetKVStore("stream")
 	if err != nil {
 		t.Error(err)
 		return
 	}
-	defer store.Close()
 	streamSqls := map[string]string{
 		"src1": `CREATE STREAM src1 (
 					id1 BIGINT,
@@ -179,13 +176,11 @@ func Test_validation(t *testing.T) {
 }
 
 func Test_validationSchemaless(t *testing.T) {
-	store := kv.GetDefaultKVStore(path.Join(DbDir, "stream"))
-	err := store.Open()
+	err, store := kv.GetKVStore("stream")
 	if err != nil {
 		t.Error(err)
 		return
 	}
-	defer store.Close()
 	streamSqls := map[string]string{
 		"src1": `CREATE STREAM src1 (
 				) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,

+ 1 - 4
internal/topo/planner/planner.go

@@ -25,7 +25,6 @@ import (
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/kv"
-	"path"
 )
 
 func Plan(rule *api.Rule, storePath string) (*topo.Topo, error) {
@@ -49,12 +48,10 @@ func PlanWithSourcesAndSinks(rule *api.Rule, storePath string, sources []*node.S
 	if rule.Options.SendMetaToSink && (len(streamsFromStmt) > 1 || stmt.Dimensions != nil) {
 		return nil, fmt.Errorf("Invalid option sendMetaToSink, it can not be applied to window")
 	}
-	store := kv.GetDefaultKVStore(path.Join(storePath, "stream"))
-	err = store.Open()
+	err, store := kv.GetKVStore("stream")
 	if err != nil {
 		return nil, err
 	}
-	defer store.Close()
 	// Create logical plan and optimize. Logical plans are a linked list
 	lp, err := createLogicalPlan(stmt, rule.Options, store)
 	if err != nil {

+ 2 - 7
internal/topo/planner/planner_test.go

@@ -23,7 +23,6 @@ import (
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/kv"
-	"path"
 	"reflect"
 	"strings"
 	"testing"
@@ -34,13 +33,11 @@ var (
 )
 
 func Test_createLogicalPlan(t *testing.T) {
-	store := kv.GetDefaultKVStore(path.Join(DbDir, "stream"))
-	err := store.Open()
+	err, store := kv.GetKVStore("stream")
 	if err != nil {
 		t.Error(err)
 		return
 	}
-	defer store.Close()
 	streamSqls := map[string]string{
 		"src1": `CREATE STREAM src1 (
 					id1 BIGINT,
@@ -1053,13 +1050,11 @@ func Test_createLogicalPlan(t *testing.T) {
 }
 
 func Test_createLogicalPlanSchemaless(t *testing.T) {
-	store := kv.GetDefaultKVStore(path.Join(DbDir, "stream"))
-	err := store.Open()
+	err, store := kv.GetKVStore("stream")
 	if err != nil {
 		t.Error(err)
 		return
 	}
-	defer store.Close()
 	streamSqls := map[string]string{
 		"src1": `CREATE STREAM src1 (
 				) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,

+ 6 - 0
internal/topo/topo.go

@@ -24,6 +24,7 @@ import (
 	"github.com/lf-edge/ekuiper/internal/topo/state"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"strconv"
+	"sync"
 )
 
 type PrintableTopo struct {
@@ -44,6 +45,7 @@ type Topo struct {
 	store              api.Store
 	coordinator        *checkpoint.Coordinator
 	topo               *PrintableTopo
+	mu                 sync.Mutex
 }
 
 func NewWithNameAndQos(name string, qos api.Qos, checkpointInterval int) (*Topo, error) {
@@ -65,6 +67,8 @@ func (s *Topo) GetContext() api.StreamContext {
 
 // Cancel may be called multiple times so must be idempotent
 func (s *Topo) Cancel() {
+	s.mu.Lock()
+	defer s.mu.Unlock()
 	// completion signal
 	s.drainErr(nil)
 	s.cancel()
@@ -146,6 +150,8 @@ func (s *Topo) Open() <-chan error {
 	log.Infoln("Opening stream")
 	// open stream
 	go func() {
+		s.mu.Lock()
+		defer s.mu.Unlock()
 		var err error
 		if s.store, err = state.CreateStore(s.name, s.qos); err != nil {
 			fmt.Println(err)

+ 1 - 2
internal/topo/topotest/mock_topo.go

@@ -29,7 +29,6 @@ import (
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
-	"path"
 	"reflect"
 	"strings"
 	"testing"
@@ -263,7 +262,7 @@ func createStream(t *testing.T, tt RuleTest, j int, opt *api.RuleOption, sinkPro
 
 // Create or drop streams
 func HandleStream(createOrDrop bool, names []string, t *testing.T) {
-	p := processor.NewStreamProcessor(path.Join(DbDir, "stream"))
+	p := processor.NewStreamProcessor("stream")
 	for _, name := range names {
 		var sql string
 		if createOrDrop {

+ 0 - 5
internal/xsql/stmtx.go

@@ -62,11 +62,6 @@ func GetDataSourceStatement(m kv.KeyValue, name string) (*StreamInfo, error) {
 		v  string
 		vs = &StreamInfo{}
 	)
-	err := m.Open()
-	if err != nil {
-		return nil, fmt.Errorf("error when opening db: %v", err)
-	}
-	defer m.Close()
 	if ok, _ := m.Get(name, &v); ok {
 		if err := json.Unmarshal([]byte(v), vs); err != nil {
 			return nil, fmt.Errorf("error unmarshall %s, the data in db may be corrupted", name)

+ 23 - 0
pkg/kv/database.go

@@ -0,0 +1,23 @@
+// Copyright 2021 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package kv
+
+import "database/sql"
+
+type Database interface {
+	Connect() error
+	Disconnect() error
+	Apply(f func(db *sql.DB) error) error
+}

+ 36 - 4
pkg/kv/kv.go

@@ -14,9 +14,23 @@
 
 package kv
 
+import (
+	"sync"
+)
+
+type kvstores struct {
+	stores map[string]KeyValue
+	mu	   sync.Mutex
+}
+
+var stores = kvstores{
+	stores: make(map[string]KeyValue),
+	mu: sync.Mutex{},
+}
+
+var database Database
+
 type KeyValue interface {
-	Open() error
-	Close() error
 	// Set key to hold string value if key does not exist otherwise return an error
 	Setnx(key string, value interface{}) error
 	// Set key to hold the string value. If key already holds a value, it is overwritten
@@ -28,6 +42,24 @@ type KeyValue interface {
 	Clean() error
 }
 
-func GetDefaultKVStore(fpath string) (ret KeyValue) {
-	return GetSqliteKVStore(fpath)
+func SetKVStoreDatabase(d Database) {
+	database = d
+}
+
+func (s *kvstores) get(table string) (error, KeyValue) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	if store, contains := s.stores[table]; contains {
+		return nil, store
+	}
+	err, store := CreateSqlKvStore(database, table)
+	if err != nil {
+		return err, nil
+	}
+	s.stores[table] = store
+	return nil, store
+}
+
+func GetKVStore(table string) (error, KeyValue) {
+	return stores.get(table)
 }

+ 159 - 0
pkg/kv/sqlKv.go

@@ -0,0 +1,159 @@
+// Copyright 2021 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package kv
+
+import (
+	"bytes"
+	"database/sql"
+	"encoding/gob"
+	"fmt"
+	"github.com/lf-edge/ekuiper/pkg/errorx"
+	"strings"
+)
+
+type sqlKvStore struct {
+	database Database
+	table string
+}
+
+func CreateSqlKvStore(database Database, table string) (error, *sqlKvStore) {
+	store := &sqlKvStore{
+		database: database,
+		table: table,
+	}
+	err := store.database.Apply(func (db *sql.DB) error {
+		query := fmt.Sprintf("CREATE TABLE IF NOT EXISTS '%s'('key' VARCHAR(255) PRIMARY KEY, 'val' BLOB);", table)
+		_, err := db.Exec(query)
+		return err
+	})
+	if err != nil {
+		return err, nil
+	}
+	return nil, store
+}
+
+func encode(value interface{}) ([]byte, error) {
+	var buf bytes.Buffer
+	gob.Register(value)
+	enc := gob.NewEncoder(&buf)
+	if err := enc.Encode(value); err != nil {
+		return nil, err
+	}
+	return buf.Bytes(), nil
+}
+
+func (kv *sqlKvStore) Setnx(key string, value interface{}) error {
+	return kv.database.Apply(func (db *sql.DB) error {
+		b, err := encode(value)
+		if nil != err {
+			return err
+		}
+		query := fmt.Sprintf("INSERT INTO %s(key,val) values(?,?);", kv.table)
+		stmt, err := db.Prepare(query)
+		_, err = stmt.Exec(key, b)
+		if err != nil {
+			used := db.Stats().OpenConnections
+
+			fmt.Println(fmt.Sprintf("Here %d", used))
+			return err
+		}
+		stmt.Close()
+		if nil != err && strings.Contains(err.Error(), "UNIQUE constraint failed") {
+			return fmt.Errorf(`Item %s already exists`, key)
+		}
+		return nil
+	})
+}
+
+func (kv *sqlKvStore) Set(key string, value interface{}) error {
+	b, err := encode(value)
+	if nil != err {
+		return err
+	}
+	err = kv.database.Apply(func(db *sql.DB) error {
+		query := fmt.Sprintf("REPLACE INTO %s(key,val) values(?,?);", kv.table)
+		stmt, err := db.Prepare(query)
+		_, err = stmt.Exec(key, b)
+		stmt.Close()
+		return err
+	})
+	return err
+}
+
+func (kv *sqlKvStore) Get(key string, value interface{}) (bool, error) {
+	result := false
+	err := kv.database.Apply(func (db *sql.DB) error {
+		query := fmt.Sprintf("SELECT val FROM %s WHERE key='%s';", kv.table, key)
+		row := db.QueryRow(query)
+		var tmp []byte
+		err := row.Scan(&tmp)
+		if nil != err {
+			return err
+		}
+		dec := gob.NewDecoder(bytes.NewBuffer(tmp))
+		if err := dec.Decode(value); err != nil {
+			return err
+		}
+		result = true
+		return nil
+	})
+	return result, err
+}
+
+func (kv *sqlKvStore) Delete(key string) error {
+	return kv.database.Apply(func (db *sql.DB) error {
+		query := fmt.Sprintf("SELECT key FROM %s WHERE key='%s';", kv.table, key)
+		row := db.QueryRow(query)
+		var tmp []byte
+		err := row.Scan(&tmp)
+		if nil != err || 0 == len(tmp) {
+			return errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("%s is not found", key))
+		}
+		query = fmt.Sprintf("DELETE FROM %s WHERE key='%s';", kv.table, key)
+		_, err = db.Exec(query)
+		return err
+	})
+}
+
+func (kv *sqlKvStore) Keys() ([]string, error) {
+	keys := make([]string, 0)
+	err := kv.database.Apply(func(db *sql.DB) error {
+		query := fmt.Sprintf("SELECT key FROM %s", kv.table)
+		row, err := db.Query(query)
+		if nil != err {
+			return err
+		}
+		defer row.Close()
+		for row.Next() {
+			var val string
+			err = row.Scan(&val)
+			if nil != err {
+				return err
+			} else {
+				keys = append(keys, val)
+			}
+		}
+		return nil
+	})
+	return keys, err
+}
+
+func (kv *sqlKvStore) Clean() error {
+	return kv.database.Apply(func (db *sql.DB) error {
+		query := fmt.Sprintf("DELETE FROM %s", kv.table)
+		_, err := db.Exec(query)
+		return err
+	})
+}

+ 5 - 14
pkg/kv/sqliteKV_test.go

@@ -22,17 +22,16 @@ import (
 	"testing"
 )
 
-func TestSqliteKVStore_Funcs(t *testing.T) {
+func TestSqlKVStore_Funcs(t *testing.T) {
 	abs, _ := filepath.Abs("test")
 	if f, _ := os.Stat(abs); f != nil {
 		os.Remove(abs)
 	}
+	_, database := NewSqliteDatabase(abs)
+	database.Connect()
+	SetKVStoreDatabase(database)
 
-	ks := GetSqliteKVStore(abs)
-	if e := ks.Open(); e != nil {
-		t.Errorf("Failed to open data %s.", e)
-	}
-
+	_, ks := GetKVStore("test")
 	if err := ks.Setnx("foo", "bar"); nil != err {
 		t.Error(err)
 	}
@@ -71,14 +70,6 @@ func TestSqliteKVStore_Funcs(t *testing.T) {
 		}
 	}
 
-	if e2 := ks.Close(); e2 != nil {
-		t.Errorf("Failed to close data: %s.", e2)
-	}
-
-	if err := ks.Open(); nil != err {
-		t.Error(err)
-	}
-
 	var v2 string
 	if ok, _ := ks.Get("foo", &v2); ok {
 		if !reflect.DeepEqual("bar", v2) {

+ 70 - 0
pkg/kv/sqliteDatabase.go

@@ -0,0 +1,70 @@
+// Copyright 2021 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package kv
+
+import (
+	"database/sql"
+	"fmt"
+	_ "github.com/mattn/go-sqlite3"
+	"os"
+	"path"
+	"sync"
+)
+
+type SqliteDatabase struct {
+	db *sql.DB
+	Path string
+	mu sync.Mutex
+}
+
+func NewSqliteDatabase(dir string) (error, *SqliteDatabase) {
+	if _, err := os.Stat(dir); os.IsNotExist(err) {
+		os.MkdirAll(dir, os.ModePerm)
+	}
+	dbPath := path.Join(dir, "sqliteKV.db")
+	return nil, &SqliteDatabase{
+		db: nil,
+		Path: dbPath,
+		mu: sync.Mutex{},
+	}
+}
+
+func (d *SqliteDatabase) Connect() error {
+	db, err := sql.Open("sqlite3", connectionString(d.Path))
+	if err != nil {
+		return err
+	}
+	db.SetMaxIdleConns(1)
+	db.SetMaxOpenConns(1)
+	db.SetConnMaxLifetime(-1)
+	d.db = db
+	return nil
+}
+
+func connectionString(dpath string) string {
+	return fmt.Sprintf("file:%s?cache=shared", dpath)
+}
+
+func (d *SqliteDatabase) Disconnect() error {
+	err := d.db.Close()
+	return err
+}
+
+func (d *SqliteDatabase) Apply(f func(db *sql.DB) error) error {
+	d.mu.Lock()
+	err := f(d.db)
+	d.mu.Unlock()
+	return err
+}

+ 0 - 159
pkg/kv/sqliteKV.go

@@ -1,159 +0,0 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package kv
-
-import (
-	"bytes"
-	"database/sql"
-	"encoding/gob"
-	"fmt"
-	"github.com/lf-edge/ekuiper/pkg/errorx"
-	_ "github.com/mattn/go-sqlite3"
-	"os"
-	"path"
-	"path/filepath"
-	"strings"
-)
-
-type SqliteKVStore struct {
-	db    *sql.DB
-	table string
-	path  string
-}
-
-func init() {
-	gob.Register(make(map[string]interface{}))
-}
-
-func GetSqliteKVStore(fpath string) (ret *SqliteKVStore) {
-	dir, file := filepath.Split(fpath)
-	if _, err := os.Stat(dir); os.IsNotExist(err) {
-		os.MkdirAll(dir, os.ModePerm)
-	}
-	ret = new(SqliteKVStore)
-	ret.path = path.Join(dir, "sqliteKV.db")
-	ret.table = file
-	return ret
-}
-
-func (m *SqliteKVStore) Open() error {
-	db, err := sql.Open("sqlite3", m.path)
-	if nil != err {
-		return err
-	}
-	m.db = db
-	sql := fmt.Sprintf("CREATE TABLE IF NOT EXISTS '%s'('key' VARCHAR(255) PRIMARY KEY, 'val' BLOB);", m.table)
-	_, err = m.db.Exec(sql)
-	return err
-}
-
-func (m *SqliteKVStore) Close() error {
-	if nil != m.db {
-		return m.db.Close()
-	}
-	return nil
-}
-
-func (m *SqliteKVStore) encode(value interface{}) ([]byte, error) {
-	var buf bytes.Buffer
-	gob.Register(value)
-	enc := gob.NewEncoder(&buf)
-	if err := enc.Encode(value); err != nil {
-		return nil, err
-	}
-	return buf.Bytes(), nil
-}
-
-func (m *SqliteKVStore) Setnx(key string, value interface{}) error {
-	b, err := m.encode(value)
-	if nil != err {
-		return err
-	}
-	sql := fmt.Sprintf("INSERT INTO %s(key,val) values(?,?);", m.table)
-	stmt, err := m.db.Prepare(sql)
-	_, err = stmt.Exec(key, b)
-	stmt.Close()
-	if nil != err && strings.Contains(err.Error(), "UNIQUE constraint failed") {
-		return fmt.Errorf(`Item %s already exists`, key)
-	}
-	return err
-}
-
-func (m *SqliteKVStore) Set(key string, value interface{}) error {
-	b, err := m.encode(value)
-	if nil != err {
-		return err
-	}
-	sql := fmt.Sprintf("REPLACE INTO %s(key,val) values(?,?);", m.table)
-	stmt, err := m.db.Prepare(sql)
-	_, err = stmt.Exec(key, b)
-	stmt.Close()
-	return err
-}
-
-func (m *SqliteKVStore) Get(key string, value interface{}) (bool, error) {
-	sql := fmt.Sprintf("SELECT val FROM %s WHERE key='%s';", m.table, key)
-	row := m.db.QueryRow(sql)
-	var tmp []byte
-	err := row.Scan(&tmp)
-	if nil != err {
-		return false, nil
-	}
-
-	dec := gob.NewDecoder(bytes.NewBuffer(tmp))
-	if err := dec.Decode(value); err != nil {
-		return false, err
-	}
-	return true, nil
-}
-
-func (m *SqliteKVStore) Delete(key string) error {
-	sql := fmt.Sprintf("SELECT key FROM %s WHERE key='%s';", m.table, key)
-	row := m.db.QueryRow(sql)
-	var tmp []byte
-	err := row.Scan(&tmp)
-	if nil != err || 0 == len(tmp) {
-		return errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("%s is not found", key))
-	}
-	sql = fmt.Sprintf("DELETE FROM %s WHERE key='%s';", m.table, key)
-	_, err = m.db.Exec(sql)
-	return err
-}
-
-func (m *SqliteKVStore) Keys() ([]string, error) {
-	keys := make([]string, 0)
-	sql := fmt.Sprintf("SELECT key FROM %s", m.table)
-	row, err := m.db.Query(sql)
-	if nil != err {
-		return nil, err
-	}
-	defer row.Close()
-	for row.Next() {
-		var val string
-		err = row.Scan(&val)
-		if nil != err {
-			return nil, err
-		} else {
-			keys = append(keys, val)
-		}
-	}
-	return keys, nil
-}
-
-func (m *SqliteKVStore) Clean() error {
-	sql := fmt.Sprintf("DELETE FROM %s", m.table)
-	_, err := m.db.Exec(sql)
-	return err
-}

+ 3 - 3
tools/migration/util/migration.go

@@ -36,11 +36,10 @@ func migration(dir string) error {
 		return err
 	}
 
-	store := kv.GetDefaultKVStore(dir)
-	if err := store.Open(); nil != err {
+	err, store := kv.GetKVStore(dir)
+	if err != nil {
 		return err
 	}
-	defer store.Close()
 
 	for _, k := range keys {
 		if value, ok := c.Get(k); !ok {
@@ -74,6 +73,7 @@ func DataMigration(dir string) error {
 			}
 		}
 	}
+	kv.CloseAll()
 	return nil
 }