|
@@ -33,8 +33,8 @@ type sinkConf struct {
|
|
|
RollingInterval int64 `json:"rollingInterval"`
|
|
|
RollingCount int `json:"rollingCount"`
|
|
|
RollingNamePattern string `json:"rollingNamePattern"` // where to add the timestamp to the file name
|
|
|
- CheckInterval *int64 `json:"checkInterval"` // Once interval removed, this will be NOT nullable
|
|
|
- Path string `json:"path"` // support dynamic property, when rolling, make sure the path is updated
|
|
|
+ CheckInterval int64 `json:"checkInterval"`
|
|
|
+ Path string `json:"path"` // support dynamic property, when rolling, make sure the path is updated
|
|
|
FileType FileType `json:"fileType"`
|
|
|
HasHeader bool `json:"hasHeader"`
|
|
|
Delimiter string `json:"delimiter"`
|
|
@@ -52,17 +52,14 @@ type fileSink struct {
|
|
|
|
|
|
func (m *fileSink) Configure(props map[string]interface{}) error {
|
|
|
c := &sinkConf{
|
|
|
- RollingCount: 1000000,
|
|
|
- Path: "cache",
|
|
|
- FileType: LINES_TYPE,
|
|
|
+ RollingCount: 1000000,
|
|
|
+ Path: "cache",
|
|
|
+ FileType: LINES_TYPE,
|
|
|
+ CheckInterval: (5 * time.Minute).Milliseconds(),
|
|
|
}
|
|
|
if err := cast.MapToStruct(props, c); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
- if c.CheckInterval == nil { // set checkInterval default value if both interval and checkInerval are not set
|
|
|
- t := (5 * time.Minute).Milliseconds()
|
|
|
- c.CheckInterval = &t
|
|
|
- }
|
|
|
if c.RollingInterval < 0 {
|
|
|
return fmt.Errorf("rollingInterval must be positive")
|
|
|
}
|
|
@@ -70,7 +67,7 @@ func (m *fileSink) Configure(props map[string]interface{}) error {
|
|
|
return fmt.Errorf("rollingCount must be positive")
|
|
|
}
|
|
|
|
|
|
- if *c.CheckInterval < 0 {
|
|
|
+ if c.CheckInterval < 0 {
|
|
|
return fmt.Errorf("checkInterval must be positive")
|
|
|
}
|
|
|
if c.RollingInterval == 0 && c.RollingCount == 0 {
|
|
@@ -107,8 +104,8 @@ func (m *fileSink) Configure(props map[string]interface{}) error {
|
|
|
func (m *fileSink) Open(ctx api.StreamContext) error {
|
|
|
ctx.GetLogger().Debug("Opening file sink")
|
|
|
// Check if the files have opened longer than the rolling interval, if so close it and create a new one
|
|
|
- if *m.c.CheckInterval > 0 {
|
|
|
- t := conf.GetTicker(*m.c.CheckInterval)
|
|
|
+ if m.c.CheckInterval > 0 {
|
|
|
+ t := conf.GetTicker(m.c.CheckInterval)
|
|
|
go func() {
|
|
|
defer t.Stop()
|
|
|
for {
|