Browse Source

feat(checkpoint): support window state

ngjaying 4 years atrás
parent
commit
6b72931ada

+ 44 - 40
xsql/processors/checkpoint_test.go

@@ -1,6 +1,7 @@
 package processors
 
 import (
+	"encoding/json"
 	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xsql"
@@ -28,7 +29,7 @@ func TestCheckpointCount(t *testing.T) {
 			name:      `rule1`,
 			sql:       `SELECT * FROM demo GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
 			size:      5,
-			breakSize: 2,
+			breakSize: 3,
 			cc:        2,
 			r: [][]map[string]interface{}{
 				{{
@@ -81,6 +82,7 @@ func TestCheckpointCount(t *testing.T) {
 	}
 	for j, opt := range options {
 		for i, tt := range tests {
+			cleanStateData()
 			test.ResetClock(1541152486000)
 			p := NewRuleProcessor(DbDir)
 			parser := xsql.NewParser(strings.NewReader(tt.sql))
@@ -110,6 +112,7 @@ func TestCheckpointCount(t *testing.T) {
 			mockSink := test.NewMockSink()
 			sink := nodes.NewSinkNodeWithSink("mockSink", mockSink, nil)
 			tp.AddSink(inputs, sink)
+			mockClock := test.GetMockClock()
 			errCh := tp.Open()
 			func() {
 				for i := 0; i < tt.breakSize*len(syncs); i++ {
@@ -129,52 +132,53 @@ func TestCheckpointCount(t *testing.T) {
 					}
 				}
 
-				mockClock := test.GetMockClock()
-				mockClock.Set(common.TimeFromUnixMilli(int64(1541152486014 + tt.breakSize*1000)))
+				mockClock.Set(common.TimeFromUnixMilli(1541152488000))
+				time.Sleep(100 * time.Millisecond)
 				actual := tp.GetCoordinator().GetCompleteCount()
 				if !reflect.DeepEqual(tt.cc, actual) {
 					t.Errorf("%d-%d. checkpoint count\n\nresult mismatch:\n\nexp=%#v\n\ngot=%d\n\n", i, j, tt.cc, actual)
 					return
 				}
-				time.Sleep(1000)
+				time.Sleep(100 * time.Millisecond)
 				tp.Cancel()
-				//TODO window memory
-				//	errCh := tp.Open()
-				//	for i := tt.breakSize; i < tt.size*len(syncs); i++ {
-				//		syncs[i%len(syncs)] <- i
-				//		retry := 100
-				//		for ; retry > 0; retry-- {
-				//			time.Sleep(1)
-				//			if getMetric(tp, "op_window_0_records_in_total") == (i - tt.breakSize + 1) {
-				//				break
-				//			}
-				//		}
-				//		select {
-				//		case err = <-errCh:
-				//			t.Log(err)
-				//			tp.Cancel()
-				//			return
-				//		default:
-				//		}
-				//	}
-				//	time.Sleep(1000)
+				time.Sleep(100 * time.Millisecond)
+				errCh := tp.Open()
+				close(syncs[i%len(syncs)])
+				for i := 0; i < tt.size*len(syncs); i++ {
+					common.Log.Debugf("resending data %d", i)
+					retry := 100
+					for ; retry > 0; retry-- {
+						if getMetric(tp, "op_window_0_records_in_total") == i {
+							break
+						}
+						time.Sleep(1)
+					}
+					select {
+					case err = <-errCh:
+						t.Log(err)
+						tp.Cancel()
+						return
+					default:
+					}
+				}
 			}()
-			//results := mockSink.GetResults()
-			//var maps [][]map[string]interface{}
-			//for _, v := range results {
-			//	var mapRes []map[string]interface{}
-			//	err := json.Unmarshal(v, &mapRes)
-			//	if err != nil {
-			//		t.Errorf("Failed to parse the input into map")
-			//		continue
-			//	}
-			//	maps = append(maps, mapRes)
-			//}
-			//if !reflect.DeepEqual(tt.r, maps) {
-			//	t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
-			//}
-			//tp.Cancel()
+			common.Log.Debugf("done sending data")
+			time.Sleep(400 * time.Millisecond)
+			results := mockSink.GetResults()
+			var maps [][]map[string]interface{}
+			for _, v := range results {
+				var mapRes []map[string]interface{}
+				err := json.Unmarshal(v, &mapRes)
+				if err != nil {
+					t.Errorf("Failed to parse the input into map")
+					continue
+				}
+				maps = append(maps, mapRes)
+			}
+			if !reflect.DeepEqual(tt.r, maps) {
+				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
+			}
+			tp.Cancel()
 		}
-		cleanStateData()
 	}
 }

+ 148 - 30
xsql/processors/extension_test.go

@@ -8,7 +8,6 @@ import (
 	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xsql"
-	"github.com/emqx/kuiper/xstream"
 	"github.com/emqx/kuiper/xstream/api"
 	"github.com/emqx/kuiper/xstream/nodes"
 	"github.com/emqx/kuiper/xstream/test"
@@ -188,6 +187,7 @@ func getExtMockSource(name string, done <-chan int, size int) *nodes.SourceNode
 					"slogan": "Impossible is nothing",
 					"brand":  "Adidas",
 				},
+				Timestamp: 1541152486500,
 			},
 			{
 				Emitter: name,
@@ -195,6 +195,7 @@ func getExtMockSource(name string, done <-chan int, size int) *nodes.SourceNode
 					"slogan": "Stronger than dirt",
 					"brand":  "Ajax",
 				},
+				Timestamp: 1541152487400,
 			},
 			{
 				Emitter: name,
@@ -202,6 +203,7 @@ func getExtMockSource(name string, done <-chan int, size int) *nodes.SourceNode
 					"slogan": "Belong anywhere",
 					"brand":  "Airbnb",
 				},
+				Timestamp: 1541152488300,
 			},
 			{
 				Emitter: name,
@@ -209,6 +211,7 @@ func getExtMockSource(name string, done <-chan int, size int) *nodes.SourceNode
 					"slogan": "I can't believe I ate the whole thing",
 					"brand":  "Alka Seltzer",
 				},
+				Timestamp: 1541152489200,
 			},
 			{
 				Emitter: name,
@@ -216,6 +219,7 @@ func getExtMockSource(name string, done <-chan int, size int) *nodes.SourceNode
 					"slogan": "You're in good hands",
 					"brand":  "Allstate",
 				},
+				Timestamp: 1541152490100,
 			},
 			{
 				Emitter: name,
@@ -223,6 +227,7 @@ func getExtMockSource(name string, done <-chan int, size int) *nodes.SourceNode
 					"slogan": "Don't leave home without it",
 					"brand":  "American Express",
 				},
+				Timestamp: 1541152491200,
 			},
 			{
 				Emitter: name,
@@ -230,6 +235,7 @@ func getExtMockSource(name string, done <-chan int, size int) *nodes.SourceNode
 					"slogan": "Think different",
 					"brand":  "Apple",
 				},
+				Timestamp: 1541152492300,
 			},
 			{
 				Emitter: name,
@@ -237,6 +243,7 @@ func getExtMockSource(name string, done <-chan int, size int) *nodes.SourceNode
 					"slogan": "We try harder",
 					"brand":  "Avis",
 				},
+				Timestamp: 1541152493400,
 			},
 		}
 
@@ -373,7 +380,7 @@ func TestFuncState(t *testing.T) {
 				}
 			}
 			for retry := 100; retry > 0; retry-- {
-				if err := compareMetrics2(tp, tt.m, tt.sql); err == nil {
+				if err := compareMetrics(tp, tt.m, tt.sql); err == nil {
 					break
 				}
 				time.Sleep(time.Duration(retry) * time.Millisecond)
@@ -394,49 +401,160 @@ func TestFuncState(t *testing.T) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
 			continue
 		}
-		if err := compareMetrics2(tp, tt.m, tt.sql); err != nil {
+		if err := compareMetrics(tp, tt.m, tt.sql); err != nil {
 			t.Errorf("%d. %q\n\n%v", i, tt.sql, err)
 		}
 		tp.Cancel()
 	}
 }
 
-func compareMetrics2(tp *xstream.TopologyNew, m map[string]interface{}, sql string) (err error) {
-	keys, values := tp.GetMetrics()
-	//for i, k := range keys {
-	//	log.Printf("%s:%v", k, values[i])
-	//}
-	for k, v := range m {
+func TestFuncStateCheckpoint(t *testing.T) {
+	var tests = []struct {
+		name      string
+		sql       string
+		r         [][]map[string]interface{}
+		size      int
+		cc        int
+		breakSize int
+	}{
+		{
+			name:      `rule1`,
+			sql:       `SELECT accumulateWordCount(slogan, " ") as wc FROM text`,
+			size:      8,
+			breakSize: 3,
+			cc:        1,
+			r: [][]map[string]interface{}{
+				{{
+					"wc": float64(3),
+				}},
+				{{
+					"wc": float64(6),
+				}},
+				{{
+					"wc": float64(6),
+				}},
+				{{
+					"wc": float64(8),
+				}},
+				{{
+					"wc": float64(16),
+				}},
+				{{
+					"wc": float64(20),
+				}},
+				{{
+					"wc": float64(25),
+				}},
+				{{
+					"wc": float64(27),
+				}},
+				{{
+					"wc": float64(30),
+				}},
+			},
+		},
+	}
+	p := setup2()
+	for i, tt := range tests {
+		p.ExecDrop(tt.name)
+		cleanStateData()
+		test.ResetClock(1541152485000)
+		mockClock := test.GetMockClock()
+		parser := xsql.NewParser(strings.NewReader(tt.sql))
 		var (
-			index   int
-			key     string
-			matched bool
+			sources []*nodes.SourceNode
+			syncs   []chan int
 		)
-		for index, key = range keys {
-			if k == key {
-				if strings.HasSuffix(k, "process_latency_ms") {
-					if values[index].(int64) >= v.(int64) {
-						matched = true
-						continue
-					} else {
+		if stmt, err := xsql.Language.Parse(parser); err != nil {
+			t.Errorf("parse sql %s error: %s", tt.sql, err)
+		} else {
+			if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
+				t.Errorf("sql %s is not a select statement", tt.sql)
+			} else {
+				streams := xsql.GetStreams(selectStmt)
+				for _, stream := range streams {
+					next := make(chan int)
+					syncs = append(syncs, next)
+					source := getExtMockSource(stream, next, 8)
+					sources = append(sources, source)
+				}
+			}
+		}
+		tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql, Options: &api.RuleOption{
+			BufferLength:       100,
+			Qos:                api.AtLeastOnce,
+			CheckpointInterval: 2000,
+		}}, sources)
+		if err != nil {
+			t.Error(err)
+		}
+		mockSink := test.NewMockSink()
+		sink := nodes.NewSinkNodeWithSink("mockSink", mockSink, nil)
+		tp.AddSink(inputs, sink)
+		errCh := tp.Open()
+		func() {
+			for i := 0; i < tt.breakSize; i++ {
+				mockClock.Add(1000)
+				log.Debugf("before sent %d at %d", i, common.TimeToUnixMilli(mockClock.Now()))
+				syncs[i%len(syncs)] <- i
+				common.Log.Debugf("send out %d", i)
+				select {
+				case err = <-errCh:
+					t.Log(err)
+					tp.Cancel()
+					return
+				default:
+				}
+			}
+			log.Debugf("first send done at %d", common.TimeToUnixMilli(mockClock.Now()))
+
+			actual := tp.GetCoordinator().GetCompleteCount()
+			if !reflect.DeepEqual(tt.cc, actual) {
+				t.Errorf("%d. checkpoint count\n\nresult mismatch:\n\nexp=%#v\n\ngot=%d\n\n", i, tt.cc, actual)
+			}
+			time.Sleep(1000)
+			tp.Cancel()
+			common.Log.Debugf("cancel and resume data %d", i)
+			errCh := tp.Open()
+			close(syncs[i%len(syncs)])
+			for i := 0; i < tt.size*len(syncs); i++ {
+				common.Log.Debugf("resending data %d", i)
+				retry := 100
+				for ; retry > 0; retry-- {
+					common.Log.Debugf("retry %d", retry)
+					if getMetric(tp, "source_text_0_records_in_total") == i {
 						break
 					}
+					time.Sleep(time.Duration(100 * retry))
 				}
-				if values[index] == v {
-					matched = true
+				select {
+				case err = <-errCh:
+					t.Log(err)
+					tp.Cancel()
+					return
+				default:
 				}
-				break
 			}
+		}()
+		common.Log.Debugf("done sending data")
+		results := mockSink.GetResults()
+		var maps [][]map[string]interface{}
+		for _, v := range results {
+			var mapRes []map[string]interface{}
+			err := json.Unmarshal(v, &mapRes)
+			if err != nil {
+				t.Errorf("Failed to parse the input into map")
+				continue
+			}
+			maps = append(maps, mapRes)
 		}
-		if matched {
-			continue
+		if len(tt.r) != len(maps) {
+			tt.r = tt.r[:len(maps)]
 		}
-		//do not find
-		if index < len(values) {
-			return fmt.Errorf("metrics mismatch for %s:\n\nexp=%#v(%t)\n\ngot=%#v(%t)\n\n", k, v, v, values[index], values[index])
-		} else {
-			return fmt.Errorf("metrics mismatch for %s:\n\nexp=%#v\n\ngot=nil\n\n", k, v)
+		if !reflect.DeepEqual(tt.r, maps) {
+			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
+			continue
 		}
+		tp.Cancel()
 	}
-	return nil
 }

+ 1 - 1
xsql/processors/xsql_processor_test.go

@@ -39,7 +39,7 @@ func cleanStateData() {
 	if err != nil {
 		log.Errorf("%s", err)
 	}
-	s := path.Join(dbDir, "sink")
+	s := path.Join(dbDir, "sink", "cache")
 	err = os.RemoveAll(s)
 	if err != nil {
 		log.Errorf("%s", err)

+ 18 - 2
xstream/nodes/watermark.go

@@ -184,12 +184,27 @@ func (w *WatermarkGenerator) getNextWindow(inputs []*xsql.Tuple, current int64,
 	}
 }
 
-func (o *WindowOperator) execEventWindow(ctx api.StreamContext, errCh chan<- error) {
+func (o *WindowOperator) execEventWindow(ctx api.StreamContext, inputs []*xsql.Tuple, errCh chan<- error) {
+	//Tickers to update watermark
+	switch o.window.Type {
+	case xsql.NOT_WINDOW:
+	case xsql.TUMBLING_WINDOW:
+		o.ticker = common.GetTicker(o.window.Length)
+		o.interval = o.window.Length
+	case xsql.HOPPING_WINDOW:
+		o.ticker = common.GetTicker(o.window.Interval)
+		o.interval = o.window.Interval
+	case xsql.SLIDING_WINDOW:
+		o.interval = o.window.Length
+	case xsql.SESSION_WINDOW:
+		//Use timeout to update watermark
+		o.ticker = common.GetTicker(o.window.Interval)
+		o.interval = o.window.Interval
+	}
 	exeCtx, cancel := ctx.WithCancel()
 	log := ctx.GetLogger()
 	go o.watermarkGenerator.start(exeCtx)
 	var (
-		inputs          []*xsql.Tuple
 		triggered       bool
 		nextWindowEndTs int64
 		prevWindowEndTs int64
@@ -243,6 +258,7 @@ func (o *WindowOperator) execEventWindow(ctx api.StreamContext, errCh chan<- err
 					}
 				}
 				o.statManager.ProcessTimeEnd()
+				ctx.PutState(WINDOW_INPUTS_KEY, inputs)
 			default:
 				o.statManager.IncTotalRecordsIn()
 				o.Broadcast(fmt.Errorf("run Window error: expect xsql.Event type but got %[1]T(%[1]v)", d))

+ 44 - 25
xstream/nodes/window_op.go

@@ -1,6 +1,7 @@
 package nodes
 
 import (
+	"encoding/gob"
 	"fmt"
 	"github.com/benbjohnson/clock"
 	"github.com/emqx/kuiper/common"
@@ -28,6 +29,12 @@ type WindowOperator struct {
 	msgCount           int
 }
 
+const WINDOW_INPUTS_KEY = "$$windowInputs"
+
+func init() {
+	gob.Register([]*xsql.Tuple{})
+}
+
 func NewWindowOp(name string, w *xsql.Window, isEventTime bool, lateTolerance int64, streams []string, bufferLength int) (*WindowOperator, error) {
 	o := new(WindowOperator)
 
@@ -55,7 +62,6 @@ func NewWindowOp(name string, w *xsql.Window, isEventTime bool, lateTolerance in
 			Type: xsql.NOT_WINDOW,
 		}
 	}
-
 	if isEventTime {
 		//Create watermark generator
 		if w, err := NewWatermarkGenerator(o.window, lateTolerance, streams, o.input); err != nil {
@@ -63,25 +69,6 @@ func NewWindowOp(name string, w *xsql.Window, isEventTime bool, lateTolerance in
 		} else {
 			o.watermarkGenerator = w
 		}
-	} else {
-		switch o.window.Type {
-		case xsql.NOT_WINDOW:
-		case xsql.TUMBLING_WINDOW:
-			o.ticker = common.GetTicker(o.window.Length)
-			o.interval = o.window.Length
-		case xsql.HOPPING_WINDOW:
-			o.ticker = common.GetTicker(o.window.Interval)
-			o.interval = o.window.Interval
-		case xsql.SLIDING_WINDOW:
-			o.interval = o.window.Length
-		case xsql.SESSION_WINDOW:
-			o.ticker = common.GetTicker(o.window.Length)
-			o.interval = o.window.Interval
-		case xsql.COUNT_WINDOW:
-			o.interval = o.window.Interval
-		default:
-			return nil, fmt.Errorf("unsupported window type %d", o.window.Type)
-		}
 	}
 	return o, nil
 }
@@ -92,7 +79,7 @@ func NewWindowOp(name string, w *xsql.Window, isEventTime bool, lateTolerance in
 func (o *WindowOperator) Exec(ctx api.StreamContext, errCh chan<- error) {
 	o.ctx = ctx
 	log := ctx.GetLogger()
-	log.Debugf("Window operator %s is started", o.name)
+	log.Debugf("Window operator %s is started with state %v", o.name, ctx.Snapshot())
 
 	if len(o.outputs) <= 0 {
 		go func() { errCh <- fmt.Errorf("no output channel found") }()
@@ -104,21 +91,50 @@ func (o *WindowOperator) Exec(ctx api.StreamContext, errCh chan<- error) {
 		return
 	}
 	o.statManager = stats
+	var inputs []*xsql.Tuple
+	if s, err := ctx.GetState(WINDOW_INPUTS_KEY); err == nil {
+		switch st := s.(type) {
+		case []*xsql.Tuple:
+			inputs = st
+		case nil:
+			log.Debugf("Restore window state, nothing")
+		default:
+			errCh <- fmt.Errorf("restore window state %v error, invalid type", st)
+		}
+	} else {
+		log.Warnf("Restore window state fails: %s", err)
+	}
+	log.Infof("Restore window state %+v", inputs)
 	if o.isEventTime {
-		go o.execEventWindow(ctx, errCh)
+		go o.execEventWindow(ctx, inputs, errCh)
 	} else {
-		go o.execProcessingWindow(ctx, errCh)
+		go o.execProcessingWindow(ctx, inputs, errCh)
 	}
 }
 
-func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, errCh chan<- error) {
+func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, inputs []*xsql.Tuple, errCh chan<- error) {
 	log := ctx.GetLogger()
 	var (
-		inputs        []*xsql.Tuple
 		c             <-chan time.Time
 		timeoutTicker *clock.Timer
 		timeout       <-chan time.Time
 	)
+	switch o.window.Type {
+	case xsql.NOT_WINDOW:
+	case xsql.TUMBLING_WINDOW:
+		o.ticker = common.GetTicker(o.window.Length)
+		o.interval = o.window.Length
+	case xsql.HOPPING_WINDOW:
+		o.ticker = common.GetTicker(o.window.Interval)
+		o.interval = o.window.Interval
+	case xsql.SLIDING_WINDOW:
+		o.interval = o.window.Length
+	case xsql.SESSION_WINDOW:
+		o.ticker = common.GetTicker(o.window.Length)
+		o.interval = o.window.Interval
+	case xsql.COUNT_WINDOW:
+		o.interval = o.window.Interval
+	}
 
 	if o.ticker != nil {
 		c = o.ticker.C
@@ -184,6 +200,7 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, errCh chan<
 				}
 				o.statManager.ProcessTimeEnd()
 				o.statManager.SetBufferLength(int64(len(o.input)))
+				ctx.PutState(WINDOW_INPUTS_KEY, inputs)
 			default:
 				o.Broadcast(fmt.Errorf("run Window error: expect xsql.Tuple type but got %[1]T(%[1]v)", d))
 				o.statManager.IncTotalExceptions()
@@ -205,6 +222,7 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, errCh chan<
 				log.Debugf("triggered by ticker")
 				inputs, _ = o.scan(inputs, n, ctx)
 				o.statManager.ProcessTimeEnd()
+				ctx.PutState(WINDOW_INPUTS_KEY, inputs)
 			}
 		case now := <-timeout:
 			if len(inputs) > 0 {
@@ -214,6 +232,7 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, errCh chan<
 				//expire all inputs, so that when timer scan there is no item
 				inputs = make([]*xsql.Tuple, 0)
 				o.statManager.ProcessTimeEnd()
+				ctx.PutState(WINDOW_INPUTS_KEY, inputs)
 			}
 		// is cancelling
 		case <-ctx.Done():

+ 6 - 4
xstream/streams.go

@@ -36,10 +36,6 @@ func NewWithNameAndQos(name string, qos api.Qos, checkpointInterval int) (*Topol
 		qos:                qos,
 		checkpointInterval: checkpointInterval,
 	}
-	var err error
-	if tp.store, err = states.CreateStore(name, qos); err != nil {
-		return nil, err
-	}
 	return tp, nil
 }
 
@@ -48,6 +44,7 @@ func (s *TopologyNew) GetContext() api.StreamContext {
 }
 
 func (s *TopologyNew) Cancel() {
+	s.store = nil
 	s.cancel()
 }
 
@@ -102,6 +99,11 @@ func (s *TopologyNew) Open() <-chan error {
 		return s.drain
 	}
 	s.prepareContext() // ensure context is set
+	var err error
+	if s.store, err = states.CreateStore(s.name, s.qos); err != nil {
+		s.drainErr(err)
+		return s.drain
+	}
 	s.enableCheckpoint()
 	log := s.ctx.GetLogger()
 	log.Infoln("Opening stream")

+ 17 - 3
xstream/test/mock_source.go

@@ -36,19 +36,33 @@ func (m *MockSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple
 		}
 		log.Debugf("mock source is waiting", i)
 		select {
-		case j := <-m.done:
-			log.Debugf("mock source receives data %d", j)
+		case j, ok := <-m.done:
+			if ok {
+				log.Debugf("mock source receives data %d", j)
+			} else {
+				log.Debugf("sync channel done at %d", i)
+			}
 		case <-ctx.Done():
 			log.Debugf("mock source open DONE")
 			return
 		}
-		log.Debugf("mock source is sending data %s", d)
+
 		if !m.isEventTime {
 			mockClock.Set(common.TimeFromUnixMilli(d.Timestamp))
+			log.Debugf("set time at %d", d.Timestamp)
 		} else {
 			mockClock.Add(1000 * time.Millisecond)
 		}
+
+		select {
+		case <-ctx.Done():
+			log.Debugf("mock source open DONE")
+			return
+		default:
+		}
+
 		consumer <- api.NewDefaultSourceTuple(d.Message, xsql.Metadata{"topic": "mock"})
+		log.Debugf("mock source is sending data %s", d)
 		m.offset = i + 1
 		time.Sleep(1)
 	}