sqlite.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package tskv
  15. import (
  16. "bytes"
  17. "database/sql"
  18. "encoding/gob"
  19. "fmt"
  20. "github.com/lf-edge/ekuiper/internal/conf"
  21. _ "github.com/mattn/go-sqlite3"
  22. "path"
  23. "sync"
  24. )
  25. // All TSKV instances share ONE database with different tables
  26. var (
  27. db *sql.DB
  28. once sync.Once
  29. )
  30. // SqliteTskv All TSKV instances share the same database but with different tables
  31. // Each table must have ONLY ONE instance
  32. type SqliteTskv struct {
  33. table string
  34. // only append key bigger than the latest key inside; ONLY check in the instance itself
  35. last int64
  36. }
  37. func init() {
  38. gob.Register(make(map[string]interface{}))
  39. }
  40. func NewSqlite(table string) (*SqliteTskv, error) {
  41. var outerError error
  42. once.Do(func() {
  43. d, err := conf.GetDataLoc()
  44. if err != nil {
  45. outerError = err
  46. return
  47. }
  48. db, outerError = sql.Open("sqlite3", path.Join(d, "tskv.db"))
  49. })
  50. if outerError != nil {
  51. return nil, outerError
  52. }
  53. if db == nil {
  54. return nil, fmt.Errorf("cannot initiate sqlite db, please restart")
  55. }
  56. sqlStr := fmt.Sprintf("CREATE TABLE IF NOT EXISTS '%s'('key' INTEGER PRIMARY KEY, 'val' BLOB);", table)
  57. _, outerError = db.Exec(sqlStr)
  58. if outerError != nil {
  59. return nil, fmt.Errorf("cannot create table: %v", outerError)
  60. }
  61. return &SqliteTskv{
  62. table: table,
  63. last: last(table),
  64. }, nil
  65. }
  66. func (m *SqliteTskv) Set(key int64, value interface{}) (bool, error) {
  67. if key > m.last {
  68. b, err := m.encode(value)
  69. if err != nil {
  70. return false, err
  71. }
  72. sqlStr := fmt.Sprintf("INSERT INTO %s(key,val) values(?,?);", m.table)
  73. stmt, err := db.Prepare(sqlStr)
  74. if err != nil {
  75. return false, err
  76. }
  77. defer stmt.Close()
  78. _, err = stmt.Exec(key, b)
  79. if err != nil {
  80. return false, err
  81. } else {
  82. m.last = key
  83. return true, nil
  84. }
  85. } else {
  86. return false, nil
  87. }
  88. }
  89. func (m *SqliteTskv) Get(key int64, value interface{}) (bool, error) {
  90. sqlStr := fmt.Sprintf("SELECT val FROM %s WHERE key=%d;", m.table, key)
  91. row := db.QueryRow(sqlStr)
  92. var tmp []byte
  93. switch err := row.Scan(&tmp); err {
  94. case sql.ErrNoRows:
  95. return false, nil
  96. case nil:
  97. // do nothing, continue processing
  98. default:
  99. return false, err
  100. }
  101. dec := gob.NewDecoder(bytes.NewBuffer(tmp))
  102. if err := dec.Decode(value); err != nil {
  103. return false, err
  104. }
  105. return true, nil
  106. }
  107. func (m *SqliteTskv) Last(value interface{}) (int64, error) {
  108. _, err := m.Get(m.last, value)
  109. if err != nil {
  110. return 0, err
  111. }
  112. return m.last, nil
  113. }
  114. func (m *SqliteTskv) Delete(k int64) error {
  115. sqlStr := fmt.Sprintf("DELETE FROM %s WHERE key=%d;", m.table, k)
  116. _, err := db.Exec(sqlStr)
  117. return err
  118. }
  119. func (m *SqliteTskv) DeleteBefore(k int64) error {
  120. sqlStr := fmt.Sprintf("DELETE FROM %s WHERE key<%d;", m.table, k)
  121. _, err := db.Exec(sqlStr)
  122. return err
  123. }
  124. func (m *SqliteTskv) Close() error {
  125. return nil
  126. }
  127. func (m *SqliteTskv) Drop() error {
  128. sqlStr := fmt.Sprintf("Drop table %s;", m.table)
  129. _, err := db.Exec(sqlStr)
  130. return err
  131. }
  132. func (m *SqliteTskv) encode(value interface{}) ([]byte, error) {
  133. var buf bytes.Buffer
  134. gob.Register(value)
  135. enc := gob.NewEncoder(&buf)
  136. if err := enc.Encode(value); err != nil {
  137. return nil, err
  138. }
  139. return buf.Bytes(), nil
  140. }
  141. func last(table string) int64 {
  142. sqlStr := fmt.Sprintf("SELECT key FROM %s Order by key DESC Limit 1;", table)
  143. row := db.QueryRow(sqlStr)
  144. var tmp int64
  145. switch err := row.Scan(&tmp); err {
  146. case sql.ErrNoRows:
  147. return 0
  148. case nil:
  149. return tmp
  150. default:
  151. return 0
  152. }
  153. }