Browse Source

fix(source): always apply timestamp format (#2095)

Previously, only apply timestamp format when schema is set. Extend it to al cases.

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
ngjaying 1 year ago
parent
commit
ba602c4d2e

+ 5 - 4
internal/topo/operator/preprocessor.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -44,13 +44,14 @@ func NewPreprocessor(isSchemaless bool, fields map[string]*ast.JsonStreamField,
 	p := &Preprocessor{
 		isEventTime: iet, timestampField: timestampField, isBinary: isBinary,
 	}
+	p.defaultFieldProcessor = defaultFieldProcessor{
+		timestampFormat: timestampFormat,
+	}
 	conf.Log.Infof("preprocessor isSchemaless %v, strictValidation %v, isBinary %v", isSchemaless, strictValidation, strictValidation)
 	if !isSchemaless && (strictValidation || isBinary) {
 		p.checkSchema = true
 		conf.Log.Infof("preprocessor check schema")
-		p.defaultFieldProcessor = defaultFieldProcessor{
-			streamFields: fields, timestampFormat: timestampFormat,
-		}
+		p.defaultFieldProcessor.streamFields = fields
 	}
 	return p, nil
 }

+ 7 - 4
internal/topo/operator/preprocessor_test.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -26,6 +26,8 @@ import (
 	"testing"
 	"time"
 
+	"github.com/stretchr/testify/assert"
+
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/converter"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
@@ -688,11 +690,12 @@ func TestPreprocessorTime_Apply(t *testing.T) {
 	contextLogger := conf.Log.WithField("rule", "TestPreprocessorTime_Apply")
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 	for i, tt := range tests {
-		pp := &Preprocessor{checkSchema: true}
-		pp.streamFields = tt.stmt.StreamFields.ToJsonSchema()
+		timestampFormat := ""
 		if tt.stmt.Options != nil {
-			pp.timestampFormat = tt.stmt.Options.TIMESTAMP_FORMAT
+			timestampFormat = tt.stmt.Options.TIMESTAMP_FORMAT
 		}
+		pp, e := NewPreprocessor(false, tt.stmt.StreamFields.ToJsonSchema(), false, nil, false, "", timestampFormat, false, true)
+		assert.NoError(t, e)
 		dm := make(map[string]interface{})
 		if e := json.Unmarshal(tt.data, &dm); e != nil {
 			log.Fatal(e)