Browse Source

feat(checkpoint): checkpoint controllers

ngjaying 4 years atrás
parent
commit
d3298cb225

+ 8 - 0
xsql/ast.go

@@ -1778,3 +1778,11 @@ func HasNoAggFuncs(node Node) bool {
 	})
 	})
 	return r
 	return r
 }
 }
+
+type Qos int
+
+const (
+	AtMostOnce Qos = iota
+	AtLeastOnce
+	ExactlyOnce
+)

+ 2 - 0
xsql/processors/extension_test.go

@@ -1,3 +1,5 @@
+// +build !windows
+
 package processors
 package processors
 
 
 import (
 import (

+ 1 - 2
xsql/processors/xsql_processor.go

@@ -10,7 +10,6 @@ import (
 	"github.com/emqx/kuiper/xstream"
 	"github.com/emqx/kuiper/xstream"
 	"github.com/emqx/kuiper/xstream/api"
 	"github.com/emqx/kuiper/xstream/api"
 	"github.com/emqx/kuiper/xstream/nodes"
 	"github.com/emqx/kuiper/xstream/nodes"
-	"github.com/emqx/kuiper/xstream/operators"
 	"path"
 	"path"
 	"strings"
 	"strings"
 )
 )
@@ -488,7 +487,7 @@ func (p *RuleProcessor) createTopoWithSources(rule *api.Rule, sources []*nodes.S
 			if dimensions != nil {
 			if dimensions != nil {
 				w = dimensions.GetWindow()
 				w = dimensions.GetWindow()
 				if w != nil {
 				if w != nil {
-					wop, err := operators.NewWindowOp("window", w, isEventTime, lateTol, streamsFromStmt, bufferLength)
+					wop, err := nodes.NewWindowOp("window", w, isEventTime, lateTol, streamsFromStmt, bufferLength)
 					if err != nil {
 					if err != nil {
 						return nil, nil, err
 						return nil, nil, err
 					}
 					}

+ 1 - 0
xstream/api/stream.go

@@ -102,6 +102,7 @@ type StreamContext interface {
 	PutState(key string, value interface{}) error
 	PutState(key string, value interface{}) error
 	GetState(key string) (interface{}, error)
 	GetState(key string) (interface{}, error)
 	DeleteState(key string) error
 	DeleteState(key string) error
+	SaveState(checkpointId int64) error
 }
 }
 
 
 type Operator interface {
 type Operator interface {

+ 168 - 0
xstream/checkpoints/barrier_handler.go

@@ -0,0 +1,168 @@
+package checkpoints
+
+import "github.com/emqx/kuiper/xstream/api"
+
+type BarrierHandler interface {
+	Process(data *BufferOrEvent, ctx api.StreamContext) bool //If data is barrier return true, else return false
+	SetOutput(chan<- *BufferOrEvent)                         //It is using for block a channel
+}
+
+//For qos 1, simple track barriers
+type BarrierTracker struct {
+	responder          Responder
+	inputCount         int
+	pendingCheckpoints map[int64]int
+}
+
+func NewBarrierTracker(responder Responder, inputCount int) *BarrierTracker {
+	return &BarrierTracker{
+		responder:          responder,
+		inputCount:         inputCount,
+		pendingCheckpoints: make(map[int64]int),
+	}
+}
+
+func (h *BarrierTracker) Process(data *BufferOrEvent, ctx api.StreamContext) bool {
+	d := data.Data
+	if b, ok := d.(*Barrier); ok {
+		h.processBarrier(b, ctx)
+		return true
+	}
+	return false
+}
+
+func (h *BarrierTracker) SetOutput(output chan<- *BufferOrEvent) {
+	//do nothing, does not need it
+}
+
+func (h *BarrierTracker) processBarrier(b *Barrier, _ api.StreamContext) {
+	if h.inputCount == 1 {
+		h.responder.TriggerCheckpoint(b.CheckpointId)
+		return
+	}
+	if c, ok := h.pendingCheckpoints[b.CheckpointId]; ok {
+		c += 1
+		if c == h.inputCount {
+			h.responder.TriggerCheckpoint(b.CheckpointId)
+			delete(h.pendingCheckpoints, b.CheckpointId)
+			for cid := range h.pendingCheckpoints {
+				if cid < b.CheckpointId {
+					delete(h.pendingCheckpoints, cid)
+				}
+			}
+		} else {
+			h.pendingCheckpoints[b.CheckpointId] = c
+		}
+	} else {
+		h.pendingCheckpoints[b.CheckpointId] = 1
+	}
+}
+
+//For qos 2, block an input until all barriers are received
+type BarrierAligner struct {
+	responder           Responder
+	inputCount          int
+	currentCheckpointId int64
+	output              chan<- *BufferOrEvent
+	blockedChannels     map[string]bool
+	buffer              []*BufferOrEvent
+}
+
+func NewBarrierAligner(responder Responder, inputCount int) *BarrierAligner {
+	ba := &BarrierAligner{
+		responder:       responder,
+		inputCount:      inputCount,
+		blockedChannels: make(map[string]bool),
+	}
+	return ba
+}
+
+func (h *BarrierAligner) Process(data *BufferOrEvent, ctx api.StreamContext) bool {
+	if data.Processed {
+		return false
+	}
+
+	switch d := data.Data.(type) {
+	case *Barrier:
+		h.processBarrier(d, ctx)
+		return true
+	default:
+		//If blocking, save to buffer
+		if h.inputCount > 1 && len(h.blockedChannels) > 0 {
+			if _, ok := h.blockedChannels[data.Channel]; ok {
+				data.Processed = true
+				h.buffer = append(h.buffer, data)
+				return true
+			}
+		}
+	}
+	return false
+}
+
+func (h *BarrierAligner) processBarrier(b *Barrier, ctx api.StreamContext) {
+	logger := ctx.GetLogger()
+	if h.inputCount == 1 {
+		if b.CheckpointId > h.currentCheckpointId {
+			h.currentCheckpointId = b.CheckpointId
+			h.responder.TriggerCheckpoint(b.CheckpointId)
+		}
+		return
+	}
+	if len(h.blockedChannels) > 0 {
+		if b.CheckpointId == h.currentCheckpointId {
+			h.onBarrier(b.OpId, ctx)
+		} else if b.CheckpointId > h.currentCheckpointId {
+			logger.Infof("Received checkpoint barrier for checkpoint %d before complete current checkpoint %d. Skipping current checkpoint.", b.CheckpointId, h.currentCheckpointId)
+			//TODO Abort checkpoint
+
+			h.releaseBlocksAndResetBarriers()
+			h.beginNewAlignment(b, ctx)
+		} else {
+			return
+		}
+	} else if b.CheckpointId > h.currentCheckpointId {
+		h.beginNewAlignment(b, ctx)
+	} else {
+		return
+	}
+	if len(h.blockedChannels) == h.inputCount {
+		logger.Debugf("Received all barriers, triggering checkpoint %d", b.CheckpointId)
+		h.releaseBlocksAndResetBarriers()
+		h.responder.TriggerCheckpoint(b.CheckpointId)
+
+		// clean up all the buffer
+		var temp []*BufferOrEvent
+		for _, d := range h.buffer {
+			temp = append(temp, d)
+		}
+		go func() {
+			for _, d := range temp {
+				h.output <- d
+			}
+		}()
+		h.buffer = make([]*BufferOrEvent, 0)
+	}
+}
+
+func (h *BarrierAligner) onBarrier(name string, ctx api.StreamContext) {
+	logger := ctx.GetLogger()
+	if _, ok := h.blockedChannels[name]; !ok {
+		h.blockedChannels[name] = true
+		logger.Debugf("Received barrier from channel %s", name)
+	}
+}
+
+func (h *BarrierAligner) SetOutput(output chan<- *BufferOrEvent) {
+	h.output = output
+}
+
+func (h *BarrierAligner) releaseBlocksAndResetBarriers() {
+	h.blockedChannels = make(map[string]bool)
+}
+
+func (h *BarrierAligner) beginNewAlignment(barrier *Barrier, ctx api.StreamContext) {
+	logger := ctx.GetLogger()
+	h.currentCheckpointId = barrier.CheckpointId
+	h.onBarrier(barrier.OpId, ctx)
+	logger.Debugf("Starting stream alignment for checkpoint %d", barrier.CheckpointId)
+}

+ 255 - 0
xstream/checkpoints/coordinator.go

@@ -0,0 +1,255 @@
+package checkpoints
+
+import (
+	"github.com/benbjohnson/clock"
+	"github.com/emqx/kuiper/common"
+	"github.com/emqx/kuiper/xsql"
+	"github.com/emqx/kuiper/xstream/api"
+	"time"
+)
+
+type pendingCheckpoint struct {
+	checkpointId   int64
+	isDiscarded    bool
+	notYetAckTasks map[string]bool
+}
+
+func newPendingCheckpoint(checkpointId int64, tasksToWaitFor []Responder) *pendingCheckpoint {
+	pc := &pendingCheckpoint{checkpointId: checkpointId}
+	nyat := make(map[string]bool)
+	for _, r := range tasksToWaitFor {
+		nyat[r.GetName()] = true
+	}
+	pc.notYetAckTasks = nyat
+	return pc
+}
+
+func (c *pendingCheckpoint) ack(opId string) bool {
+	if c.isDiscarded {
+		return false
+	}
+	delete(c.notYetAckTasks, opId)
+	//TODO serialize state
+	return true
+}
+
+func (c *pendingCheckpoint) isFullyAck() bool {
+	return len(c.notYetAckTasks) == 0
+}
+
+func (c *pendingCheckpoint) finalize() *completedCheckpoint {
+	ccp := &completedCheckpoint{checkpointId: c.checkpointId}
+	return ccp
+}
+
+func (c *pendingCheckpoint) dispose(releaseState bool) {
+	c.isDiscarded = true
+}
+
+type completedCheckpoint struct {
+	checkpointId int64
+}
+
+type checkpointStore struct {
+	maxNum      int
+	checkpoints []*completedCheckpoint
+}
+
+func (s *checkpointStore) add(c *completedCheckpoint) {
+	s.checkpoints = append(s.checkpoints, c)
+	if len(s.checkpoints) > s.maxNum {
+		s.checkpoints = s.checkpoints[1:]
+	}
+}
+
+func (s *checkpointStore) getLatest() *completedCheckpoint {
+	if len(s.checkpoints) > 0 {
+		return s.checkpoints[len(s.checkpoints)-1]
+	}
+	return nil
+}
+
+type Coordinator struct {
+	tasksToTrigger          []Responder
+	tasksToWaitFor          []Responder
+	pendingCheckpoints      map[int64]*pendingCheckpoint
+	completedCheckpoints    *checkpointStore
+	ruleId                  string
+	baseInterval            int
+	timeout                 int
+	advanceToEndOfEventTime bool
+	ticker                  *clock.Ticker //For processing time only
+	signal                  chan *Signal
+	store                   Store
+	ctx                     api.StreamContext
+}
+
+func NewCoordinator(ruleId string, sources []StreamTask, operators []NonSourceTask, sinks []NonSourceTask, qos xsql.Qos, store Store, interval int, ctx api.StreamContext) *Coordinator {
+	signal := make(chan *Signal, 1024)
+	var allResponders, sourceResponders []Responder
+	for _, r := range sources {
+		re := NewResponderExecutor(signal, r)
+		allResponders = append(allResponders, re)
+		sourceResponders = append(sourceResponders, re)
+	}
+	for _, r := range operators {
+		re := NewResponderExecutor(signal, r)
+		handler := createBarrierHandler(re, r.GetInputCount(), qos)
+		r.InitCheckpoint(handler, qos)
+		allResponders = append(allResponders, re)
+	}
+	for _, r := range sinks {
+		re := NewResponderExecutor(signal, r)
+		handler := NewBarrierTracker(re, r.GetInputCount())
+		r.InitCheckpoint(handler, qos)
+		allResponders = append(allResponders, re)
+	}
+	//5 minutes by default
+	if interval <= 0 {
+		interval = 5000
+	}
+	return &Coordinator{
+		tasksToTrigger:     sourceResponders,
+		tasksToWaitFor:     allResponders,
+		pendingCheckpoints: make(map[int64]*pendingCheckpoint),
+		completedCheckpoints: &checkpointStore{
+			maxNum: 3,
+		},
+		ruleId:       ruleId,
+		signal:       signal,
+		baseInterval: interval,
+		timeout:      200000,
+		store:        store,
+		ctx:          ctx,
+	}
+}
+
+func createBarrierHandler(re Responder, inputCount int, qos xsql.Qos) BarrierHandler {
+	if qos == xsql.AtLeastOnce {
+		return NewBarrierTracker(re, inputCount)
+	} else if qos == xsql.ExactlyOnce {
+		return NewBarrierAligner(re, inputCount)
+	} else {
+		return nil
+	}
+}
+
+func (c *Coordinator) Activate() error {
+	logger := c.ctx.GetLogger()
+	if c.ticker != nil {
+		c.ticker.Stop()
+	}
+	c.ticker = common.GetTicker(c.baseInterval)
+	tc := c.ticker.C
+	go func() {
+		for {
+			select {
+			case <-tc:
+				//trigger checkpoint
+				//TODO pose max attempt and min pause check for consequent pendingCheckpoints
+
+				// TODO Check if all tasks are running
+
+				//Create a pending checkpoint
+				checkpointId := common.GetNowInMilli()
+				checkpoint := newPendingCheckpoint(checkpointId, c.tasksToWaitFor)
+				logger.Debugf("Create checkpoint %d", checkpointId)
+				c.pendingCheckpoints[checkpointId] = checkpoint
+				//Let the sources send out a barrier
+				for _, r := range c.tasksToTrigger {
+					go func() {
+						if err := r.TriggerCheckpoint(checkpointId); err != nil {
+							logger.Infof("Fail to trigger checkpoint for source %s with error %v", r.GetName(), err)
+							c.cancel(checkpointId)
+						} else {
+							time.Sleep(time.Duration(c.timeout) * time.Microsecond)
+							c.cancel(checkpointId)
+						}
+					}()
+				}
+			case s := <-c.signal:
+				switch s.Message {
+				case STOP:
+					logger.Debug("Stop checkpoint scheduler")
+					if c.ticker != nil {
+						c.ticker.Stop()
+					}
+					return
+				case ACK:
+					logger.Debugf("Receive ack from %s for checkpoint %d", s.OpId, s.CheckpointId)
+					if checkpoint, ok := c.pendingCheckpoints[s.CheckpointId]; ok {
+						checkpoint.ack(s.OpId)
+						if checkpoint.isFullyAck() {
+							c.complete(s.CheckpointId)
+						}
+					} else {
+						logger.Debugf("Receive ack from %s for non existing checkpoint %d", s.OpId, s.CheckpointId)
+					}
+				case DEC:
+					logger.Debugf("Receive dec from %s for checkpoint %d", s.OpId, s.CheckpointId)
+					c.cancel(s.CheckpointId)
+				}
+			case <-c.ctx.Done():
+				logger.Infoln("Cancelling coordinator....")
+				if c.ticker != nil {
+					c.ticker.Stop()
+				}
+				return
+			}
+		}
+	}()
+	return nil
+}
+
+func (c *Coordinator) Deactivate() error {
+	if c.ticker != nil {
+		c.ticker.Stop()
+	}
+	c.signal <- &Signal{Message: STOP}
+	return nil
+}
+
+func (c *Coordinator) cancel(checkpointId int64) {
+	logger := c.ctx.GetLogger()
+	if checkpoint, ok := c.pendingCheckpoints[checkpointId]; ok {
+		delete(c.pendingCheckpoints, checkpointId)
+		checkpoint.dispose(true)
+	} else {
+		logger.Debugf("Cancel for non existing checkpoint %d. Just ignored", checkpointId)
+	}
+}
+
+func (c *Coordinator) complete(checkpointId int64) {
+	logger := c.ctx.GetLogger()
+
+	if ccp, ok := c.pendingCheckpoints[checkpointId]; ok {
+		err := c.store.SaveCheckpoint(checkpointId)
+		if err != nil {
+			logger.Infof("Cannot save checkpoint %d due to storage error: %v", checkpointId, err)
+			//TODO handle checkpoint error
+			return
+		}
+		c.completedCheckpoints.add(ccp.finalize())
+		delete(c.pendingCheckpoints, checkpointId)
+		//Drop the previous pendingCheckpoints
+		for cid, cp := range c.pendingCheckpoints {
+			if cid < checkpointId {
+				//TODO revisit how to abort a checkpoint, discard callback
+				cp.isDiscarded = true
+				delete(c.pendingCheckpoints, cid)
+			}
+		}
+		logger.Debugf("Totally complete checkpoint %d", checkpointId)
+	} else {
+		logger.Infof("Cannot find checkpoint %d to complete", checkpointId)
+	}
+}
+
+//For testing
+func (c *Coordinator) GetCompleteCount() int {
+	return len(c.completedCheckpoints.checkpoints)
+}
+
+func (c *Coordinator) GetLatest() int64 {
+	return c.completedCheckpoints.getLatest().checkpointId
+}

+ 44 - 0
xstream/checkpoints/def.go

@@ -0,0 +1,44 @@
+package checkpoints
+
+import (
+	"github.com/emqx/kuiper/xsql"
+	"github.com/emqx/kuiper/xstream/api"
+)
+
+type StreamTask interface {
+	Broadcast(data interface{}) error
+	GetName() string
+	GetStreamContext() api.StreamContext
+}
+
+type NonSourceTask interface {
+	StreamTask
+	GetInputCount() int
+	AddInputCount()
+
+	InitCheckpoint(BarrierHandler, xsql.Qos)
+}
+
+type BufferOrEvent struct {
+	Data      interface{}
+	Channel   string
+	Processed bool
+}
+
+type Message int
+
+const (
+	STOP Message = iota
+	ACK
+	DEC
+)
+
+type Signal struct {
+	Message Message
+	Barrier
+}
+
+type Barrier struct {
+	CheckpointId int64
+	OpId         string
+}

+ 53 - 0
xstream/checkpoints/responder.go

@@ -0,0 +1,53 @@
+package checkpoints
+
+type Responder interface {
+	TriggerCheckpoint(checkpointId int64) error
+	GetName() string
+}
+
+type ResponderExecutor struct {
+	responder chan<- *Signal
+	task      StreamTask
+}
+
+func NewResponderExecutor(responder chan<- *Signal, task StreamTask) *ResponderExecutor {
+	return &ResponderExecutor{
+		responder: responder,
+		task:      task,
+	}
+}
+
+func (re *ResponderExecutor) GetName() string {
+	return re.task.GetName()
+}
+
+func (re *ResponderExecutor) TriggerCheckpoint(checkpointId int64) error {
+	ctx := re.task.GetStreamContext()
+	logger := ctx.GetLogger()
+	name := re.GetName()
+	logger.Debugf("Starting checkpoint %d on task %s", checkpointId, name)
+	//create
+	barrier := &Barrier{
+		CheckpointId: checkpointId,
+		OpId:         name,
+	}
+	//broadcast barrier
+	re.task.Broadcast(barrier)
+	//Save key state to the global state
+	go func() {
+		state := ACK
+		err := ctx.SaveState(checkpointId)
+		if err != nil {
+			logger.Infof("save checkpoint error %s", err)
+			state = DEC
+		}
+
+		signal := &Signal{
+			Message: state,
+			Barrier: Barrier{CheckpointId: checkpointId, OpId: name},
+		}
+		re.responder <- signal
+		logger.Debugf("Complete checkpoint %d on task %s", checkpointId, name)
+	}()
+	return nil
+}

+ 50 - 0
xstream/checkpoints/store.go

@@ -0,0 +1,50 @@
+package checkpoints
+
+import (
+	"encoding/gob"
+	"github.com/emqx/kuiper/common"
+	"sync"
+)
+
+func init() {
+	gob.Register(map[string]interface{}{})
+}
+
+//The manager for checkpoint storage. Right now, only support to store in badgerDB
+type Store interface {
+	SaveState(checkpointId int64, opId string, state map[string]interface{}) error
+	RestoreState(opId string) map[string]interface{} //Get the state of an op, should only be used in initialization
+	SaveCheckpoint(checkpointId int64) error         //Save the whole checkpoint state into storage like badger
+}
+
+type KVStore struct {
+	db          common.KeyValue
+	mapStore    *sync.Map
+	checkpoints []int64
+	max         int
+}
+
+//Store in path ./data/checkpoint/$ruleId
+//Store 2 things:
+//A queue for completed checkpoint id
+//A map with key of checkpoint id and value of snapshot(gob serialized)
+//The snapshot is a map also with key of opId and value of map
+//Assume each operator only has one instance
+func GetKVStore(ruleId string) *KVStore {
+	db := common.GetSimpleKVStore("checkpoint/" + ruleId)
+	s := &KVStore{db: db, max: 3}
+	s.mapStore = &sync.Map{}
+	return s
+}
+
+func (s *KVStore) SaveState(checkpointId int64, opId string, state map[string]interface{}) error {
+	return nil
+}
+
+func (s *KVStore) RestoreState(opId string) map[string]interface{} {
+	return nil
+}
+
+func (s *KVStore) SaveCheckpoint(checkpointId int64) error {
+	return nil
+}

+ 6 - 0
xstream/contexts/default.go

@@ -2,6 +2,7 @@ package contexts
 
 
 import (
 import (
 	"context"
 	"context"
+	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xstream/api"
 	"github.com/emqx/kuiper/xstream/api"
 	"github.com/emqx/kuiper/xstream/states"
 	"github.com/emqx/kuiper/xstream/states"
@@ -134,3 +135,8 @@ func (c *DefaultContext) GetState(key string) (interface{}, error) {
 func (c *DefaultContext) DeleteState(key string) error {
 func (c *DefaultContext) DeleteState(key string) error {
 	return c.state.DeleteState(key)
 	return c.state.DeleteState(key)
 }
 }
+
+func (c *DefaultContext) SaveState(checkpointId int64) error {
+	fmt.Printf("placeholder for context save state for checkpoint %d", checkpointId)
+	return nil
+}

+ 8 - 8
xstream/funcs.go

@@ -4,7 +4,7 @@ import (
 	"context"
 	"context"
 	"fmt"
 	"fmt"
 	"github.com/emqx/kuiper/xstream/api"
 	"github.com/emqx/kuiper/xstream/api"
-	"github.com/emqx/kuiper/xstream/operators"
+	"github.com/emqx/kuiper/xstream/nodes"
 	"reflect"
 	"reflect"
 )
 )
 
 
@@ -22,7 +22,7 @@ const (
 //   func(T) R
 //   func(T) R
 //   where T is the type of incoming item
 //   where T is the type of incoming item
 //   R the type of returned processed item
 //   R the type of returned processed item
-func ProcessFunc(f interface{}) (operators.UnFunc, error) {
+func ProcessFunc(f interface{}) (nodes.UnFunc, error) {
 	fntype := reflect.TypeOf(f)
 	fntype := reflect.TypeOf(f)
 
 
 	funcForm, err := isUnaryFuncForm(fntype)
 	funcForm, err := isUnaryFuncForm(fntype)
@@ -35,7 +35,7 @@ func ProcessFunc(f interface{}) (operators.UnFunc, error) {
 
 
 	fnval := reflect.ValueOf(f)
 	fnval := reflect.ValueOf(f)
 
 
-	return operators.UnFunc(func(ctx api.StreamContext, data interface{}) interface{} {
+	return nodes.UnFunc(func(ctx api.StreamContext, data interface{}) interface{} {
 		result := callOpFunc(fnval, ctx, data, funcForm)
 		result := callOpFunc(fnval, ctx, data, funcForm)
 		return result.Interface()
 		return result.Interface()
 	}), nil
 	}), nil
@@ -47,7 +47,7 @@ func ProcessFunc(f interface{}) (operators.UnFunc, error) {
 //   func(T)bool - where T is the type of incoming data item, bool is the value of the predicate
 //   func(T)bool - where T is the type of incoming data item, bool is the value of the predicate
 // When the user-defined function returns false, the current processed data item will not
 // When the user-defined function returns false, the current processed data item will not
 // be placed in the downstream processing.
 // be placed in the downstream processing.
-func FilterFunc(f interface{}) (operators.UnFunc, error) {
+func FilterFunc(f interface{}) (nodes.UnFunc, error) {
 	fntype := reflect.TypeOf(f)
 	fntype := reflect.TypeOf(f)
 
 
 	funcForm, err := isUnaryFuncForm(fntype)
 	funcForm, err := isUnaryFuncForm(fntype)
@@ -64,7 +64,7 @@ func FilterFunc(f interface{}) (operators.UnFunc, error) {
 	}
 	}
 
 
 	fnval := reflect.ValueOf(f)
 	fnval := reflect.ValueOf(f)
-	return operators.UnFunc(func(ctx api.StreamContext, data interface{}) interface{} {
+	return nodes.UnFunc(func(ctx api.StreamContext, data interface{}) interface{} {
 		result := callOpFunc(fnval, ctx, data, funcForm)
 		result := callOpFunc(fnval, ctx, data, funcForm)
 		predicate := result.Bool()
 		predicate := result.Bool()
 		if !predicate {
 		if !predicate {
@@ -78,7 +78,7 @@ func FilterFunc(f interface{}) (operators.UnFunc, error) {
 // maps, one-to-one, the incomfing value to a new value.  The user-defined function
 // maps, one-to-one, the incomfing value to a new value.  The user-defined function
 // must be of type:
 // must be of type:
 //   func(T) R - where T is the incoming item, R is the type of the returned mapped item
 //   func(T) R - where T is the incoming item, R is the type of the returned mapped item
-func MapFunc(f interface{}) (operators.UnFunc, error) {
+func MapFunc(f interface{}) (nodes.UnFunc, error) {
 	return ProcessFunc(f)
 	return ProcessFunc(f)
 }
 }
 
 
@@ -88,7 +88,7 @@ func MapFunc(f interface{}) (operators.UnFunc, error) {
 //   func (T) R - where R is the original item, R is a slice of decostructed items
 //   func (T) R - where R is the original item, R is a slice of decostructed items
 // The slice returned should be restreamed by placing each item onto the stream for
 // The slice returned should be restreamed by placing each item onto the stream for
 // downstream processing.
 // downstream processing.
-func FlatMapFunc(f interface{}) (operators.UnFunc, error) {
+func FlatMapFunc(f interface{}) (nodes.UnFunc, error) {
 	fntype := reflect.TypeOf(f)
 	fntype := reflect.TypeOf(f)
 
 
 	funcForm, err := isUnaryFuncForm(fntype)
 	funcForm, err := isUnaryFuncForm(fntype)
@@ -104,7 +104,7 @@ func FlatMapFunc(f interface{}) (operators.UnFunc, error) {
 	}
 	}
 
 
 	fnval := reflect.ValueOf(f)
 	fnval := reflect.ValueOf(f)
-	return operators.UnFunc(func(ctx api.StreamContext, data interface{}) interface{} {
+	return nodes.UnFunc(func(ctx api.StreamContext, data interface{}) interface{} {
 		result := callOpFunc(fnval, ctx, data, funcForm)
 		result := callOpFunc(fnval, ctx, data, funcForm)
 		return result.Interface()
 		return result.Interface()
 	}), nil
 	}), nil

+ 0 - 22
xstream/nodes/common_func.go

@@ -1,22 +0,0 @@
-package nodes
-
-import (
-	"github.com/emqx/kuiper/xstream/api"
-	"sync"
-)
-
-//Blocking broadcast
-func Broadcast(outputs map[string]chan<- interface{}, val interface{}, ctx api.StreamContext) {
-	logger := ctx.GetLogger()
-	var wg sync.WaitGroup
-	wg.Add(len(outputs))
-	for n, out := range outputs {
-		go func(output chan<- interface{}) {
-			output <- val
-			wg.Done()
-			logger.Debugf("broadcast from %s to %s done", ctx.GetOpId(), n)
-		}(out)
-	}
-	logger.Debugf("broadcasting from %s", ctx.GetOpId())
-	wg.Wait()
-}

+ 125 - 0
xstream/nodes/node.go

@@ -0,0 +1,125 @@
+package nodes
+
+import (
+	"fmt"
+	"github.com/emqx/kuiper/xsql"
+	"github.com/emqx/kuiper/xstream/api"
+	"github.com/emqx/kuiper/xstream/checkpoints"
+	"sync"
+)
+
+type OperatorNode interface {
+	api.Operator
+	Broadcast(data interface{}) error
+	GetStreamContext() api.StreamContext
+	GetInputCount() int
+	AddInputCount()
+	InitCheckpoint(checkpoints.BarrierHandler, xsql.Qos)
+}
+
+type defaultNode struct {
+	name         string
+	outputs      map[string]chan<- interface{}
+	concurrency  int
+	statManagers []StatManager
+	ctx          api.StreamContext
+}
+
+func (o *defaultNode) AddOutput(output chan<- interface{}, name string) error {
+	if _, ok := o.outputs[name]; !ok {
+		o.outputs[name] = output
+	} else {
+		return fmt.Errorf("fail to add output %s, node %s already has an output of the same name", name, o.name)
+	}
+	return nil
+}
+
+func (o *defaultNode) GetName() string {
+	return o.name
+}
+
+// SetConcurrency sets the concurrency level for the operation
+func (o *defaultNode) SetConcurrency(concurr int) {
+	o.concurrency = concurr
+	if o.concurrency < 1 {
+		o.concurrency = 1
+	}
+}
+
+func (o *defaultNode) GetMetrics() (result [][]interface{}) {
+	for _, stats := range o.statManagers {
+		result = append(result, stats.GetMetrics())
+	}
+	return result
+}
+
+func (o *defaultNode) Broadcast(val interface{}) error {
+	logger := o.ctx.GetLogger()
+	var wg sync.WaitGroup
+	wg.Add(len(o.outputs))
+	for n, out := range o.outputs {
+		go func(output chan<- interface{}) {
+			output <- val
+			wg.Done()
+			logger.Debugf("broadcast from %s to %s done", o.ctx.GetOpId(), n)
+		}(out)
+	}
+	logger.Debugf("broadcasting from %s", o.ctx.GetOpId())
+	wg.Wait()
+	return nil
+}
+
+func (o *defaultNode) GetStreamContext() api.StreamContext {
+	return o.ctx
+}
+
+type defaultSinkNode struct {
+	*defaultNode
+	input          chan interface{}
+	barrierHandler checkpoints.BarrierHandler
+	inputCount     int
+	qos            xsql.Qos
+}
+
+func (o *defaultSinkNode) GetInput() (chan<- interface{}, string) {
+	return o.input, o.name
+}
+
+func (o *defaultSinkNode) GetInputCount() int {
+	return o.inputCount
+}
+
+func (o *defaultSinkNode) AddInputCount() {
+	o.inputCount++
+}
+
+func (o *defaultSinkNode) InitCheckpoint(bh checkpoints.BarrierHandler, q xsql.Qos) {
+	o.barrierHandler = bh
+	o.qos = q
+}
+
+// return the data and if processed
+func (o *defaultSinkNode) preprocess(data interface{}) (interface{}, bool) {
+	if o.qos >= xsql.AtLeastOnce {
+		b := data.(*checkpoints.BufferOrEvent)
+		//if it is barrier return true and ignore the further processing
+		//if it is blocked(align handler), return true and then write back to the channel later
+		if o.barrierHandler.Process(b, o.ctx) {
+			return b.Data, false
+		} else {
+			return nil, true
+		}
+	}
+	return data, false
+}
+
+func (o *defaultSinkNode) Broadcast(val interface{}) error {
+	if o.qos >= xsql.AtLeastOnce {
+		boe := &checkpoints.BufferOrEvent{
+			Data:    val,
+			Channel: o.name,
+		}
+		return o.defaultNode.Broadcast(boe)
+	}
+	return o.defaultNode.Broadcast(val)
+}

+ 19 - 53
xstream/operators/operations.go

@@ -1,10 +1,9 @@
-package operators
+package nodes
 
 
 import (
 import (
 	"fmt"
 	"fmt"
 	"github.com/emqx/kuiper/xsql"
 	"github.com/emqx/kuiper/xsql"
 	"github.com/emqx/kuiper/xstream/api"
 	"github.com/emqx/kuiper/xstream/api"
-	"github.com/emqx/kuiper/xstream/nodes"
 	"sync"
 	"sync"
 )
 )
 
 
@@ -22,30 +21,24 @@ func (f UnFunc) Apply(ctx api.StreamContext, data interface{}) interface{} {
 }
 }
 
 
 type UnaryOperator struct {
 type UnaryOperator struct {
-	op           UnOperation
-	concurrency  int
-	input        chan interface{}
-	outputs      map[string]chan<- interface{}
-	mutex        sync.RWMutex
-	cancelled    bool
-	name         string
-	statManagers []nodes.StatManager
+	*defaultSinkNode
+	op        UnOperation
+	mutex     sync.RWMutex
+	cancelled bool
 }
 }
 
 
 // NewUnary creates *UnaryOperator value
 // NewUnary creates *UnaryOperator value
 func New(name string, bufferLength int) *UnaryOperator {
 func New(name string, bufferLength int) *UnaryOperator {
-	// extract logger
-	o := new(UnaryOperator)
-
-	o.concurrency = 1
-	o.input = make(chan interface{}, bufferLength)
-	o.outputs = make(map[string]chan<- interface{})
-	o.name = name
-	return o
-}
-
-func (o *UnaryOperator) GetName() string {
-	return o.name
+	return &UnaryOperator{
+		defaultSinkNode: &defaultSinkNode{
+			input: make(chan interface{}, bufferLength),
+			defaultNode: &defaultNode{
+				name:        name,
+				outputs:     make(map[string]chan<- interface{}),
+				concurrency: 1,
+			},
+		},
+	}
 }
 }
 
 
 // SetOperation sets the executor operation
 // SetOperation sets the executor operation
@@ -53,29 +46,9 @@ func (o *UnaryOperator) SetOperation(op UnOperation) {
 	o.op = op
 	o.op = op
 }
 }
 
 
-// SetConcurrency sets the concurrency level for the operation
-func (o *UnaryOperator) SetConcurrency(concurr int) {
-	o.concurrency = concurr
-	if o.concurrency < 1 {
-		o.concurrency = 1
-	}
-}
-
-func (o *UnaryOperator) AddOutput(output chan<- interface{}, name string) error {
-	if _, ok := o.outputs[name]; !ok {
-		o.outputs[name] = output
-	} else {
-		return fmt.Errorf("fail to add output %s, operator %s already has an output of the same name", name, o.name)
-	}
-	return nil
-}
-
-func (o *UnaryOperator) GetInput() (chan<- interface{}, string) {
-	return o.input, o.name
-}
-
 // Exec is the entry point for the executor
 // Exec is the entry point for the executor
 func (o *UnaryOperator) Exec(ctx api.StreamContext, errCh chan<- error) {
 func (o *UnaryOperator) Exec(ctx api.StreamContext, errCh chan<- error) {
+	o.ctx = ctx
 	log := ctx.GetLogger()
 	log := ctx.GetLogger()
 	log.Debugf("Unary operator %s is started", o.name)
 	log.Debugf("Unary operator %s is started", o.name)
 
 
@@ -110,7 +83,7 @@ func (o *UnaryOperator) doOp(ctx api.StreamContext, errCh chan<- error) {
 		cancel()
 		cancel()
 	}()
 	}()
 
 
-	stats, err := nodes.NewStatManager("op", ctx)
+	stats, err := NewStatManager("op", ctx)
 	if err != nil {
 	if err != nil {
 		select {
 		select {
 		case errCh <- err:
 		case errCh <- err:
@@ -141,12 +114,12 @@ func (o *UnaryOperator) doOp(ctx api.StreamContext, errCh chan<- error) {
 				continue
 				continue
 			case error:
 			case error:
 				logger.Errorf("Operation %s error: %s", ctx.GetOpId(), val)
 				logger.Errorf("Operation %s error: %s", ctx.GetOpId(), val)
-				nodes.Broadcast(o.outputs, val, ctx)
+				o.Broadcast(val)
 				stats.IncTotalExceptions()
 				stats.IncTotalExceptions()
 				continue
 				continue
 			default:
 			default:
 				stats.ProcessTimeEnd()
 				stats.ProcessTimeEnd()
-				nodes.Broadcast(o.outputs, val, ctx)
+				o.Broadcast(val)
 				stats.IncTotalRecordsOut()
 				stats.IncTotalRecordsOut()
 				stats.SetBufferLength(int64(len(o.input)))
 				stats.SetBufferLength(int64(len(o.input)))
 			}
 			}
@@ -161,10 +134,3 @@ func (o *UnaryOperator) doOp(ctx api.StreamContext, errCh chan<- error) {
 		}
 		}
 	}
 	}
 }
 }
-
-func (m *UnaryOperator) GetMetrics() (result [][]interface{}) {
-	for _, stats := range m.statManagers {
-		result = append(result, stats.GetMetrics())
-	}
-	return result
-}

+ 31 - 32
xstream/nodes/sink_node.go

@@ -15,19 +15,15 @@ import (
 )
 )
 
 
 type SinkNode struct {
 type SinkNode struct {
+	*defaultSinkNode
 	//static
 	//static
-	input    chan interface{}
-	name     string
 	sinkType string
 	sinkType string
 	mutex    sync.RWMutex
 	mutex    sync.RWMutex
 	//configs (also static for sinks)
 	//configs (also static for sinks)
-	concurrency int
-	options     map[string]interface{}
-	isMock      bool
+	options map[string]interface{}
+	isMock  bool
 	//states varies after restart
 	//states varies after restart
-	ctx          api.StreamContext
-	statManagers []StatManager
-	sinks        []api.Sink
+	sinks []api.Sink
 }
 }
 
 
 func NewSinkNode(name string, sinkType string, props map[string]interface{}) *SinkNode {
 func NewSinkNode(name string, sinkType string, props map[string]interface{}) *SinkNode {
@@ -40,25 +36,33 @@ func NewSinkNode(name string, sinkType string, props map[string]interface{}) *Si
 		}
 		}
 	}
 	}
 	return &SinkNode{
 	return &SinkNode{
-		input:       make(chan interface{}, bufferLength),
-		name:        name,
-		sinkType:    sinkType,
-		options:     props,
-		concurrency: 1,
-		ctx:         nil,
+		defaultSinkNode: &defaultSinkNode{
+			input: make(chan interface{}, bufferLength),
+			defaultNode: &defaultNode{
+				name:        name,
+				concurrency: 1,
+				ctx:         nil,
+			},
+		},
+		sinkType: sinkType,
+		options:  props,
 	}
 	}
 }
 }
 
 
 //Only for mock source, do not use it in production
 //Only for mock source, do not use it in production
 func NewSinkNodeWithSink(name string, sink api.Sink, props map[string]interface{}) *SinkNode {
 func NewSinkNodeWithSink(name string, sink api.Sink, props map[string]interface{}) *SinkNode {
 	return &SinkNode{
 	return &SinkNode{
-		input:       make(chan interface{}, 1024),
-		name:        name,
-		sinks:       []api.Sink{sink},
-		options:     props,
-		concurrency: 1,
-		ctx:         nil,
-		isMock:      true,
+		defaultSinkNode: &defaultSinkNode{
+			input: make(chan interface{}, 1024),
+			defaultNode: &defaultNode{
+				name:        name,
+				concurrency: 1,
+				ctx:         nil,
+			},
+		},
+		sinks:   []api.Sink{sink},
+		options: props,
+		isMock:  true,
 	}
 	}
 }
 }
 
 
@@ -327,19 +331,14 @@ func doGetSink(name string, action map[string]interface{}) (api.Sink, error) {
 	return s, nil
 	return s, nil
 }
 }
 
 
-func (m *SinkNode) GetName() string {
-	return m.name
+//Override defaultNode
+func (m *SinkNode) AddOutput(output chan<- interface{}, name string) error {
+	return fmt.Errorf("fail to add output %s, sink %s cannot add output", name, m.name)
 }
 }
 
 
-func (m *SinkNode) GetInput() (chan<- interface{}, string) {
-	return m.input, m.name
-}
-
-func (m *SinkNode) GetMetrics() (result [][]interface{}) {
-	for _, stats := range m.statManagers {
-		result = append(result, stats.GetMetrics())
-	}
-	return result
+//Override defaultNode
+func (m *SinkNode) Broadcast(val interface{}) error {
+	return fmt.Errorf("sink %s cannot add broadcast", m.name)
 }
 }
 
 
 func (m *SinkNode) drainError(errCh chan<- error, err error, ctx api.StreamContext, logger api.Logger) {
 func (m *SinkNode) drainError(errCh chan<- error, err error, ctx api.StreamContext, logger api.Logger) {

+ 22 - 45
xstream/nodes/source_node.go

@@ -1,7 +1,6 @@
 package nodes
 package nodes
 
 
 import (
 import (
-	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/plugins"
 	"github.com/emqx/kuiper/plugins"
 	"github.com/emqx/kuiper/xsql"
 	"github.com/emqx/kuiper/xsql"
@@ -12,17 +11,13 @@ import (
 )
 )
 
 
 type SourceNode struct {
 type SourceNode struct {
-	sourceType  string
-	outs        map[string]chan<- interface{}
-	name        string
-	ctx         api.StreamContext
-	options     map[string]string
-	concurrency int
-	isMock      bool
+	*defaultNode
+	sourceType string
+	options    map[string]string
+	isMock     bool
 
 
-	mutex        sync.RWMutex
-	sources      []api.Source
-	statManagers []StatManager
+	mutex   sync.RWMutex
+	sources []api.Source
 }
 }
 
 
 func NewSourceNode(name string, options map[string]string) *SourceNode {
 func NewSourceNode(name string, options map[string]string) *SourceNode {
@@ -31,25 +26,27 @@ func NewSourceNode(name string, options map[string]string) *SourceNode {
 		t = "mqtt"
 		t = "mqtt"
 	}
 	}
 	return &SourceNode{
 	return &SourceNode{
-		sourceType:  t,
-		outs:        make(map[string]chan<- interface{}),
-		name:        name,
-		options:     options,
-		ctx:         nil,
-		concurrency: 1,
+		sourceType: t,
+		defaultNode: &defaultNode{
+			name:        name,
+			outputs:     make(map[string]chan<- interface{}),
+			concurrency: 1,
+		},
+		options: options,
 	}
 	}
 }
 }
 
 
 //Only for mock source, do not use it in production
 //Only for mock source, do not use it in production
 func NewSourceNodeWithSource(name string, source api.Source, options map[string]string) *SourceNode {
 func NewSourceNodeWithSource(name string, source api.Source, options map[string]string) *SourceNode {
 	return &SourceNode{
 	return &SourceNode{
-		sources:     []api.Source{source},
-		outs:        make(map[string]chan<- interface{}),
-		name:        name,
-		options:     options,
-		ctx:         nil,
-		concurrency: 1,
-		isMock:      true,
+		sources: []api.Source{source},
+		defaultNode: &defaultNode{
+			name:        name,
+			outputs:     make(map[string]chan<- interface{}),
+			concurrency: 1,
+		},
+		options: options,
+		isMock:  true,
 	}
 	}
 }
 }
 
 
@@ -128,7 +125,7 @@ func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
 						tuple := &xsql.Tuple{Emitter: m.name, Message: data.Message(), Timestamp: common.GetNowInMilli(), Metadata: data.Meta()}
 						tuple := &xsql.Tuple{Emitter: m.name, Message: data.Message(), Timestamp: common.GetNowInMilli(), Metadata: data.Meta()}
 						stats.ProcessTimeEnd()
 						stats.ProcessTimeEnd()
 						//blocking
 						//blocking
-						Broadcast(m.outs, tuple, ctx)
+						m.Broadcast(tuple)
 						stats.IncTotalRecordsOut()
 						stats.IncTotalRecordsOut()
 						stats.SetBufferLength(int64(buffer.GetLength()))
 						stats.SetBufferLength(int64(buffer.GetLength()))
 						logger.Debugf("%s consume data %v complete", m.name, tuple)
 						logger.Debugf("%s consume data %v complete", m.name, tuple)
@@ -219,23 +216,3 @@ func (m *SourceNode) getConf(ctx api.StreamContext) map[string]interface{} {
 	logger.Debugf("get conf for %s with conf key %s: %v", m.sourceType, confkey, props)
 	logger.Debugf("get conf for %s with conf key %s: %v", m.sourceType, confkey, props)
 	return props
 	return props
 }
 }
-
-func (m *SourceNode) GetName() string {
-	return m.name
-}
-
-func (m *SourceNode) AddOutput(output chan<- interface{}, name string) (err error) {
-	if _, ok := m.outs[name]; !ok {
-		m.outs[name] = output
-	} else {
-		return fmt.Errorf("fail to add output %s, stream node %s already has an output of the same name", name, m.name)
-	}
-	return nil
-}
-
-func (m *SourceNode) GetMetrics() (result [][]interface{}) {
-	for _, stats := range m.statManagers {
-		result = append(result, stats.GetMetrics())
-	}
-	return result
-}

+ 3 - 4
xstream/operators/watermark.go

@@ -1,4 +1,4 @@
-package operators
+package nodes
 
 
 import (
 import (
 	"context"
 	"context"
@@ -7,7 +7,6 @@ import (
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xsql"
 	"github.com/emqx/kuiper/xsql"
 	"github.com/emqx/kuiper/xstream/api"
 	"github.com/emqx/kuiper/xstream/api"
-	"github.com/emqx/kuiper/xstream/nodes"
 	"math"
 	"math"
 	"sort"
 	"sort"
 	"time"
 	"time"
@@ -208,7 +207,7 @@ func (o *WindowOperator) execEventWindow(ctx api.StreamContext, errCh chan<- err
 			switch d := item.(type) {
 			switch d := item.(type) {
 			case error:
 			case error:
 				o.statManager.IncTotalRecordsIn()
 				o.statManager.IncTotalRecordsIn()
-				nodes.Broadcast(o.outputs, d, ctx)
+				o.Broadcast(d)
 				o.statManager.IncTotalExceptions()
 				o.statManager.IncTotalExceptions()
 			case xsql.Event:
 			case xsql.Event:
 				if d.IsWatermark() {
 				if d.IsWatermark() {
@@ -242,7 +241,7 @@ func (o *WindowOperator) execEventWindow(ctx api.StreamContext, errCh chan<- err
 				o.statManager.ProcessTimeEnd()
 				o.statManager.ProcessTimeEnd()
 			default:
 			default:
 				o.statManager.IncTotalRecordsIn()
 				o.statManager.IncTotalRecordsIn()
-				nodes.Broadcast(o.outputs, fmt.Errorf("run Window error: expect xsql.Event type but got %[1]T(%[1]v)", d), ctx)
+				o.Broadcast(fmt.Errorf("run Window error: expect xsql.Event type but got %[1]T(%[1]v)", d))
 				o.statManager.IncTotalExceptions()
 				o.statManager.IncTotalExceptions()
 			}
 			}
 		// is cancelling
 		// is cancelling

+ 16 - 31
xstream/operators/window_op.go

@@ -1,4 +1,4 @@
-package operators
+package nodes
 
 
 import (
 import (
 	"fmt"
 	"fmt"
@@ -6,7 +6,6 @@ import (
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xsql"
 	"github.com/emqx/kuiper/xsql"
 	"github.com/emqx/kuiper/xstream/api"
 	"github.com/emqx/kuiper/xstream/api"
-	"github.com/emqx/kuiper/xstream/nodes"
 	"math"
 	"math"
 	"time"
 	"time"
 )
 )
@@ -18,15 +17,13 @@ type WindowConfig struct {
 }
 }
 
 
 type WindowOperator struct {
 type WindowOperator struct {
-	input              chan interface{}
-	outputs            map[string]chan<- interface{}
-	name               string
+	*defaultSinkNode
 	ticker             *clock.Ticker //For processing time only
 	ticker             *clock.Ticker //For processing time only
 	window             *WindowConfig
 	window             *WindowConfig
 	interval           int
 	interval           int
 	triggerTime        int64
 	triggerTime        int64
 	isEventTime        bool
 	isEventTime        bool
-	statManager        nodes.StatManager
+	statManager        StatManager
 	watermarkGenerator *WatermarkGenerator //For event time only
 	watermarkGenerator *WatermarkGenerator //For event time only
 	msgCount           int
 	msgCount           int
 }
 }
@@ -34,9 +31,13 @@ type WindowOperator struct {
 func NewWindowOp(name string, w *xsql.Window, isEventTime bool, lateTolerance int64, streams []string, bufferLength int) (*WindowOperator, error) {
 func NewWindowOp(name string, w *xsql.Window, isEventTime bool, lateTolerance int64, streams []string, bufferLength int) (*WindowOperator, error) {
 	o := new(WindowOperator)
 	o := new(WindowOperator)
 
 
-	o.input = make(chan interface{}, bufferLength)
-	o.outputs = make(map[string]chan<- interface{})
-	o.name = name
+	o.defaultSinkNode = &defaultSinkNode{
+		input: make(chan interface{}, bufferLength),
+		defaultNode: &defaultNode{
+			outputs: make(map[string]chan<- interface{}),
+			name:    name,
+		},
+	}
 	o.isEventTime = isEventTime
 	o.isEventTime = isEventTime
 	if w != nil {
 	if w != nil {
 		o.window = &WindowConfig{
 		o.window = &WindowConfig{
@@ -85,27 +86,11 @@ func NewWindowOp(name string, w *xsql.Window, isEventTime bool, lateTolerance in
 	return o, nil
 	return o, nil
 }
 }
 
 
-func (o *WindowOperator) GetName() string {
-	return o.name
-}
-
-func (o *WindowOperator) AddOutput(output chan<- interface{}, name string) error {
-	if _, ok := o.outputs[name]; !ok {
-		o.outputs[name] = output
-	} else {
-		return fmt.Errorf("fail to add output %s, operator %s already has an output of the same name", name, o.name)
-	}
-	return nil
-}
-
-func (o *WindowOperator) GetInput() (chan<- interface{}, string) {
-	return o.input, o.name
-}
-
 // Exec is the entry point for the executor
 // Exec is the entry point for the executor
 // input: *xsql.Tuple from preprocessor
 // input: *xsql.Tuple from preprocessor
 // output: xsql.WindowTuplesSet
 // output: xsql.WindowTuplesSet
 func (o *WindowOperator) Exec(ctx api.StreamContext, errCh chan<- error) {
 func (o *WindowOperator) Exec(ctx api.StreamContext, errCh chan<- error) {
+	o.ctx = ctx
 	log := ctx.GetLogger()
 	log := ctx.GetLogger()
 	log.Debugf("Window operator %s is started", o.name)
 	log.Debugf("Window operator %s is started", o.name)
 
 
@@ -113,7 +98,7 @@ func (o *WindowOperator) Exec(ctx api.StreamContext, errCh chan<- error) {
 		go func() { errCh <- fmt.Errorf("no output channel found") }()
 		go func() { errCh <- fmt.Errorf("no output channel found") }()
 		return
 		return
 	}
 	}
-	stats, err := nodes.NewStatManager("op", ctx)
+	stats, err := NewStatManager("op", ctx)
 	if err != nil {
 	if err != nil {
 		go func() { errCh <- err }()
 		go func() { errCh <- err }()
 		return
 		return
@@ -151,7 +136,7 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, errCh chan<
 			}
 			}
 			switch d := item.(type) {
 			switch d := item.(type) {
 			case error:
 			case error:
-				nodes.Broadcast(o.outputs, d, ctx)
+				o.Broadcast(d)
 				o.statManager.IncTotalExceptions()
 				o.statManager.IncTotalExceptions()
 			case *xsql.Tuple:
 			case *xsql.Tuple:
 				log.Debugf("Event window receive tuple %s", d.Message)
 				log.Debugf("Event window receive tuple %s", d.Message)
@@ -187,7 +172,7 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, errCh chan<
 							tsets := tl.nextCountWindow()
 							tsets := tl.nextCountWindow()
 							log.Debugf("Sent: %v", tsets)
 							log.Debugf("Sent: %v", tsets)
 							//blocking if one of the channel is full
 							//blocking if one of the channel is full
-							nodes.Broadcast(o.outputs, tsets, ctx)
+							o.Broadcast(tsets)
 							o.statManager.IncTotalRecordsOut()
 							o.statManager.IncTotalRecordsOut()
 						}
 						}
 						inputs = tl.getRestTuples()
 						inputs = tl.getRestTuples()
@@ -196,7 +181,7 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, errCh chan<
 				o.statManager.ProcessTimeEnd()
 				o.statManager.ProcessTimeEnd()
 				o.statManager.SetBufferLength(int64(len(o.input)))
 				o.statManager.SetBufferLength(int64(len(o.input)))
 			default:
 			default:
-				nodes.Broadcast(o.outputs, fmt.Errorf("run Window error: expect xsql.Tuple type but got %[1]T(%[1]v)", d), ctx)
+				o.Broadcast(fmt.Errorf("run Window error: expect xsql.Tuple type but got %[1]T(%[1]v)", d))
 				o.statManager.IncTotalExceptions()
 				o.statManager.IncTotalExceptions()
 			}
 			}
 		case now := <-c:
 		case now := <-c:
@@ -325,7 +310,7 @@ func (o *WindowOperator) scan(inputs []*xsql.Tuple, triggerTime int64, ctx api.S
 		}
 		}
 		log.Debugf("Sent: %v", results)
 		log.Debugf("Sent: %v", results)
 		//blocking if one of the channel is full
 		//blocking if one of the channel is full
-		nodes.Broadcast(o.outputs, results, ctx)
+		o.Broadcast(results)
 		triggered = true
 		triggered = true
 		o.statManager.IncTotalRecordsOut()
 		o.statManager.IncTotalRecordsOut()
 		log.Debugf("done scan")
 		log.Debugf("done scan")

+ 1 - 1
xstream/operators/window_op_test.go

@@ -1,4 +1,4 @@
-package operators
+package nodes
 
 
 import (
 import (
 	"fmt"
 	"fmt"

+ 56 - 11
xstream/streams.go

@@ -3,27 +3,40 @@ package xstream
 import (
 import (
 	"context"
 	"context"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/common"
+	"github.com/emqx/kuiper/xsql"
 	"github.com/emqx/kuiper/xstream/api"
 	"github.com/emqx/kuiper/xstream/api"
+	"github.com/emqx/kuiper/xstream/checkpoints"
 	"github.com/emqx/kuiper/xstream/contexts"
 	"github.com/emqx/kuiper/xstream/contexts"
 	"github.com/emqx/kuiper/xstream/nodes"
 	"github.com/emqx/kuiper/xstream/nodes"
-	"github.com/emqx/kuiper/xstream/operators"
 	"strconv"
 	"strconv"
 )
 )
 
 
 type TopologyNew struct {
 type TopologyNew struct {
-	sources []*nodes.SourceNode
-	sinks   []*nodes.SinkNode
-	ctx     api.StreamContext
-	cancel  context.CancelFunc
-	drain   chan error
-	ops     []api.Operator
-	name    string
+	sources            []*nodes.SourceNode
+	sinks              []*nodes.SinkNode
+	ctx                api.StreamContext
+	cancel             context.CancelFunc
+	drain              chan error
+	ops                []nodes.OperatorNode
+	name               string
+	qos                xsql.Qos
+	checkpointInterval int
+	store              checkpoints.Store
+	coordinator        *checkpoints.Coordinator
 }
 }
 
 
 func NewWithName(name string) *TopologyNew {
 func NewWithName(name string) *TopologyNew {
+	return NewWithNameAndQos(name, xsql.AtMostOnce)
+}
+
+func NewWithNameAndQos(name string, qos xsql.Qos) *TopologyNew {
 	tp := &TopologyNew{
 	tp := &TopologyNew{
 		name:  name,
 		name:  name,
 		drain: make(chan error),
 		drain: make(chan error),
+		qos:   qos,
+	}
+	if qos >= xsql.AtLeastOnce {
+		tp.store = checkpoints.GetKVStore(name)
 	}
 	}
 	return tp
 	return tp
 }
 }
@@ -44,21 +57,23 @@ func (s *TopologyNew) AddSrc(src *nodes.SourceNode) *TopologyNew {
 func (s *TopologyNew) AddSink(inputs []api.Emitter, snk *nodes.SinkNode) *TopologyNew {
 func (s *TopologyNew) AddSink(inputs []api.Emitter, snk *nodes.SinkNode) *TopologyNew {
 	for _, input := range inputs {
 	for _, input := range inputs {
 		input.AddOutput(snk.GetInput())
 		input.AddOutput(snk.GetInput())
+		snk.AddInputCount()
 	}
 	}
 	s.sinks = append(s.sinks, snk)
 	s.sinks = append(s.sinks, snk)
 	return s
 	return s
 }
 }
 
 
-func (s *TopologyNew) AddOperator(inputs []api.Emitter, operator api.Operator) *TopologyNew {
+func (s *TopologyNew) AddOperator(inputs []api.Emitter, operator nodes.OperatorNode) *TopologyNew {
 	for _, input := range inputs {
 	for _, input := range inputs {
 		input.AddOutput(operator.GetInput())
 		input.AddOutput(operator.GetInput())
+		operator.AddInputCount()
 	}
 	}
 	s.ops = append(s.ops, operator)
 	s.ops = append(s.ops, operator)
 	return s
 	return s
 }
 }
 
 
-func Transform(op operators.UnOperation, name string, bufferLength int) *operators.UnaryOperator {
-	operator := operators.New(name, bufferLength)
+func Transform(op nodes.UnOperation, name string, bufferLength int) *nodes.UnaryOperator {
+	operator := nodes.New(name, bufferLength)
 	operator.SetOperation(op)
 	operator.SetOperation(op)
 	return operator
 	return operator
 }
 }
@@ -85,6 +100,7 @@ func (s *TopologyNew) Open() <-chan error {
 		return s.drain
 		return s.drain
 	}
 	}
 	s.prepareContext() // ensure context is set
 	s.prepareContext() // ensure context is set
+	s.enableCheckpoint()
 	log := s.ctx.GetLogger()
 	log := s.ctx.GetLogger()
 	log.Infoln("Opening stream")
 	log.Infoln("Opening stream")
 	// open stream
 	// open stream
@@ -103,11 +119,40 @@ func (s *TopologyNew) Open() <-chan error {
 		for _, node := range s.sources {
 		for _, node := range s.sources {
 			node.Open(s.ctx.WithMeta(s.name, node.GetName()), s.drain)
 			node.Open(s.ctx.WithMeta(s.name, node.GetName()), s.drain)
 		}
 		}
+
+		// activate checkpoint
+		if s.coordinator != nil {
+			s.coordinator.Activate()
+		}
 	}()
 	}()
 
 
 	return s.drain
 	return s.drain
 }
 }
 
 
+func (s *TopologyNew) enableCheckpoint() error {
+	if s.qos >= xsql.AtLeastOnce {
+		var sources []checkpoints.StreamTask
+		for _, r := range s.sources {
+			sources = append(sources, r)
+		}
+		var ops []checkpoints.NonSourceTask
+		for _, r := range s.ops {
+			ops = append(ops, r)
+		}
+		var sinks []checkpoints.NonSourceTask
+		for _, r := range s.sinks {
+			sinks = append(sinks, r)
+		}
+		c := checkpoints.NewCoordinator(s.name, sources, ops, sinks, s.qos, s.store, s.checkpointInterval, s.ctx)
+		s.coordinator = c
+	}
+	return nil
+}
+
+func (s *TopologyNew) GetCoordinator() *checkpoints.Coordinator {
+	return s.coordinator
+}
+
 func (s *TopologyNew) GetMetrics() (keys []string, values []interface{}) {
 func (s *TopologyNew) GetMetrics() (keys []string, values []interface{}) {
 	for _, node := range s.sources {
 	for _, node := range s.sources {
 		for ins, metrics := range node.GetMetrics() {
 		for ins, metrics := range node.GetMetrics() {