Browse Source

Proposal(kv) Redis as KV Storage * implemented redis as storage
* refactored current sqlite storage as option
* kv can be exntended by another type of storage

Signed-off-by: Robert Wadowski <robert.wadowski1@gmail.com>

Proposal(kv) Redis as KV Storage * implemented redis as storage * checkpoint sql functionality refactored
* checkpoint can use redis (sorted set)

Signed-off-by: Robert Wadowski <robert.wadowski1@gmail.com>

Proposal(kv) Redis as KV Storage * implemented redis as storage* refactoring and fixes

Signed-off-by: Robert Wadowski <robert.wadowski1@gmail.com>

Proposal(kv) Redis as KV Storage * implemented redis as storage* refactoring and fixes * tests refactored

Signed-off-by: Robert Wadowski <robert.wadowski1@gmail.com>

Proposal(kv) Redis as KV Storage * implemented redis as storage* refactoring and fixes
* refactoring of tests

Signed-off-by: Robert Wadowski <robert.wadowski1@gmail.com>

fix(sql) default store is sqlite db

Signed-off-by: Robert Wadowski <robert.wadowski1@gmail.com>

fix(config) added store configuration to kuiper.yaml definiton in chart

Signed-off-by: Robert Wadowski <robert.wadowski1@gmail.com>

fix(formatting) go fmt applied

Signed-off-by: Robert Wadowski <robert.wadowski1@gmail.com>

Refactoring of codebase

Signed-off-by: Robert Wadowski <robert.wadowski1@gmail.com>

Formatting applied

Signed-off-by: Robert Wadowski <robert.wadowski1@gmail.com>

Fix of the kv store test

Signed-off-by: Robert Wadowski <robert.wadowski1@gmail.com>

Removed typo

Signed-off-by: Robert Wadowski <robert.wadowski1@gmail.com>

Proposal(kv) Redis as KV Storage * implemented redis as storage
* refactored current sqlite storage as option
* kv can be exntended by another type of storage

Signed-off-by: Robert Wadowski <robert.wadowski1@gmail.com>

Proposal(kv) Redis as KV Storage * implemented redis as storage * checkpoint sql functionality refactored
* checkpoint can use redis (sorted set)

Signed-off-by: Robert Wadowski <robert.wadowski1@gmail.com>

Proposal(kv) Redis as KV Storage * implemented redis as storage* refactoring and fixes

Signed-off-by: Robert Wadowski <robert.wadowski1@gmail.com>

Proposal(kv) Redis as KV Storage * implemented redis as storage* refactoring and fixes * tests refactored

Signed-off-by: Robert Wadowski <robert.wadowski1@gmail.com>

Proposal(kv) Redis as KV Storage * implemented redis as storage* refactoring and fixes
* refactoring of tests

Signed-off-by: Robert Wadowski <robert.wadowski1@gmail.com>

fix(sql) default store is sqlite db

Signed-off-by: Robert Wadowski <robert.wadowski1@gmail.com>

fix(config) added store configuration to kuiper.yaml definiton in chart

Signed-off-by: Robert Wadowski <robert.wadowski1@gmail.com>

fix(formatting) go fmt applied

Signed-off-by: Robert Wadowski <robert.wadowski1@gmail.com>

Refactoring of codebase

Signed-off-by: Robert Wadowski <robert.wadowski1@gmail.com>

Formatting applied

Signed-off-by: Robert Wadowski <robert.wadowski1@gmail.com>

Removed typo

Signed-off-by: Robert Wadowski <robert.wadowski1@gmail.com>

Added default empty fields

Signed-off-by: Robert Wadowski <robert.wadowski1@gmail.com>

Indentation fix

Signed-off-by: Robert Wadowski <robert.wadowski1@gmail.com>

Small refatoring of comments and imports

Signed-off-by: Robert Wadowski <robert.wadowski1@gmail.com>

Doc update

Signed-off-by: Robert Wadowski <robert.wadowski1@gmail.com>

Fixed pool connection initialization

Signed-off-by: Robert Wadowski <robert.wadowski1@gmail.com>

Fix of handling keys in redis

Signed-off-by: Robert Wadowski <robert.wadowski1@gmail.com>

Robert Wadowski 3 years ago
parent
commit
078dbfc2f6
53 changed files with 1852 additions and 479 deletions
  1. 2 0
      .gitignore
  2. 13 0
      deploy/chart/kuiper/values.yaml
  3. 34 0
      docs/en_US/operation/configuration_file.md
  4. 15 0
      etc/kuiper.yaml
  5. 2 0
      go.mod
  6. 12 0
      go.sum
  7. 14 1
      internal/conf/conf.go
  8. 26 0
      internal/pkg/db/config.go
  9. 49 0
      internal/pkg/db/database.go
  10. 22 0
      internal/pkg/db/redis/config.go
  11. 121 0
      internal/pkg/db/redis/redis.go
  12. 2 4
      internal/pkg/sqlkv/database.go
  13. 20 0
      internal/pkg/db/sql/sqlite/config.go
  14. 14 9
      internal/pkg/sqlkv/sqliteDatabase.go
  15. 0 70
      internal/pkg/sqlkv/kvStore.go
  16. 0 107
      internal/pkg/sqlkv/sqlKv_test.go
  17. 42 0
      internal/pkg/store/builder.go
  18. 30 0
      internal/pkg/store/encoding/encoding.go
  19. 152 0
      internal/pkg/store/redis/redisKv.go
  20. 34 0
      internal/pkg/store/redis/redisStoreBuilder.go
  21. 83 0
      internal/pkg/store/redisKv_test.go
  22. 7 15
      internal/pkg/sqlkv/sqlKv.go
  23. 34 0
      internal/pkg/store/sql/sqlStoreBuilder.go
  24. 100 0
      internal/pkg/store/sqlKv_test.go
  25. 84 0
      internal/pkg/store/test/common/test.go
  26. 42 0
      internal/pkg/ts/builder.go
  27. 188 0
      internal/pkg/ts/redis/redisTs.go
  28. 34 0
      internal/pkg/ts/redis/redisTsBuilder.go
  29. 93 0
      internal/pkg/ts/redisTs_test.go
  30. 153 0
      internal/pkg/ts/sql/sqlTs.go
  31. 34 0
      internal/pkg/ts/sql/sqlTsBuilder.go
  32. 114 0
      internal/pkg/ts/sqlTs_test.go
  33. 51 47
      internal/pkg/tskv/sqlite_test.go
  34. 0 169
      internal/pkg/tskv/sqlite.go
  35. 5 4
      internal/plugin/manager.go
  36. 5 6
      internal/processor/rule.go
  37. 3 3
      internal/processor/stream.go
  38. 2 12
      internal/server/server.go
  39. 5 5
      internal/service/manager.go
  40. 3 5
      internal/testx/testUtil.go
  41. 5 0
      internal/topo/context/default_test.go
  42. 3 3
      internal/topo/node/sink_cache.go
  43. 2 2
      internal/topo/planner/analyzer.go
  44. 3 3
      internal/topo/planner/analyzer_test.go
  45. 3 3
      internal/topo/planner/planner.go
  46. 3 3
      internal/topo/planner/planner_test.go
  47. 4 3
      internal/topo/state/kv_store.go
  48. 5 0
      internal/topo/state/kv_store_test.go
  49. 3 3
      internal/xsql/stmtx.go
  50. 71 0
      pkg/kv/setup.go
  51. 103 0
      pkg/kv/stores.go
  52. 1 1
      pkg/kv/kv.go
  53. 2 1
      internal/pkg/tskv/tskv.go

+ 2 - 0
.gitignore

@@ -34,3 +34,5 @@ corss_build_for_rpm.tar
 
 *.swp
 *.history
+
+*/**/*.db

+ 13 - 0
deploy/chart/kuiper/values.yaml

@@ -96,6 +96,19 @@ kuiperConfig:
       cacheThreshold: 10
       cacheTriggerCount: 15
       disableCache: true
+    store:
+      type: sqlite
+      redis:
+        host: localhost
+        port: 6379
+        password: kuiper
+        #Timeout in ms
+        timeout: 1000
+      sqlite:
+        #Sqlite absolute database path, if left empty default directory of the application will be used
+        path: ""
+        #Sqlite file name, if left empty name of db will be sqliteKV.db and tskv.db respectively
+        name: ""
   "mqtt_source.yaml":
     #Global MQTT configurations
     default:

+ 34 - 0
docs/en_US/operation/configuration_file.md

@@ -138,3 +138,37 @@ The content of the page should be similar as below.
   disableCache: false
 ```
 
+## Store configurations
+There is possibility to configure storage of state for application. Default storage layer is sqlite database. There is option to set redis as storage.
+In order to use redis as store type property must be changed into redis value.
+### Sqlite
+    
+It has properties
+* path - if left empty database will be created in the data directory of the kuiper base key
+* name - name of database file - if left empty it will be `sqliteKV.db`
+ 
+### Redis
+
+It has properties
+* host     - host of redis
+* port     - port of redis
+* password - password used for auth in redis, if left empty auth won't be used
+* timeout  - timeout fo connection
+
+### Config
+```yaml
+    store:
+      #Type of store that will be used for keeping state of the application
+      type: sqlite
+      redis:
+        host: localhost
+        port: 6379
+        password: kuiper
+        #Timeout in ms
+        timeout: 1000
+      sqlite:
+        #Sqlite absolute database path, if left empty default directory of the application will be used
+        path:
+        #Sqlite file name, if left empty name of db will be sqliteKV.db
+        name:
+```

+ 15 - 0
etc/kuiper.yaml

@@ -52,3 +52,18 @@ sink:
 
   # Control to disable cache or not. If it's set to true, then the cache will be disabled, otherwise, it will be enabled.
   disableCache: true
+
+store:
+  #Type of store that will be used for keeping state of the application
+  type: sqlite
+  redis:
+    host: localhost
+    port: 6379
+    password: kuiper
+    #Timeout in ms
+    timeout: 1000
+  sqlite:
+    #Sqlite absolute database path, if left empty default directory of the application will be used
+    path:
+    #Sqlite file name, if left empty name of db will be sqliteKV.db
+    name:

+ 2 - 0
go.mod

@@ -4,6 +4,7 @@ require (
 	github.com/Masterminds/sprig/v3 v3.2.1
 	github.com/PaesslerAG/gval v1.0.0
 	github.com/PaesslerAG/jsonpath v0.1.1
+	github.com/alicebob/miniredis/v2 v2.15.1
 	github.com/benbjohnson/clock v1.0.0
 	github.com/eclipse/paho.mqtt.golang v1.3.5
 	github.com/edgexfoundry/go-mod-core-contracts/v2 v2.0.0
@@ -12,6 +13,7 @@ require (
 	github.com/gdexlab/go-render v1.0.1
 	github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3
 	github.com/golang/protobuf v1.5.0
+	github.com/gomodule/redigo v2.0.0+incompatible
 	github.com/google/uuid v1.2.0
 	github.com/gorilla/handlers v1.4.2
 	github.com/gorilla/mux v1.7.3

+ 12 - 0
go.sum

@@ -15,6 +15,10 @@ github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuy
 github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
 github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
 github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
+github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk=
+github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
+github.com/alicebob/miniredis/v2 v2.15.1 h1:Fw+ixAJPmKhCLBqDwHlTDqxUxp0xjEwXczEpt1B6r7k=
+github.com/alicebob/miniredis/v2 v2.15.1/go.mod h1:gquAfGbzn92jvtrSC69+6zZnwSODVXVpYDRaGhWaL6I=
 github.com/benbjohnson/clock v1.0.0 h1:78Jk/r6m4wCi6sndMpty7A//t4dw/RW5fV4ZgDVfX1w=
 github.com/benbjohnson/clock v1.0.0/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
 github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
@@ -24,6 +28,9 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r
 github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
 github.com/cespare/xxhash/v2 v2.1.0 h1:yTUvW7Vhb89inJ+8irsUqiWjh8iT6sQPZiQzI6ReGkA=
 github.com/cespare/xxhash/v2 v2.1.0/go.mod h1:dgIUBU3pDso/gPgZ1osOZ0iQf77oPR28Tjxl5dIMyVM=
+github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
+github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
+github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
 github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
 github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
 github.com/cpuguy83/go-md2man v1.0.10 h1:BSKMNlYxDvnunlTymqtgONjNnaRV1sTpcovwwjF22jk=
@@ -80,6 +87,8 @@ github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QD
 github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
 github.com/golang/protobuf v1.5.0 h1:LUVKkCeviFUMKqHa4tXIIij/lbhnMbP7Fn5wKdKkRh4=
 github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
+github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0=
+github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4=
 github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
 github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
 github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
@@ -222,6 +231,8 @@ github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
 github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
 github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
 github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
+github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da h1:NimzV1aGyq29m5ukMK0AMWEhFaL/lrEOaephfuoiARg=
+github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA=
 golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
 golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
@@ -261,6 +272,7 @@ golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5h
 golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

+ 14 - 1
internal/conf/conf.go

@@ -73,7 +73,20 @@ type KuiperConf struct {
 	Sink struct {
 		CacheThreshold    int  `yaml:"cacheThreshold"`
 		CacheTriggerCount int  `yaml:"cacheTriggerCount"`
-		DisableCache      bool `yaml:"disableCache""`
+		DisableCache      bool `yaml:"disableCache"`
+	}
+	Store struct {
+		Type  string `yaml:"type"`
+		Redis struct {
+			Host     string `yaml:"host"`
+			Port     int    `yaml:"port"`
+			Password string `yaml:"password"`
+			Timeout  int    `yaml:"timeout"`
+		}
+		Sqlite struct {
+			Path string `yaml:"path"`
+			Name string `yaml:"name"`
+		}
 	}
 }
 

+ 26 - 0
internal/pkg/db/config.go

@@ -0,0 +1,26 @@
+// Copyright 2021 INTECH Process Automation Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package db
+
+import (
+	"github.com/lf-edge/ekuiper/internal/pkg/db/redis"
+	"github.com/lf-edge/ekuiper/internal/pkg/db/sql/sqlite"
+)
+
+type Config struct {
+	Type   string
+	Redis  redis.Config
+	Sqlite sqlite.Config
+}

+ 49 - 0
internal/pkg/db/database.go

@@ -0,0 +1,49 @@
+// Copyright 2021 INTECH Process Automation Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package db
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/pkg/db/redis"
+	"github.com/lf-edge/ekuiper/internal/pkg/db/sql/sqlite"
+)
+
+type Database interface {
+	Connect() error
+	Disconnect() error
+}
+
+func CreateDatabase(conf Config) (error, Database) {
+	var db Database
+	var err error
+	databaseType := conf.Type
+	switch databaseType {
+	case "redis":
+		r := redis.NewRedisFromConf(conf.Redis)
+		db = &r
+	case "sqlite":
+		err, db = sqlite.NewSqliteDatabase(conf.Sqlite)
+		if err != nil {
+			return err, nil
+		}
+	default:
+		return fmt.Errorf("unrecognized database type - %s", databaseType), nil
+	}
+	err = db.Connect()
+	if err != nil {
+		return err, nil
+	}
+	return nil, db
+}

+ 22 - 0
internal/pkg/db/redis/config.go

@@ -0,0 +1,22 @@
+// Copyright 2021 INTECH Process Automation Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package redis
+
+type Config struct {
+	Host     string
+	Port     int
+	Password string
+	Timeout  int
+}

+ 121 - 0
internal/pkg/db/redis/redis.go

@@ -0,0 +1,121 @@
+// Copyright 2021 INTECH Process Automation Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package redis
+
+import (
+	"fmt"
+	"github.com/gomodule/redigo/redis"
+	"sync"
+	"time"
+)
+
+type Instance struct {
+	ConnectionString string
+	pool             *redis.Pool
+	mu               *sync.Mutex
+	config           Config
+}
+
+func NewRedisFromConf(conf Config) Instance {
+	host := conf.Host
+	port := conf.Port
+	return Instance{
+		ConnectionString: connectionString(host, port),
+		pool:             nil,
+		mu:               &sync.Mutex{},
+		config:           conf,
+	}
+}
+
+func NewRedis(host string, port int) Instance {
+	return Instance{
+		ConnectionString: connectionString(host, port),
+		pool:             nil,
+		mu:               &sync.Mutex{},
+	}
+}
+
+func (r *Instance) Connect() error {
+	r.mu.Lock()
+	defer r.mu.Unlock()
+	if r.ConnectionString == "" {
+		return fmt.Errorf("connection string for redis not initalized")
+	}
+	err, pool := r.connectRedis()
+	if err != nil {
+		return err
+	}
+	conn := pool.Get()
+	defer conn.Close()
+	reply, err := conn.Do("PING")
+	if err != nil {
+		return err
+	}
+	response, err := redis.String(reply, err)
+	if err != nil {
+		return err
+	}
+	if response != "PONG" {
+		return fmt.Errorf("failed to connect to redis")
+	}
+	r.pool = pool
+	return nil
+}
+
+func (r *Instance) connectRedis() (error, *redis.Pool) {
+	opts := []redis.DialOption{
+		redis.DialConnectTimeout(time.Duration(r.config.Timeout) * time.Millisecond),
+	}
+	if r.config.Password != "" {
+		opts = append(opts, redis.DialPassword(r.config.Password))
+	}
+	dialFunction := func() (redis.Conn, error) {
+		conn, err := redis.Dial("tcp", r.ConnectionString, opts...)
+		if err == nil {
+			_, err = conn.Do("PING")
+			if err == nil {
+				return conn, nil
+			}
+		}
+		return nil, fmt.Errorf("could not dial redis: %s", err)
+	}
+	pool := &redis.Pool{
+		IdleTimeout: 0,
+		MaxIdle:     10,
+		Dial:        dialFunction,
+	}
+	return nil, pool
+}
+
+func connectionString(host string, port int) string {
+	return fmt.Sprintf("%s:%d", host, port)
+}
+
+func (r *Instance) Disconnect() error {
+	r.mu.Lock()
+	defer r.mu.Unlock()
+	if r == nil {
+		return nil
+	}
+	err := r.pool.Close()
+	r.pool = nil
+	return err
+}
+
+func (r *Instance) Apply(f func(conn redis.Conn) error) error {
+	connection := r.pool.Get()
+	defer connection.Close()
+	return f(connection)
+}

+ 2 - 4
internal/pkg/sqlkv/database.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021 INTECH Process Automation Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -12,12 +12,10 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package sqlkv
+package sql
 
 import "database/sql"
 
 type Database interface {
-	Connect() error
-	Disconnect() error
 	Apply(f func(db *sql.DB) error) error
 }

+ 20 - 0
internal/pkg/db/sql/sqlite/config.go

@@ -0,0 +1,20 @@
+// Copyright 2021 INTECH Process Automation Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sqlite
+
+type Config struct {
+	Path string
+	Name string
+}

+ 14 - 9
internal/pkg/sqlkv/sqliteDatabase.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021 INTECH Process Automation Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package sqlkv
+package sqlite
 
 import (
 	"database/sql"
@@ -23,25 +23,30 @@ import (
 	"sync"
 )
 
-type SqliteDatabase struct {
+type Database struct {
 	db   *sql.DB
 	Path string
 	mu   sync.Mutex
 }
 
-func newSqliteDatabase(dir string) (error, *SqliteDatabase) {
+func NewSqliteDatabase(conf Config) (error, *Database) {
+	dir := conf.Path
+	name := "sqliteKV.db"
+	if conf.Name != "" {
+		name = conf.Name
+	}
 	if _, err := os.Stat(dir); os.IsNotExist(err) {
 		os.MkdirAll(dir, os.ModePerm)
 	}
-	dbPath := path.Join(dir, "sqliteKV.db")
-	return nil, &SqliteDatabase{
+	dbPath := path.Join(dir, name)
+	return nil, &Database{
 		db:   nil,
 		Path: dbPath,
 		mu:   sync.Mutex{},
 	}
 }
 
-func (d *SqliteDatabase) Connect() error {
+func (d *Database) Connect() error {
 	db, err := sql.Open("sqlite3", connectionString(d.Path))
 	if err != nil {
 		return err
@@ -57,12 +62,12 @@ func connectionString(dpath string) string {
 	return fmt.Sprintf("file:%s?cache=shared", dpath)
 }
 
-func (d *SqliteDatabase) Disconnect() error {
+func (d *Database) Disconnect() error {
 	err := d.db.Close()
 	return err
 }
 
-func (d *SqliteDatabase) Apply(f func(db *sql.DB) error) error {
+func (d *Database) Apply(f func(db *sql.DB) error) error {
 	d.mu.Lock()
 	err := f(d.db)
 	d.mu.Unlock()

+ 0 - 70
internal/pkg/sqlkv/kvStore.go

@@ -1,70 +0,0 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package sqlkv
-
-import (
-	"github.com/lf-edge/ekuiper/pkg/kv"
-	"sync"
-)
-
-type kvstores struct {
-	stores map[string]kv.KeyValue
-	mu     sync.Mutex
-}
-
-var stores = kvstores{
-	stores: make(map[string]kv.KeyValue),
-	mu:     sync.Mutex{},
-}
-
-var database Database
-
-func Setup(dataDir string) error {
-	err, db := newSqliteDatabase(dataDir)
-	if err != nil {
-		return err
-	}
-	err = db.Connect()
-	if err != nil {
-		return err
-	}
-	database = db
-	return nil
-}
-
-func Close() {
-	if database != nil {
-		database.Disconnect()
-		database = nil
-	}
-}
-
-func (s *kvstores) get(table string) (kv.KeyValue, error) {
-	s.mu.Lock()
-	defer s.mu.Unlock()
-	if store, contains := s.stores[table]; contains {
-		return store, nil
-	}
-	err, store := CreateSqlKvStore(database, table)
-	if err != nil {
-		return nil, err
-	}
-	s.stores[table] = store
-	return store, nil
-}
-
-func GetKVStore(table string) (kv.KeyValue, error) {
-	return stores.get(table)
-}

+ 0 - 107
internal/pkg/sqlkv/sqlKv_test.go

@@ -1,107 +0,0 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package sqlkv
-
-import (
-	"os"
-	"path"
-	"path/filepath"
-	"reflect"
-	"testing"
-)
-
-func TestSqlKVStore_Funcs(t *testing.T) {
-	abs, _ := filepath.Abs("test")
-	if f, _ := os.Stat(abs); f != nil {
-		os.Remove(abs)
-	}
-	_, db := newSqliteDatabase(abs)
-	db.Connect()
-	database = db
-
-	ks, _ := GetKVStore("test")
-	if err := ks.Setnx("foo", "bar"); nil != err {
-		t.Error(err)
-	}
-
-	var v string
-	if ok, _ := ks.Get("foo", &v); ok {
-		if !reflect.DeepEqual("bar", v) {
-			t.Error("expect:bar", "get:", v)
-		}
-	} else {
-		t.Errorf("Should not find the foo key.")
-	}
-
-	if err := ks.Setnx("foo1", "bar1"); nil != err {
-		t.Error(err)
-	}
-
-	if err := ks.Set("foo1", "bar2"); nil != err {
-		t.Error(err)
-	}
-
-	var v1 string
-	if ok, _ := ks.Get("foo1", &v1); ok {
-		if !reflect.DeepEqual("bar2", v1) {
-			t.Error("expect:bar2", "get:", v1)
-		}
-	} else {
-		t.Errorf("Should not find the foo1 key.")
-	}
-
-	if keys, e1 := ks.Keys(); e1 != nil {
-		t.Errorf("Failed to get value: %s.", e1)
-	} else {
-		if !reflect.DeepEqual(2, len(keys)) {
-			t.Error("expect:2", "get:", len(keys))
-		}
-	}
-
-	var v2 string
-	if ok, _ := ks.Get("foo", &v2); ok {
-		if !reflect.DeepEqual("bar", v2) {
-			t.Error("expect:bar", "get:", v)
-		}
-	} else {
-		t.Errorf("Should not find the foo key.")
-	}
-
-	if err := ks.Delete("foo1"); nil != err {
-		t.Error(err)
-	}
-
-	if keys, e1 := ks.Keys(); e1 != nil {
-		t.Errorf("Failed to get value: %s.", e1)
-	} else {
-		reflect.DeepEqual(1, len(keys))
-	}
-
-	if err := ks.Clean(); nil != err {
-		t.Error(err)
-	}
-
-	if keys, e1 := ks.Keys(); e1 != nil {
-		t.Errorf("Failed to get value: %s.", e1)
-	} else {
-		reflect.DeepEqual(0, len(keys))
-	}
-
-	database.Disconnect()
-	dir, _ := filepath.Split(abs)
-	abs = path.Join(dir, "sqliteKV.db")
-	os.Remove(abs)
-
-}

+ 42 - 0
internal/pkg/store/builder.go

@@ -0,0 +1,42 @@
+// Copyright 2021 INTECH Process Automation Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package store
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/pkg/db"
+	"github.com/lf-edge/ekuiper/internal/pkg/db/redis"
+	"github.com/lf-edge/ekuiper/internal/pkg/db/sql"
+	rb "github.com/lf-edge/ekuiper/internal/pkg/store/redis"
+	sb "github.com/lf-edge/ekuiper/internal/pkg/store/sql"
+	st "github.com/lf-edge/ekuiper/pkg/kv/stores"
+)
+
+type Builder interface {
+	CreateStore(table string) (error, st.KeyValue)
+}
+
+func CreateStoreBuilder(database db.Database) (error, Builder) {
+	switch database.(type) {
+	case *redis.Instance:
+		d := *database.(*redis.Instance)
+		return nil, rb.NewStoreBuilder(d)
+	case sql.Database:
+		d := database.(sql.Database)
+		return nil, sb.NewStoreBuilder(d)
+	default:
+		return fmt.Errorf("unrecognized database type"), nil
+	}
+}

+ 30 - 0
internal/pkg/store/encoding/encoding.go

@@ -0,0 +1,30 @@
+// Copyright 2021 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package encoding
+
+import (
+	"bytes"
+	"encoding/gob"
+)
+
+func Encode(value interface{}) (error, []byte) {
+	var buff bytes.Buffer
+	gob.Register(value)
+	enc := gob.NewEncoder(&buff)
+	if err := enc.Encode(value); err != nil {
+		return err, nil
+	}
+	return nil, buff.Bytes()
+}

+ 152 - 0
internal/pkg/store/redis/redisKv.go

@@ -0,0 +1,152 @@
+// Copyright 2021 INTECH Process Automation Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package redis
+
+import (
+	"bytes"
+	"encoding/gob"
+	"fmt"
+	"github.com/gomodule/redigo/redis"
+	dbRedis "github.com/lf-edge/ekuiper/internal/pkg/db/redis"
+	kvEncoding "github.com/lf-edge/ekuiper/internal/pkg/store/encoding"
+	"strings"
+)
+
+const KvPrefix = "KV:STORE"
+
+type redisKvStore struct {
+	database  dbRedis.Instance
+	table     string
+	keyPrefix string
+}
+
+func CreateRedisKvStore(redis dbRedis.Instance, table string) (error, *redisKvStore) {
+	store := &redisKvStore{
+		database:  redis,
+		table:     table,
+		keyPrefix: fmt.Sprintf("%s:%s", KvPrefix, table),
+	}
+	return nil, store
+}
+
+func (kv redisKvStore) Setnx(key string, value interface{}) error {
+	return kv.database.Apply(func(conn redis.Conn) error {
+		err, b := kvEncoding.Encode(value)
+		if nil != err {
+			return err
+		}
+		tKey := kv.tableKey(key)
+		reply, err := conn.Do("SETNX", tKey, b)
+		if err != nil {
+			return err
+		}
+		code, err := redis.Int(reply, err)
+		if code == 0 {
+			return fmt.Errorf("item %s already exists under %s key because of %s", key, tKey, err)
+		}
+		return nil
+	})
+}
+
+func (kv redisKvStore) Set(key string, value interface{}) error {
+	err, b := kvEncoding.Encode(value)
+	if nil != err {
+		return err
+	}
+	err = kv.database.Apply(func(conn redis.Conn) error {
+		tKey := kv.tableKey(key)
+		reply, err := conn.Do("SET", tKey, b)
+		code, err := redis.String(reply, err)
+		if err != nil {
+			return err
+		}
+		if code != "OK" {
+			return fmt.Errorf("item %s (under key %s) not set because of %s", key, tKey, err)
+		}
+		return nil
+	})
+	return err
+}
+
+func (kv redisKvStore) Get(key string, value interface{}) (bool, error) {
+	result := false
+	err := kv.database.Apply(func(conn redis.Conn) error {
+		tKey := kv.tableKey(key)
+		reply, err := conn.Do("GET", tKey)
+		if err != nil {
+			return err
+		}
+		buff, err := redis.Bytes(reply, err)
+		if err != nil {
+			result = false
+			return nil
+		}
+		dec := gob.NewDecoder(bytes.NewBuffer(buff))
+		if err := dec.Decode(value); err != nil {
+			return err
+		}
+		result = true
+		return nil
+	})
+	return result, err
+}
+
+func (kv redisKvStore) Delete(key string) error {
+	return kv.database.Apply(func(conn redis.Conn) error {
+		tKey := kv.tableKey(key)
+		_, err := conn.Do("DEL", tKey)
+		return err
+	})
+}
+
+func (kv redisKvStore) Keys() ([]string, error) {
+	keys := make([]string, 0)
+	err := kv.database.Apply(func(conn redis.Conn) error {
+		pattern := fmt.Sprintf("%s:*", kv.keyPrefix)
+		reply, err := conn.Do("KEYS", pattern)
+		keys, err = redis.Strings(reply, err)
+		return err
+	})
+	result := make([]string, 0)
+	for _, k := range keys {
+		result = append(result, kv.trimPrefix(k))
+	}
+	return result, err
+}
+
+func (kv redisKvStore) Clean() error {
+	keys, err := kv.Keys()
+	if err != nil {
+		return err
+	}
+	keysToRemove := make([]interface{}, len(keys))
+	for i, v := range keysToRemove {
+		keysToRemove[i] = v
+	}
+	err = kv.database.Apply(func(conn redis.Conn) error {
+		_, err := conn.Do("DEL", keysToRemove...)
+		return err
+	})
+	return err
+}
+
+func (kv redisKvStore) tableKey(key string) string {
+	return fmt.Sprintf("%s:%s:%s", KvPrefix, kv.table, key)
+}
+
+func (kv redisKvStore) trimPrefix(fullKey string) string {
+	prefixToTrim := fmt.Sprintf("%s:%s:", KvPrefix, kv.table)
+	return strings.TrimPrefix(fullKey, prefixToTrim)
+}

+ 34 - 0
internal/pkg/store/redis/redisStoreBuilder.go

@@ -0,0 +1,34 @@
+// Copyright 2021 INTECH Process Automation Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package redis
+
+import (
+	"github.com/lf-edge/ekuiper/internal/pkg/db/redis"
+	"github.com/lf-edge/ekuiper/pkg/kv/stores"
+)
+
+type StoreBuilder struct {
+	database redis.Instance
+}
+
+func NewStoreBuilder(redis redis.Instance) StoreBuilder {
+	return StoreBuilder{
+		database: redis,
+	}
+}
+
+func (b StoreBuilder) CreateStore(table string) (error, stores.KeyValue) {
+	return CreateRedisKvStore(b.database, table)
+}

+ 83 - 0
internal/pkg/store/redisKv_test.go

@@ -0,0 +1,83 @@
+// Copyright 2021 INTECH Process Automation Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package store
+
+import (
+	"github.com/alicebob/miniredis/v2"
+	"github.com/lf-edge/ekuiper/internal/pkg/db/redis"
+	rb "github.com/lf-edge/ekuiper/internal/pkg/store/redis"
+	"github.com/lf-edge/ekuiper/internal/pkg/store/test/common"
+	st "github.com/lf-edge/ekuiper/pkg/kv/stores"
+	"strconv"
+	"testing"
+)
+
+func TestRedisKvSetnx(t *testing.T) {
+	ks, db, minRedis := setupRedisKv()
+	defer cleanRedisKv(db, minRedis)
+	common.TestKvSetnx(ks, t)
+}
+
+func TestRedisKvSet(t *testing.T) {
+	ks, db, minRedis := setupRedisKv()
+	defer cleanRedisKv(db, minRedis)
+	common.TestKvSet(ks, t)
+}
+
+func TestRedisKvGet(t *testing.T) {
+	ks, db, minRedis := setupRedisKv()
+	defer cleanRedisKv(db, minRedis)
+	common.TestKvGet(ks, t)
+}
+
+func TestRedisKvKeys(t *testing.T) {
+	ks, db, minRedis := setupRedisKv()
+	defer cleanRedisKv(db, minRedis)
+
+	length := 10
+	common.TestKvKeys(length, ks, t)
+}
+
+func setupRedisKv() (st.KeyValue, *redis.Instance, *miniredis.Miniredis) {
+	minRedis, err := miniredis.Run()
+	if err != nil {
+		panic(err)
+	}
+	redisDB := redis.NewRedis("localhost", stringToInt(minRedis.Port()))
+	err = redisDB.Connect()
+	if err != nil {
+		panic(err)
+	}
+	builder := rb.NewStoreBuilder(redisDB)
+	var ks st.KeyValue
+	err, ks = builder.CreateStore("test")
+	if err != nil {
+		panic(err)
+	}
+	return ks, &redisDB, minRedis
+}
+
+func cleanRedisKv(instance *redis.Instance, minRedis *miniredis.Miniredis) {
+	instance.Disconnect()
+	minRedis.Close()
+}
+
+func stringToInt(svalue string) int {
+	ivalue, err := strconv.Atoi(svalue)
+	if err != nil {
+		panic(err)
+	}
+	return ivalue
+}

+ 7 - 15
internal/pkg/sqlkv/sqlKv.go

@@ -12,23 +12,25 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package sqlkv
+package sql
 
 import (
 	"bytes"
 	"database/sql"
 	"encoding/gob"
 	"fmt"
+	dbSql "github.com/lf-edge/ekuiper/internal/pkg/db/sql"
+	kvEncoding "github.com/lf-edge/ekuiper/internal/pkg/store/encoding"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
 	"strings"
 )
 
 type sqlKvStore struct {
-	database Database
+	database dbSql.Database
 	table    string
 }
 
-func CreateSqlKvStore(database Database, table string) (error, *sqlKvStore) {
+func createSqlKvStore(database dbSql.Database, table string) (error, *sqlKvStore) {
 	store := &sqlKvStore{
 		database: database,
 		table:    table,
@@ -44,19 +46,9 @@ func CreateSqlKvStore(database Database, table string) (error, *sqlKvStore) {
 	return nil, store
 }
 
-func encode(value interface{}) ([]byte, error) {
-	var buf bytes.Buffer
-	gob.Register(value)
-	enc := gob.NewEncoder(&buf)
-	if err := enc.Encode(value); err != nil {
-		return nil, err
-	}
-	return buf.Bytes(), nil
-}
-
 func (kv *sqlKvStore) Setnx(key string, value interface{}) error {
 	return kv.database.Apply(func(db *sql.DB) error {
-		b, err := encode(value)
+		err, b := kvEncoding.Encode(value)
 		if nil != err {
 			return err
 		}
@@ -74,7 +66,7 @@ func (kv *sqlKvStore) Setnx(key string, value interface{}) error {
 }
 
 func (kv *sqlKvStore) Set(key string, value interface{}) error {
-	b, err := encode(value)
+	err, b := kvEncoding.Encode(value)
 	if nil != err {
 		return err
 	}

+ 34 - 0
internal/pkg/store/sql/sqlStoreBuilder.go

@@ -0,0 +1,34 @@
+// Copyright 2021 INTECH Process Automation Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sql
+
+import (
+	"github.com/lf-edge/ekuiper/internal/pkg/db/sql"
+	"github.com/lf-edge/ekuiper/pkg/kv/stores"
+)
+
+type StoreBuilder struct {
+	database sql.Database
+}
+
+func NewStoreBuilder(d sql.Database) StoreBuilder {
+	return StoreBuilder{
+		database: d,
+	}
+}
+
+func (b StoreBuilder) CreateStore(table string) (error, stores.KeyValue) {
+	return createSqlKvStore(b.database, table)
+}

+ 100 - 0
internal/pkg/store/sqlKv_test.go

@@ -0,0 +1,100 @@
+// Copyright 2021 INTECH Process Automation Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package store
+
+import (
+	"github.com/lf-edge/ekuiper/internal/pkg/db/sql/sqlite"
+	sb "github.com/lf-edge/ekuiper/internal/pkg/store/sql"
+	"github.com/lf-edge/ekuiper/internal/pkg/store/test/common"
+	st "github.com/lf-edge/ekuiper/pkg/kv/stores"
+	"os"
+	"path"
+	"path/filepath"
+	"testing"
+)
+
+const DbName = "sqliteKV.db"
+const Table = "test"
+
+func TestSqlKvSetnx(t *testing.T) {
+	ks, db, abs := setupSqlKv()
+	defer cleanSqlKv(db, abs)
+	common.TestKvSetnx(ks, t)
+}
+
+func TestSqlKvSet(t *testing.T) {
+	ks, db, abs := setupSqlKv()
+	defer cleanSqlKv(db, abs)
+	common.TestKvSet(ks, t)
+}
+
+func TestSqlKvGet(t *testing.T) {
+	ks, db, abs := setupSqlKv()
+	defer cleanSqlKv(db, abs)
+	common.TestKvGet(ks, t)
+}
+
+func TestSqlKvKeys(t *testing.T) {
+	ks, db, abs := setupSqlKv()
+	defer cleanSqlKv(db, abs)
+
+	length := 10
+	common.TestKvKeys(length, ks, t)
+}
+
+func deleteIfExists(abs string) error {
+	absPath := path.Join(abs, DbName)
+	if f, _ := os.Stat(absPath); f != nil {
+		return os.Remove(absPath)
+	}
+	return nil
+}
+
+func setupSqlKv() (st.KeyValue, *sqlite.Database, string) {
+	absPath, err := filepath.Abs("test")
+	if err != nil {
+		panic(err)
+	}
+	err = deleteIfExists(absPath)
+	if err != nil {
+		panic(err)
+	}
+	config := sqlite.Config{
+		Path: absPath,
+		Name: DbName,
+	}
+	_, db := sqlite.NewSqliteDatabase(config)
+	err = db.Connect()
+	if err != nil {
+		panic(err)
+	}
+
+	builder := sb.NewStoreBuilder(db)
+	var store st.KeyValue
+	err, store = builder.CreateStore(Table)
+	if err != nil {
+		panic(err)
+	}
+	return store, db, absPath
+}
+
+func cleanSqlKv(db *sqlite.Database, abs string) {
+	if err := db.Disconnect(); err != nil {
+		panic(err)
+	}
+	if err := deleteIfExists(abs); err != nil {
+		panic(err)
+	}
+}

+ 84 - 0
internal/pkg/store/test/common/test.go

@@ -0,0 +1,84 @@
+// Copyright 2021 INTECH Process Automation Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package common
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/pkg/kv/stores"
+	"reflect"
+	"testing"
+)
+
+func TestKvSetnx(ks stores.KeyValue, t *testing.T) {
+
+	if err := ks.Setnx("foo", "bar"); nil != err {
+		t.Error(err)
+	}
+
+	if err := ks.Setnx("foo", "bar1"); nil == err {
+		t.Errorf("Can't overwrite an existing intem")
+	}
+}
+
+func TestKvSet(ks stores.KeyValue, t *testing.T) {
+
+	if err := ks.Set("foo", "bar"); nil != err {
+		t.Error(err)
+	}
+
+	if err := ks.Set("foo", "bar1"); nil != err {
+		t.Errorf("Set should overwrite an existing record")
+	}
+}
+
+func TestKvGet(ks stores.KeyValue, t *testing.T) {
+
+	if err := ks.Setnx("foo", "bar"); nil != err {
+		t.Error(err)
+	}
+
+	var v string
+	if ok, _ := ks.Get("foo", &v); ok {
+		if !reflect.DeepEqual("bar", v) {
+			t.Error("expect:bar", "get:", v)
+		}
+	} else {
+		t.Errorf("Should find the foo key")
+	}
+}
+
+func TestKvKeys(length int, ks stores.KeyValue, t *testing.T) {
+
+	expected := make([]string, 0)
+	for i := 0; i < length; i++ {
+		key := fmt.Sprintf("key-%d", i)
+		value := fmt.Sprintf("value-%d", i)
+		if err := ks.Setnx(key, value); err != nil {
+			t.Errorf("It should be set")
+		}
+		expected = append(expected, key)
+	}
+
+	var keys []string
+	var err error
+	if keys, err = ks.Keys(); err != nil {
+		t.Errorf("Failed to get value: %s.", err)
+	} else if !reflect.DeepEqual(length, len(keys)) {
+		t.Errorf("expect: %d, got: %d", length, len(keys))
+	}
+	if !reflect.DeepEqual(keys, expected) {
+		t.Errorf("Keys do not match expected %s != %s", keys, expected)
+	}
+}

+ 42 - 0
internal/pkg/ts/builder.go

@@ -0,0 +1,42 @@
+// Copyright 2021 INTECH Process Automation Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package ts
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/pkg/db"
+	"github.com/lf-edge/ekuiper/internal/pkg/db/redis"
+	"github.com/lf-edge/ekuiper/internal/pkg/db/sql"
+	rb "github.com/lf-edge/ekuiper/internal/pkg/ts/redis"
+	sb "github.com/lf-edge/ekuiper/internal/pkg/ts/sql"
+	"github.com/lf-edge/ekuiper/pkg/kv/stores"
+)
+
+type Builder interface {
+	CreateTs(table string) (error, stores.Tskv)
+}
+
+func CreateTsBuilder(database db.Database) (error, Builder) {
+	switch database.(type) {
+	case *redis.Instance:
+		d := *database.(*redis.Instance)
+		return nil, rb.NewTsBuilder(d)
+	case sql.Database:
+		d := database.(sql.Database)
+		return nil, sb.NewTsBuilder(d)
+	default:
+		return fmt.Errorf("unrecognized database type"), nil
+	}
+}

+ 188 - 0
internal/pkg/ts/redis/redisTs.go

@@ -0,0 +1,188 @@
+// Copyright 2021 INTECH Process Automation Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package redis
+
+import (
+	"bytes"
+	"encoding/gob"
+	"fmt"
+	"github.com/gomodule/redigo/redis"
+	dbRedis "github.com/lf-edge/ekuiper/internal/pkg/db/redis"
+	kvEncoding "github.com/lf-edge/ekuiper/internal/pkg/store/encoding"
+	"strconv"
+)
+
+const (
+	TsPrefix             = "KV:TS"
+	AddToSortedSet       = "ZADD"
+	ReversedRangeByScore = "ZREVRANGEBYSCORE"
+	RemoveRangeByScore   = "ZREMRANGEBYSCORE"
+	Delete               = "DEL"
+	ReversedRange        = "ZREVRANGE"
+)
+
+type ts struct {
+	redis dbRedis.Instance
+	table string
+	last  int64
+	key   string
+}
+
+func init() {
+	gob.Register(make(map[string]interface{}))
+}
+
+func createRedisTs(redis dbRedis.Instance, table string) (error, *ts) {
+	key := fmt.Sprintf("%s:%s", TsPrefix, table)
+	err, lastTs := getLast(redis, key)
+	if err != nil {
+		return err, nil
+	}
+	s := &ts{
+		redis: redis,
+		table: table,
+		last:  lastTs,
+		key:   key,
+	}
+	return nil, s
+}
+
+func (t *ts) Set(key int64, value interface{}) (bool, error) {
+	if key <= t.last {
+		return false, nil
+	}
+	err, b := kvEncoding.Encode(value)
+	if err != nil {
+		return false, err
+	}
+	err = t.redis.Apply(func(conn redis.Conn) error {
+		reply, err := conn.Do(AddToSortedSet, t.key, key, b)
+		if err != nil {
+			return err
+		}
+		length, err := redis.Int(reply, err)
+		if err != nil {
+			return err
+		}
+		if length == 0 {
+			return fmt.Errorf("list at %s key should be non empty", t.key)
+		}
+		t.last = key
+		return nil
+	})
+	if err != nil {
+		return false, err
+	}
+	return true, nil
+}
+
+func (t ts) Get(key int64, value interface{}) (bool, error) {
+	err := t.redis.Apply(func(conn redis.Conn) error {
+		reply, err := conn.Do(ReversedRangeByScore, t.key, key, key)
+		if err != nil {
+			return err
+		}
+		var tmp [][]byte
+		tmp, err = redis.ByteSlices(reply, err)
+		if err != nil {
+			return err
+		}
+		if len(tmp) == 0 {
+			return fmt.Errorf("record under %s key and %d score not found", t.key, key)
+		}
+		dec := gob.NewDecoder(bytes.NewBuffer(tmp[0]))
+		err = dec.Decode(value)
+		return err
+	})
+	if err != nil {
+		return false, err
+	}
+	return true, nil
+}
+
+func (t ts) Last(value interface{}) (int64, error) {
+	var last int64 = 0
+	err := t.redis.Apply(func(conn redis.Conn) error {
+		reply, err := conn.Do(ReversedRange, t.key, 0, 0, "WITHSCORES")
+		if err != nil {
+			return err
+		}
+		var tmp [][]byte
+		tmp, err = redis.ByteSlices(reply, err)
+		if err != nil {
+			return err
+		}
+		dec := gob.NewDecoder(bytes.NewBuffer(tmp[0]))
+		if err = dec.Decode(value); err != nil {
+			return err
+		}
+		last, err = strconv.ParseInt(string(tmp[1]), 10, 64)
+		return err
+	})
+	if err != nil {
+		return 0, err
+	}
+	return last, nil
+}
+
+func (t ts) Delete(key int64) error {
+	return t.redis.Apply(func(conn redis.Conn) error {
+		_, err := conn.Do(RemoveRangeByScore, t.key, key, key)
+		return err
+	})
+}
+
+func (t ts) DeleteBefore(key int64) error {
+	return t.redis.Apply(func(conn redis.Conn) error {
+		bound := fmt.Sprintf("(%d", key)
+		_, err := conn.Do(RemoveRangeByScore, t.key, "-INF", bound)
+		return err
+	})
+}
+
+func (t ts) Close() error {
+	return nil
+}
+
+func (t ts) Drop() error {
+	return t.redis.Apply(func(conn redis.Conn) error {
+		_, err := conn.Do(Delete, t.key)
+		return err
+	})
+}
+
+func getLast(db dbRedis.Instance, key string) (error, int64) {
+	var lastTs int64
+	err := db.Apply(func(conn redis.Conn) error {
+		reply, err := conn.Do(ReversedRange, key, 0, 0, "WITHSCORES")
+		if err != nil {
+			return err
+		}
+		var tmp [][]byte
+		tmp, err = redis.ByteSlices(reply, err)
+		if err != nil {
+			return err
+		}
+		if len(tmp) == 0 {
+			return nil
+		}
+		lastTs, err = strconv.ParseInt(string(tmp[1]), 10, 64)
+		return err
+	})
+	if err != nil {
+		return err, 0
+	}
+	return nil, lastTs
+}

+ 34 - 0
internal/pkg/ts/redis/redisTsBuilder.go

@@ -0,0 +1,34 @@
+// Copyright 2021 INTECH Process Automation Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package redis
+
+import (
+	"github.com/lf-edge/ekuiper/internal/pkg/db/redis"
+	st "github.com/lf-edge/ekuiper/pkg/kv/stores"
+)
+
+type TsBuilder struct {
+	redis redis.Instance
+}
+
+func NewTsBuilder(d redis.Instance) TsBuilder {
+	return TsBuilder{
+		redis: d,
+	}
+}
+
+func (b TsBuilder) CreateTs(table string) (error, st.Tskv) {
+	return createRedisTs(b.redis, table)
+}

+ 93 - 0
internal/pkg/ts/redisTs_test.go

@@ -0,0 +1,93 @@
+// Copyright 2021 INTECH Process Automation Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package ts
+
+import (
+	"github.com/alicebob/miniredis/v2"
+	"github.com/lf-edge/ekuiper/internal/pkg/db/redis"
+	rb "github.com/lf-edge/ekuiper/internal/pkg/ts/redis"
+	"github.com/lf-edge/ekuiper/internal/pkg/ts/test/common"
+	st "github.com/lf-edge/ekuiper/pkg/kv/stores"
+	"strconv"
+	"testing"
+)
+
+func TestRedisTsSet(t *testing.T) {
+	ks, db, minRedis := setupRedisKv()
+	defer cleanRedisKv(db, minRedis)
+
+	common.TestTsSet(ks, t)
+}
+
+func TestRedisTsLast(t *testing.T) {
+	ks, db, minRedis := setupRedisKv()
+	defer cleanRedisKv(db, minRedis)
+
+	common.TestTsLast(ks, t)
+}
+
+func TestRedisTsGet(t *testing.T) {
+	ks, db, minRedis := setupRedisKv()
+	defer cleanRedisKv(db, minRedis)
+
+	common.TestTsGet(ks, t)
+}
+
+func TestRedisTsDelete(t *testing.T) {
+	ks, db, minRedis := setupRedisKv()
+	defer cleanRedisKv(db, minRedis)
+
+	common.TestTsDelete(ks, t)
+}
+
+func TestRedisTsDeleteBefore(t *testing.T) {
+	ks, db, minRedis := setupRedisKv()
+	defer cleanRedisKv(db, minRedis)
+
+	common.TestTsDeleteBefore(ks, t)
+}
+
+func setupRedisKv() (st.Tskv, *redis.Instance, *miniredis.Miniredis) {
+	minRedis, err := miniredis.Run()
+	if err != nil {
+		panic(err)
+	}
+	redisDB := redis.NewRedis("localhost", stringToInt(minRedis.Port()))
+	err = redisDB.Connect()
+	if err != nil {
+		panic(err)
+	}
+
+	builder := rb.NewTsBuilder(redisDB)
+	var ks st.Tskv
+	err, ks = builder.CreateTs("test")
+	if err != nil {
+		panic(err)
+	}
+	return ks, &redisDB, minRedis
+}
+
+func cleanRedisKv(instance *redis.Instance, minRedis *miniredis.Miniredis) {
+	instance.Disconnect()
+	minRedis.Close()
+}
+
+func stringToInt(svalue string) int {
+	ivalue, err := strconv.Atoi(svalue)
+	if err != nil {
+		panic(err)
+	}
+	return ivalue
+}

+ 153 - 0
internal/pkg/ts/sql/sqlTs.go

@@ -0,0 +1,153 @@
+// Copyright 2021 INTECH Process Automation Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sql
+
+import (
+	"bytes"
+	"database/sql"
+	"encoding/gob"
+	"fmt"
+	dbSql "github.com/lf-edge/ekuiper/internal/pkg/db/sql"
+	kvEncoding "github.com/lf-edge/ekuiper/internal/pkg/store/encoding"
+)
+
+type ts struct {
+	database dbSql.Database
+	table    string
+	last     int64
+}
+
+func init() {
+	gob.Register(make(map[string]interface{}))
+}
+
+func createSqlTs(database dbSql.Database, table string) (error, *ts) {
+	store := &ts{
+		database: database,
+		table:    table,
+		last:     getLast(database, table),
+	}
+	err := store.database.Apply(func(db *sql.DB) error {
+		query := fmt.Sprintf("CREATE TABLE IF NOT EXISTS '%s'('key' INTEGER PRIMARY KEY, 'val' BLOB);", table)
+		_, err := db.Exec(query)
+		return err
+	})
+	if err != nil {
+		return err, nil
+	}
+	return nil, store
+}
+
+func (t *ts) Set(key int64, value interface{}) (bool, error) {
+	if key <= t.last {
+		return false, nil
+	}
+	err, b := kvEncoding.Encode(value)
+	if err != nil {
+		return false, err
+	}
+	err = t.database.Apply(func(db *sql.DB) error {
+		query := fmt.Sprintf("INSERT INTO %s(key,val) values(?,?);", t.table)
+		stmt, err := db.Prepare(query)
+		if err != nil {
+			return err
+		}
+		defer stmt.Close()
+		_, err = stmt.Exec(key, b)
+		if err != nil {
+			return err
+		}
+		t.last = key
+		return nil
+	})
+	if err != nil {
+		return false, err
+	}
+	return true, nil
+}
+
+func (t ts) Get(key int64, value interface{}) (bool, error) {
+	result := false
+	err := t.database.Apply(func(db *sql.DB) error {
+		query := fmt.Sprintf("SELECT val FROM %s WHERE key=%d;", t.table, key)
+		row := db.QueryRow(query)
+		var tmp []byte
+		switch err := row.Scan(&tmp); err {
+		case sql.ErrNoRows:
+			return nil
+		case nil:
+		default:
+			return err
+		}
+
+		dec := gob.NewDecoder(bytes.NewBuffer(tmp))
+		if err := dec.Decode(value); err != nil {
+			return err
+		}
+		result = true
+		return nil
+	})
+	if err != nil {
+		return false, err
+	}
+	return result, nil
+}
+
+func (t ts) Last(value interface{}) (int64, error) {
+	_, err := t.Get(t.last, value)
+	if err != nil {
+		return 0, err
+	}
+	return t.last, nil
+}
+
+func (t ts) Delete(key int64) error {
+	return t.database.Apply(func(db *sql.DB) error {
+		query := fmt.Sprintf("DELETE FROM %s WHERE key=%d;", t.table, key)
+		_, err := db.Exec(query)
+		return err
+	})
+}
+
+func (t ts) DeleteBefore(key int64) error {
+	return t.database.Apply(func(db *sql.DB) error {
+		query := fmt.Sprintf("DELETE FROM %s WHERE key<%d;", t.table, key)
+		_, err := db.Exec(query)
+		return err
+	})
+}
+
+func (t ts) Close() error {
+	return nil
+}
+
+func (t ts) Drop() error {
+	return t.database.Apply(func(db *sql.DB) error {
+		query := fmt.Sprintf("Drop table %s;", t.table)
+		_, err := db.Exec(query)
+		return err
+	})
+}
+
+func getLast(d dbSql.Database, table string) int64 {
+	var last int64 = 0
+	_ = d.Apply(func(db *sql.DB) error {
+		query := fmt.Sprintf("SELECT key FROM %s Order by key DESC Limit 1;", table)
+		row := db.QueryRow(query)
+		err := row.Scan(&last)
+		return err
+	})
+	return last
+}

+ 34 - 0
internal/pkg/ts/sql/sqlTsBuilder.go

@@ -0,0 +1,34 @@
+// Copyright 2021 INTECH Process Automation Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sql
+
+import (
+	"github.com/lf-edge/ekuiper/internal/pkg/db/sql"
+	ts2 "github.com/lf-edge/ekuiper/pkg/kv/stores"
+)
+
+type TsBuilder struct {
+	database sql.Database
+}
+
+func NewTsBuilder(d sql.Database) TsBuilder {
+	return TsBuilder{
+		database: d,
+	}
+}
+
+func (b TsBuilder) CreateTs(table string) (error, ts2.Tskv) {
+	return createSqlTs(b.database, table)
+}

+ 114 - 0
internal/pkg/ts/sqlTs_test.go

@@ -0,0 +1,114 @@
+// Copyright 2021 INTECH Process Automation Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package ts
+
+import (
+	"github.com/lf-edge/ekuiper/internal/pkg/db/sql/sqlite"
+	sb "github.com/lf-edge/ekuiper/internal/pkg/ts/sql"
+	"github.com/lf-edge/ekuiper/internal/pkg/ts/test/common"
+	"github.com/lf-edge/ekuiper/pkg/kv/stores"
+	"os"
+	"path"
+	"path/filepath"
+	"testing"
+)
+
+const (
+	DbName = "sqliteTS.db"
+	Table  = "test"
+)
+
+func TestSqlTsSet(t *testing.T) {
+	ks, db, abs := setupSqlKv()
+	defer cleanSqlKv(db, abs)
+
+	common.TestTsSet(ks, t)
+}
+
+func TestSqlTsLast(t *testing.T) {
+	ks, db, abs := setupSqlKv()
+	defer cleanSqlKv(db, abs)
+
+	common.TestTsLast(ks, t)
+}
+
+func TestSqlTsGet(t *testing.T) {
+	ks, db, abs := setupSqlKv()
+	defer cleanSqlKv(db, abs)
+
+	common.TestTsGet(ks, t)
+}
+
+func TestSqlTsDelete(t *testing.T) {
+	ks, db, abs := setupSqlKv()
+	defer cleanSqlKv(db, abs)
+
+	common.TestTsDelete(ks, t)
+}
+
+func TestSqlTsDeleteBefore(t *testing.T) {
+	ks, db, abs := setupSqlKv()
+	defer cleanSqlKv(db, abs)
+
+	common.TestTsDeleteBefore(ks, t)
+}
+
+func deleteIfExists(abs string) error {
+	absPath := path.Join(abs, DbName)
+	if f, _ := os.Stat(absPath); f != nil {
+		return os.Remove(absPath)
+	}
+	return nil
+}
+
+func setupSqlKv() (stores.Tskv, *sqlite.Database, string) {
+	absPath, err := filepath.Abs("test")
+	if err != nil {
+		panic(err)
+	}
+	err = deleteIfExists(absPath)
+	if err != nil {
+		panic(err)
+	}
+	config := sqlite.Config{
+		Path: absPath,
+		Name: DbName,
+	}
+	_, db := sqlite.NewSqliteDatabase(config)
+	err = db.Connect()
+	if err != nil {
+		panic(err)
+	}
+
+	builder := sb.NewTsBuilder(db)
+	if err != nil {
+		panic(err)
+	}
+	var store stores.Tskv
+	err, store = builder.CreateTs(Table)
+	if err != nil {
+		panic(err)
+	}
+	return store, db, absPath
+}
+
+func cleanSqlKv(db *sqlite.Database, abs string) {
+	if err := db.Disconnect(); err != nil {
+		panic(err)
+	}
+	if err := deleteIfExists(abs); err != nil {
+		panic(err)
+	}
+}

+ 51 - 47
internal/pkg/tskv/sqlite_test.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021 INTECH Process Automation Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -12,49 +12,31 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package tskv
+package common
 
 import (
+	"github.com/lf-edge/ekuiper/pkg/kv/stores"
 	"reflect"
 	"testing"
 )
 
-func TestSqlite_Funcs(t *testing.T) {
-	ks, e := NewSqlite("test")
-	if e != nil {
-		t.Errorf("Failed to create tskv %s.", e)
-		return
-	}
-
-	if ok, err := ks.Set(1000, "bar1"); nil != err {
-		t.Error(err)
-	} else if !ok {
-		t.Error("should allow key 1000")
-	}
-
-	if ok, err := ks.Set(1500, "bar15"); nil != err {
-		t.Error(err)
-	} else if !ok {
-		t.Error("should allow key 1500")
-	}
+var (
+	Keys   = []int64{1000, 1500, 2000, 3000}
+	Values = []string{"bar1", "bar15", "bar2", "bar3"}
+)
 
-	if ok, err := ks.Set(2000, "bar2"); nil != err {
-		t.Error(err)
-	} else if !ok {
-		t.Error("should allow key 2000")
-	}
-
-	if ok, err := ks.Set(3000, "bar3"); nil != err {
-		t.Error(err)
-	} else if !ok {
-		t.Error("should allow key 3000")
-	}
+func TestTsSet(ks stores.Tskv, t *testing.T) {
+	load(ks, t)
 
 	if ok, err := ks.Set(2500, "bar25"); nil != err {
 		t.Error(err)
 	} else if ok {
-		t.Error("should deny key 2500")
+		t.Errorf("should deny key 2500 while last one is 3000")
 	}
+}
+
+func TestTsLast(ks stores.Tskv, t *testing.T) {
+	load(ks, t)
 
 	var v string
 	if k, err := ks.Last(&v); err != nil {
@@ -62,22 +44,36 @@ func TestSqlite_Funcs(t *testing.T) {
 	} else if k != 3000 || v != "bar3" {
 		t.Errorf("Last expect 3000/bar3 but got %d/%s", k, v)
 	}
+}
+
+func TestTsGet(ks stores.Tskv, t *testing.T) {
+	load(ks, t)
 
-	if ok, _ := ks.Get(2000, &v); ok {
-		if !reflect.DeepEqual("bar2", v) {
-			t.Error("expect:bar", "get:", v)
+	var value string
+	if ok, _ := ks.Get(2000, &value); ok {
+		if !reflect.DeepEqual("bar2", value) {
+			t.Error("expect:bar", "get:", value)
 		}
 	} else {
 		t.Errorf("Should find key 2000.")
 	}
+}
+
+func TestTsDelete(ks stores.Tskv, t *testing.T) {
+	load(ks, t)
 
 	if err := ks.Delete(1500); nil != err {
 		t.Error(err)
 	}
 
-	if ok, _ := ks.Get(1500, &v); ok {
+	var value string
+	if ok, _ := ks.Get(1500, &value); ok {
 		t.Errorf("Should not find deleted key 1500.")
 	}
+}
+
+func TestTsDeleteBefore(ks stores.Tskv, t *testing.T) {
+	load(ks, t)
 
 	if ok, err := ks.Set(3500, "bar35"); nil != err {
 		t.Error(err)
@@ -89,31 +85,39 @@ func TestSqlite_Funcs(t *testing.T) {
 		t.Error(err)
 	}
 
-	if ok, _ := ks.Get(1000, &v); ok {
+	var value string
+	if ok, _ := ks.Get(1000, &value); ok {
 		t.Errorf("Should not find deleted key 1000.")
 	}
-
-	if ok, _ := ks.Get(2000, &v); ok {
+	if ok, _ := ks.Get(2000, &value); ok {
 		t.Errorf("Should not find deleted key 2000.")
 	}
 
-	if ok, _ := ks.Get(3000, &v); ok {
-		if !reflect.DeepEqual("bar3", v) {
-			t.Error("expect:bar3", "get:", v)
+	if ok, _ := ks.Get(3000, &value); ok {
+		if !reflect.DeepEqual("bar3", value) {
+			t.Error("expect:bar3", "get:", value)
 		}
 	} else {
 		t.Errorf("Should find key 3000.")
 	}
 
-	if ok, _ := ks.Get(3500, &v); ok {
-		if !reflect.DeepEqual("bar35", v) {
-			t.Error("expect:bar35", "get:", v)
+	if ok, _ := ks.Get(3500, &value); ok {
+		if !reflect.DeepEqual("bar35", value) {
+			t.Error("expect:bar35", "get:", value)
 		}
 	} else {
 		t.Errorf("Should find key 3500.")
 	}
+}
 
-	if err := ks.Drop(); err != nil {
-		t.Error(err)
+func load(ks stores.Tskv, t *testing.T) {
+	for i := 0; i < len(Keys); i++ {
+		k := Keys[i]
+		v := Values[i]
+		if ok, err := ks.Set(k, v); nil != err {
+			t.Error(err)
+		} else if !ok {
+			t.Errorf("should allow key %d", k)
+		}
 	}
 }

+ 0 - 169
internal/pkg/tskv/sqlite.go

@@ -1,169 +0,0 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package tskv
-
-import (
-	"bytes"
-	"database/sql"
-	"encoding/gob"
-	"fmt"
-	"github.com/lf-edge/ekuiper/internal/conf"
-	_ "github.com/mattn/go-sqlite3"
-	"path"
-	"sync"
-)
-
-// All TSKV instances share ONE database with different tables
-var (
-	db   *sql.DB
-	once sync.Once
-)
-
-// SqliteTskv All TSKV instances share the same database but with different tables
-// Each table must have ONLY ONE instance
-type SqliteTskv struct {
-	table string
-	// only append key bigger than the latest key inside; ONLY check in the instance itself
-	last int64
-}
-
-func init() {
-	gob.Register(make(map[string]interface{}))
-}
-
-func NewSqlite(table string) (*SqliteTskv, error) {
-	var outerError error
-	once.Do(func() {
-		d, err := conf.GetDataLoc()
-		if err != nil {
-			outerError = err
-			return
-		}
-		db, outerError = sql.Open("sqlite3", path.Join(d, "tskv.db"))
-	})
-	if outerError != nil {
-		return nil, outerError
-	}
-	if db == nil {
-		return nil, fmt.Errorf("cannot initiate sqlite db, please restart")
-	}
-	sqlStr := fmt.Sprintf("CREATE TABLE IF NOT EXISTS '%s'('key' INTEGER PRIMARY KEY, 'val' BLOB);", table)
-	_, outerError = db.Exec(sqlStr)
-	if outerError != nil {
-		return nil, fmt.Errorf("cannot create table: %v", outerError)
-	}
-	return &SqliteTskv{
-		table: table,
-		last:  last(table),
-	}, nil
-}
-
-func (m *SqliteTskv) Set(key int64, value interface{}) (bool, error) {
-	if key > m.last {
-		b, err := m.encode(value)
-		if err != nil {
-			return false, err
-		}
-		sqlStr := fmt.Sprintf("INSERT INTO %s(key,val) values(?,?);", m.table)
-		stmt, err := db.Prepare(sqlStr)
-		if err != nil {
-			return false, err
-		}
-		defer stmt.Close()
-		_, err = stmt.Exec(key, b)
-		if err != nil {
-			return false, err
-		} else {
-			m.last = key
-			return true, nil
-		}
-	} else {
-		return false, nil
-	}
-}
-
-func (m *SqliteTskv) Get(key int64, value interface{}) (bool, error) {
-	sqlStr := fmt.Sprintf("SELECT val FROM %s WHERE key=%d;", m.table, key)
-	row := db.QueryRow(sqlStr)
-	var tmp []byte
-	switch err := row.Scan(&tmp); err {
-	case sql.ErrNoRows:
-		return false, nil
-	case nil:
-		// do nothing, continue processing
-	default:
-		return false, err
-	}
-
-	dec := gob.NewDecoder(bytes.NewBuffer(tmp))
-	if err := dec.Decode(value); err != nil {
-		return false, err
-	}
-	return true, nil
-}
-
-func (m *SqliteTskv) Last(value interface{}) (int64, error) {
-	_, err := m.Get(m.last, value)
-	if err != nil {
-		return 0, err
-	}
-	return m.last, nil
-}
-
-func (m *SqliteTskv) Delete(k int64) error {
-	sqlStr := fmt.Sprintf("DELETE FROM %s WHERE key=%d;", m.table, k)
-	_, err := db.Exec(sqlStr)
-	return err
-}
-
-func (m *SqliteTskv) DeleteBefore(k int64) error {
-	sqlStr := fmt.Sprintf("DELETE FROM %s WHERE key<%d;", m.table, k)
-	_, err := db.Exec(sqlStr)
-	return err
-}
-
-func (m *SqliteTskv) Close() error {
-	return nil
-}
-
-func (m *SqliteTskv) Drop() error {
-	sqlStr := fmt.Sprintf("Drop table %s;", m.table)
-	_, err := db.Exec(sqlStr)
-	return err
-}
-
-func (m *SqliteTskv) encode(value interface{}) ([]byte, error) {
-	var buf bytes.Buffer
-	gob.Register(value)
-	enc := gob.NewEncoder(&buf)
-	if err := enc.Encode(value); err != nil {
-		return nil, err
-	}
-	return buf.Bytes(), nil
-}
-
-func last(table string) int64 {
-	sqlStr := fmt.Sprintf("SELECT key FROM %s Order by key DESC Limit 1;", table)
-	row := db.QueryRow(sqlStr)
-	var tmp int64
-	switch err := row.Scan(&tmp); err {
-	case sql.ErrNoRows:
-		return 0
-	case nil:
-		return tmp
-	default:
-		return 0
-	}
-}

+ 5 - 4
internal/plugin/manager.go

@@ -22,10 +22,11 @@ import (
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/pkg/filex"
 	"github.com/lf-edge/ekuiper/internal/pkg/httpx"
-	"github.com/lf-edge/ekuiper/internal/pkg/sqlkv"
+	"github.com/lf-edge/ekuiper/pkg/kv"
+
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
-	"github.com/lf-edge/ekuiper/pkg/kv"
+	"github.com/lf-edge/ekuiper/pkg/kv/stores"
 	"io/ioutil"
 	"os"
 	"os/exec"
@@ -279,7 +280,7 @@ type Manager struct {
 	pluginDir string
 	etcDir    string
 	registry  *Registry
-	db        kv.KeyValue
+	db        stores.KeyValue
 }
 
 func NewPluginManager() (*Manager, error) {
@@ -295,7 +296,7 @@ func NewPluginManager() (*Manager, error) {
 			outerErr = fmt.Errorf("cannot find etc folder: %s", err)
 			return
 		}
-		db, err := sqlkv.GetKVStore("pluginFuncs")
+		err, db := kv.GetKV("pluginFuncs")
 		if err != nil {
 			outerErr = fmt.Errorf("error when opening db: %v.", err)
 		}

+ 5 - 6
internal/processor/rule.go

@@ -19,8 +19,6 @@ import (
 	"encoding/json"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
-	"github.com/lf-edge/ekuiper/internal/pkg/sqlkv"
-	"github.com/lf-edge/ekuiper/internal/pkg/tskv"
 	"github.com/lf-edge/ekuiper/internal/topo"
 	"github.com/lf-edge/ekuiper/internal/topo/node"
 	"github.com/lf-edge/ekuiper/internal/topo/planner"
@@ -29,14 +27,15 @@ import (
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
 	"github.com/lf-edge/ekuiper/pkg/kv"
+	"github.com/lf-edge/ekuiper/pkg/kv/stores"
 )
 
 type RuleProcessor struct {
-	db kv.KeyValue
+	db stores.KeyValue
 }
 
 func NewRuleProcessor() *RuleProcessor {
-	db, err := sqlkv.GetKVStore("rule")
+	err, db := kv.GetKV("rule")
 	if err != nil {
 		panic(fmt.Sprintf("Can not initalize store for the rule processor at path 'rule': %v", err))
 	}
@@ -234,7 +233,7 @@ func (p *RuleProcessor) ExecDrop(name string) (string, error) {
 }
 
 func cleanCheckpoint(name string) error {
-	db, err := tskv.NewSqlite(name)
+	err, db := kv.GetTS(name)
 	if err != nil {
 		return err
 	}
@@ -242,7 +241,7 @@ func cleanCheckpoint(name string) error {
 }
 
 func cleanSinkCache(rule *api.Rule) error {
-	store, err := sqlkv.GetKVStore("sink")
+	err, store := kv.GetKV("sink")
 	if err != nil {
 		return err
 	}

+ 3 - 3
internal/processor/stream.go

@@ -19,11 +19,11 @@ import (
 	"encoding/json"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
-	"github.com/lf-edge/ekuiper/internal/pkg/sqlkv"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
 	"github.com/lf-edge/ekuiper/pkg/kv"
+	"github.com/lf-edge/ekuiper/pkg/kv/stores"
 	"strings"
 )
 
@@ -32,11 +32,11 @@ var (
 )
 
 type StreamProcessor struct {
-	db kv.KeyValue
+	db stores.KeyValue
 }
 
 func NewStreamProcessor() *StreamProcessor {
-	db, err := sqlkv.GetKVStore("stream")
+	err, db := kv.GetKV("stream")
 	if err != nil {
 		panic(fmt.Sprintf("Can not initalize store for the stream processor at path 'stream': %v", err))
 	}

+ 2 - 12
internal/server/server.go

@@ -16,11 +16,11 @@ package server
 
 import (
 	"github.com/lf-edge/ekuiper/internal/conf"
-	"github.com/lf-edge/ekuiper/internal/pkg/sqlkv"
 	"github.com/lf-edge/ekuiper/internal/plugin"
 	"github.com/lf-edge/ekuiper/internal/processor"
 	"github.com/lf-edge/ekuiper/internal/service"
 	"github.com/lf-edge/ekuiper/internal/xsql"
+	"github.com/lf-edge/ekuiper/pkg/kv"
 	"github.com/prometheus/client_golang/prometheus/promhttp"
 
 	"context"
@@ -34,7 +34,6 @@ import (
 )
 
 var (
-	dataDir         string
 	logger          = conf.Log
 	startTimeStamp  int64
 	version         = ""
@@ -50,15 +49,7 @@ func StartUp(Version, LoadFileType string) {
 	startTimeStamp = time.Now().Unix()
 	conf.InitConf()
 
-	dr, err := conf.GetDataLoc()
-	if err != nil {
-		panic(err)
-	} else {
-		logger.Infof("db location is %s", dr)
-		dataDir = dr
-	}
-
-	err = sqlkv.Setup(dataDir)
+	err := kv.SetupWithKuiperConfig(conf.Config)
 	if err != nil {
 		panic(err)
 	}
@@ -185,6 +176,5 @@ func StartUp(Version, LoadFileType string) {
 		logger.Info("prometheus server successfully shutdown.")
 	}
 
-	sqlkv.Close()
 	os.Exit(0)
 }

+ 5 - 5
internal/service/manager.go

@@ -20,9 +20,9 @@ import (
 	kconf "github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/pkg/filex"
 	"github.com/lf-edge/ekuiper/internal/pkg/httpx"
-	"github.com/lf-edge/ekuiper/internal/pkg/sqlkv"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/kv"
+	"github.com/lf-edge/ekuiper/pkg/kv/stores"
 	"io/ioutil"
 	"os"
 	"path"
@@ -44,8 +44,8 @@ type Manager struct {
 	functionBuf  *sync.Map
 
 	etcDir     string
-	serviceKV  kv.KeyValue
-	functionKV kv.KeyValue
+	serviceKV  stores.KeyValue
+	functionKV stores.KeyValue
 }
 
 func GetServiceManager() (*Manager, error) {
@@ -60,11 +60,11 @@ func GetServiceManager() (*Manager, error) {
 		if err != nil {
 			return nil, fmt.Errorf("cannot find etc/services folder: %s", err)
 		}
-		sdb, err := sqlkv.GetKVStore("services")
+		err, sdb := kv.GetKV("services")
 		if err != nil {
 			return nil, fmt.Errorf("cannot open service db: %s", err)
 		}
-		fdb, err := sqlkv.GetKVStore("serviceFuncs")
+		err, fdb := kv.GetKV("serviceFuncs")
 		if err != nil {
 			return nil, fmt.Errorf("cannot open function db: %s", err)
 		}

+ 3 - 5
internal/testx/testUtil.go

@@ -16,7 +16,7 @@ package testx
 
 import (
 	"github.com/lf-edge/ekuiper/internal/conf"
-	"github.com/lf-edge/ekuiper/internal/pkg/sqlkv"
+	"github.com/lf-edge/ekuiper/pkg/kv"
 )
 
 // errstring returns the string representation of an error.
@@ -27,12 +27,10 @@ func Errstring(err error) string {
 	return ""
 }
 
-func InitEnv() string {
+func InitEnv() {
 	conf.InitConf()
-	dbDir, err := conf.GetDataLoc()
+	err := kv.SetupDefault()
 	if err != nil {
 		conf.Log.Fatal(err)
 	}
-	sqlkv.Setup(dbDir)
-	return dbDir
 }

+ 5 - 0
internal/topo/context/default_test.go

@@ -18,6 +18,7 @@ import (
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/topo/state"
 	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/kv"
 	"log"
 	"os"
 	"path"
@@ -26,6 +27,10 @@ import (
 )
 
 func TestState(t *testing.T) {
+	err := kv.SetupDefault()
+	if err != nil {
+		t.Error(err)
+	}
 	var (
 		i      = 0
 		ruleId = "testStateRule"

+ 3 - 3
internal/topo/node/sink_cache.go

@@ -18,10 +18,10 @@ import (
 	"encoding/gob"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
-	"github.com/lf-edge/ekuiper/internal/pkg/sqlkv"
 	"github.com/lf-edge/ekuiper/internal/topo/checkpoint"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/kv"
+	"github.com/lf-edge/ekuiper/pkg/kv/stores"
 	"path"
 	"sort"
 	"strconv"
@@ -80,7 +80,7 @@ type Cache struct {
 	changed bool
 	//serialize
 	key   string //the key for current cache
-	store kv.KeyValue
+	store stores.KeyValue
 }
 
 func NewTimebasedCache(in <-chan interface{}, limit int, saveInterval int, errCh chan<- error, ctx api.StreamContext) *Cache {
@@ -101,7 +101,7 @@ func (c *Cache) initStore(ctx api.StreamContext) {
 		Tail: 0,
 	}
 	var err error
-	c.store, err = sqlkv.GetKVStore(path.Join("sink", ctx.GetRuleId()))
+	err, c.store = kv.GetKV(path.Join("sink", ctx.GetRuleId()))
 	if err != nil {
 		c.drainError(err)
 	}

+ 2 - 2
internal/topo/planner/analyzer.go

@@ -18,14 +18,14 @@ import (
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/pkg/ast"
-	"github.com/lf-edge/ekuiper/pkg/kv"
+	"github.com/lf-edge/ekuiper/pkg/kv/stores"
 	"strconv"
 	"strings"
 )
 
 // Analyze the select statement by decorating the info from stream statement.
 // Typically, set the correct stream name for fieldRefs
-func decorateStmt(s *ast.SelectStatement, store kv.KeyValue) ([]*ast.StreamStmt, error) {
+func decorateStmt(s *ast.SelectStatement, store stores.KeyValue) ([]*ast.StreamStmt, error) {
 	streamsFromStmt := xsql.GetStreams(s)
 	streamStmts := make([]*ast.StreamStmt, len(streamsFromStmt))
 	isSchemaless := false

+ 3 - 3
internal/topo/planner/analyzer_test.go

@@ -17,11 +17,11 @@ package planner
 import (
 	"encoding/json"
 	"fmt"
-	"github.com/lf-edge/ekuiper/internal/pkg/sqlkv"
 	"github.com/lf-edge/ekuiper/internal/testx"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
+	"github.com/lf-edge/ekuiper/pkg/kv"
 	"reflect"
 	"strings"
 	"testing"
@@ -123,7 +123,7 @@ var tests = []struct {
 }
 
 func Test_validation(t *testing.T) {
-	store, err := sqlkv.GetKVStore("stream")
+	err, store := kv.GetKV("stream")
 	if err != nil {
 		t.Error(err)
 		return
@@ -185,7 +185,7 @@ func Test_validation(t *testing.T) {
 }
 
 func Test_validationSchemaless(t *testing.T) {
-	store, err := sqlkv.GetKVStore("stream")
+	err, store := kv.GetKV("stream")
 	if err != nil {
 		t.Error(err)
 		return

+ 3 - 3
internal/topo/planner/planner.go

@@ -18,7 +18,6 @@ import (
 	"errors"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
-	"github.com/lf-edge/ekuiper/internal/pkg/sqlkv"
 	"github.com/lf-edge/ekuiper/internal/topo"
 	"github.com/lf-edge/ekuiper/internal/topo/node"
 	"github.com/lf-edge/ekuiper/internal/topo/operator"
@@ -26,6 +25,7 @@ import (
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/kv"
+	store2 "github.com/lf-edge/ekuiper/pkg/kv/stores"
 )
 
 func Plan(rule *api.Rule) (*topo.Topo, error) {
@@ -49,7 +49,7 @@ func PlanWithSourcesAndSinks(rule *api.Rule, sources []*node.SourceNode, sinks [
 	if rule.Options.SendMetaToSink && (len(streamsFromStmt) > 1 || stmt.Dimensions != nil) {
 		return nil, fmt.Errorf("Invalid option sendMetaToSink, it can not be applied to window")
 	}
-	store, err := sqlkv.GetKVStore("stream")
+	err, store := kv.GetKV("stream")
 	if err != nil {
 		return nil, err
 	}
@@ -199,7 +199,7 @@ func getMockSource(sources []*node.SourceNode, name string) *node.SourceNode {
 	return nil
 }
 
-func createLogicalPlan(stmt *ast.SelectStatement, opt *api.RuleOption, store kv.KeyValue) (LogicalPlan, error) {
+func createLogicalPlan(stmt *ast.SelectStatement, opt *api.RuleOption, store store2.KeyValue) (LogicalPlan, error) {
 
 	dimensions := stmt.Dimensions
 	var (

+ 3 - 3
internal/topo/planner/planner_test.go

@@ -18,11 +18,11 @@ import (
 	"encoding/json"
 	"fmt"
 	"github.com/gdexlab/go-render/render"
-	"github.com/lf-edge/ekuiper/internal/pkg/sqlkv"
 	"github.com/lf-edge/ekuiper/internal/testx"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
+	"github.com/lf-edge/ekuiper/pkg/kv"
 	"reflect"
 	"strings"
 	"testing"
@@ -33,7 +33,7 @@ func init() {
 }
 
 func Test_createLogicalPlan(t *testing.T) {
-	store, err := sqlkv.GetKVStore("stream")
+	err, store := kv.GetKV("stream")
 	if err != nil {
 		t.Error(err)
 		return
@@ -1136,7 +1136,7 @@ func Test_createLogicalPlan(t *testing.T) {
 }
 
 func Test_createLogicalPlanSchemaless(t *testing.T) {
-	store, err := sqlkv.GetKVStore("stream")
+	err, store := kv.GetKV("stream")
 	if err != nil {
 		t.Error(err)
 		return

+ 4 - 3
internal/topo/state/kv_store.go

@@ -18,9 +18,10 @@ import (
 	"encoding/gob"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
-	"github.com/lf-edge/ekuiper/internal/pkg/tskv"
 	"github.com/lf-edge/ekuiper/internal/topo/checkpoint"
 	"github.com/lf-edge/ekuiper/pkg/cast"
+	"github.com/lf-edge/ekuiper/pkg/kv"
+	ts "github.com/lf-edge/ekuiper/pkg/kv/stores"
 	"sync"
 )
 
@@ -35,7 +36,7 @@ func init() {
 //  { "checkpoint1", "checkpoint2" ... "checkpointn" : The complete or incomplete snapshot
 //
 type KVStore struct {
-	db          tskv.Tskv
+	db          ts.Tskv
 	mapStore    *sync.Map //The current root store of a rule
 	checkpoints []int64
 	max         int
@@ -48,7 +49,7 @@ type KVStore struct {
 //"$checkpointId":A map with key of checkpoint id and value of snapshot(gob serialized)
 //Assume each operator only has one instance
 func getKVStore(ruleId string) (*KVStore, error) {
-	db, err := tskv.NewSqlite(ruleId)
+	err, db := kv.GetTS(ruleId)
 	if err != nil {
 		return nil, err
 	}

+ 5 - 0
internal/topo/state/kv_store_test.go

@@ -18,6 +18,7 @@ import (
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/pkg/cast"
+	"github.com/lf-edge/ekuiper/pkg/kv"
 	"log"
 	"os"
 	"path"
@@ -153,6 +154,10 @@ func TestLifecycle(t *testing.T) {
 	)
 	func() {
 		cleanStateData()
+		err := kv.SetupDefault()
+		if err != nil {
+			t.Error(err)
+		}
 		store, err := getKVStore(ruleId)
 		if err != nil {
 			t.Errorf("Get store for rule %s error: %s", ruleId, err)

+ 3 - 3
internal/xsql/stmtx.go

@@ -19,7 +19,7 @@ import (
 	"fmt"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
-	"github.com/lf-edge/ekuiper/pkg/kv"
+	"github.com/lf-edge/ekuiper/pkg/kv/stores"
 	"strings"
 )
 
@@ -57,7 +57,7 @@ type StreamInfo struct {
 	Statement  string         `json:"statement"`
 }
 
-func GetDataSourceStatement(m kv.KeyValue, name string) (*StreamInfo, error) {
+func GetDataSourceStatement(m stores.KeyValue, name string) (*StreamInfo, error) {
 	var (
 		v  string
 		vs = &StreamInfo{}
@@ -72,7 +72,7 @@ func GetDataSourceStatement(m kv.KeyValue, name string) (*StreamInfo, error) {
 	return nil, errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("%s is not found", name))
 }
 
-func GetDataSource(m kv.KeyValue, name string) (stmt *ast.StreamStmt, err error) {
+func GetDataSource(m stores.KeyValue, name string) (stmt *ast.StreamStmt, err error) {
 	info, err := GetDataSourceStatement(m, name)
 	if err != nil {
 		return nil, err

+ 71 - 0
pkg/kv/setup.go

@@ -0,0 +1,71 @@
+// Copyright 2021 INTECH Process Automation Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package kv
+
+import (
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/pkg/db"
+	"github.com/lf-edge/ekuiper/internal/pkg/db/redis"
+	"github.com/lf-edge/ekuiper/internal/pkg/db/sql/sqlite"
+)
+
+func SetupDefault() error {
+	dir, err := conf.GetDataLoc()
+	if err != nil {
+		return err
+	}
+
+	c := db.Config{
+		Type:  "sqlite",
+		Redis: redis.Config{},
+		Sqlite: sqlite.Config{
+			Path: dir,
+			Name: "",
+		},
+	}
+
+	return Setup(c)
+}
+
+func SetupWithKuiperConfig(conf *conf.KuiperConf) error {
+	c := db.Config{
+		Type: conf.Store.Type,
+		Redis: redis.Config{
+			Host:     conf.Store.Redis.Host,
+			Port:     conf.Store.Redis.Port,
+			Password: conf.Store.Redis.Password,
+			Timeout:  conf.Store.Redis.Timeout,
+		},
+		Sqlite: sqlite.Config{
+			Path: conf.Store.Sqlite.Path,
+			Name: conf.Store.Sqlite.Name,
+		},
+	}
+	return Setup(c)
+}
+
+func Setup(config db.Config) error {
+	err, database := db.CreateDatabase(config)
+	if err != nil {
+		return err
+	}
+
+	err = InitGlobalStores(database)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}

+ 103 - 0
pkg/kv/stores.go

@@ -0,0 +1,103 @@
+// Copyright 2021 INTECH Process Automation Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package kv
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/pkg/db"
+	"github.com/lf-edge/ekuiper/internal/pkg/store"
+	"github.com/lf-edge/ekuiper/internal/pkg/ts"
+	st "github.com/lf-edge/ekuiper/pkg/kv/stores"
+	"sync"
+)
+
+type stores struct {
+	kv        map[string]st.KeyValue
+	ts        map[string]st.Tskv
+	mu        sync.Mutex
+	kvBuilder store.Builder
+	tsBuilder ts.Builder
+}
+
+func newStores(db db.Database) (error, *stores) {
+	var err error
+	var kvBuilder store.Builder
+	var tsBuilder ts.Builder
+	err, kvBuilder = store.CreateStoreBuilder(db)
+	if err != nil {
+		return err, nil
+	}
+	err, tsBuilder = ts.CreateTsBuilder(db)
+	if err != nil {
+		return err, nil
+	}
+	return nil, &stores{
+		kv:        make(map[string]st.KeyValue),
+		ts:        make(map[string]st.Tskv),
+		mu:        sync.Mutex{},
+		kvBuilder: kvBuilder,
+		tsBuilder: tsBuilder,
+	}
+}
+
+func (s *stores) GetKV(table string) (error, st.KeyValue) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	if ks, contains := s.kv[table]; contains {
+		return nil, ks
+	}
+	err, ks := s.kvBuilder.CreateStore(table)
+	if err != nil {
+		return err, nil
+	}
+	s.kv[table] = ks
+	return nil, ks
+}
+
+func (s *stores) GetTS(table string) (error, st.Tskv) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	if tts, contains := s.ts[table]; contains {
+		return nil, tts
+	}
+	err, tts := s.tsBuilder.CreateTs(table)
+	if err != nil {
+		return err, nil
+	}
+	s.ts[table] = tts
+	return nil, tts
+}
+
+var globalStores *stores = nil
+
+func InitGlobalStores(db db.Database) error {
+	var err error
+	err, globalStores = newStores(db)
+	return err
+}
+
+func GetKV(table string) (error, st.KeyValue) {
+	if globalStores == nil {
+		return fmt.Errorf("global stores are not initialized"), nil
+	}
+	return globalStores.GetKV(table)
+}
+
+func GetTS(table string) (error, st.Tskv) {
+	if globalStores == nil {
+		return fmt.Errorf("global stores are not initialized"), nil
+	}
+	return globalStores.GetTS(table)
+}

+ 1 - 1
pkg/kv/kv.go

@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package kv
+package stores
 
 type KeyValue interface {
 	// Set key to hold string value if key does not exist otherwise return an error

+ 2 - 1
internal/pkg/tskv/tskv.go

@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package tskv
+package stores
 
 type Tskv interface {
 	Set(k int64, v interface{}) (inserted bool, err error)
@@ -21,4 +21,5 @@ type Tskv interface {
 	Delete(k int64) error
 	DeleteBefore(int64) error
 	Close() error
+	Drop() error
 }