Browse Source

feat(checkpoint): support rewindable source

ngjaying 4 years atrás
parent
commit
f5a308df98
3 changed files with 64 additions and 3 deletions
  1. 5 0
      xstream/api/stream.go
  2. 25 0
      xstream/nodes/source_node.go
  3. 34 3
      xstream/test/mock_source.go

+ 5 - 0
xstream/api/stream.go

@@ -85,6 +85,11 @@ type TopNode interface {
 	GetName() string
 }
 
+type Rewindable interface {
+	GetOffset() (interface{}, error)
+	Rewind(offset interface{}) error
+}
+
 type RuleOption struct {
 	IsEventTime        bool  `json:"isEventTime"`
 	LateTol            int64 `json:"lateTolerance"`

+ 25 - 0
xstream/nodes/source_node.go

@@ -36,6 +36,8 @@ func NewSourceNode(name string, options map[string]string) *SourceNode {
 	}
 }
 
+const OFFSET_KEY = "$$offset"
+
 //Only for mock source, do not use it in production
 func NewSourceNodeWithSource(name string, source api.Source, options map[string]string) *SourceNode {
 	return &SourceNode{
@@ -105,6 +107,18 @@ func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
 				m.statManagers = append(m.statManagers, stats)
 				m.mutex.Unlock()
 
+				if rw, ok := source.(api.Rewindable); ok {
+					if offset, err := ctx.GetState(OFFSET_KEY); err != nil {
+						m.drainError(errCh, err, ctx, logger)
+					} else if offset != nil {
+						logger.Infof("Source rewind from %v", offset)
+						err = rw.Rewind(offset)
+						if err != nil {
+							m.drainError(errCh, err, ctx, logger)
+						}
+					}
+				}
+
 				buffer := NewDynamicChannelBuffer()
 				buffer.SetLimit(bl)
 				sourceErrCh := make(chan error)
@@ -128,6 +142,17 @@ func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
 						m.Broadcast(tuple)
 						stats.IncTotalRecordsOut()
 						stats.SetBufferLength(int64(buffer.GetLength()))
+						if rw, ok := source.(api.Rewindable); ok {
+							if offset, err := rw.GetOffset(); err != nil {
+								m.drainError(errCh, err, ctx, logger)
+							} else {
+								err = ctx.PutState(OFFSET_KEY, offset)
+								if err != nil {
+									m.drainError(errCh, err, ctx, logger)
+								}
+								logger.Debugf("Source save offset %v", offset)
+							}
+						}
 						logger.Debugf("%s consume data %v complete", m.name, tuple)
 					}
 				}

+ 34 - 3
xstream/test/mock_source.go

@@ -1,6 +1,7 @@
 package test
 
 import (
+	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xsql"
 	"github.com/emqx/kuiper/xstream/api"
@@ -11,6 +12,8 @@ type MockSource struct {
 	data        []*xsql.Tuple
 	done        <-chan int
 	isEventTime bool
+
+	offset int
 }
 
 func NewMockSource(data []*xsql.Tuple, done <-chan int, isEventTime bool) *MockSource {
@@ -25,9 +28,20 @@ func NewMockSource(data []*xsql.Tuple, done <-chan int, isEventTime bool) *MockS
 func (m *MockSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
 	log := ctx.GetLogger()
 	mockClock := GetMockClock()
-	log.Debugln("mock source starts")
-	for _, d := range m.data {
-		<-m.done
+	log.Debugf("mock source starts with offset %d", m.offset)
+	for i, d := range m.data {
+		if i < m.offset {
+			log.Debugf("mock source is skipping %d", i)
+			continue
+		}
+		log.Debugf("mock source is waiting", i)
+		select {
+		case j := <-m.done:
+			log.Debugf("mock source receives data %d", j)
+		case <-ctx.Done():
+			log.Debugf("mock source open DONE")
+			return
+		}
 		log.Debugf("mock source is sending data %s", d)
 		if !m.isEventTime {
 			mockClock.Set(common.TimeFromUnixMilli(d.Timestamp))
@@ -35,11 +49,28 @@ func (m *MockSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple
 			mockClock.Add(1000 * time.Millisecond)
 		}
 		consumer <- api.NewDefaultSourceTuple(d.Message, xsql.Metadata{"topic": "mock"})
+		m.offset = i + 1
 		time.Sleep(1)
 	}
+	log.Debugf("mock source sends out all data")
+}
+
+func (m *MockSource) GetOffset() (interface{}, error) {
+	return m.offset, nil
+}
+
+func (m *MockSource) Rewind(offset interface{}) error {
+	oi, err := common.ToInt(offset)
+	if err != nil {
+		return fmt.Errorf("mock source fails to rewind: %s", err)
+	} else {
+		m.offset = oi
+	}
+	return nil
 }
 
 func (m *MockSource) Close(ctx api.StreamContext) error {
+	m.offset = 0
 	return nil
 }