Przeglądaj źródła

feat(checkpoint): use tskv to only clean db in batch

Signed-off-by: ngjaying <ngjaying@gmail.com>
ngjaying 3 lat temu
rodzic
commit
e667096cb6

+ 155 - 0
internal/pkg/tskv/sqlite.go

@@ -0,0 +1,155 @@
+package tskv
+
+import (
+	"bytes"
+	"database/sql"
+	"encoding/gob"
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	_ "github.com/mattn/go-sqlite3"
+	"path"
+	"sync"
+)
+
+// All TSKV instances share ONE database with different tables
+var (
+	db   *sql.DB
+	once sync.Once
+)
+
+// SqliteTskv All TSKV instances share the same database but with different tables
+// Each table must have ONLY ONE instance
+type SqliteTskv struct {
+	table string
+	// only append key bigger than the latest key inside; ONLY check in the instance itself
+	last int64
+}
+
+func init() {
+	gob.Register(make(map[string]interface{}))
+}
+
+func NewSqlite(table string) (*SqliteTskv, error) {
+	var outerError error
+	once.Do(func() {
+		d, err := conf.GetDataLoc()
+		if err != nil {
+			outerError = err
+			return
+		}
+		db, outerError = sql.Open("sqlite3", path.Join(d, "tskv.db"))
+	})
+	if outerError != nil {
+		return nil, outerError
+	}
+	if db == nil {
+		return nil, fmt.Errorf("cannot initiate sqlite db, please restart")
+	}
+	sqlStr := fmt.Sprintf("CREATE TABLE IF NOT EXISTS '%s'('key' INTEGER PRIMARY KEY, 'val' BLOB);", table)
+	_, outerError = db.Exec(sqlStr)
+	if outerError != nil {
+		return nil, fmt.Errorf("cannot create table: %v", outerError)
+	}
+	return &SqliteTskv{
+		table: table,
+		last:  last(table),
+	}, nil
+}
+
+func (m *SqliteTskv) Set(key int64, value interface{}) (bool, error) {
+	if key > m.last {
+		b, err := m.encode(value)
+		if err != nil {
+			return false, err
+		}
+		sqlStr := fmt.Sprintf("INSERT INTO %s(key,val) values(?,?);", m.table)
+		stmt, err := db.Prepare(sqlStr)
+		if err != nil {
+			return false, err
+		}
+		defer stmt.Close()
+		_, err = stmt.Exec(key, b)
+		if err != nil {
+			return false, err
+		} else {
+			m.last = key
+			return true, nil
+		}
+	} else {
+		return false, nil
+	}
+}
+
+func (m *SqliteTskv) Get(key int64, value interface{}) (bool, error) {
+	sqlStr := fmt.Sprintf("SELECT val FROM %s WHERE key=%d;", m.table, key)
+	row := db.QueryRow(sqlStr)
+	var tmp []byte
+	switch err := row.Scan(&tmp); err {
+	case sql.ErrNoRows:
+		return false, nil
+	case nil:
+		// do nothing, continue processing
+	default:
+		return false, err
+	}
+
+	dec := gob.NewDecoder(bytes.NewBuffer(tmp))
+	if err := dec.Decode(value); err != nil {
+		return false, err
+	}
+	return true, nil
+}
+
+func (m *SqliteTskv) Last(value interface{}) (int64, error) {
+	_, err := m.Get(m.last, value)
+	if err != nil {
+		return 0, err
+	}
+	return m.last, nil
+}
+
+func (m *SqliteTskv) Delete(k int64) error {
+	sqlStr := fmt.Sprintf("DELETE FROM %s WHERE key=%d;", m.table, k)
+	_, err := db.Exec(sqlStr)
+	return err
+}
+
+func (m *SqliteTskv) DeleteBefore(k int64) error {
+	sqlStr := fmt.Sprintf("DELETE FROM %s WHERE key<%d;", m.table, k)
+	_, err := db.Exec(sqlStr)
+	return err
+}
+
+func (m *SqliteTskv) Close() error {
+	return nil
+}
+
+func (m *SqliteTskv) Drop() error {
+	sqlStr := fmt.Sprintf("Drop table %s;", m.table)
+	_, err := db.Exec(sqlStr)
+	return err
+}
+
+func (m *SqliteTskv) encode(value interface{}) ([]byte, error) {
+	var buf bytes.Buffer
+	gob.Register(value)
+	enc := gob.NewEncoder(&buf)
+	if err := enc.Encode(value); err != nil {
+		return nil, err
+	}
+	return buf.Bytes(), nil
+}
+
+func last(table string) int64 {
+	sqlStr := fmt.Sprintf("SELECT key FROM %s Order by key DESC Limit 1;", table)
+	row := db.QueryRow(sqlStr)
+	var tmp int64
+	switch err := row.Scan(&tmp); err {
+	case sql.ErrNoRows:
+		return 0
+	case nil:
+		return tmp
+	default:
+		return 0
+	}
+}

+ 105 - 0
internal/pkg/tskv/sqlite_test.go

@@ -0,0 +1,105 @@
+package tskv
+
+import (
+	"reflect"
+	"testing"
+)
+
+func TestSqlite_Funcs(t *testing.T) {
+	ks, e := NewSqlite("test")
+	if e != nil {
+		t.Errorf("Failed to create tskv %s.", e)
+		return
+	}
+
+	if ok, err := ks.Set(1000, "bar1"); nil != err {
+		t.Error(err)
+	} else if !ok {
+		t.Error("should allow key 1000")
+	}
+
+	if ok, err := ks.Set(1500, "bar15"); nil != err {
+		t.Error(err)
+	} else if !ok {
+		t.Error("should allow key 1500")
+	}
+
+	if ok, err := ks.Set(2000, "bar2"); nil != err {
+		t.Error(err)
+	} else if !ok {
+		t.Error("should allow key 2000")
+	}
+
+	if ok, err := ks.Set(3000, "bar3"); nil != err {
+		t.Error(err)
+	} else if !ok {
+		t.Error("should allow key 3000")
+	}
+
+	if ok, err := ks.Set(2500, "bar25"); nil != err {
+		t.Error(err)
+	} else if ok {
+		t.Error("should deny key 2500")
+	}
+
+	var v string
+	if k, err := ks.Last(&v); err != nil {
+		t.Error(err)
+	} else if k != 3000 || v != "bar3" {
+		t.Errorf("Last expect 3000/bar3 but got %d/%s", k, v)
+	}
+
+	if ok, _ := ks.Get(2000, &v); ok {
+		if !reflect.DeepEqual("bar2", v) {
+			t.Error("expect:bar", "get:", v)
+		}
+	} else {
+		t.Errorf("Should find key 2000.")
+	}
+
+	if err := ks.Delete(1500); nil != err {
+		t.Error(err)
+	}
+
+	if ok, _ := ks.Get(1500, &v); ok {
+		t.Errorf("Should not find deleted key 1500.")
+	}
+
+	if ok, err := ks.Set(3500, "bar35"); nil != err {
+		t.Error(err)
+	} else if !ok {
+		t.Error("should allow key 3500")
+	}
+
+	if err := ks.DeleteBefore(3000); nil != err {
+		t.Error(err)
+	}
+
+	if ok, _ := ks.Get(1000, &v); ok {
+		t.Errorf("Should not find deleted key 1000.")
+	}
+
+	if ok, _ := ks.Get(2000, &v); ok {
+		t.Errorf("Should not find deleted key 2000.")
+	}
+
+	if ok, _ := ks.Get(3000, &v); ok {
+		if !reflect.DeepEqual("bar3", v) {
+			t.Error("expect:bar3", "get:", v)
+		}
+	} else {
+		t.Errorf("Should find key 3000.")
+	}
+
+	if ok, _ := ks.Get(3500, &v); ok {
+		if !reflect.DeepEqual("bar35", v) {
+			t.Error("expect:bar35", "get:", v)
+		}
+	} else {
+		t.Errorf("Should find key 3500.")
+	}
+
+	if err := ks.Drop(); err != nil {
+		t.Error(err)
+	}
+}

+ 10 - 0
internal/pkg/tskv/tskv.go

@@ -0,0 +1,10 @@
+package tskv
+
+type Tskv interface {
+	Set(k int64, v interface{}) (inserted bool, err error)
+	Get(k int64, v interface{}) (found bool, err error)
+	Last(v interface{}) (key int64, err error)
+	Delete(k int64) error
+	DeleteBefore(int64) error
+	Close() error
+}

+ 14 - 21
internal/topo/checkpoint/coordinator.go

@@ -77,7 +77,7 @@ type Coordinator struct {
 	completedCheckpoints    *checkpointStore
 	ruleId                  string
 	baseInterval            int
-	timeout                 int
+	cleanThreshold          int
 	advanceToEndOfEventTime bool
 	ticker                  *clock.Ticker //For processing time only
 	signal                  chan *Signal
@@ -113,7 +113,7 @@ func NewCoordinator(ruleId string, sources []StreamTask, operators []NonSourceTa
 	}
 	//5 minutes by default
 	if interval <= 0 {
-		interval = 5000
+		interval = 300000
 	}
 	return &Coordinator{
 		tasksToTrigger:     sourceResponders,
@@ -123,12 +123,12 @@ func NewCoordinator(ruleId string, sources []StreamTask, operators []NonSourceTa
 		completedCheckpoints: &checkpointStore{
 			maxNum: 3,
 		},
-		ruleId:       ruleId,
-		signal:       signal,
-		baseInterval: interval,
-		timeout:      200000,
-		store:        store,
-		ctx:          ctx,
+		ruleId:         ruleId,
+		signal:         signal,
+		baseInterval:   interval,
+		store:          store,
+		ctx:            ctx,
+		cleanThreshold: 100,
 	}
 }
 
@@ -152,6 +152,7 @@ func (c *Coordinator) Activate() error {
 	tc := c.ticker.C
 	go func() {
 		c.activated = true
+		toBeClean := 0
 		for {
 			select {
 			case n := <-tc:
@@ -171,22 +172,14 @@ func (c *Coordinator) Activate() error {
 						if err := t.TriggerCheckpoint(checkpointId); err != nil {
 							logger.Infof("Fail to trigger checkpoint for source %s with error %v, cancel it", t.GetName(), err)
 							c.cancel(checkpointId)
-						} else {
-							timeout := conf.GetTicker(c.timeout)
-							select {
-							case <-timeout.C:
-								logger.Debugf("Try to cancel checkpoint %d for timeout", checkpointId)
-								c.cancel(checkpointId)
-							case <-c.ctx.Done():
-								if timeout != nil {
-									timeout.Stop()
-									logger.Infoln("Stop ongoing checkpoint %d", checkpointId)
-									c.cancel(checkpointId)
-								}
-							}
 						}
 					}(r)
 				}
+				toBeClean++
+				if toBeClean >= c.cleanThreshold {
+					c.store.Clean()
+					toBeClean = 0
+				}
 			case s := <-c.signal:
 				switch s.Message {
 				case STOP:

+ 17 - 35
internal/topo/state/kv_store.go

@@ -4,10 +4,9 @@ import (
 	"encoding/gob"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/pkg/tskv"
 	"github.com/lf-edge/ekuiper/internal/topo/checkpoint"
 	"github.com/lf-edge/ekuiper/pkg/cast"
-	"github.com/lf-edge/ekuiper/pkg/kv"
-	"path"
 	"sync"
 )
 
@@ -22,10 +21,11 @@ func init() {
 //  { "checkpoint1", "checkpoint2" ... "checkpointn" : The complete or incomplete snapshot
 //
 type KVStore struct {
-	db          kv.KeyValue
+	db          tskv.Tskv
 	mapStore    *sync.Map //The current root store of a rule
 	checkpoints []int64
 	max         int
+	ruleId      string
 }
 
 //Store in path ./data/checkpoint/$ruleId
@@ -34,9 +34,11 @@ type KVStore struct {
 //"$checkpointId":A map with key of checkpoint id and value of snapshot(gob serialized)
 //Assume each operator only has one instance
 func getKVStore(ruleId string) (*KVStore, error) {
-	dr, _ := conf.GetDataLoc()
-	db := kv.GetDefaultKVStore(path.Join(dr, ruleId, CheckpointListKey))
-	s := &KVStore{db: db, max: 3, mapStore: &sync.Map{}}
+	db, err := tskv.NewSqlite(ruleId)
+	if err != nil {
+		return nil, err
+	}
+	s := &KVStore{db: db, max: 3, mapStore: &sync.Map{}, ruleId: ruleId}
 	//read data from badger db
 	if err := s.restore(); err != nil {
 		return nil, err
@@ -45,24 +47,13 @@ func getKVStore(ruleId string) (*KVStore, error) {
 }
 
 func (s *KVStore) restore() error {
-	err := s.db.Open()
+	var m map[string]interface{}
+	k, err := s.db.Last(&m)
 	if err != nil {
 		return err
 	}
-	defer s.db.Close()
-
-	var cs []int64
-	if ok, _ := s.db.Get(CheckpointListKey, &cs); ok {
-		s.checkpoints = cs
-		for _, c := range cs {
-			var m map[string]interface{}
-			if ok, _ := s.db.Get(fmt.Sprintf("%d", c), &m); ok {
-				s.mapStore.Store(c, cast.MapToSyncMap(m))
-			} else {
-				return fmt.Errorf("invalid checkpoint data: %v", c)
-			}
-		}
-	}
+	s.checkpoints = []int64{k}
+	s.mapStore.Store(k, cast.MapToSyncMap(m))
 	return nil
 }
 
@@ -89,27 +80,14 @@ func (s *KVStore) SaveCheckpoint(checkpointId int64) error {
 		if m, ok := v.(*sync.Map); !ok {
 			return fmt.Errorf("invalid KVStore for checkpointId %d with value %v: should be *sync.Map type", checkpointId, v)
 		} else {
-			err := s.db.Open()
-			if err != nil {
-				return fmt.Errorf("save checkpoint err: %v", err)
-			}
-			err = s.db.Set(fmt.Sprintf("%d", checkpointId), cast.SyncMapToMap(m))
-			if err != nil {
-				return fmt.Errorf("save checkpoint err: %v", err)
-			}
 			s.checkpoints = append(s.checkpoints, checkpointId)
 			//TODO is the order promised?
 			for len(s.checkpoints) > s.max {
 				cp := s.checkpoints[0]
 				s.checkpoints = s.checkpoints[1:]
 				s.mapStore.Delete(cp)
-				err = s.db.Delete(fmt.Sprintf("%d", cp))
-				if err != nil {
-					fmt.Printf("delete checkpoint %d db store error %s", cp, err)
-				}
 			}
-
-			err = s.db.Set(CheckpointListKey, s.checkpoints)
+			_, err := s.db.Set(checkpointId, cast.SyncMapToMap(m))
 			if err != nil {
 				return fmt.Errorf("save checkpoint err: %v", err)
 			}
@@ -142,3 +120,7 @@ func (s *KVStore) GetOpState(opId string) (*sync.Map, error) {
 	}
 	return &sync.Map{}, nil
 }
+
+func (s *KVStore) Clean() error {
+	return s.db.DeleteBefore(s.checkpoints[0])
+}

+ 8 - 5
internal/topo/state/kv_store_test.go

@@ -138,7 +138,7 @@ func TestLifecycle(t *testing.T) {
 		}
 	)
 	func() {
-		defer cleanStateData()
+		cleanStateData()
 		store, err := getKVStore(ruleId)
 		if err != nil {
 			t.Errorf("Get store for rule %s error: %s", ruleId, err)
@@ -208,14 +208,17 @@ func TestLifecycle(t *testing.T) {
 			return
 		}
 		// compare checkpoints
-		if !reflect.DeepEqual(checkpointIds, store.checkpoints) {
+		if !reflect.DeepEqual(checkpointIds[2:], store.checkpoints) {
 			t.Errorf("%d.Restore checkpoint\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, checkpointIds, store.checkpoints)
 			return
 		}
 		// compare contents
 		result = mapStoreToMap(store.mapStore)
-		if !reflect.DeepEqual(r, result) {
-			t.Errorf("%d.Restore checkpoint\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, r, result)
+		last := map[string]interface{}{
+			"3": r["3"],
+		}
+		if !reflect.DeepEqual(last, result) {
+			t.Errorf("%d.Restore checkpoint\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, last, result)
 			return
 		}
 		ns, err := store.GetOpState(opIds[1])
@@ -251,7 +254,7 @@ func cleanStateData() {
 	if err != nil {
 		log.Panic(err)
 	}
-	c := path.Join(dbDir, CheckpointListKey)
+	c := path.Join(dbDir)
 	err = os.RemoveAll(c)
 	if err != nil {
 		conf.Log.Error(err)

+ 5 - 1
internal/topo/state/memory_store.go

@@ -20,6 +20,10 @@ func (s *MemoryStore) SaveCheckpoint(_ int64) error {
 	return nil
 }
 
-func (s *MemoryStore) GetOpState(opId string) (*sync.Map, error) {
+func (s *MemoryStore) GetOpState(_ string) (*sync.Map, error) {
 	return &sync.Map{}, nil
 }
+
+func (s *MemoryStore) Clean() error {
+	return nil
+}

+ 2 - 1
pkg/api/stream.go

@@ -46,8 +46,9 @@ type Logger interface {
 
 type Store interface {
 	SaveState(checkpointId int64, opId string, state map[string]interface{}) error
-	SaveCheckpoint(checkpointId int64) error //Save the whole checkpoint state into storage like badger
+	SaveCheckpoint(checkpointId int64) error //Save the whole checkpoint state into storage
 	GetOpState(opId string) (*sync.Map, error)
+	Clean() error
 }
 
 type Closable interface {