Browse Source

fix(metrics): fix source processLatency in metrics (#1829)

* fix source processLatency

Signed-off-by: Rui-Gan <1171530954@qq.com>

* fix ut

Signed-off-by: Rui-Gan <1171530954@qq.com>

* fix rcvTime

Signed-off-by: Rui-Gan <1171530954@qq.com>

---------

Signed-off-by: Rui-Gan <1171530954@qq.com>
Regina 2 years ago
parent
commit
c42aa8aa68

+ 8 - 5
extensions/sources/random/random.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -18,12 +18,14 @@ import (
 	"bytes"
 	"bytes"
 	"encoding/json"
 	"encoding/json"
 	"fmt"
 	"fmt"
-	"github.com/lf-edge/ekuiper/pkg/api"
-	"github.com/lf-edge/ekuiper/pkg/cast"
-	"github.com/lf-edge/ekuiper/pkg/message"
 	"math/rand"
 	"math/rand"
 	"strings"
 	"strings"
 	"time"
 	"time"
+
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/cast"
+	"github.com/lf-edge/ekuiper/pkg/message"
 )
 )
 
 
 const dedupStateKey = "input"
 const dedupStateKey = "input"
@@ -93,13 +95,14 @@ func (s *randomSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTup
 	for {
 	for {
 		select {
 		select {
 		case <-t.C:
 		case <-t.C:
+			rcvTime := conf.GetNow()
 			next := randomize(s.conf.Pattern, s.conf.Seed)
 			next := randomize(s.conf.Pattern, s.conf.Seed)
 			if s.conf.Deduplicate != 0 && s.isDup(ctx, next) {
 			if s.conf.Deduplicate != 0 && s.isDup(ctx, next) {
 				logger.Debugf("find duplicate")
 				logger.Debugf("find duplicate")
 				continue
 				continue
 			}
 			}
 			logger.Debugf("Send out data %v", next)
 			logger.Debugf("Send out data %v", next)
-			consumer <- api.NewDefaultSourceTuple(next, nil)
+			consumer <- api.NewDefaultSourceTupleWithTime(next, nil, rcvTime)
 		case <-ctx.Done():
 		case <-ctx.Done():
 			return
 			return
 		}
 		}

+ 3 - 1
extensions/sources/sql/sql.go

@@ -22,6 +22,7 @@ import (
 	driver2 "github.com/lf-edge/ekuiper/extensions/sqldatabase/driver"
 	driver2 "github.com/lf-edge/ekuiper/extensions/sqldatabase/driver"
 	"github.com/lf-edge/ekuiper/extensions/sqldatabase/sqlgen"
 	"github.com/lf-edge/ekuiper/extensions/sqldatabase/sqlgen"
 	"github.com/lf-edge/ekuiper/extensions/util"
 	"github.com/lf-edge/ekuiper/extensions/util"
+	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 )
 )
@@ -80,6 +81,7 @@ func (m *sqlsource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple,
 	for {
 	for {
 		select {
 		select {
 		case <-t.C:
 		case <-t.C:
+			rcvTime := conf.GetNow()
 			query, err := m.Query.SqlQueryStatement()
 			query, err := m.Query.SqlQueryStatement()
 			if err != nil {
 			if err != nil {
 				logger.Errorf("Get sql query error %v", err)
 				logger.Errorf("Get sql query error %v", err)
@@ -114,7 +116,7 @@ func (m *sqlsource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple,
 
 
 				scanIntoMap(data, columns, cols)
 				scanIntoMap(data, columns, cols)
 				m.Query.UpdateMaxIndexValue(data)
 				m.Query.UpdateMaxIndexValue(data)
-				consumer <- api.NewDefaultSourceTuple(data, nil)
+				consumer <- api.NewDefaultSourceTupleWithTime(data, nil, rcvTime)
 			}
 			}
 		case <-ctx.Done():
 		case <-ctx.Done():
 			return
 			return

+ 3 - 1
extensions/sources/sql/sqlLookup.go

@@ -19,6 +19,7 @@ import (
 	"fmt"
 	"fmt"
 
 
 	"github.com/lf-edge/ekuiper/extensions/util"
 	"github.com/lf-edge/ekuiper/extensions/util"
+	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 )
 )
@@ -60,6 +61,7 @@ func (s *sqlLookupSource) Configure(datasource string, props map[string]interfac
 
 
 func (s *sqlLookupSource) Lookup(ctx api.StreamContext, fields []string, keys []string, values []interface{}) ([]api.SourceTuple, error) {
 func (s *sqlLookupSource) Lookup(ctx api.StreamContext, fields []string, keys []string, values []interface{}) ([]api.SourceTuple, error) {
 	ctx.GetLogger().Debug("Start to lookup tuple")
 	ctx.GetLogger().Debug("Start to lookup tuple")
+	rcvTime := conf.GetNow()
 	query := "SELECT "
 	query := "SELECT "
 	if len(fields) == 0 {
 	if len(fields) == 0 {
 		query += "*"
 		query += "*"
@@ -105,7 +107,7 @@ func (s *sqlLookupSource) Lookup(ctx api.StreamContext, fields []string, keys []
 			return nil, err
 			return nil, err
 		}
 		}
 		scanIntoMap(data, columns, cols)
 		scanIntoMap(data, columns, cols)
-		result = append(result, api.NewDefaultSourceTuple(data, nil))
+		result = append(result, api.NewDefaultSourceTupleWithTime(data, nil, rcvTime))
 	}
 	}
 	return result, nil
 	return result, nil
 }
 }

+ 7 - 4
extensions/sources/video/video.go

@@ -1,4 +1,4 @@
-// Copyright 2022 EMQ Technologies Co., Ltd.
+// Copyright 2022-2023 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -17,11 +17,13 @@ package main
 import (
 import (
 	"bytes"
 	"bytes"
 	"fmt"
 	"fmt"
+	"os"
+	"time"
+
+	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	ffmpeg "github.com/u2takey/ffmpeg-go"
 	ffmpeg "github.com/u2takey/ffmpeg-go"
-	"os"
-	"time"
 )
 )
 
 
 const RTSP_DEFAULT_INTERVAL = 10000
 const RTSP_DEFAULT_INTERVAL = 10000
@@ -71,6 +73,7 @@ func (rps *VideoPullSource) initTimerPull(ctx api.StreamContext, consumer chan<-
 	for {
 	for {
 		select {
 		select {
 		case <-ticker.C:
 		case <-ticker.C:
+			rcvTime := conf.GetNow()
 			buf := rps.readFrameAsJpeg(ctx)
 			buf := rps.readFrameAsJpeg(ctx)
 			result, e := ctx.Decode(buf.Bytes())
 			result, e := ctx.Decode(buf.Bytes())
 			meta := make(map[string]interface{})
 			meta := make(map[string]interface{})
@@ -80,7 +83,7 @@ func (rps *VideoPullSource) initTimerPull(ctx api.StreamContext, consumer chan<-
 			}
 			}
 
 
 			select {
 			select {
-			case consumer <- api.NewDefaultSourceTuple(result, meta):
+			case consumer <- api.NewDefaultSourceTupleWithTime(result, meta, rcvTime):
 				logger.Debugf("send data to device node")
 				logger.Debugf("send data to device node")
 			case <-ctx.Done():
 			case <-ctx.Done():
 				return
 				return

+ 5 - 2
extensions/sources/zmq/zmq.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -17,6 +17,8 @@ package main
 import (
 import (
 	"context"
 	"context"
 	"fmt"
 	"fmt"
+
+	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	zmq "github.com/pebbe/zmq4"
 	zmq "github.com/pebbe/zmq4"
 )
 )
@@ -62,6 +64,7 @@ func (s *zmqSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple,
 			id, err := s.subscriber.GetIdentity()
 			id, err := s.subscriber.GetIdentity()
 			errCh <- fmt.Errorf("zmq source getting message %s error: %v", id, err)
 			errCh <- fmt.Errorf("zmq source getting message %s error: %v", id, err)
 		} else {
 		} else {
+			rcvTime := conf.GetNow()
 			logger.Debugf("zmq source receive %v", msgs)
 			logger.Debugf("zmq source receive %v", msgs)
 			var m []byte
 			var m []byte
 			for i, msg := range msgs {
 			for i, msg := range msgs {
@@ -78,7 +81,7 @@ func (s *zmqSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple,
 			if e != nil {
 			if e != nil {
 				logger.Errorf("Invalid data format, cannot decode %v with error %s", m, e)
 				logger.Errorf("Invalid data format, cannot decode %v with error %s", m, e)
 			} else {
 			} else {
-				consumer <- api.NewDefaultSourceTuple(result, meta)
+				consumer <- api.NewDefaultSourceTupleWithTime(result, meta, rcvTime)
 			}
 			}
 		}
 		}
 		select {
 		select {

+ 3 - 1
internal/io/edgex/edgex_source.go

@@ -25,6 +25,7 @@ import (
 	"github.com/edgexfoundry/go-mod-core-contracts/v3/dtos/requests"
 	"github.com/edgexfoundry/go-mod-core-contracts/v3/dtos/requests"
 	"github.com/edgexfoundry/go-mod-messaging/v3/pkg/types"
 	"github.com/edgexfoundry/go-mod-messaging/v3/pkg/types"
 	"github.com/fxamacker/cbor/v2"
 	"github.com/fxamacker/cbor/v2"
+	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/topo/connection/clients"
 	"github.com/lf-edge/ekuiper/internal/topo/connection/clients"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
@@ -100,6 +101,7 @@ func (es *EdgexSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTup
 			case e1 := <-subErrs:
 			case e1 := <-subErrs:
 				log.Errorf("Subscription to edgex messagebus received error %v.\n", e1)
 				log.Errorf("Subscription to edgex messagebus received error %v.\n", e1)
 			case msg, ok := <-messages:
 			case msg, ok := <-messages:
+				rcvTime := conf.GetNow()
 				if !ok { // the source is closed
 				if !ok { // the source is closed
 					log.Infof("Exit subscription to edgex messagebus topic %s.", es.topic)
 					log.Infof("Exit subscription to edgex messagebus topic %s.", es.topic)
 					return
 					return
@@ -190,7 +192,7 @@ func (es *EdgexSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTup
 					meta["correlationid"] = env.CorrelationID
 					meta["correlationid"] = env.CorrelationID
 
 
 					select {
 					select {
-					case consumer <- api.NewDefaultSourceTuple(result, meta):
+					case consumer <- api.NewDefaultSourceTupleWithTime(result, meta, rcvTime):
 						log.Debugf("send data to device node")
 						log.Debugf("send data to device node")
 					case <-ctx.Done():
 					case <-ctx.Done():
 						return
 						return

+ 6 - 4
internal/io/file/file_source.go

@@ -171,6 +171,7 @@ func (fs *FileSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTupl
 }
 }
 
 
 func (fs *FileSource) Load(ctx api.StreamContext, consumer chan<- api.SourceTuple) error {
 func (fs *FileSource) Load(ctx api.StreamContext, consumer chan<- api.SourceTuple) error {
+	rcvTime := conf.GetNow()
 	if fs.isDir {
 	if fs.isDir {
 		ctx.GetLogger().Debugf("Monitor dir %s", fs.file)
 		ctx.GetLogger().Debugf("Monitor dir %s", fs.file)
 		entries, err := os.ReadDir(fs.file)
 		entries, err := os.ReadDir(fs.file)
@@ -197,7 +198,7 @@ func (fs *FileSource) Load(ctx api.StreamContext, consumer chan<- api.SourceTupl
 	// Send EOF if retain size not set if used in table
 	// Send EOF if retain size not set if used in table
 	if fs.config.IsTable {
 	if fs.config.IsTable {
 		select {
 		select {
-		case consumer <- api.NewDefaultSourceTuple(nil, nil):
+		case consumer <- api.NewDefaultSourceTupleWithTime(nil, nil, rcvTime):
 			// do nothing
 			// do nothing
 		case <-ctx.Done():
 		case <-ctx.Done():
 			return nil
 			return nil
@@ -243,6 +244,7 @@ func (fs *FileSource) parseFile(ctx api.StreamContext, file string, consumer cha
 
 
 func (fs *FileSource) publish(ctx api.StreamContext, file io.Reader, consumer chan<- api.SourceTuple, meta map[string]interface{}) error {
 func (fs *FileSource) publish(ctx api.StreamContext, file io.Reader, consumer chan<- api.SourceTuple, meta map[string]interface{}) error {
 	ctx.GetLogger().Debug("Start to load")
 	ctx.GetLogger().Debug("Start to load")
+	rcvTime := conf.GetNow()
 	switch fs.config.FileType {
 	switch fs.config.FileType {
 	case JSON_TYPE:
 	case JSON_TYPE:
 		r := json.NewDecoder(file)
 		r := json.NewDecoder(file)
@@ -254,7 +256,7 @@ func (fs *FileSource) publish(ctx api.StreamContext, file io.Reader, consumer ch
 		ctx.GetLogger().Debug("Sending tuples")
 		ctx.GetLogger().Debug("Sending tuples")
 		for _, m := range resultMap {
 		for _, m := range resultMap {
 			select {
 			select {
-			case consumer <- api.NewDefaultSourceTuple(m, meta):
+			case consumer <- api.NewDefaultSourceTupleWithTime(m, meta, rcvTime):
 			case <-ctx.Done():
 			case <-ctx.Done():
 				return nil
 				return nil
 			}
 			}
@@ -305,7 +307,7 @@ func (fs *FileSource) publish(ctx api.StreamContext, file io.Reader, consumer ch
 				}
 				}
 			}
 			}
 			select {
 			select {
-			case consumer <- api.NewDefaultSourceTuple(m, meta):
+			case consumer <- api.NewDefaultSourceTupleWithTime(m, meta, rcvTime):
 			case <-ctx.Done():
 			case <-ctx.Done():
 				return nil
 				return nil
 			}
 			}
@@ -324,7 +326,7 @@ func (fs *FileSource) publish(ctx api.StreamContext, file io.Reader, consumer ch
 					Error: fmt.Errorf("Invalid data format, cannot decode %s with error %s", scanner.Text(), err),
 					Error: fmt.Errorf("Invalid data format, cannot decode %s with error %s", scanner.Text(), err),
 				}
 				}
 			} else {
 			} else {
-				tuple = api.NewDefaultSourceTuple(m, meta)
+				tuple = api.NewDefaultSourceTupleWithTime(m, meta, rcvTime)
 			}
 			}
 			select {
 			select {
 			case consumer <- tuple:
 			case consumer <- tuple:

+ 26 - 19
internal/io/file/file_source_test.go

@@ -16,6 +16,8 @@ package file
 
 
 import (
 import (
 	"fmt"
 	"fmt"
+	"github.com/benbjohnson/clock"
+	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/io/mock"
 	"github.com/lf-edge/ekuiper/internal/io/mock"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"io"
 	"io"
@@ -33,10 +35,11 @@ func TestJsonFile(t *testing.T) {
 	meta := map[string]interface{}{
 	meta := map[string]interface{}{
 		"file": filepath.Join(path, "test", "test.json"),
 		"file": filepath.Join(path, "test", "test.json"),
 	}
 	}
+	mc := conf.Clock.(*clock.Mock)
 	exp := []api.SourceTuple{
 	exp := []api.SourceTuple{
-		api.NewDefaultSourceTuple(map[string]interface{}{"id": float64(1), "name": "John Doe"}, meta),
-		api.NewDefaultSourceTuple(map[string]interface{}{"id": float64(2), "name": "Jane Doe"}, meta),
-		api.NewDefaultSourceTuple(map[string]interface{}{"id": float64(3), "name": "John Smith"}, meta),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"id": float64(1), "name": "John Doe"}, meta, mc.Now()),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"id": float64(2), "name": "Jane Doe"}, meta, mc.Now()),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"id": float64(3), "name": "John Smith"}, meta, mc.Now()),
 	}
 	}
 	p := map[string]interface{}{
 	p := map[string]interface{}{
 		"path": filepath.Join(path, "test"),
 		"path": filepath.Join(path, "test"),
@@ -55,14 +58,15 @@ func TestJsonFolder(t *testing.T) {
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
+	mc := conf.Clock.(*clock.Mock)
 	moveToFolder := filepath.Join(path, "test", "moveTo")
 	moveToFolder := filepath.Join(path, "test", "moveTo")
 	exp := []api.SourceTuple{
 	exp := []api.SourceTuple{
-		api.NewDefaultSourceTuple(map[string]interface{}{"id": float64(1), "name": "John Doe", "height": 1.82}, map[string]interface{}{"file": filepath.Join(path, "test", "json", "f1.json")}),
-		api.NewDefaultSourceTuple(map[string]interface{}{"id": float64(2), "name": "Jane Doe", "height": 1.65}, map[string]interface{}{"file": filepath.Join(path, "test", "json", "f1.json")}),
-		api.NewDefaultSourceTuple(map[string]interface{}{"id": float64(3), "name": "Will Doe", "height": 1.76}, map[string]interface{}{"file": filepath.Join(path, "test", "json", "f2.json")}),
-		api.NewDefaultSourceTuple(map[string]interface{}{"id": float64(4), "name": "Dude Doe", "height": 1.92}, map[string]interface{}{"file": filepath.Join(path, "test", "json", "f3.json")}),
-		api.NewDefaultSourceTuple(map[string]interface{}{"id": float64(5), "name": "Jane Doe", "height": 1.72}, map[string]interface{}{"file": filepath.Join(path, "test", "json", "f3.json")}),
-		api.NewDefaultSourceTuple(map[string]interface{}{"id": float64(6), "name": "John Smith", "height": 2.22}, map[string]interface{}{"file": filepath.Join(path, "test", "json", "f3.json")}),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"id": float64(1), "name": "John Doe", "height": 1.82}, map[string]interface{}{"file": filepath.Join(path, "test", "json", "f1.json")}, mc.Now()),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"id": float64(2), "name": "Jane Doe", "height": 1.65}, map[string]interface{}{"file": filepath.Join(path, "test", "json", "f1.json")}, mc.Now()),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"id": float64(3), "name": "Will Doe", "height": 1.76}, map[string]interface{}{"file": filepath.Join(path, "test", "json", "f2.json")}, mc.Now()),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"id": float64(4), "name": "Dude Doe", "height": 1.92}, map[string]interface{}{"file": filepath.Join(path, "test", "json", "f3.json")}, mc.Now()),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"id": float64(5), "name": "Jane Doe", "height": 1.72}, map[string]interface{}{"file": filepath.Join(path, "test", "json", "f3.json")}, mc.Now()),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"id": float64(6), "name": "John Smith", "height": 2.22}, map[string]interface{}{"file": filepath.Join(path, "test", "json", "f3.json")}, mc.Now()),
 	}
 	}
 	p := map[string]interface{}{
 	p := map[string]interface{}{
 		"path":            filepath.Join(path, "test"),
 		"path":            filepath.Join(path, "test"),
@@ -111,12 +115,13 @@ func TestCSVFolder(t *testing.T) {
 			t.Fatal(err)
 			t.Fatal(err)
 		}
 		}
 	}
 	}
+	mc := conf.Clock.(*clock.Mock)
 	// Start testing
 	// Start testing
 	exp := []api.SourceTuple{
 	exp := []api.SourceTuple{
-		api.NewDefaultSourceTuple(map[string]interface{}{"@": "#", "id": "1", "ts": "1670170500", "value": "161.927872"}, map[string]interface{}{"file": filepath.Join(path, "test", "csvTemp", "a.csv")}),
-		api.NewDefaultSourceTuple(map[string]interface{}{"@": "#", "id": "2", "ts": "1670170900", "value": "176"}, map[string]interface{}{"file": filepath.Join(path, "test", "csvTemp", "a.csv")}),
-		api.NewDefaultSourceTuple(map[string]interface{}{"id": "33", "ts": "1670270500", "humidity": "89"}, map[string]interface{}{"file": filepath.Join(path, "test", "csvTemp", "b.csv")}),
-		api.NewDefaultSourceTuple(map[string]interface{}{"id": "44", "ts": "1670270900", "humidity": "76"}, map[string]interface{}{"file": filepath.Join(path, "test", "csvTemp", "b.csv")}),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"@": "#", "id": "1", "ts": "1670170500", "value": "161.927872"}, map[string]interface{}{"file": filepath.Join(path, "test", "csvTemp", "a.csv")}, mc.Now()),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"@": "#", "id": "2", "ts": "1670170900", "value": "176"}, map[string]interface{}{"file": filepath.Join(path, "test", "csvTemp", "a.csv")}, mc.Now()),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"id": "33", "ts": "1670270500", "humidity": "89"}, map[string]interface{}{"file": filepath.Join(path, "test", "csvTemp", "b.csv")}, mc.Now()),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"id": "44", "ts": "1670270900", "humidity": "76"}, map[string]interface{}{"file": filepath.Join(path, "test", "csvTemp", "b.csv")}, mc.Now()),
 	}
 	}
 	p := map[string]interface{}{
 	p := map[string]interface{}{
 		"fileType":         "csv",
 		"fileType":         "csv",
@@ -175,10 +180,11 @@ func TestCSVFile(t *testing.T) {
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
+	mc := conf.Clock.(*clock.Mock)
 	exp := []api.SourceTuple{
 	exp := []api.SourceTuple{
-		api.NewDefaultSourceTuple(map[string]interface{}{"ns": "@", "id": "id", "ts": "ts", "number": "value"}, map[string]interface{}{"file": filepath.Join(path, "test", "csv", "a.csv")}),
-		api.NewDefaultSourceTuple(map[string]interface{}{"ns": "#", "id": "1", "ts": "1670170500", "number": "161.927872"}, map[string]interface{}{"file": filepath.Join(path, "test", "csv", "a.csv")}),
-		api.NewDefaultSourceTuple(map[string]interface{}{"ns": "#", "id": "2", "ts": "1670170900", "number": "176"}, map[string]interface{}{"file": filepath.Join(path, "test", "csv", "a.csv")}),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"ns": "@", "id": "id", "ts": "ts", "number": "value"}, map[string]interface{}{"file": filepath.Join(path, "test", "csv", "a.csv")}, mc.Now()),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"ns": "#", "id": "1", "ts": "1670170500", "number": "161.927872"}, map[string]interface{}{"file": filepath.Join(path, "test", "csv", "a.csv")}, mc.Now()),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"ns": "#", "id": "2", "ts": "1670170900", "number": "176"}, map[string]interface{}{"file": filepath.Join(path, "test", "csv", "a.csv")}, mc.Now()),
 	}
 	}
 	p := map[string]interface{}{
 	p := map[string]interface{}{
 		"fileType":         "csv",
 		"fileType":         "csv",
@@ -205,10 +211,11 @@ func TestJsonLines(t *testing.T) {
 	meta := map[string]interface{}{
 	meta := map[string]interface{}{
 		"file": filepath.Join(path, "test", "test.lines"),
 		"file": filepath.Join(path, "test", "test.lines"),
 	}
 	}
+	mc := conf.Clock.(*clock.Mock)
 	exp := []api.SourceTuple{
 	exp := []api.SourceTuple{
-		api.NewDefaultSourceTuple(map[string]interface{}{"id": float64(1), "name": "John Doe"}, meta),
-		api.NewDefaultSourceTuple(map[string]interface{}{"id": float64(2), "name": "Jane Doe"}, meta),
-		api.NewDefaultSourceTuple(map[string]interface{}{"id": float64(3), "name": "John Smith"}, meta),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"id": float64(1), "name": "John Doe"}, meta, mc.Now()),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"id": float64(2), "name": "Jane Doe"}, meta, mc.Now()),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"id": float64(3), "name": "John Smith"}, meta, mc.Now()),
 	}
 	}
 	p := map[string]interface{}{
 	p := map[string]interface{}{
 		"path":     filepath.Join(path, "test"),
 		"path":     filepath.Join(path, "test"),

+ 18 - 2
internal/io/file/file_stream_test.go

@@ -1,6 +1,21 @@
+// Copyright 2023 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
 package file
 package file
 
 
 import (
 import (
+	"github.com/benbjohnson/clock"
 	"github.com/lf-edge/ekuiper/internal/compressor"
 	"github.com/lf-edge/ekuiper/internal/compressor"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/io/mock"
 	"github.com/lf-edge/ekuiper/internal/io/mock"
@@ -157,9 +172,10 @@ func TestFileSinkCompress_Collect(t *testing.T) {
 			meta := map[string]interface{}{
 			meta := map[string]interface{}{
 				"file": filepath.Join(dir, filename),
 				"file": filepath.Join(dir, filename),
 			}
 			}
+			mc := conf.Clock.(*clock.Mock)
 			exp := []api.SourceTuple{
 			exp := []api.SourceTuple{
-				api.NewDefaultSourceTuple(map[string]interface{}{"key": "value1"}, meta),
-				api.NewDefaultSourceTuple(map[string]interface{}{"key": "value2"}, meta),
+				api.NewDefaultSourceTupleWithTime(map[string]interface{}{"key": "value1"}, meta, mc.Now()),
+				api.NewDefaultSourceTupleWithTime(map[string]interface{}{"key": "value2"}, meta, mc.Now()),
 			}
 			}
 			mock.TestSourceOpen(r, exp, t)
 			mock.TestSourceOpen(r, exp, t)
 
 

+ 4 - 2
internal/io/http/httppull_source.go

@@ -15,9 +15,10 @@
 package http
 package http
 
 
 import (
 import (
-	"github.com/lf-edge/ekuiper/pkg/infra"
 	"time"
 	"time"
 
 
+	"github.com/lf-edge/ekuiper/pkg/infra"
+
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/pkg/httpx"
 	"github.com/lf-edge/ekuiper/internal/pkg/httpx"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
@@ -70,6 +71,7 @@ func (hps *PullSource) initTimerPull(ctx api.StreamContext, consumer chan<- api.
 	for {
 	for {
 		select {
 		select {
 		case <-ticker.C:
 		case <-ticker.C:
+			rcvTime := conf.GetNow()
 			headers, err := hps.parseHeaders(ctx, hps.tokens)
 			headers, err := hps.parseHeaders(ctx, hps.tokens)
 			if err != nil {
 			if err != nil {
 				continue
 				continue
@@ -90,7 +92,7 @@ func (hps *PullSource) initTimerPull(ctx api.StreamContext, consumer chan<- api.
 				}
 				}
 				meta := make(map[string]interface{})
 				meta := make(map[string]interface{})
 				select {
 				select {
-				case consumer <- api.NewDefaultSourceTuple(result, meta):
+				case consumer <- api.NewDefaultSourceTupleWithTime(result, meta, rcvTime):
 					logger.Debugf("send data to device node")
 					logger.Debugf("send data to device node")
 				case <-ctx.Done():
 				case <-ctx.Done():
 					return
 					return

+ 8 - 6
internal/io/http/httppull_source_test.go

@@ -18,7 +18,9 @@ import (
 	"encoding/json"
 	"encoding/json"
 	"errors"
 	"errors"
 	"fmt"
 	"fmt"
+	"github.com/benbjohnson/clock"
 	"github.com/gorilla/mux"
 	"github.com/gorilla/mux"
+	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/io/mock"
 	"github.com/lf-edge/ekuiper/internal/io/mock"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"net"
 	"net"
@@ -793,9 +795,9 @@ func TestPullWithAuth(t *testing.T) {
 		t.Errorf(err.Error())
 		t.Errorf(err.Error())
 		return
 		return
 	}
 	}
-
+	mc := conf.Clock.(*clock.Mock)
 	exp := []api.SourceTuple{
 	exp := []api.SourceTuple{
-		api.NewDefaultSourceTuple(map[string]interface{}{"device_id": "device1", "humidity": 60.0, "temperature": 25.5}, map[string]interface{}{}),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"device_id": "device1", "humidity": 60.0, "temperature": 25.5}, map[string]interface{}{}, mc.Now()),
 	}
 	}
 	mock.TestSourceOpen(r, exp, t)
 	mock.TestSourceOpen(r, exp, t)
 }
 }
@@ -815,11 +817,11 @@ func TestPullIncremental(t *testing.T) {
 		t.Errorf(err.Error())
 		t.Errorf(err.Error())
 		return
 		return
 	}
 	}
-
+	mc := conf.Clock.(*clock.Mock)
 	exp := []api.SourceTuple{
 	exp := []api.SourceTuple{
-		api.NewDefaultSourceTuple(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "device0", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}),
-		api.NewDefaultSourceTuple(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "device1", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}),
-		api.NewDefaultSourceTuple(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "device2", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "device0", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}, mc.Now()),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "device1", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}, mc.Now()),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "device2", "humidity": 60.0, "temperature": 25.5}}, map[string]interface{}{}, mc.Now()),
 	}
 	}
 	mock.TestSourceOpen(r, exp, t)
 	mock.TestSourceOpen(r, exp, t)
 }
 }

+ 6 - 3
internal/io/memory/lookupsource_test.go

@@ -16,6 +16,7 @@ package memory
 
 
 import (
 import (
 	gocontext "context"
 	gocontext "context"
+	"github.com/benbjohnson/clock"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/io/memory/pubsub"
 	"github.com/lf-edge/ekuiper/internal/io/memory/pubsub"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
@@ -62,8 +63,9 @@ func TestUpdateLookup(t *testing.T) {
 			}
 			}
 		}
 		}
 	}()
 	}()
+	mc := conf.Clock.(*clock.Mock)
 	expected := []api.SourceTuple{
 	expected := []api.SourceTuple{
-		api.NewDefaultSourceTuple(map[string]interface{}{"ff": "value1", "gg": "value2"}, map[string]interface{}{"topic": "test"}),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"ff": "value1", "gg": "value2"}, map[string]interface{}{"topic": "test"}, mc.Now()),
 	}
 	}
 	result, err := ls.Lookup(ctx, []string{}, []string{"ff"}, []interface{}{"value1"})
 	result, err := ls.Lookup(ctx, []string{}, []string{"ff"}, []interface{}{"value1"})
 	if !reflect.DeepEqual(result, expected) {
 	if !reflect.DeepEqual(result, expected) {
@@ -109,9 +111,10 @@ func TestLookup(t *testing.T) {
 			}
 			}
 		}
 		}
 	}()
 	}()
+	mc := conf.Clock.(*clock.Mock)
 	expected := []api.SourceTuple{
 	expected := []api.SourceTuple{
-		api.NewDefaultSourceTuple(map[string]interface{}{"ff": "value1", "gg": "value2"}, map[string]interface{}{"topic": "test2"}),
-		api.NewDefaultSourceTuple(map[string]interface{}{"ff": "value1", "gg": "value4"}, map[string]interface{}{"topic": "test2"}),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"ff": "value1", "gg": "value2"}, map[string]interface{}{"topic": "test2"}, mc.Now()),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"ff": "value1", "gg": "value4"}, map[string]interface{}{"topic": "test2"}, mc.Now()),
 	}
 	}
 	result, err := ls.Lookup(ctx, []string{}, []string{"ff"}, []interface{}{"value1"})
 	result, err := ls.Lookup(ctx, []string{}, []string{"ff"}, []interface{}{"value1"})
 	if len(result) != 2 {
 	if len(result) != 2 {

+ 7 - 3
internal/io/memory/memory_test.go

@@ -16,6 +16,7 @@ package memory
 
 
 import (
 import (
 	"fmt"
 	"fmt"
+	"github.com/benbjohnson/clock"
 	"github.com/gdexlab/go-render/render"
 	"github.com/gdexlab/go-render/render"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/io/memory/pubsub"
 	"github.com/lf-edge/ekuiper/internal/io/memory/pubsub"
@@ -76,7 +77,8 @@ func TestSharedInmemoryNode(t *testing.T) {
 	for {
 	for {
 		select {
 		select {
 		case res := <-consumer:
 		case res := <-consumer:
-			expected := api.NewDefaultSourceTuple(data, map[string]interface{}{"topic": "test_id"})
+			mc := conf.Clock.(*clock.Mock)
+			expected := api.NewDefaultSourceTupleWithTime(data, map[string]interface{}{"topic": "test_id"}, mc.Now())
 			if !reflect.DeepEqual(expected, res) {
 			if !reflect.DeepEqual(expected, res) {
 				t.Errorf("result %s should be equal to %s", res, expected)
 				t.Errorf("result %s should be equal to %s", res, expected)
 			}
 			}
@@ -400,7 +402,9 @@ func TestMultipleTopics(t *testing.T) {
 	for res := range consumer {
 	for res := range consumer {
 		results = append(results, res)
 		results = append(results, res)
 	}
 	}
-	if !reflect.DeepEqual(expected, results) {
-		t.Errorf("Expect\t %v\n but got\t\t\t\t %v", render.AsCode(expected), render.AsCode(results))
+	for i, r := range results {
+		if !reflect.DeepEqual(r.Message(), expected[i].Message()) || !reflect.DeepEqual(r.Meta(), expected[i].Meta()) {
+			t.Errorf("Expect\t %v\n but got\t\t\t\t %v", render.AsCode(expected[i]), render.AsCode(r))
+		}
 	}
 	}
 }
 }

+ 2 - 2
internal/io/memory/pubsub/manager.go

@@ -110,12 +110,12 @@ func RemovePub(topic string) {
 }
 }
 
 
 func Produce(ctx api.StreamContext, topic string, data map[string]interface{}) {
 func Produce(ctx api.StreamContext, topic string, data map[string]interface{}) {
-	doProduce(ctx, topic, api.NewDefaultSourceTuple(data, map[string]interface{}{"topic": topic}))
+	doProduce(ctx, topic, api.NewDefaultSourceTupleWithTime(data, map[string]interface{}{"topic": topic}, conf.GetNow()))
 }
 }
 
 
 func ProduceUpdatable(ctx api.StreamContext, topic string, data map[string]interface{}, rowkind string, keyval interface{}) {
 func ProduceUpdatable(ctx api.StreamContext, topic string, data map[string]interface{}, rowkind string, keyval interface{}) {
 	doProduce(ctx, topic, &UpdatableTuple{
 	doProduce(ctx, topic, &UpdatableTuple{
-		DefaultSourceTuple: api.NewDefaultSourceTuple(data, map[string]interface{}{"topic": topic}),
+		DefaultSourceTuple: api.NewDefaultSourceTupleWithTime(data, map[string]interface{}{"topic": topic}, conf.GetNow()),
 		Rowkind:            rowkind,
 		Rowkind:            rowkind,
 		Keyval:             keyval,
 		Keyval:             keyval,
 	})
 	})

+ 6 - 4
internal/io/memory/sink_test.go

@@ -16,6 +16,7 @@ package memory
 
 
 import (
 import (
 	"fmt"
 	"fmt"
+	"github.com/benbjohnson/clock"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/io/memory/pubsub"
 	"github.com/lf-edge/ekuiper/internal/io/memory/pubsub"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
@@ -56,24 +57,25 @@ func TestUpdate(t *testing.T) {
 		fmt.Println(d)
 		fmt.Println(d)
 		actual = append(actual, d)
 		actual = append(actual, d)
 	}
 	}
+	mc := conf.Clock.(*clock.Mock)
 	expects := []api.SourceTuple{
 	expects := []api.SourceTuple{
 		&pubsub.UpdatableTuple{
 		&pubsub.UpdatableTuple{
-			DefaultSourceTuple: api.NewDefaultSourceTuple(map[string]interface{}{"id": "1", "verb": "insert", "name": "test1"}, map[string]interface{}{"topic": "testupdate"}),
+			DefaultSourceTuple: api.NewDefaultSourceTupleWithTime(map[string]interface{}{"id": "1", "verb": "insert", "name": "test1"}, map[string]interface{}{"topic": "testupdate"}, mc.Now()),
 			Rowkind:            "insert",
 			Rowkind:            "insert",
 			Keyval:             "1",
 			Keyval:             "1",
 		},
 		},
 		&pubsub.UpdatableTuple{
 		&pubsub.UpdatableTuple{
-			DefaultSourceTuple: api.NewDefaultSourceTuple(map[string]interface{}{"id": "2", "verb": "insert", "name": "test2"}, map[string]interface{}{"topic": "testupdate"}),
+			DefaultSourceTuple: api.NewDefaultSourceTupleWithTime(map[string]interface{}{"id": "2", "verb": "insert", "name": "test2"}, map[string]interface{}{"topic": "testupdate"}, mc.Now()),
 			Rowkind:            "insert",
 			Rowkind:            "insert",
 			Keyval:             "2",
 			Keyval:             "2",
 		},
 		},
 		&pubsub.UpdatableTuple{
 		&pubsub.UpdatableTuple{
-			DefaultSourceTuple: api.NewDefaultSourceTuple(map[string]interface{}{"id": "1", "verb": "update", "name": "test1"}, map[string]interface{}{"topic": "testupdate"}),
+			DefaultSourceTuple: api.NewDefaultSourceTupleWithTime(map[string]interface{}{"id": "1", "verb": "update", "name": "test1"}, map[string]interface{}{"topic": "testupdate"}, mc.Now()),
 			Rowkind:            "update",
 			Rowkind:            "update",
 			Keyval:             "1",
 			Keyval:             "1",
 		},
 		},
 		&pubsub.UpdatableTuple{
 		&pubsub.UpdatableTuple{
-			DefaultSourceTuple: api.NewDefaultSourceTuple(map[string]interface{}{"id": "2", "verb": "delete", "name": "test2"}, map[string]interface{}{"topic": "testupdate"}),
+			DefaultSourceTuple: api.NewDefaultSourceTupleWithTime(map[string]interface{}{"id": "2", "verb": "delete", "name": "test2"}, map[string]interface{}{"topic": "testupdate"}, mc.Now()),
 			Rowkind:            "delete",
 			Rowkind:            "delete",
 			Keyval:             "2",
 			Keyval:             "2",
 		},
 		},

+ 14 - 11
internal/io/memory/store/db_test.go

@@ -15,20 +15,23 @@
 package store
 package store
 
 
 import (
 import (
+	"github.com/benbjohnson/clock"
+	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"reflect"
 	"reflect"
 	"testing"
 	"testing"
 )
 )
 
 
 func TestTable(t *testing.T) {
 func TestTable(t *testing.T) {
+	mc := conf.Clock.(*clock.Mock)
 	tb := createTable("topicT", "a")
 	tb := createTable("topicT", "a")
-	tb.add(api.NewDefaultSourceTuple(map[string]interface{}{"a": 1, "b": "0"}, nil))
-	tb.add(api.NewDefaultSourceTuple(map[string]interface{}{"a": 2, "b": "0"}, nil))
-	tb.add(api.NewDefaultSourceTuple(map[string]interface{}{"a": 3, "b": "4"}, nil))
-	tb.add(api.NewDefaultSourceTuple(map[string]interface{}{"a": 1, "b": "1"}, nil))
+	tb.add(api.NewDefaultSourceTupleWithTime(map[string]interface{}{"a": 1, "b": "0"}, nil, mc.Now()))
+	tb.add(api.NewDefaultSourceTupleWithTime(map[string]interface{}{"a": 2, "b": "0"}, nil, mc.Now()))
+	tb.add(api.NewDefaultSourceTupleWithTime(map[string]interface{}{"a": 3, "b": "4"}, nil, mc.Now()))
+	tb.add(api.NewDefaultSourceTupleWithTime(map[string]interface{}{"a": 1, "b": "1"}, nil, mc.Now()))
 	v, _ := tb.Read([]string{"a"}, []interface{}{1})
 	v, _ := tb.Read([]string{"a"}, []interface{}{1})
 	exp := []api.SourceTuple{
 	exp := []api.SourceTuple{
-		api.NewDefaultSourceTuple(map[string]interface{}{"a": 1, "b": "1"}, nil),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"a": 1, "b": "1"}, nil, mc.Now()),
 	}
 	}
 	if !reflect.DeepEqual(v, exp) {
 	if !reflect.DeepEqual(v, exp) {
 		t.Errorf("read a 1 expect %v, but got %v", exp, v)
 		t.Errorf("read a 1 expect %v, but got %v", exp, v)
@@ -36,19 +39,19 @@ func TestTable(t *testing.T) {
 	}
 	}
 	v, _ = tb.Read([]string{"b"}, []interface{}{"0"})
 	v, _ = tb.Read([]string{"b"}, []interface{}{"0"})
 	exp = []api.SourceTuple{
 	exp = []api.SourceTuple{
-		api.NewDefaultSourceTuple(map[string]interface{}{"a": 2, "b": "0"}, nil),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"a": 2, "b": "0"}, nil, mc.Now()),
 	}
 	}
 	if !reflect.DeepEqual(v, exp) {
 	if !reflect.DeepEqual(v, exp) {
 		t.Errorf("read b 0 expect %v, but got %v", exp, v)
 		t.Errorf("read b 0 expect %v, but got %v", exp, v)
 		return
 		return
 	}
 	}
-	tb.add(api.NewDefaultSourceTuple(map[string]interface{}{"a": 5, "b": "0"}, nil))
+	tb.add(api.NewDefaultSourceTupleWithTime(map[string]interface{}{"a": 5, "b": "0"}, nil, mc.Now()))
 	tb.delete(3)
 	tb.delete(3)
-	tb.add(api.NewDefaultSourceTuple(map[string]interface{}{"a": 1, "b": "1"}, nil))
+	tb.add(api.NewDefaultSourceTupleWithTime(map[string]interface{}{"a": 1, "b": "1"}, nil, mc.Now()))
 	v, _ = tb.Read([]string{"b"}, []interface{}{"0"})
 	v, _ = tb.Read([]string{"b"}, []interface{}{"0"})
 	exp = []api.SourceTuple{
 	exp = []api.SourceTuple{
-		api.NewDefaultSourceTuple(map[string]interface{}{"a": 2, "b": "0"}, nil),
-		api.NewDefaultSourceTuple(map[string]interface{}{"a": 5, "b": "0"}, nil),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"a": 2, "b": "0"}, nil, mc.Now()),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"a": 5, "b": "0"}, nil, mc.Now()),
 	}
 	}
 	if len(v) != 2 {
 	if len(v) != 2 {
 		t.Errorf("read 1 again expect %v, but got %v", exp, v)
 		t.Errorf("read 1 again expect %v, but got %v", exp, v)
@@ -65,7 +68,7 @@ func TestTable(t *testing.T) {
 
 
 	v, _ = tb.Read([]string{"a", "b"}, []interface{}{1, "1"})
 	v, _ = tb.Read([]string{"a", "b"}, []interface{}{1, "1"})
 	exp = []api.SourceTuple{
 	exp = []api.SourceTuple{
-		api.NewDefaultSourceTuple(map[string]interface{}{"a": 1, "b": "1"}, nil),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"a": 1, "b": "1"}, nil, mc.Now()),
 	}
 	}
 	if !reflect.DeepEqual(v, exp) {
 	if !reflect.DeepEqual(v, exp) {
 		t.Errorf("read a,b expect %v, but got %v", exp, v)
 		t.Errorf("read a,b expect %v, but got %v", exp, v)

+ 4 - 2
internal/io/mock/test_source.go

@@ -70,7 +70,9 @@ outerloop:
 		return
 		return
 	}
 	}
 	cancel()
 	cancel()
-	if !reflect.DeepEqual(exp, result) {
-		t.Errorf("result mismatch:\n  exp=%s\n  got=%s\n\n", exp, result)
+	for i, v := range result {
+		if !reflect.DeepEqual(exp[i].Message(), v.Message()) || !reflect.DeepEqual(exp[i].Meta(), v.Meta()) {
+			t.Errorf("result mismatch:\n  exp=%s\n  got=%s\n\n", exp[i], v)
+		}
 	}
 	}
 }
 }

+ 5 - 3
internal/io/mqtt/mqtt_source.go

@@ -16,6 +16,9 @@ package mqtt
 
 
 import (
 import (
 	"fmt"
 	"fmt"
+	"path"
+	"strconv"
+
 	pahoMqtt "github.com/eclipse/paho.mqtt.golang"
 	pahoMqtt "github.com/eclipse/paho.mqtt.golang"
 	"github.com/lf-edge/ekuiper/internal/compressor"
 	"github.com/lf-edge/ekuiper/internal/compressor"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
@@ -24,8 +27,6 @@ import (
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/message"
 	"github.com/lf-edge/ekuiper/pkg/message"
-	"path"
-	"strconv"
 )
 )
 
 
 type MQTTSource struct {
 type MQTTSource struct {
@@ -149,6 +150,7 @@ func subscribe(ms *MQTTSource, ctx api.StreamContext, consumer chan<- api.Source
 }
 }
 
 
 func getTuple(ctx api.StreamContext, ms *MQTTSource, env interface{}) api.SourceTuple {
 func getTuple(ctx api.StreamContext, ms *MQTTSource, env interface{}) api.SourceTuple {
+	rcvTime := conf.GetNow()
 	msg, ok := env.(pahoMqtt.Message)
 	msg, ok := env.(pahoMqtt.Message)
 	if !ok { // should never happen
 	if !ok { // should never happen
 		return &xsql.ErrorSourceTuple{
 		return &xsql.ErrorSourceTuple{
@@ -182,7 +184,7 @@ func getTuple(ctx api.StreamContext, ms *MQTTSource, env interface{}) api.Source
 			ctx.GetLogger().Errorf(v)
 			ctx.GetLogger().Errorf(v)
 		}
 		}
 	}
 	}
-	return api.NewDefaultSourceTuple(result, meta)
+	return api.NewDefaultSourceTupleWithTime(result, meta, rcvTime)
 }
 }
 
 
 func (ms *MQTTSource) Close(ctx api.StreamContext) error {
 func (ms *MQTTSource) Close(ctx api.StreamContext) error {

+ 9 - 6
internal/io/neuron/multiple_test.go

@@ -15,6 +15,8 @@
 package neuron
 package neuron
 
 
 import (
 import (
+	"github.com/benbjohnson/clock"
+	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/io/mock"
 	"github.com/lf-edge/ekuiper/internal/io/mock"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"reflect"
 	"reflect"
@@ -28,15 +30,16 @@ func TestMultiNeuron(t *testing.T) {
 	// start and test 2 sources
 	// start and test 2 sources
 	url1 := "tcp://127.0.0.1:33331"
 	url1 := "tcp://127.0.0.1:33331"
 	url2 := "tcp://127.0.0.1:33332"
 	url2 := "tcp://127.0.0.1:33332"
+	mc := conf.Clock.(*clock.Mock)
 	exp1 := []api.SourceTuple{
 	exp1 := []api.SourceTuple{
-		api.NewDefaultSourceTuple(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 11.22, "tag_name2": "yellow"}, "errors": map[string]interface{}{"tag_name3": 122.0}}, map[string]interface{}{"topic": "$$neuron_tcp://127.0.0.1:33331"}),
-		api.NewDefaultSourceTuple(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 11.22, "tag_name2": "green", "tag_name3": 60.0}, "errors": map[string]interface{}{}}, map[string]interface{}{"topic": "$$neuron_tcp://127.0.0.1:33331"}),
-		api.NewDefaultSourceTuple(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 15.4, "tag_name2": "green", "tag_name3": 70.0}, "errors": map[string]interface{}{}}, map[string]interface{}{"topic": "$$neuron_tcp://127.0.0.1:33331"}),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 11.22, "tag_name2": "yellow"}, "errors": map[string]interface{}{"tag_name3": 122.0}}, map[string]interface{}{"topic": "$$neuron_tcp://127.0.0.1:33331"}, mc.Now()),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 11.22, "tag_name2": "green", "tag_name3": 60.0}, "errors": map[string]interface{}{}}, map[string]interface{}{"topic": "$$neuron_tcp://127.0.0.1:33331"}, mc.Now()),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 15.4, "tag_name2": "green", "tag_name3": 70.0}, "errors": map[string]interface{}{}}, map[string]interface{}{"topic": "$$neuron_tcp://127.0.0.1:33331"}, mc.Now()),
 	}
 	}
 	exp2 := []api.SourceTuple{
 	exp2 := []api.SourceTuple{
-		api.NewDefaultSourceTuple(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 11.22, "tag_name2": "yellow"}, "errors": map[string]interface{}{"tag_name3": 122.0}}, map[string]interface{}{"topic": "$$neuron_tcp://127.0.0.1:33332"}),
-		api.NewDefaultSourceTuple(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 11.22, "tag_name2": "green", "tag_name3": 60.0}, "errors": map[string]interface{}{}}, map[string]interface{}{"topic": "$$neuron_tcp://127.0.0.1:33332"}),
-		api.NewDefaultSourceTuple(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 15.4, "tag_name2": "green", "tag_name3": 70.0}, "errors": map[string]interface{}{}}, map[string]interface{}{"topic": "$$neuron_tcp://127.0.0.1:33332"}),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 11.22, "tag_name2": "yellow"}, "errors": map[string]interface{}{"tag_name3": 122.0}}, map[string]interface{}{"topic": "$$neuron_tcp://127.0.0.1:33332"}, mc.Now()),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 11.22, "tag_name2": "green", "tag_name3": 60.0}, "errors": map[string]interface{}{}}, map[string]interface{}{"topic": "$$neuron_tcp://127.0.0.1:33332"}, mc.Now()),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 15.4, "tag_name2": "green", "tag_name3": 70.0}, "errors": map[string]interface{}{}}, map[string]interface{}{"topic": "$$neuron_tcp://127.0.0.1:33332"}, mc.Now()),
 	}
 	}
 	s1 := GetSource()
 	s1 := GetSource()
 	err := s1.Configure("new", map[string]interface{}{"url": url1})
 	err := s1.Configure("new", map[string]interface{}{"url": url1})

+ 6 - 3
internal/io/neuron/neuron_test.go

@@ -16,6 +16,8 @@ package neuron
 
 
 import (
 import (
 	"fmt"
 	"fmt"
+	"github.com/benbjohnson/clock"
+	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/io/mock"
 	"github.com/lf-edge/ekuiper/internal/io/mock"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"go.nanomsg.org/mangos/v3"
 	"go.nanomsg.org/mangos/v3"
@@ -78,11 +80,12 @@ func mockNeuron(send bool, recv bool, url string) (mangos.Socket, chan []byte) {
 
 
 // Test scenario of multiple neuron sources and sinks
 // Test scenario of multiple neuron sources and sinks
 func TestMultiSourceSink(t *testing.T) {
 func TestMultiSourceSink(t *testing.T) {
+	mc := conf.Clock.(*clock.Mock)
 	// start and test 2 sources
 	// start and test 2 sources
 	exp := []api.SourceTuple{
 	exp := []api.SourceTuple{
-		api.NewDefaultSourceTuple(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 11.22, "tag_name2": "yellow"}, "errors": map[string]interface{}{"tag_name3": 122.0}}, map[string]interface{}{"topic": "$$neuron_ipc:///tmp/neuron-ekuiper.ipc"}),
-		api.NewDefaultSourceTuple(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 11.22, "tag_name2": "green", "tag_name3": 60.0}, "errors": map[string]interface{}{}}, map[string]interface{}{"topic": "$$neuron_ipc:///tmp/neuron-ekuiper.ipc"}),
-		api.NewDefaultSourceTuple(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 15.4, "tag_name2": "green", "tag_name3": 70.0}, "errors": map[string]interface{}{}}, map[string]interface{}{"topic": "$$neuron_ipc:///tmp/neuron-ekuiper.ipc"}),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 11.22, "tag_name2": "yellow"}, "errors": map[string]interface{}{"tag_name3": 122.0}}, map[string]interface{}{"topic": "$$neuron_ipc:///tmp/neuron-ekuiper.ipc"}, mc.Now()),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 11.22, "tag_name2": "green", "tag_name3": 60.0}, "errors": map[string]interface{}{}}, map[string]interface{}{"topic": "$$neuron_ipc:///tmp/neuron-ekuiper.ipc"}, mc.Now()),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 15.4, "tag_name2": "green", "tag_name3": 70.0}, "errors": map[string]interface{}{}}, map[string]interface{}{"topic": "$$neuron_ipc:///tmp/neuron-ekuiper.ipc"}, mc.Now()),
 	}
 	}
 	s1 := GetSource()
 	s1 := GetSource()
 	err := s1.Configure("new", nil)
 	err := s1.Configure("new", nil)

+ 6 - 3
internal/io/neuron/source_test.go

@@ -16,6 +16,8 @@ package neuron
 
 
 import (
 import (
 	"fmt"
 	"fmt"
+	"github.com/benbjohnson/clock"
+	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/io/mock"
 	"github.com/lf-edge/ekuiper/internal/io/mock"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
@@ -26,10 +28,11 @@ import (
 )
 )
 
 
 func TestRun(t *testing.T) {
 func TestRun(t *testing.T) {
+	mc := conf.Clock.(*clock.Mock)
 	exp := []api.SourceTuple{
 	exp := []api.SourceTuple{
-		api.NewDefaultSourceTuple(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 11.22, "tag_name2": "yellow"}, "errors": map[string]interface{}{"tag_name3": 122.0}}, map[string]interface{}{"topic": "$$neuron_ipc:///tmp/neuron-ekuiper.ipc"}),
-		api.NewDefaultSourceTuple(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 11.22, "tag_name2": "green", "tag_name3": 60.0}, "errors": map[string]interface{}{}}, map[string]interface{}{"topic": "$$neuron_ipc:///tmp/neuron-ekuiper.ipc"}),
-		api.NewDefaultSourceTuple(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 15.4, "tag_name2": "green", "tag_name3": 70.0}, "errors": map[string]interface{}{}}, map[string]interface{}{"topic": "$$neuron_ipc:///tmp/neuron-ekuiper.ipc"}),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 11.22, "tag_name2": "yellow"}, "errors": map[string]interface{}{"tag_name3": 122.0}}, map[string]interface{}{"topic": "$$neuron_ipc:///tmp/neuron-ekuiper.ipc"}, mc.Now()),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 11.22, "tag_name2": "green", "tag_name3": 60.0}, "errors": map[string]interface{}{}}, map[string]interface{}{"topic": "$$neuron_ipc:///tmp/neuron-ekuiper.ipc"}, mc.Now()),
+		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"group_name": "group1", "timestamp": 1646125996000.0, "node_name": "node1", "values": map[string]interface{}{"tag_name1": 15.4, "tag_name2": "green", "tag_name3": 70.0}, "errors": map[string]interface{}{}}, map[string]interface{}{"topic": "$$neuron_ipc:///tmp/neuron-ekuiper.ipc"}, mc.Now()),
 	}
 	}
 	s := GetSource()
 	s := GetSource()
 	err := s.Configure("new", nil)
 	err := s.Configure("new", nil)

+ 4 - 2
internal/io/redis/lookup.go

@@ -21,6 +21,7 @@ import (
 	"encoding/json"
 	"encoding/json"
 	"errors"
 	"errors"
 	"fmt"
 	"fmt"
+	cnf "github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/redis/go-redis/v9"
 	"github.com/redis/go-redis/v9"
@@ -79,6 +80,7 @@ func (s *lookupSource) Open(ctx api.StreamContext) error {
 }
 }
 
 
 func (s *lookupSource) Lookup(ctx api.StreamContext, _ []string, keys []string, values []interface{}) ([]api.SourceTuple, error) {
 func (s *lookupSource) Lookup(ctx api.StreamContext, _ []string, keys []string, values []interface{}) ([]api.SourceTuple, error) {
+	rcvTime := cnf.GetNow()
 	ctx.GetLogger().Debugf("Lookup redis %v", keys)
 	ctx.GetLogger().Debugf("Lookup redis %v", keys)
 	if len(keys) != 1 {
 	if len(keys) != 1 {
 		return nil, fmt.Errorf("redis lookup only support one key, but got %v", keys)
 		return nil, fmt.Errorf("redis lookup only support one key, but got %v", keys)
@@ -97,7 +99,7 @@ func (s *lookupSource) Lookup(ctx api.StreamContext, _ []string, keys []string,
 		if err != nil {
 		if err != nil {
 			return nil, err
 			return nil, err
 		}
 		}
-		return []api.SourceTuple{api.NewDefaultSourceTuple(m, nil)}, nil
+		return []api.SourceTuple{api.NewDefaultSourceTupleWithTime(m, nil, rcvTime)}, nil
 	} else {
 	} else {
 		res, err := s.cli.LRange(ctx, v, 0, -1).Result()
 		res, err := s.cli.LRange(ctx, v, 0, -1).Result()
 		if err != nil {
 		if err != nil {
@@ -113,7 +115,7 @@ func (s *lookupSource) Lookup(ctx api.StreamContext, _ []string, keys []string,
 			if err != nil {
 			if err != nil {
 				return nil, err
 				return nil, err
 			}
 			}
-			ret = append(ret, api.NewDefaultSourceTuple(m, nil))
+			ret = append(ret, api.NewDefaultSourceTupleWithTime(m, nil, rcvTime))
 		}
 		}
 		return ret, nil
 		return ret, nil
 	}
 	}

+ 19 - 7
internal/io/redis/lookup_test.go

@@ -18,6 +18,7 @@ package redis
 
 
 import (
 import (
 	"github.com/alicebob/miniredis/v2"
 	"github.com/alicebob/miniredis/v2"
+	"github.com/benbjohnson/clock"
 	econf "github.com/lf-edge/ekuiper/internal/conf"
 	econf "github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
@@ -62,6 +63,7 @@ func TestSingle(t *testing.T) {
 		t.Error(err)
 		t.Error(err)
 		return
 		return
 	}
 	}
+	mc := econf.Clock.(*clock.Mock)
 	var tests = []struct {
 	var tests = []struct {
 		value  int
 		value  int
 		result []api.SourceTuple
 		result []api.SourceTuple
@@ -69,12 +71,12 @@ func TestSingle(t *testing.T) {
 		{
 		{
 			value: 1,
 			value: 1,
 			result: []api.SourceTuple{
 			result: []api.SourceTuple{
-				api.NewDefaultSourceTuple(map[string]interface{}{"id": float64(1), "name": "John", "address": float64(34), "mobile": "334433"}, nil),
+				api.NewDefaultSourceTupleWithTime(map[string]interface{}{"id": float64(1), "name": "John", "address": float64(34), "mobile": "334433"}, nil, mc.Now()),
 			},
 			},
 		}, {
 		}, {
 			value: 2,
 			value: 2,
 			result: []api.SourceTuple{
 			result: []api.SourceTuple{
-				api.NewDefaultSourceTuple(map[string]interface{}{"id": float64(2), "name": "Susan", "address": float64(22), "mobile": "666433"}, nil),
+				api.NewDefaultSourceTupleWithTime(map[string]interface{}{"id": float64(2), "name": "Susan", "address": float64(22), "mobile": "666433"}, nil, mc.Now()),
 			},
 			},
 		}, {
 		}, {
 			value:  3,
 			value:  3,
@@ -87,7 +89,7 @@ func TestSingle(t *testing.T) {
 			t.Errorf("Test %d: %v", i, err)
 			t.Errorf("Test %d: %v", i, err)
 			continue
 			continue
 		}
 		}
-		if !reflect.DeepEqual(actual, tt.result) {
+		if !deepEqual(actual, tt.result) {
 			t.Errorf("Test %d: expected %v, actual %v", i, tt.result, actual)
 			t.Errorf("Test %d: expected %v, actual %v", i, tt.result, actual)
 			continue
 			continue
 		}
 		}
@@ -108,6 +110,7 @@ func TestList(t *testing.T) {
 		t.Error(err)
 		t.Error(err)
 		return
 		return
 	}
 	}
+	mc := econf.Clock.(*clock.Mock)
 	var tests = []struct {
 	var tests = []struct {
 		value  string
 		value  string
 		result []api.SourceTuple
 		result []api.SourceTuple
@@ -115,13 +118,13 @@ func TestList(t *testing.T) {
 		{
 		{
 			value: "group1",
 			value: "group1",
 			result: []api.SourceTuple{
 			result: []api.SourceTuple{
-				api.NewDefaultSourceTuple(map[string]interface{}{"id": float64(2), "name": "Susan"}, nil),
-				api.NewDefaultSourceTuple(map[string]interface{}{"id": float64(1), "name": "John"}, nil),
+				api.NewDefaultSourceTupleWithTime(map[string]interface{}{"id": float64(2), "name": "Susan"}, nil, mc.Now()),
+				api.NewDefaultSourceTupleWithTime(map[string]interface{}{"id": float64(1), "name": "John"}, nil, mc.Now()),
 			},
 			},
 		}, {
 		}, {
 			value: "group2",
 			value: "group2",
 			result: []api.SourceTuple{
 			result: []api.SourceTuple{
-				api.NewDefaultSourceTuple(map[string]interface{}{"id": float64(3), "name": "Nancy"}, nil),
+				api.NewDefaultSourceTupleWithTime(map[string]interface{}{"id": float64(3), "name": "Nancy"}, nil, mc.Now()),
 			},
 			},
 		}, {
 		}, {
 			value:  "group4",
 			value:  "group4",
@@ -134,9 +137,18 @@ func TestList(t *testing.T) {
 			t.Errorf("Test %d: %v", i, err)
 			t.Errorf("Test %d: %v", i, err)
 			continue
 			continue
 		}
 		}
-		if !reflect.DeepEqual(actual, tt.result) {
+		if !deepEqual(actual, tt.result) {
 			t.Errorf("Test %d: expected %v, actual %v", i, tt.result, actual)
 			t.Errorf("Test %d: expected %v, actual %v", i, tt.result, actual)
 			continue
 			continue
 		}
 		}
 	}
 	}
 }
 }
+
+func deepEqual(a []api.SourceTuple, b []api.SourceTuple) bool {
+	for i, val := range a {
+		if !reflect.DeepEqual(val.Message(), b[i].Message()) || !reflect.DeepEqual(val.Meta(), b[i].Meta()) {
+			return false
+		}
+	}
+	return true
+}

+ 3 - 1
internal/plugin/portable/runtime/source.go

@@ -18,6 +18,8 @@ import (
 	"encoding/json"
 	"encoding/json"
 	"errors"
 	"errors"
 	"fmt"
 	"fmt"
+
+	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/infra"
 	"github.com/lf-edge/ekuiper/pkg/infra"
 	"go.nanomsg.org/mangos/v3"
 	"go.nanomsg.org/mangos/v3"
@@ -113,7 +115,7 @@ func (ps *PortableSource) Open(ctx api.StreamContext, consumer chan<- api.Source
 			infra.DrainError(ctx, fmt.Errorf("cannot receive from mangos Socket: %s", err.Error()), errCh)
 			infra.DrainError(ctx, fmt.Errorf("cannot receive from mangos Socket: %s", err.Error()), errCh)
 			return
 			return
 		}
 		}
-		result := &api.DefaultSourceTuple{}
+		result := &api.DefaultSourceTuple{Time: conf.GetNow()}
 		e := json.Unmarshal(msg, result)
 		e := json.Unmarshal(msg, result)
 		if e != nil {
 		if e != nil {
 			ctx.GetLogger().Errorf("Invalid data format, cannot decode %s to json format with error %s", string(msg), e)
 			ctx.GetLogger().Errorf("Invalid data format, cannot decode %s to json format with error %s", string(msg), e)

+ 5 - 5
internal/topo/lookup/cache/cache_test.go

@@ -1,4 +1,4 @@
-// Copyright 2022 EMQ Technologies Co., Ltd.
+// Copyright 2022-2023 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -28,8 +28,8 @@ func TestExpiration(t *testing.T) {
 	defer c.Close()
 	defer c.Close()
 	clock := conf.Clock.(*clock.Mock)
 	clock := conf.Clock.(*clock.Mock)
 	expects := [][]api.SourceTuple{
 	expects := [][]api.SourceTuple{
-		{api.NewDefaultSourceTuple(map[string]interface{}{"a": 1}, nil)},
-		{api.NewDefaultSourceTuple(map[string]interface{}{"a": 2}, nil), api.NewDefaultSourceTuple(map[string]interface{}{"a": 3}, nil)},
+		{api.NewDefaultSourceTupleWithTime(map[string]interface{}{"a": 1}, nil, clock.Now())},
+		{api.NewDefaultSourceTupleWithTime(map[string]interface{}{"a": 2}, nil, clock.Now()), api.NewDefaultSourceTupleWithTime(map[string]interface{}{"a": 3}, nil, clock.Now())},
 		{},
 		{},
 	}
 	}
 	c.Set("a", expects[0])
 	c.Set("a", expects[0])
@@ -86,8 +86,8 @@ func TestNoExpiration(t *testing.T) {
 	defer c.Close()
 	defer c.Close()
 	clock := conf.Clock.(*clock.Mock)
 	clock := conf.Clock.(*clock.Mock)
 	expects := [][]api.SourceTuple{
 	expects := [][]api.SourceTuple{
-		{api.NewDefaultSourceTuple(map[string]interface{}{"a": 1}, nil)},
-		{api.NewDefaultSourceTuple(map[string]interface{}{"a": 2}, nil), api.NewDefaultSourceTuple(map[string]interface{}{"a": 3}, nil)},
+		{api.NewDefaultSourceTupleWithTime(map[string]interface{}{"a": 1}, nil, clock.Now())},
+		{api.NewDefaultSourceTupleWithTime(map[string]interface{}{"a": 2}, nil, clock.Now()), api.NewDefaultSourceTupleWithTime(map[string]interface{}{"a": 3}, nil, clock.Now())},
 		{},
 		{},
 	}
 	}
 	c.Set("a", expects[0])
 	c.Set("a", expects[0])

+ 5 - 2
internal/topo/node/dynamic_channel_buffer_test.go

@@ -1,4 +1,4 @@
-// Copyright 2022 EMQ Technologies Co., Ltd.
+// Copyright 2022-2023 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -16,6 +16,8 @@ package node
 
 
 import (
 import (
 	"fmt"
 	"fmt"
+	"github.com/benbjohnson/clock"
+	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"testing"
 	"testing"
 	"time"
 	"time"
@@ -25,10 +27,11 @@ func TestBuffer(t *testing.T) {
 	b := NewDynamicChannelBuffer()
 	b := NewDynamicChannelBuffer()
 	b.SetLimit(100)
 	b.SetLimit(100)
 	stopSign := make(chan struct{})
 	stopSign := make(chan struct{})
+	mc := conf.Clock.(*clock.Mock)
 	go func(done chan struct{}) {
 	go func(done chan struct{}) {
 		for i := 0; i < 100; i++ {
 		for i := 0; i < 100; i++ {
 			select {
 			select {
-			case b.In <- api.NewDefaultSourceTuple(map[string]interface{}{"a": 5}, nil):
+			case b.In <- api.NewDefaultSourceTupleWithTime(map[string]interface{}{"a": 5}, nil, mc.Now()):
 				fmt.Printf("feed in %d\n", i)
 				fmt.Printf("feed in %d\n", i)
 			default:
 			default:
 				t.Errorf("message %d dropped, should not drop message", i)
 				t.Errorf("message %d dropped, should not drop message", i)

+ 14 - 13
internal/topo/node/lookup_node_test.go

@@ -1,4 +1,4 @@
-// Copyright 2022 EMQ Technologies Co., Ltd.
+// Copyright 2022-2023 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -45,14 +45,15 @@ func (m *mockLookupSrc) Configure(_ string, _ map[string]interface{}) error {
 
 
 // Lookup accept int value as the first array value
 // Lookup accept int value as the first array value
 func (m *mockLookupSrc) Lookup(_ api.StreamContext, fields []string, _ []string, values []interface{}) ([]api.SourceTuple, error) {
 func (m *mockLookupSrc) Lookup(_ api.StreamContext, fields []string, _ []string, values []interface{}) ([]api.SourceTuple, error) {
+	mc := conf.Clock.(*clock.Mock)
 	if len(fields) > 0 { // if fields is not empty, the value will be kept
 	if len(fields) > 0 { // if fields is not empty, the value will be kept
 		if m.data != nil {
 		if m.data != nil {
 			return m.data, nil
 			return m.data, nil
 		} else {
 		} else {
-			m.data = []api.SourceTuple{api.NewDefaultSourceTuple(map[string]interface{}{
+			m.data = []api.SourceTuple{api.NewDefaultSourceTupleWithTime(map[string]interface{}{
 				"newA": 1000,
 				"newA": 1000,
 				"newB": 1000,
 				"newB": 1000,
-			}, nil)}
+			}, nil, mc.Now())}
 		}
 		}
 	}
 	}
 	a1, ok := values[0].(int)
 	a1, ok := values[0].(int)
@@ -60,39 +61,39 @@ func (m *mockLookupSrc) Lookup(_ api.StreamContext, fields []string, _ []string,
 		var result []api.SourceTuple
 		var result []api.SourceTuple
 		c := a1 % 2
 		c := a1 % 2
 		if c != 0 {
 		if c != 0 {
-			result = append(result, api.NewDefaultSourceTuple(map[string]interface{}{
+			result = append(result, api.NewDefaultSourceTupleWithTime(map[string]interface{}{
 				"newA": c,
 				"newA": c,
 				"newB": c * 2,
 				"newB": c * 2,
-			}, nil))
+			}, nil, mc.Now()))
 		}
 		}
 		c = a1 % 3
 		c = a1 % 3
 		if c != 0 {
 		if c != 0 {
-			result = append(result, api.NewDefaultSourceTuple(map[string]interface{}{
+			result = append(result, api.NewDefaultSourceTupleWithTime(map[string]interface{}{
 				"newA": c,
 				"newA": c,
 				"newB": c * 2,
 				"newB": c * 2,
-			}, nil))
+			}, nil, mc.Now()))
 		}
 		}
 		c = a1 % 5
 		c = a1 % 5
 		if c != 0 {
 		if c != 0 {
-			result = append(result, api.NewDefaultSourceTuple(map[string]interface{}{
+			result = append(result, api.NewDefaultSourceTupleWithTime(map[string]interface{}{
 				"newA": c,
 				"newA": c,
 				"newB": c * 2,
 				"newB": c * 2,
-			}, nil))
+			}, nil, mc.Now()))
 		}
 		}
 		c = a1 % 7
 		c = a1 % 7
 		if c != 0 {
 		if c != 0 {
-			result = append(result, api.NewDefaultSourceTuple(map[string]interface{}{
+			result = append(result, api.NewDefaultSourceTupleWithTime(map[string]interface{}{
 				"newA": c,
 				"newA": c,
 				"newB": c * 2,
 				"newB": c * 2,
-			}, nil))
+			}, nil, mc.Now()))
 		}
 		}
 		return result, nil
 		return result, nil
 	} else {
 	} else {
 		return []api.SourceTuple{
 		return []api.SourceTuple{
-			api.NewDefaultSourceTuple(map[string]interface{}{
+			api.NewDefaultSourceTupleWithTime(map[string]interface{}{
 				"newA": 0,
 				"newA": 0,
 				"newB": 0,
 				"newB": 0,
-			}, nil),
+			}, nil, mc.Now()),
 		}, nil
 		}, nil
 	}
 	}
 }
 }

+ 8 - 1
internal/topo/node/metric/stats_manager.go

@@ -16,8 +16,9 @@ package metric
 
 
 import (
 import (
 	"fmt"
 	"fmt"
-	"github.com/lf-edge/ekuiper/pkg/api"
 	"time"
 	"time"
+
+	"github.com/lf-edge/ekuiper/pkg/api"
 )
 )
 
 
 const RecordsInTotal = "records_in_total"
 const RecordsInTotal = "records_in_total"
@@ -38,6 +39,7 @@ type StatManager interface {
 	ProcessTimeStart()
 	ProcessTimeStart()
 	ProcessTimeEnd()
 	ProcessTimeEnd()
 	SetBufferLength(l int64)
 	SetBufferLength(l int64)
+	SetProcessTimeStart(t time.Time)
 	GetMetrics() []interface{}
 	GetMetrics() []interface{}
 	// Clean remove all metrics history
 	// Clean remove all metrics history
 	Clean(ruleId string)
 	Clean(ruleId string)
@@ -115,6 +117,11 @@ func (sm *DefaultStatManager) SetBufferLength(l int64) {
 	sm.bufferLength = l
 	sm.bufferLength = l
 }
 }
 
 
+func (sm *DefaultStatManager) SetProcessTimeStart(t time.Time) {
+	sm.processTimeStart = t
+	sm.lastInvocation = t
+}
+
 func (sm *DefaultStatManager) GetMetrics() []interface{} {
 func (sm *DefaultStatManager) GetMetrics() []interface{} {
 	result := []interface{}{
 	result := []interface{}{
 		sm.totalRecordsIn,
 		sm.totalRecordsIn,

+ 8 - 3
internal/topo/node/source_node.go

@@ -16,6 +16,8 @@ package node
 
 
 import (
 import (
 	"fmt"
 	"fmt"
+	"sync"
+
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/converter"
 	"github.com/lf-edge/ekuiper/internal/converter"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
@@ -26,7 +28,6 @@ import (
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/infra"
 	"github.com/lf-edge/ekuiper/pkg/infra"
-	"sync"
 )
 )
 
 
 type SourceNode struct {
 type SourceNode struct {
@@ -149,8 +150,12 @@ func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
 									continue
 									continue
 								}
 								}
 								stats.IncTotalRecordsIn()
 								stats.IncTotalRecordsIn()
-								stats.ProcessTimeStart()
-								tuple := &xsql.Tuple{Emitter: m.name, Message: data.Message(), Timestamp: conf.GetNowInMilli(), Metadata: data.Meta()}
+								rcvTime := conf.GetNow()
+								if !data.Timestamp().IsZero() {
+									rcvTime = data.Timestamp()
+								}
+								stats.SetProcessTimeStart(rcvTime)
+								tuple := &xsql.Tuple{Emitter: m.name, Message: data.Message(), Timestamp: rcvTime.UnixMilli(), Metadata: data.Meta()}
 								var processedData interface{}
 								var processedData interface{}
 								if m.preprocessOp != nil {
 								if m.preprocessOp != nil {
 									processedData = m.preprocessOp.Apply(ctx, tuple, nil, nil)
 									processedData = m.preprocessOp.Apply(ctx, tuple, nil, nil)

+ 2 - 2
internal/topo/topotest/mocknode/mock_source.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -56,7 +56,7 @@ func (m *MockSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple
 		case <-next:
 		case <-next:
 			m.Lock()
 			m.Lock()
 			m.offset = i + 1
 			m.offset = i + 1
-			consumer <- api.NewDefaultSourceTuple(d.Message, xsql.Metadata{"topic": "mock"})
+			consumer <- api.NewDefaultSourceTupleWithTime(d.Message, xsql.Metadata{"topic": "mock"}, mockClock.Now())
 			log.Debugf("%d: mock source %s is sending data %d:%s", cast.TimeToUnixMilli(mockClock.Now()), ctx.GetOpId(), i, d)
 			log.Debugf("%d: mock source %s is sending data %d:%s", cast.TimeToUnixMilli(mockClock.Now()), ctx.GetOpId(), i, d)
 			m.Unlock()
 			m.Unlock()
 		case <-ctx.Done():
 		case <-ctx.Done():

+ 11 - 1
internal/xsql/source_tuple.go

@@ -1,4 +1,4 @@
-// Copyright 2022 EMQ Technologies Co., Ltd.
+// Copyright 2022-2023 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -14,6 +14,12 @@
 
 
 package xsql
 package xsql
 
 
+import (
+	"time"
+
+	"github.com/lf-edge/ekuiper/internal/conf"
+)
+
 type ErrorSourceTuple struct {
 type ErrorSourceTuple struct {
 	Error error `json:"error"`
 	Error error `json:"error"`
 }
 }
@@ -25,3 +31,7 @@ func (t *ErrorSourceTuple) Message() map[string]interface{} {
 func (t *ErrorSourceTuple) Meta() map[string]interface{} {
 func (t *ErrorSourceTuple) Meta() map[string]interface{} {
 	return nil
 	return nil
 }
 }
+
+func (t *ErrorSourceTuple) Timestamp() time.Time {
+	return conf.GetNow()
+}

+ 16 - 1
pkg/api/stream.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -17,22 +17,33 @@ package api
 import (
 import (
 	"context"
 	"context"
 	"sync"
 	"sync"
+	"time"
 )
 )
 
 
 type SourceTuple interface {
 type SourceTuple interface {
 	Message() map[string]interface{}
 	Message() map[string]interface{}
 	Meta() map[string]interface{}
 	Meta() map[string]interface{}
+	Timestamp() time.Time
 }
 }
 
 
 type DefaultSourceTuple struct {
 type DefaultSourceTuple struct {
 	Mess map[string]interface{} `json:"message"`
 	Mess map[string]interface{} `json:"message"`
 	M    map[string]interface{} `json:"meta"`
 	M    map[string]interface{} `json:"meta"`
+	Time time.Time              `json:"timestamp"`
 }
 }
 
 
 func NewDefaultSourceTuple(message map[string]interface{}, meta map[string]interface{}) *DefaultSourceTuple {
 func NewDefaultSourceTuple(message map[string]interface{}, meta map[string]interface{}) *DefaultSourceTuple {
 	return &DefaultSourceTuple{
 	return &DefaultSourceTuple{
 		Mess: message,
 		Mess: message,
 		M:    meta,
 		M:    meta,
+		Time: time.Now(),
+	}
+}
+func NewDefaultSourceTupleWithTime(message map[string]interface{}, meta map[string]interface{}, timestamp time.Time) *DefaultSourceTuple {
+	return &DefaultSourceTuple{
+		Mess: message,
+		M:    meta,
+		Time: timestamp,
 	}
 	}
 }
 }
 
 
@@ -43,6 +54,10 @@ func (t *DefaultSourceTuple) Meta() map[string]interface{} {
 	return t.M
 	return t.M
 }
 }
 
 
+func (t *DefaultSourceTuple) Timestamp() time.Time {
+	return t.Time
+}
+
 type Logger interface {
 type Logger interface {
 	Debug(args ...interface{})
 	Debug(args ...interface{})
 	Info(args ...interface{})
 	Info(args ...interface{})