Browse Source

fix: fix source processLatency in metrics (#2067)

* fix latency

Signed-off-by: Rui-Gan <1171530954@qq.com>

* fix comment

Signed-off-by: Rui-Gan <1171530954@qq.com>

---------

Signed-off-by: Rui-Gan <1171530954@qq.com>
Regina 1 year ago
parent
commit
7f823d7b02
2 changed files with 4 additions and 0 deletions
  1. 1 0
      extensions/sources/sql/sql.go
  2. 3 0
      internal/io/file/file_source.go

+ 1 - 0
extensions/sources/sql/sql.go

@@ -117,6 +117,7 @@ func (m *sqlsource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple,
 				scanIntoMap(data, columns, cols)
 				m.Query.UpdateMaxIndexValue(data)
 				consumer <- api.NewDefaultSourceTupleWithTime(data, nil, rcvTime)
+				rcvTime = conf.GetNow()
 			}
 		case <-ctx.Done():
 			return

+ 3 - 0
internal/io/file/file_source.go

@@ -283,6 +283,7 @@ func (fs *FileSource) publish(ctx api.StreamContext, file io.Reader, consumer ch
 			if fs.config.SendInterval > 0 {
 				time.Sleep(time.Millisecond * time.Duration(fs.config.SendInterval))
 			}
+			rcvTime = conf.GetNow()
 		}
 		return nil
 	case CSV_TYPE:
@@ -334,6 +335,7 @@ func (fs *FileSource) publish(ctx api.StreamContext, file io.Reader, consumer ch
 			if fs.config.SendInterval > 0 {
 				time.Sleep(time.Millisecond * time.Duration(fs.config.SendInterval))
 			}
+			rcvTime = conf.GetNow()
 		}
 	case LINES_TYPE:
 		scanner := bufio.NewScanner(file)
@@ -360,6 +362,7 @@ func (fs *FileSource) publish(ctx api.StreamContext, file io.Reader, consumer ch
 			if fs.config.SendInterval > 0 {
 				time.Sleep(time.Millisecond * time.Duration(fs.config.SendInterval))
 			}
+			rcvTime = conf.GetNow()
 		}
 	default:
 		return fmt.Errorf("invalid file type %s", fs.config.FileType)