Ver código fonte

fix(planner): set timestamp format property in processing time mode

The timestampFormat property is also useful in processing time mode when defining a datetime type and receive a string value and vise versa. So the property must be complied in processing time mode.

Closes: #954

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 3 anos atrás
pai
commit
6f6a3fefa1

+ 2 - 0
docs/en_US/sqls/streams.md

@@ -37,6 +37,8 @@ CREATE STREAM
 | StrictValidation     | true | To control validation behavior of message field against stream schema. See [Strict Validation](#Strict Validation) for more info. |
 | CONF_KEY | true | If additional configuration items are requied to be configured, then specify the config key here. See [MQTT stream](../rules/sources/mqtt.md) for more info. |
 | SHARED | true | Whether the source instance will be shared across all rules using this stream |
+| TIMESTAMP | true | The field to represent the event's timestamp. If specified, the rule will run with event time. Otherwise, it will run with processing time. Please refer to [timestamp management](./windows.md#timestamp-management) for details. |
+| TIMESTAMP_FORMAT | true | The default format to be used when converting string to or from datetime type. |
 
 **Example 1,**
 

+ 2 - 0
docs/zh_CN/sqls/streams.md

@@ -39,6 +39,8 @@ CREATE STREAM
 | StrictValidation     | 是  | 针对流模式控制消息字段的验证行为。 有关更多信息,请参见 [Strict Validation](#Strict Validation) |
 | CONF_KEY | 是 | 如果需要配置其他配置项,请在此处指定 config 键。 有关更多信息,请参见 [MQTT stream](../rules/sources/mqtt.md) 。 |
 | SHARED | 是 | 是否在使用该流的规则中共享源的实例 |
+| TIMESTAMP | 是 | 代表该事件时间戳的字段名。如果有设置,则使用此流的规则将采用事件时间;否则将采用处理时间。详情请看[时间戳管理](./windows.md#时间戳管理)。 |
+| TIMESTAMP_FORMAT | 是 | 字符串和时间格式转换时使用的默认格式。 |
 
 **示例1**
 

+ 9 - 6
internal/topo/planner/dataSourcePlan.go

@@ -91,7 +91,10 @@ func (p *DataSourcePlan) extract(expr ast.Expr) (ast.Expr, ast.Expr) {
 
 func (p *DataSourcePlan) PruneColumns(fields []ast.Expr) error {
 	//init values
-	p.getProps()
+	err := p.getProps()
+	if err != nil {
+		return err
+	}
 	p.fields = make(map[string]interface{})
 	if !p.allMeta {
 		p.metaMap = make(map[string]string)
@@ -157,7 +160,7 @@ func (p *DataSourcePlan) getAllFields() {
 	p.streamFields = make([]interface{}, 0)
 	if p.isWildCard {
 		if p.streamStmt.StreamFields != nil {
-			for k, _ := range p.streamStmt.StreamFields { // The input can only be StreamFields
+			for k := range p.streamStmt.StreamFields { // The input can only be StreamFields
 				p.streamFields = append(p.streamFields, &p.streamStmt.StreamFields[k])
 			}
 		} else {
@@ -167,7 +170,7 @@ func (p *DataSourcePlan) getAllFields() {
 		sfs := make([]interface{}, 0, len(p.fields))
 		if conf.IsTesting {
 			var keys []string
-			for k, _ := range p.fields {
+			for k := range p.fields {
 				keys = append(keys, k)
 			}
 			sort.Strings(keys)
@@ -198,9 +201,9 @@ func (p *DataSourcePlan) getProps() error {
 		} else {
 			return fmt.Errorf("preprocessor is set to be event time but stream option TIMESTAMP not found")
 		}
-		if p.streamStmt.Options.TIMESTAMP_FORMAT != "" {
-			p.timestampFormat = p.streamStmt.Options.TIMESTAMP_FORMAT
-		}
+	}
+	if p.streamStmt.Options.TIMESTAMP_FORMAT != "" {
+		p.timestampFormat = p.streamStmt.Options.TIMESTAMP_FORMAT
 	}
 	if strings.ToLower(p.streamStmt.Options.FORMAT) == message.FormatBinary {
 		p.isBinary = true

+ 13 - 9
internal/topo/planner/planner_test.go

@@ -48,7 +48,7 @@ func Test_createLogicalPlan(t *testing.T) {
 		"src2": `CREATE STREAM src2 (
 					id2 BIGINT,
 					hum BIGINT
-				) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts");`,
+				) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts", TIMESTAMP_FORMAT="YYYY-MM-dd HH:mm:ss");`,
 		"tableInPlanner": `CREATE TABLE tableInPlanner (
 					id BIGINT,
 					name STRING,
@@ -226,8 +226,9 @@ func Test_createLogicalPlan(t *testing.T) {
 															FieldType: &ast.BasicType{Type: ast.BIGINT},
 														},
 													},
-													streamStmt: streams["src2"],
-													metaFields: []string{},
+													streamStmt:      streams["src2"],
+													metaFields:      []string{},
+													timestampFormat: "YYYY-MM-dd HH:mm:ss",
 												}.Init(),
 											},
 										},
@@ -477,8 +478,9 @@ func Test_createLogicalPlan(t *testing.T) {
 																		FieldType: &ast.BasicType{Type: ast.BIGINT},
 																	},
 																},
-																streamStmt: streams["src2"],
-																metaFields: []string{},
+																streamStmt:      streams["src2"],
+																metaFields:      []string{},
+																timestampFormat: "YYYY-MM-dd HH:mm:ss",
 															}.Init(),
 														},
 													},
@@ -574,8 +576,9 @@ func Test_createLogicalPlan(t *testing.T) {
 															FieldType: &ast.BasicType{Type: ast.BIGINT},
 														},
 													},
-													streamStmt: streams["src2"],
-													metaFields: []string{},
+													streamStmt:      streams["src2"],
+													metaFields:      []string{},
+													timestampFormat: "YYYY-MM-dd HH:mm:ss",
 												}.Init(),
 											},
 										},
@@ -948,8 +951,9 @@ func Test_createLogicalPlan(t *testing.T) {
 															FieldType: &ast.BasicType{Type: ast.BIGINT},
 														},
 													},
-													streamStmt: streams["src2"],
-													metaFields: []string{},
+													streamStmt:      streams["src2"],
+													metaFields:      []string{},
+													timestampFormat: "YYYY-MM-dd HH:mm:ss",
 												}.Init(),
 												DataSourcePlan{
 													name: "tableInPlanner",