Jelajahi Sumber

fix(stream): enable strictValidation=false

1. Parse strictValidation to true by default
2. Deal with strictValidation=false in preprocessor

Closes: #987

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 3 tahun lalu
induk
melakukan
1ecc5a6997

+ 4 - 0
internal/conf/time.go

@@ -43,3 +43,7 @@ func GetTimer(duration int) *clock.Timer {
 func GetNowInMilli() int64 {
 func GetNowInMilli() int64 {
 	return cast.TimeToUnixMilli(Clock.Now())
 	return cast.TimeToUnixMilli(Clock.Now())
 }
 }
+
+func GetNow() time.Time {
+	return Clock.Now()
+}

+ 2 - 2
internal/processor/stream_test.go

@@ -75,7 +75,7 @@ func TestStreamCreateProcessor(t *testing.T) {
 			s: `DESCRIBE STREAM topic1;`,
 			s: `DESCRIBE STREAM topic1;`,
 			r: []string{"Fields\n--------------------------------------------------------------------------------\nUSERID\tbigint\nFIRST_NAME\tstring\nLAST_NAME\tstring\nNICKNAMES\t" +
 			r: []string{"Fields\n--------------------------------------------------------------------------------\nUSERID\tbigint\nFIRST_NAME\tstring\nLAST_NAME\tstring\nNICKNAMES\t" +
 				"array(string)\nGender\tboolean\nADDRESS\tstruct(STREET_NAME string, NUMBER bigint, BUILDING struct(NAME string, ROOM bigint))\n\n" +
 				"array(string)\nGender\tboolean\nADDRESS\tstruct(STREET_NAME string, NUMBER bigint, BUILDING struct(NAME string, ROOM bigint))\n\n" +
-				"DATASOURCE: users\nFORMAT: JSON\nKEY: USERID\n"},
+				"DATASOURCE: users\nFORMAT: JSON\nKEY: USERID\nSTRICT_VALIDATION: true\n"},
 		},
 		},
 		{
 		{
 			s: `DROP STREAM topic1;`,
 			s: `DROP STREAM topic1;`,
@@ -164,7 +164,7 @@ func TestTableProcessor(t *testing.T) {
 			s: `DESCRIBE TABLE topic1;`,
 			s: `DESCRIBE TABLE topic1;`,
 			r: []string{"Fields\n--------------------------------------------------------------------------------\nUSERID\tbigint\nFIRST_NAME\tstring\nLAST_NAME\tstring\nNICKNAMES\t" +
 			r: []string{"Fields\n--------------------------------------------------------------------------------\nUSERID\tbigint\nFIRST_NAME\tstring\nLAST_NAME\tstring\nNICKNAMES\t" +
 				"array(string)\nGender\tboolean\nADDRESS\tstruct(STREET_NAME string, NUMBER bigint)\n\n" +
 				"array(string)\nGender\tboolean\nADDRESS\tstruct(STREET_NAME string, NUMBER bigint)\n\n" +
-				"DATASOURCE: users\nFORMAT: JSON\nKEY: USERID\n"},
+				"DATASOURCE: users\nFORMAT: JSON\nKEY: USERID\nSTRICT_VALIDATION: true\n"},
 		},
 		},
 		{
 		{
 			s: `DROP TABLE topic1;`,
 			s: `DROP TABLE topic1;`,

+ 42 - 6
internal/topo/operator/field_processor.go

@@ -18,6 +18,7 @@ import (
 	"encoding/base64"
 	"encoding/base64"
 	"encoding/json"
 	"encoding/json"
 	"fmt"
 	"fmt"
+	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
@@ -30,12 +31,13 @@ import (
 )
 )
 
 
 type defaultFieldProcessor struct {
 type defaultFieldProcessor struct {
-	streamFields    []interface{}
-	timestampFormat string
-	isBinary        bool
+	streamFields     []interface{}
+	timestampFormat  string
+	isBinary         bool
+	strictValidation bool
 }
 }
 
 
-func (p *defaultFieldProcessor) processField(tuple *xsql.Tuple, fv *xsql.FunctionValuer) (map[string]interface{}, error) {
+func (p *defaultFieldProcessor) processField(tuple *xsql.Tuple, _ *xsql.FunctionValuer) (map[string]interface{}, error) {
 	result := make(map[string]interface{})
 	result := make(map[string]interface{})
 	if p.streamFields != nil {
 	if p.streamFields != nil {
 		for _, f := range p.streamFields {
 		for _, f := range p.streamFields {
@@ -211,10 +213,44 @@ func (p *defaultFieldProcessor) addRecField(ft ast.FieldType, r map[string]inter
 		default:
 		default:
 			return fmt.Errorf("unsupported type %T", st)
 			return fmt.Errorf("unsupported type %T", st)
 		}
 		}
-		return nil
 	} else {
 	} else {
-		return fmt.Errorf("invalid data %s, field %s not found", j, n)
+		if p.strictValidation {
+			return fmt.Errorf("invalid data %s, field %s not found", j, n)
+		} else {
+			switch st := ft.(type) {
+			case *ast.BasicType:
+				switch st.Type {
+				case ast.UNKNOWN:
+					return fmt.Errorf("invalid data type unknown defined for %s, please check the stream definition", t)
+				case ast.BIGINT:
+					r[n] = int64(0)
+				case ast.FLOAT:
+					r[n] = 0.0
+				case ast.STRINGS:
+					r[n] = ""
+				case ast.DATETIME:
+					r[n] = conf.GetNow()
+				case ast.BOOLEAN:
+					r[n] = false
+				case ast.BYTEA:
+					r[n] = []byte{}
+				default:
+					return fmt.Errorf("invalid data type for %s, it is not supported yet", st)
+				}
+			case *ast.ArrayType:
+				if tempArr, err := p.addArrayField(st, nil); err != nil {
+					return fmt.Errorf("fail to parse field %s: %s", n, err)
+				} else {
+					r[n] = tempArr
+				}
+			case *ast.RecType:
+				r[n] = make(map[string]interface{})
+			default:
+				return fmt.Errorf("unsupported type %T", st)
+			}
+		}
 	}
 	}
+	return nil
 }
 }
 
 
 //ft must be ast.ArrayType
 //ft must be ast.ArrayType

+ 2 - 2
internal/topo/operator/preprocessor.go

@@ -31,11 +31,11 @@ type Preprocessor struct {
 	timestampField string
 	timestampField string
 }
 }
 
 
-func NewPreprocessor(fields []interface{}, allMeta bool, metaFields []string, iet bool, timestampField string, timestampFormat string, isBinary bool) (*Preprocessor, error) {
+func NewPreprocessor(fields []interface{}, allMeta bool, metaFields []string, iet bool, timestampField string, timestampFormat string, isBinary bool, strictValidation bool) (*Preprocessor, error) {
 	p := &Preprocessor{
 	p := &Preprocessor{
 		allMeta: allMeta, metaFields: metaFields, isEventTime: iet, timestampField: timestampField}
 		allMeta: allMeta, metaFields: metaFields, isEventTime: iet, timestampField: timestampField}
 	p.defaultFieldProcessor = defaultFieldProcessor{
 	p.defaultFieldProcessor = defaultFieldProcessor{
-		streamFields: fields, isBinary: isBinary, timestampFormat: timestampFormat,
+		streamFields: fields, isBinary: isBinary, timestampFormat: timestampFormat, strictValidation: strictValidation,
 	}
 	}
 	return p, nil
 	return p, nil
 }
 }

+ 31 - 0
internal/topo/operator/preprocessor_test.go

@@ -533,6 +533,31 @@ func TestPreprocessor_Apply(t *testing.T) {
 				},
 				},
 			},
 			},
 			},
 			},
+		}, {
+			stmt: &ast.StreamStmt{
+				Name: ast.StreamName("demo"),
+				StreamFields: []ast.StreamField{
+					{Name: "a", FieldType: &ast.RecType{
+						StreamFields: []ast.StreamField{
+							{Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
+						},
+					}},
+					{Name: "b", FieldType: &ast.BasicType{Type: ast.FLOAT}},
+					{Name: "c", FieldType: &ast.ArrayType{Type: ast.BIGINT}},
+				},
+				Options: &ast.Options{
+					STRICT_VALIDATION: false,
+				},
+			},
+			data: []byte(`{"a": {"d" : "hello"}}`),
+			result: &xsql.Tuple{Message: xsql.Message{
+				"a": map[string]interface{}{
+					"b": "",
+				},
+				"b": 0.0,
+				"c": []int(nil),
+			},
+			},
 		},
 		},
 	}
 	}
 
 
@@ -543,6 +568,11 @@ func TestPreprocessor_Apply(t *testing.T) {
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 	for i, tt := range tests {
 	for i, tt := range tests {
 		pp := &Preprocessor{}
 		pp := &Preprocessor{}
+		if tt.stmt.Options != nil {
+			pp.strictValidation = tt.stmt.Options.STRICT_VALIDATION
+		} else {
+			pp.strictValidation = true
+		}
 		pp.streamFields = convertFields(tt.stmt.StreamFields)
 		pp.streamFields = convertFields(tt.stmt.StreamFields)
 
 
 		dm := make(map[string]interface{})
 		dm := make(map[string]interface{})
@@ -949,6 +979,7 @@ func TestPreprocessorError(t *testing.T) {
 	for i, tt := range tests {
 	for i, tt := range tests {
 
 
 		pp := &Preprocessor{}
 		pp := &Preprocessor{}
+		pp.strictValidation = true
 		pp.streamFields = convertFields(tt.stmt.StreamFields)
 		pp.streamFields = convertFields(tt.stmt.StreamFields)
 		dm := make(map[string]interface{})
 		dm := make(map[string]interface{})
 		if e := json.Unmarshal(tt.data, &dm); e != nil {
 		if e := json.Unmarshal(tt.data, &dm); e != nil {

+ 1 - 0
internal/topo/operator/table_processor.go

@@ -37,6 +37,7 @@ func NewTableProcessor(name string, fields []interface{}, options *ast.Options)
 	p := &TableProcessor{emitterName: name, batchEmitted: true, retainSize: 1}
 	p := &TableProcessor{emitterName: name, batchEmitted: true, retainSize: 1}
 	p.defaultFieldProcessor = defaultFieldProcessor{
 	p.defaultFieldProcessor = defaultFieldProcessor{
 		streamFields: fields, isBinary: false, timestampFormat: options.TIMESTAMP_FORMAT,
 		streamFields: fields, isBinary: false, timestampFormat: options.TIMESTAMP_FORMAT,
+		strictValidation: options.STRICT_VALIDATION,
 	}
 	}
 	if options.RETAIN_SIZE > 0 {
 	if options.RETAIN_SIZE > 0 {
 		p.retainSize = options.RETAIN_SIZE
 		p.retainSize = options.RETAIN_SIZE

+ 6 - 6
internal/topo/planner/planner.go

@@ -117,14 +117,14 @@ func buildOps(lp LogicalPlan, tp *topo.Topo, options *api.RuleOption, sources []
 	case *DataSourcePlan:
 	case *DataSourcePlan:
 		switch t.streamStmt.StreamType {
 		switch t.streamStmt.StreamType {
 		case ast.TypeStream:
 		case ast.TypeStream:
-			pp, err := operator.NewPreprocessor(t.streamFields, t.allMeta, t.metaFields, t.iet, t.timestampField, t.timestampFormat, t.isBinary)
+			pp, err := operator.NewPreprocessor(t.streamFields, t.allMeta, t.metaFields, t.iet, t.timestampField, t.timestampFormat, t.isBinary, t.streamStmt.Options.STRICT_VALIDATION)
 			if err != nil {
 			if err != nil {
 				return nil, 0, err
 				return nil, 0, err
 			}
 			}
 			var srcNode *node.SourceNode
 			var srcNode *node.SourceNode
 			if len(sources) == 0 {
 			if len(sources) == 0 {
-				node := node.NewSourceNode(string(t.name), t.streamStmt.StreamType, t.streamStmt.Options)
-				srcNode = node
+				sourceNode := node.NewSourceNode(string(t.name), t.streamStmt.StreamType, t.streamStmt.Options)
+				srcNode = sourceNode
 			} else {
 			} else {
 				srcNode = getMockSource(sources, string(t.name))
 				srcNode = getMockSource(sources, string(t.name))
 				if srcNode == nil {
 				if srcNode == nil {
@@ -324,7 +324,7 @@ func createLogicalPlan(stmt *ast.SelectStatement, opt *api.RuleOption, store kv.
 }
 }
 
 
 func Transform(op node.UnOperation, name string, options *api.RuleOption) *node.UnaryOperator {
 func Transform(op node.UnOperation, name string, options *api.RuleOption) *node.UnaryOperator {
-	operator := node.New(name, xsql.FuncRegisters, options)
-	operator.SetOperation(op)
-	return operator
+	unaryOperator := node.New(name, xsql.FuncRegisters, options)
+	unaryOperator.SetOperation(op)
+	return unaryOperator
 }
 }

+ 1 - 1
internal/xsql/parser.go

@@ -1226,7 +1226,7 @@ func (p *Parser) parseStreamStructType() (ast.FieldType, error) {
 }
 }
 
 
 func (p *Parser) parseStreamOptions() (*ast.Options, error) {
 func (p *Parser) parseStreamOptions() (*ast.Options, error) {
-	opts := &ast.Options{}
+	opts := &ast.Options{STRICT_VALIDATION: true}
 	v := reflect.ValueOf(opts)
 	v := reflect.ValueOf(opts)
 	lStack := &stack.Stack{}
 	lStack := &stack.Stack{}
 	if tok, lit := p.scanIgnoreWhitespace(); tok == ast.LPAREN {
 	if tok, lit := p.scanIgnoreWhitespace(); tok == ast.LPAREN {

+ 43 - 33
internal/xsql/parser_stream_test.go

@@ -56,13 +56,14 @@ func TestParser_ParseCreateStream(t *testing.T) {
 					}},
 					}},
 				},
 				},
 				Options: &ast.Options{
 				Options: &ast.Options{
-					DATASOURCE:       "users",
-					FORMAT:           "JSON",
-					KEY:              "USERID",
-					CONF_KEY:         "srv1",
-					TYPE:             "MQTT",
-					TIMESTAMP:        "USERID",
-					TIMESTAMP_FORMAT: "yyyy-MM-dd''T''HH:mm:ssX'",
+					DATASOURCE:        "users",
+					FORMAT:            "JSON",
+					KEY:               "USERID",
+					CONF_KEY:          "srv1",
+					TYPE:              "MQTT",
+					TIMESTAMP:         "USERID",
+					TIMESTAMP_FORMAT:  "yyyy-MM-dd''T''HH:mm:ssX'",
+					STRICT_VALIDATION: true,
 				},
 				},
 			},
 			},
 		},
 		},
@@ -132,9 +133,10 @@ func TestParser_ParseCreateStream(t *testing.T) {
 					{Name: "birthday", FieldType: &ast.BasicType{Type: ast.DATETIME}},
 					{Name: "birthday", FieldType: &ast.BasicType{Type: ast.DATETIME}},
 				},
 				},
 				Options: &ast.Options{
 				Options: &ast.Options{
-					DATASOURCE: "users",
-					FORMAT:     "JSON",
-					KEY:        "USERID",
+					DATASOURCE:        "users",
+					FORMAT:            "JSON",
+					KEY:               "USERID",
+					STRICT_VALIDATION: true,
 				},
 				},
 			},
 			},
 		},
 		},
@@ -161,9 +163,10 @@ func TestParser_ParseCreateStream(t *testing.T) {
 					{Name: "birthday", FieldType: &ast.BasicType{Type: ast.DATETIME}},
 					{Name: "birthday", FieldType: &ast.BasicType{Type: ast.DATETIME}},
 				},
 				},
 				Options: &ast.Options{
 				Options: &ast.Options{
-					DATASOURCE: "users",
-					FORMAT:     "JSON",
-					KEY:        "USERID",
+					DATASOURCE:        "users",
+					FORMAT:            "JSON",
+					KEY:               "USERID",
+					STRICT_VALIDATION: true,
 				},
 				},
 			},
 			},
 		},
 		},
@@ -176,9 +179,10 @@ func TestParser_ParseCreateStream(t *testing.T) {
 				Name:         ast.StreamName("demo"),
 				Name:         ast.StreamName("demo"),
 				StreamFields: nil,
 				StreamFields: nil,
 				Options: &ast.Options{
 				Options: &ast.Options{
-					DATASOURCE: "users",
-					FORMAT:     "JSON",
-					KEY:        "USERID",
+					DATASOURCE:        "users",
+					FORMAT:            "JSON",
+					KEY:               "USERID",
+					STRICT_VALIDATION: true,
 				},
 				},
 			},
 			},
 		},
 		},
@@ -189,9 +193,10 @@ func TestParser_ParseCreateStream(t *testing.T) {
 				Name:         ast.StreamName("demo"),
 				Name:         ast.StreamName("demo"),
 				StreamFields: nil,
 				StreamFields: nil,
 				Options: &ast.Options{
 				Options: &ast.Options{
-					DATASOURCE: "users",
-					FORMAT:     "JSON",
-					KEY:        "USERID",
+					DATASOURCE:        "users",
+					FORMAT:            "JSON",
+					KEY:               "USERID",
+					STRICT_VALIDATION: true,
 				},
 				},
 			},
 			},
 		},
 		},
@@ -211,9 +216,10 @@ func TestParser_ParseCreateStream(t *testing.T) {
 					{Name: "NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
 					{Name: "NAME", FieldType: &ast.BasicType{Type: ast.STRINGS}},
 				},
 				},
 				Options: &ast.Options{
 				Options: &ast.Options{
-					DATASOURCE: "users",
-					FORMAT:     "JSON",
-					KEY:        "USERID",
+					DATASOURCE:        "users",
+					FORMAT:            "JSON",
+					KEY:               "USERID",
+					STRICT_VALIDATION: true,
 				},
 				},
 			},
 			},
 		},
 		},
@@ -277,7 +283,7 @@ func TestParser_ParseCreateStream(t *testing.T) {
 				StreamFields: []ast.StreamField{
 				StreamFields: []ast.StreamField{
 					{Name: "USERID", FieldType: &ast.BasicType{Type: ast.BIGINT}},
 					{Name: "USERID", FieldType: &ast.BasicType{Type: ast.BIGINT}},
 				},
 				},
-				Options: &ast.Options{},
+				Options: &ast.Options{STRICT_VALIDATION: true},
 			},
 			},
 		},
 		},
 
 
@@ -339,10 +345,11 @@ func TestParser_ParseCreateStream(t *testing.T) {
 					}},
 					}},
 				},
 				},
 				Options: &ast.Options{
 				Options: &ast.Options{
-					DATASOURCE: "test",
-					FORMAT:     "JSON",
-					CONF_KEY:   "democonf",
-					TYPE:       "MQTT",
+					DATASOURCE:        "test",
+					FORMAT:            "JSON",
+					CONF_KEY:          "democonf",
+					TYPE:              "MQTT",
+					STRICT_VALIDATION: true,
 				},
 				},
 			},
 			},
 		}, {
 		}, {
@@ -361,8 +368,9 @@ func TestParser_ParseCreateStream(t *testing.T) {
 					{Name: "PICTURE", FieldType: &ast.BasicType{Type: ast.BYTEA}},
 					{Name: "PICTURE", FieldType: &ast.BasicType{Type: ast.BYTEA}},
 				},
 				},
 				Options: &ast.Options{
 				Options: &ast.Options{
-					DATASOURCE: "users",
-					FORMAT:     "JSON",
+					DATASOURCE:        "users",
+					FORMAT:            "JSON",
+					STRICT_VALIDATION: true,
 				},
 				},
 			},
 			},
 		}, {
 		}, {
@@ -381,8 +389,9 @@ func TestParser_ParseCreateStream(t *testing.T) {
 					{Name: "PICTURE", FieldType: &ast.BasicType{Type: ast.BYTEA}},
 					{Name: "PICTURE", FieldType: &ast.BasicType{Type: ast.BYTEA}},
 				},
 				},
 				Options: &ast.Options{
 				Options: &ast.Options{
-					DATASOURCE: "users",
-					FORMAT:     "JSON",
+					DATASOURCE:        "users",
+					FORMAT:            "JSON",
+					STRICT_VALIDATION: true,
 				},
 				},
 			},
 			},
 		}, {
 		}, {
@@ -408,8 +417,9 @@ func TestParser_ParseCreateStream(t *testing.T) {
 					{Name: "image", FieldType: &ast.BasicType{Type: ast.BYTEA}},
 					{Name: "image", FieldType: &ast.BasicType{Type: ast.BYTEA}},
 				},
 				},
 				Options: &ast.Options{
 				Options: &ast.Options{
-					DATASOURCE: "users",
-					FORMAT:     "BINARY",
+					DATASOURCE:        "users",
+					FORMAT:            "BINARY",
+					STRICT_VALIDATION: true,
 				},
 				},
 			},
 			},
 		},
 		},

+ 14 - 11
internal/xsql/parser_tree_test.go

@@ -39,10 +39,11 @@ func TestParser_ParseTree(t *testing.T) {
 					{Name: "USERID", FieldType: &ast.BasicType{Type: ast.BIGINT}},
 					{Name: "USERID", FieldType: &ast.BasicType{Type: ast.BIGINT}},
 				},
 				},
 				Options: &ast.Options{
 				Options: &ast.Options{
-					DATASOURCE: "users",
-					FORMAT:     "JSON",
-					KEY:        "USERID",
-					SHARED:     true,
+					DATASOURCE:        "users",
+					FORMAT:            "JSON",
+					KEY:               "USERID",
+					SHARED:            true,
+					STRICT_VALIDATION: true,
 				},
 				},
 			},
 			},
 		},
 		},
@@ -56,10 +57,11 @@ func TestParser_ParseTree(t *testing.T) {
 					{Name: "USERID", FieldType: &ast.BasicType{Type: ast.BIGINT}},
 					{Name: "USERID", FieldType: &ast.BasicType{Type: ast.BIGINT}},
 				},
 				},
 				Options: &ast.Options{
 				Options: &ast.Options{
-					DATASOURCE:  "users",
-					FORMAT:      "JSON",
-					KEY:         "USERID",
-					RETAIN_SIZE: 3,
+					DATASOURCE:        "users",
+					FORMAT:            "JSON",
+					KEY:               "USERID",
+					RETAIN_SIZE:       3,
+					STRICT_VALIDATION: true,
 				},
 				},
 				StreamType: ast.TypeTable,
 				StreamType: ast.TypeTable,
 			},
 			},
@@ -78,9 +80,10 @@ func TestParser_ParseTree(t *testing.T) {
 					{Name: "id", FieldType: &ast.BasicType{Type: ast.BIGINT}},
 					{Name: "id", FieldType: &ast.BasicType{Type: ast.BIGINT}},
 				},
 				},
 				Options: &ast.Options{
 				Options: &ast.Options{
-					DATASOURCE: "lookup.json",
-					FORMAT:     "json",
-					CONF_KEY:   "test",
+					DATASOURCE:        "lookup.json",
+					FORMAT:            "json",
+					CONF_KEY:          "test",
+					STRICT_VALIDATION: true,
 				},
 				},
 				StreamType: ast.TypeTable,
 				StreamType: ast.TypeTable,
 			},
 			},