sqlite.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. package tskv
  2. import (
  3. "bytes"
  4. "database/sql"
  5. "encoding/gob"
  6. "fmt"
  7. "github.com/lf-edge/ekuiper/internal/conf"
  8. _ "github.com/mattn/go-sqlite3"
  9. "path"
  10. "sync"
  11. )
  12. // All TSKV instances share ONE database with different tables
  13. var (
  14. db *sql.DB
  15. once sync.Once
  16. )
  17. // SqliteTskv All TSKV instances share the same database but with different tables
  18. // Each table must have ONLY ONE instance
  19. type SqliteTskv struct {
  20. table string
  21. // only append key bigger than the latest key inside; ONLY check in the instance itself
  22. last int64
  23. }
  24. func init() {
  25. gob.Register(make(map[string]interface{}))
  26. }
  27. func NewSqlite(table string) (*SqliteTskv, error) {
  28. var outerError error
  29. once.Do(func() {
  30. d, err := conf.GetDataLoc()
  31. if err != nil {
  32. outerError = err
  33. return
  34. }
  35. db, outerError = sql.Open("sqlite3", path.Join(d, "tskv.db"))
  36. })
  37. if outerError != nil {
  38. return nil, outerError
  39. }
  40. if db == nil {
  41. return nil, fmt.Errorf("cannot initiate sqlite db, please restart")
  42. }
  43. sqlStr := fmt.Sprintf("CREATE TABLE IF NOT EXISTS '%s'('key' INTEGER PRIMARY KEY, 'val' BLOB);", table)
  44. _, outerError = db.Exec(sqlStr)
  45. if outerError != nil {
  46. return nil, fmt.Errorf("cannot create table: %v", outerError)
  47. }
  48. return &SqliteTskv{
  49. table: table,
  50. last: last(table),
  51. }, nil
  52. }
  53. func (m *SqliteTskv) Set(key int64, value interface{}) (bool, error) {
  54. if key > m.last {
  55. b, err := m.encode(value)
  56. if err != nil {
  57. return false, err
  58. }
  59. sqlStr := fmt.Sprintf("INSERT INTO %s(key,val) values(?,?);", m.table)
  60. stmt, err := db.Prepare(sqlStr)
  61. if err != nil {
  62. return false, err
  63. }
  64. defer stmt.Close()
  65. _, err = stmt.Exec(key, b)
  66. if err != nil {
  67. return false, err
  68. } else {
  69. m.last = key
  70. return true, nil
  71. }
  72. } else {
  73. return false, nil
  74. }
  75. }
  76. func (m *SqliteTskv) Get(key int64, value interface{}) (bool, error) {
  77. sqlStr := fmt.Sprintf("SELECT val FROM %s WHERE key=%d;", m.table, key)
  78. row := db.QueryRow(sqlStr)
  79. var tmp []byte
  80. switch err := row.Scan(&tmp); err {
  81. case sql.ErrNoRows:
  82. return false, nil
  83. case nil:
  84. // do nothing, continue processing
  85. default:
  86. return false, err
  87. }
  88. dec := gob.NewDecoder(bytes.NewBuffer(tmp))
  89. if err := dec.Decode(value); err != nil {
  90. return false, err
  91. }
  92. return true, nil
  93. }
  94. func (m *SqliteTskv) Last(value interface{}) (int64, error) {
  95. _, err := m.Get(m.last, value)
  96. if err != nil {
  97. return 0, err
  98. }
  99. return m.last, nil
  100. }
  101. func (m *SqliteTskv) Delete(k int64) error {
  102. sqlStr := fmt.Sprintf("DELETE FROM %s WHERE key=%d;", m.table, k)
  103. _, err := db.Exec(sqlStr)
  104. return err
  105. }
  106. func (m *SqliteTskv) DeleteBefore(k int64) error {
  107. sqlStr := fmt.Sprintf("DELETE FROM %s WHERE key<%d;", m.table, k)
  108. _, err := db.Exec(sqlStr)
  109. return err
  110. }
  111. func (m *SqliteTskv) Close() error {
  112. return nil
  113. }
  114. func (m *SqliteTskv) Drop() error {
  115. sqlStr := fmt.Sprintf("Drop table %s;", m.table)
  116. _, err := db.Exec(sqlStr)
  117. return err
  118. }
  119. func (m *SqliteTskv) encode(value interface{}) ([]byte, error) {
  120. var buf bytes.Buffer
  121. gob.Register(value)
  122. enc := gob.NewEncoder(&buf)
  123. if err := enc.Encode(value); err != nil {
  124. return nil, err
  125. }
  126. return buf.Bytes(), nil
  127. }
  128. func last(table string) int64 {
  129. sqlStr := fmt.Sprintf("SELECT key FROM %s Order by key DESC Limit 1;", table)
  130. row := db.QueryRow(sqlStr)
  131. var tmp int64
  132. switch err := row.Scan(&tmp); err {
  133. case sql.ErrNoRows:
  134. return 0
  135. case nil:
  136. return tmp
  137. default:
  138. return 0
  139. }
  140. }