Explorar o código

fix(stream): strict validation defaults to false

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang %!s(int64=2) %!d(string=hai) anos
pai
achega
9088896001

+ 2 - 2
internal/processor/stream_test.go

@@ -82,7 +82,7 @@ func TestStreamCreateProcessor(t *testing.T) {
 			s: `DESCRIBE STREAM topic1;`,
 			r: []string{"Fields\n--------------------------------------------------------------------------------\nUSERID\tbigint\nFIRST_NAME\tstring\nLAST_NAME\tstring\nNICKNAMES\t" +
 				"array(string)\nGender\tboolean\nADDRESS\tstruct(STREET_NAME string, NUMBER bigint, BUILDING struct(NAME string, ROOM bigint))\n\n" +
-				"DATASOURCE: users\nFORMAT: JSON\nKEY: USERID\nSTRICT_VALIDATION: true\n"},
+				"DATASOURCE: users\nFORMAT: JSON\nKEY: USERID\n"},
 		},
 		{
 			s: `DROP STREAM topic1;`,
@@ -171,7 +171,7 @@ func TestTableProcessor(t *testing.T) {
 			s: `DESCRIBE TABLE topic1;`,
 			r: []string{"Fields\n--------------------------------------------------------------------------------\nUSERID\tbigint\nFIRST_NAME\tstring\nLAST_NAME\tstring\nNICKNAMES\t" +
 				"array(string)\nGender\tboolean\nADDRESS\tstruct(STREET_NAME string, NUMBER bigint)\n\n" +
-				"DATASOURCE: users\nFORMAT: JSON\nKEY: USERID\nSTRICT_VALIDATION: true\n"},
+				"DATASOURCE: users\nFORMAT: JSON\nKEY: USERID\n"},
 		},
 		{
 			s: `DROP TABLE topic1;`,

+ 4 - 1
internal/topo/operator/field_processor.go

@@ -84,7 +84,10 @@ func (p *defaultFieldProcessor) validateAndConvertField(sf *ast.JsonStreamField,
 		if t == nil {
 			return []interface{}(nil), nil
 		} else if jtype == reflect.Slice {
-			a := t.([]interface{})
+			a, ok := t.([]interface{})
+			if !ok {
+				return nil, fmt.Errorf("cannot convert %v to []interface{}", t)
+			}
 			for i, e := range a {
 				ne, err := p.validateAndConvertField(sf.Items, e)
 				if err != nil {

+ 3 - 0
internal/topo/operator/preprocessor.go

@@ -16,6 +16,7 @@ package operator
 
 import (
 	"fmt"
+	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
@@ -41,8 +42,10 @@ type Preprocessor struct {
 func NewPreprocessor(isSchemaless bool, fields map[string]*ast.JsonStreamField, _ bool, _ []string, iet bool, timestampField string, timestampFormat string, isBinary bool, strictValidation bool) (*Preprocessor, error) {
 	p := &Preprocessor{
 		isEventTime: iet, timestampField: timestampField, isBinary: isBinary}
+	conf.Log.Infof("preprocessor isSchemaless %v, strictValidation %v, isBinary %v", isSchemaless, strictValidation, strictValidation)
 	if !isSchemaless && (strictValidation || isBinary) {
 		p.checkSchema = true
+		conf.Log.Infof("preprocessor check schema")
 		p.defaultFieldProcessor = defaultFieldProcessor{
 			streamFields: fields, timestampFormat: timestampFormat,
 		}

+ 1 - 1
internal/topo/planner/planner.go

@@ -126,7 +126,7 @@ func buildOps(lp LogicalPlan, tp *topo.Topo, options *api.RuleOption, sources []
 				pp  node.UnOperation
 				err error
 			)
-			if t.iet || (!isSchemaless && (t.streamStmt.Options.STRICT_VALIDATION || !t.isBinary)) {
+			if t.iet || (!isSchemaless && (t.streamStmt.Options.STRICT_VALIDATION || t.isBinary)) {
 				pp, err = operator.NewPreprocessor(isSchemaless, t.streamFields, t.allMeta, t.metaFields, t.iet, t.timestampField, t.timestampFormat, t.isBinary, t.streamStmt.Options.STRICT_VALIDATION)
 				if err != nil {
 					return nil, 0, err

+ 15 - 20
internal/topo/planner/planner_test.go

@@ -2573,10 +2573,9 @@ func Test_createLogicalPlan4Lookup(t *testing.T) {
 								},
 							},
 							options: &ast.Options{
-								DATASOURCE:        "table1",
-								TYPE:              "sql",
-								STRICT_VALIDATION: true,
-								KIND:              "lookup",
+								DATASOURCE: "table1",
+								TYPE:       "sql",
+								KIND:       "lookup",
 							},
 							conditions: nil,
 						}.Init(),
@@ -2687,10 +2686,9 @@ func Test_createLogicalPlan4Lookup(t *testing.T) {
 											},
 										},
 										options: &ast.Options{
-											DATASOURCE:        "table1",
-											TYPE:              "sql",
-											STRICT_VALIDATION: true,
-											KIND:              "lookup",
+											DATASOURCE: "table1",
+											TYPE:       "sql",
+											KIND:       "lookup",
 										},
 										conditions: &ast.BinaryExpr{
 											OP: ast.AND,
@@ -2795,10 +2793,9 @@ func Test_createLogicalPlan4Lookup(t *testing.T) {
 											},
 										},
 										options: &ast.Options{
-											DATASOURCE:        "table1",
-											TYPE:              "sql",
-											STRICT_VALIDATION: true,
-											KIND:              "lookup",
+											DATASOURCE: "table1",
+											TYPE:       "sql",
+											KIND:       "lookup",
 										},
 										conditions: nil,
 									}.Init(),
@@ -2829,10 +2826,9 @@ func Test_createLogicalPlan4Lookup(t *testing.T) {
 								},
 							},
 							options: &ast.Options{
-								DATASOURCE:        "table2",
-								TYPE:              "sql",
-								STRICT_VALIDATION: true,
-								KIND:              "lookup",
+								DATASOURCE: "table2",
+								TYPE:       "sql",
+								KIND:       "lookup",
 							},
 						}.Init(),
 					},
@@ -2921,10 +2917,9 @@ func Test_createLogicalPlan4Lookup(t *testing.T) {
 								},
 							},
 							options: &ast.Options{
-								DATASOURCE:        "table1",
-								TYPE:              "sql",
-								STRICT_VALIDATION: true,
-								KIND:              "lookup",
+								DATASOURCE: "table1",
+								TYPE:       "sql",
+								KIND:       "lookup",
 							},
 							conditions: nil,
 						}.Init(),

+ 3 - 3
internal/topo/topotest/mock_topo.go

@@ -280,7 +280,7 @@ func HandleStream(createOrDrop bool, names []string, t *testing.T) {
 					color STRING,
 					size BIGINT,
 					ts BIGINT
-				) WITH (DATASOURCE="demoError", TYPE="mock", FORMAT="json", KEY="ts");`
+				) WITH (DATASOURCE="demoError", TYPE="mock", FORMAT="json", KEY="ts",STRICT_VALIDATION="true");`
 			case "demo1":
 				sql = `CREATE STREAM demo1 (
 					temp FLOAT,
@@ -322,7 +322,7 @@ func HandleStream(createOrDrop bool, names []string, t *testing.T) {
 					color STRING,
 					size BIGINT,
 					ts BIGINT
-				) WITH (DATASOURCE="demoErr", TYPE="mock", FORMAT="json", KEY="ts", TIMESTAMP="ts");`
+				) WITH (DATASOURCE="demoErr", TYPE="mock", FORMAT="json", KEY="ts", TIMESTAMP="ts",STRICT_VALIDATION="true");`
 			case "ldemo":
 				sql = `CREATE STREAM ldemo (					
 				) WITH (DATASOURCE="ldemo", TYPE="mock", FORMAT="json");`
@@ -333,7 +333,7 @@ func HandleStream(createOrDrop bool, names []string, t *testing.T) {
 				sql = `CREATE STREAM lsessionDemo (
 				) WITH (DATASOURCE="lsessionDemo", TYPE="mock", FORMAT="json");`
 			case "ext":
-				sql = "CREATE STREAM ext (count bigint) WITH (DATASOURCE=\"ext\", FORMAT=\"JSON\", TYPE=\"random\", CONF_KEY=\"ext\")"
+				sql = "CREATE STREAM ext (count bigint) WITH (DATASOURCE=\"ext\", FORMAT=\"JSON\", TYPE=\"random\", CONF_KEY=\"ext\",STRICT_VALIDATION=\"true\")"
 			case "ext2":
 				sql = "CREATE STREAM ext2 (count bigint) WITH (DATASOURCE=\"ext2\", FORMAT=\"JSON\", TYPE=\"random\", CONF_KEY=\"dedup\")"
 			case "extpy":

+ 1 - 1
internal/xsql/parser.go

@@ -1410,7 +1410,7 @@ func (p *Parser) parseStreamStructType() (ast.FieldType, error) {
 }
 
 func (p *Parser) parseStreamOptions() (*ast.Options, error) {
-	opts := &ast.Options{STRICT_VALIDATION: true}
+	opts := &ast.Options{STRICT_VALIDATION: false}
 	v := reflect.ValueOf(opts)
 	lStack := &stack.Stack{}
 	if tok, lit := p.scanIgnoreWhitespace(); tok == ast.LPAREN {

+ 36 - 47
internal/xsql/parser_stream_test.go

@@ -56,14 +56,13 @@ func TestParser_ParseCreateStream(t *testing.T) {
 					}},
 				},
 				Options: &ast.Options{
-					DATASOURCE:        "users",
-					FORMAT:            "JSON",
-					KEY:               "USERID",
-					CONF_KEY:          "srv1",
-					TYPE:              "MQTT",
-					TIMESTAMP:         "USERID",
-					TIMESTAMP_FORMAT:  "yyyy-MM-dd''T''HH:mm:ssX'",
-					STRICT_VALIDATION: true,
+					DATASOURCE:       "users",
+					FORMAT:           "JSON",
+					KEY:              "USERID",
+					CONF_KEY:         "srv1",
+					TYPE:             "MQTT",
+					TIMESTAMP:        "USERID",
+					TIMESTAMP_FORMAT: "yyyy-MM-dd''T''HH:mm:ssX'",
 				},
 			},
 		},
@@ -133,10 +132,9 @@ func TestParser_ParseCreateStream(t *testing.T) {
 					{Name: "birthday", FieldType: &ast.BasicType{Type: ast.DATETIME}},
 				},
 				Options: &ast.Options{
-					DATASOURCE:        "users",
-					FORMAT:            "JSON",
-					KEY:               "USERID",
-					STRICT_VALIDATION: true,
+					DATASOURCE: "users",
+					FORMAT:     "JSON",
+					KEY:        "USERID",
 				},
 			},
 		},
@@ -163,10 +161,9 @@ func TestParser_ParseCreateStream(t *testing.T) {
 					{Name: "birthday", FieldType: &ast.BasicType{Type: ast.DATETIME}},
 				},
 				Options: &ast.Options{
-					DATASOURCE:        "users",
-					FORMAT:            "JSON",
-					KEY:               "USERID",
-					STRICT_VALIDATION: true,
+					DATASOURCE: "users",
+					FORMAT:     "JSON",
+					KEY:        "USERID",
 				},
 			},
 		},
@@ -179,10 +176,9 @@ func TestParser_ParseCreateStream(t *testing.T) {
 				Name:         ast.StreamName("demo"),
 				StreamFields: nil,
 				Options: &ast.Options{
-					DATASOURCE:        "users",
-					FORMAT:            "JSON",
-					KEY:               "USERID",
-					STRICT_VALIDATION: true,
+					DATASOURCE: "users",
+					FORMAT:     "JSON",
+					KEY:        "USERID",
 				},
 			},
 		},
@@ -193,10 +189,9 @@ func TestParser_ParseCreateStream(t *testing.T) {
 				Name:         ast.StreamName("demo"),
 				StreamFields: nil,
 				Options: &ast.Options{
-					DATASOURCE:        "users",
-					FORMAT:            "JSON",
-					KEY:               "USERID",
-					STRICT_VALIDATION: true,
+					DATASOURCE: "users",
+					FORMAT:     "JSON",
+					KEY:        "USERID",
 				},
 			},
 		},
@@ -216,10 +211,9 @@ func TestParser_ParseCreateStream(t *testing.T) {
 					{Name: "NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
 				},
 				Options: &ast.Options{
-					DATASOURCE:        "users",
-					FORMAT:            "JSON",
-					KEY:               "USERID",
-					STRICT_VALIDATION: true,
+					DATASOURCE: "users",
+					FORMAT:     "JSON",
+					KEY:        "USERID",
 				},
 			},
 		},
@@ -283,7 +277,7 @@ func TestParser_ParseCreateStream(t *testing.T) {
 				StreamFields: []ast.StreamField{
 					{Name: "USERID", FieldType: &ast.BasicType{Type: ast.BIGINT}},
 				},
-				Options: &ast.Options{STRICT_VALIDATION: true},
+				Options: &ast.Options{},
 			},
 		},
 
@@ -345,11 +339,10 @@ func TestParser_ParseCreateStream(t *testing.T) {
 					}},
 				},
 				Options: &ast.Options{
-					DATASOURCE:        "test",
-					FORMAT:            "JSON",
-					CONF_KEY:          "democonf",
-					TYPE:              "MQTT",
-					STRICT_VALIDATION: true,
+					DATASOURCE: "test",
+					FORMAT:     "JSON",
+					CONF_KEY:   "democonf",
+					TYPE:       "MQTT",
 				},
 			},
 		}, {
@@ -368,9 +361,8 @@ func TestParser_ParseCreateStream(t *testing.T) {
 					{Name: "PICTURE", FieldType: &ast.BasicType{Type: ast.BYTEA}},
 				},
 				Options: &ast.Options{
-					DATASOURCE:        "users",
-					FORMAT:            "JSON",
-					STRICT_VALIDATION: true,
+					DATASOURCE: "users",
+					FORMAT:     "JSON",
 				},
 			},
 		}, {
@@ -389,9 +381,8 @@ func TestParser_ParseCreateStream(t *testing.T) {
 					{Name: "PICTURE", FieldType: &ast.BasicType{Type: ast.BYTEA}},
 				},
 				Options: &ast.Options{
-					DATASOURCE:        "users",
-					FORMAT:            "JSON",
-					STRICT_VALIDATION: true,
+					DATASOURCE: "users",
+					FORMAT:     "JSON",
 				},
 			},
 		}, {
@@ -417,9 +408,8 @@ func TestParser_ParseCreateStream(t *testing.T) {
 					{Name: "image", FieldType: &ast.BasicType{Type: ast.BYTEA}},
 				},
 				Options: &ast.Options{
-					DATASOURCE:        "users",
-					FORMAT:            "BINARY",
-					STRICT_VALIDATION: true,
+					DATASOURCE: "users",
+					FORMAT:     "BINARY",
 				},
 			},
 		}, {
@@ -429,10 +419,9 @@ func TestParser_ParseCreateStream(t *testing.T) {
 				Name:         ast.StreamName("demo"),
 				StreamFields: nil,
 				Options: &ast.Options{
-					DATASOURCE:        "users",
-					FORMAT:            "DELIMITED",
-					DELIMITER:         " ",
-					STRICT_VALIDATION: true,
+					DATASOURCE: "users",
+					FORMAT:     "DELIMITED",
+					DELIMITER:  " ",
 				},
 			},
 		},

+ 20 - 25
internal/xsql/parser_tree_test.go

@@ -39,11 +39,10 @@ func TestParser_ParseTree(t *testing.T) {
 					{Name: "USERID", FieldType: &ast.BasicType{Type: ast.BIGINT}},
 				},
 				Options: &ast.Options{
-					DATASOURCE:        "users",
-					FORMAT:            "JSON",
-					KEY:               "USERID",
-					SHARED:            true,
-					STRICT_VALIDATION: true,
+					DATASOURCE: "users",
+					FORMAT:     "JSON",
+					KEY:        "USERID",
+					SHARED:     true,
 				},
 			},
 		},
@@ -57,12 +56,11 @@ func TestParser_ParseTree(t *testing.T) {
 					{Name: "USERID", FieldType: &ast.BasicType{Type: ast.BIGINT}},
 				},
 				Options: &ast.Options{
-					DATASOURCE:        "users",
-					FORMAT:            "PROTOBUF",
-					SCHEMAID:          "proto1.Book",
-					KEY:               "USERID",
-					SHARED:            true,
-					STRICT_VALIDATION: true,
+					DATASOURCE: "users",
+					FORMAT:     "PROTOBUF",
+					SCHEMAID:   "proto1.Book",
+					KEY:        "USERID",
+					SHARED:     true,
 				},
 			},
 		},
@@ -76,11 +74,10 @@ func TestParser_ParseTree(t *testing.T) {
 					{Name: "USERID", FieldType: &ast.BasicType{Type: ast.BIGINT}},
 				},
 				Options: &ast.Options{
-					DATASOURCE:        "users",
-					FORMAT:            "JSON",
-					KEY:               "USERID",
-					RETAIN_SIZE:       3,
-					STRICT_VALIDATION: true,
+					DATASOURCE:  "users",
+					FORMAT:      "JSON",
+					KEY:         "USERID",
+					RETAIN_SIZE: 3,
 				},
 				StreamType: ast.TypeTable,
 			},
@@ -99,10 +96,9 @@ func TestParser_ParseTree(t *testing.T) {
 					{Name: "id", FieldType: &ast.BasicType{Type: ast.BIGINT}},
 				},
 				Options: &ast.Options{
-					DATASOURCE:        "lookup.json",
-					FORMAT:            "json",
-					CONF_KEY:          "test",
-					STRICT_VALIDATION: true,
+					DATASOURCE: "lookup.json",
+					FORMAT:     "json",
+					CONF_KEY:   "test",
 				},
 				StreamType: ast.TypeTable,
 			},
@@ -121,11 +117,10 @@ func TestParser_ParseTree(t *testing.T) {
 					{Name: "id", FieldType: &ast.BasicType{Type: ast.BIGINT}},
 				},
 				Options: &ast.Options{
-					DATASOURCE:        "devices",
-					STRICT_VALIDATION: true,
-					KIND:              ast.StreamKindLookup,
-					TYPE:              "sql",
-					KEY:               "id",
+					DATASOURCE: "devices",
+					KIND:       ast.StreamKindLookup,
+					TYPE:       "sql",
+					KEY:        "id",
 				},
 				StreamType: ast.TypeTable,
 			},

+ 1 - 1
test/binary_image_process.jmx

@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
-  ~ Copyright 2021 EMQ Technologies Co., Ltd.
+  ~ Copyright 2021-2022 EMQ Technologies Co., Ltd.
   ~
   ~ Licensed under the Apache License, Version 2.0 (the "License");
   ~ you may not use this file except in compliance with the License.

+ 4 - 4
test/change_stream_rule.jmx

@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
-  ~ Copyright 2021 EMQ Technologies Co., Ltd.
+  ~ Copyright 2021-2022 EMQ Technologies Co., Ltd.
   ~
   ~ Licensed under the Apache License, Version 2.0 (the "License");
   ~ you may not use this file except in compliance with the License.
@@ -132,7 +132,7 @@
                 <elementProp name="" elementType="HTTPArgument">
                   <boolProp name="HTTPArgument.always_encode">false</boolProp>
                   <stringProp name="Argument.value">{&#xd;
-&quot;sql&quot; : &quot;create stream demo (temperature float, light string) WITH (FORMAT=\&quot;JSON\&quot;, DATASOURCE=\&quot;devices/+/messages\&quot; )&quot;&#xd;
+&quot;sql&quot; : &quot;create stream demo (temperature float, light string) WITH (FORMAT=\&quot;JSON\&quot;, DATASOURCE=\&quot;devices/+/messages\&quot;,STRICT_VALIDATION=\&quot;true\&quot; )&quot;&#xd;
 }</stringProp>
                   <stringProp name="Argument.metadata">=</stringProp>
                 </elementProp>
@@ -348,7 +348,7 @@
                 <elementProp name="" elementType="HTTPArgument">
                   <boolProp name="HTTPArgument.always_encode">false</boolProp>
                   <stringProp name="Argument.value">{&#xd;
-&quot;sql&quot; : &quot;create stream demo (temperature float, humidity bigint, light bigint) WITH (FORMAT=\&quot;JSON\&quot;, DATASOURCE=\&quot;devices/+/messages\&quot; )&quot;&#xd;
+&quot;sql&quot; : &quot;create stream demo (temperature float, humidity bigint, light bigint) WITH (FORMAT=\&quot;JSON\&quot;, DATASOURCE=\&quot;devices/+/messages\&quot;,STRICT_VALIDATION=\&quot;true\&quot; )&quot;&#xd;
 }</stringProp>
                   <stringProp name="Argument.metadata">=</stringProp>
                 </elementProp>
@@ -434,7 +434,7 @@
                 <elementProp name="" elementType="HTTPArgument">
                   <boolProp name="HTTPArgument.always_encode">false</boolProp>
                   <stringProp name="Argument.value">{&#xd;
-&quot;sql&quot; : &quot;create stream demo (temperature float, humidity bigint, light bigint) WITH (FORMAT=\&quot;JSON\&quot;, DATASOURCE=\&quot;devices/+/messages\&quot; )&quot;&#xd;
+&quot;sql&quot; : &quot;create stream demo (temperature float, humidity bigint, light bigint) WITH (FORMAT=\&quot;JSON\&quot;, DATASOURCE=\&quot;devices/+/messages\&quot;,STRICT_VALIDATION=\&quot;true\&quot; )&quot;&#xd;
 }</stringProp>
                   <stringProp name="Argument.metadata">=</stringProp>
                 </elementProp>

+ 2 - 2
test/http_pull_rule.jmx

@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
-  ~ Copyright 2021 EMQ Technologies Co., Ltd.
+  ~ Copyright 2021-2022 EMQ Technologies Co., Ltd.
   ~
   ~ Licensed under the Apache License, Version 2.0 (the "License");
   ~ you may not use this file except in compliance with the License.
@@ -109,7 +109,7 @@
                 <elementProp name="" elementType="HTTPArgument">
                   <boolProp name="HTTPArgument.always_encode">false</boolProp>
                   <stringProp name="Argument.value">{&#xd;
-&quot;sql&quot; : &quot;create stream demo (Temperature float, humidity bigint) WITH (FORMAT=\&quot;JSON\&quot;, TYPE=\&quot;httppull\&quot; DATASOURCE=\&quot;pull\&quot; Conf_key=\&quot;application_conf\&quot;)&quot;&#xd;
+&quot;sql&quot; : &quot;create stream demo (Temperature float, humidity bigint) WITH (FORMAT=\&quot;JSON\&quot;, TYPE=\&quot;httppull\&quot; DATASOURCE=\&quot;pull\&quot; Conf_key=\&quot;application_conf\&quot;,STRICT_VALIDATION=\&quot;true\&quot;)&quot;&#xd;
 }</stringProp>
                   <stringProp name="Argument.metadata">=</stringProp>
                 </elementProp>

+ 2 - 2
test/select_all_rule.jmx

@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
-  ~ Copyright 2021 EMQ Technologies Co., Ltd.
+  ~ Copyright 2021-2022 EMQ Technologies Co., Ltd.
   ~
   ~ Licensed under the Apache License, Version 2.0 (the "License");
   ~ you may not use this file except in compliance with the License.
@@ -131,7 +131,7 @@
                 <elementProp name="" elementType="HTTPArgument">
                   <boolProp name="HTTPArgument.always_encode">false</boolProp>
                   <stringProp name="Argument.value">{&#xd;
-&quot;sql&quot; : &quot;create stream demo (Temperature float, humidity bigint) WITH (FORMAT=\&quot;JSON\&quot;, DATASOURCE=\&quot;devices/+/messages\&quot; )&quot;&#xd;
+&quot;sql&quot; : &quot;create stream demo (Temperature float, humidity bigint) WITH (FORMAT=\&quot;JSON\&quot;, DATASOURCE=\&quot;devices/+/messages\&quot;, STRICT_VALIDATION=\&quot;true\&quot;)&quot;&#xd;
 }</stringProp>
                   <stringProp name="Argument.metadata">=</stringProp>
                 </elementProp>

+ 2 - 2
test/select_countwindow_rule.jmx

@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
-  ~ Copyright 2021 EMQ Technologies Co., Ltd.
+  ~ Copyright 2021-2022 EMQ Technologies Co., Ltd.
   ~
   ~ Licensed under the Apache License, Version 2.0 (the "License");
   ~ you may not use this file except in compliance with the License.
@@ -131,7 +131,7 @@
                 <elementProp name="" elementType="HTTPArgument">
                   <boolProp name="HTTPArgument.always_encode">false</boolProp>
                   <stringProp name="Argument.value">{&#xd;
-&quot;sql&quot; : &quot;create stream demo (Temperature bigint, humidity bigint) WITH (FORMAT=\&quot;JSON\&quot;, DATASOURCE=\&quot;devices/+/messages\&quot; )&quot;&#xd;
+&quot;sql&quot; : &quot;create stream demo (Temperature bigint, humidity bigint) WITH (FORMAT=\&quot;JSON\&quot;, DATASOURCE=\&quot;devices/+/messages\&quot;,STRICT_VALIDATION=\&quot;true\&quot; )&quot;&#xd;
 }</stringProp>
                   <stringProp name="Argument.metadata">=</stringProp>
                 </elementProp>