Переглянути джерело

feat(planner): planner to deal with inferred schema

Validation at compile time for field reference

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 роки тому
батько
коміт
29825ad32f

+ 3 - 3
internal/plugin/portable/test/portable_rule_test.go

@@ -64,13 +64,13 @@ func TestSourceAndFunc(t *testing.T) {
 			Rule: `{"sql":"SELECT count as ee FROM ext","actions":[{"memory":{"topic":"cache"}}]}`,
 			Rule: `{"sql":"SELECT count as ee FROM ext","actions":[{"memory":{"topic":"cache"}}]}`,
 			R: [][]map[string]interface{}{
 			R: [][]map[string]interface{}{
 				{{
 				{{
-					"ee": 50,
+					"ee": int64(50),
 				}},
 				}},
 				{{
 				{{
-					"ee": 50,
+					"ee": int64(50),
 				}},
 				}},
 				{{
 				{{
-					"ee": 50,
+					"ee": int64(50),
 				}},
 				}},
 			},
 			},
 			M: map[string]interface{}{
 			M: map[string]interface{}{

+ 1 - 39
internal/processor/stream.go

@@ -378,45 +378,7 @@ func (p *StreamProcessor) GetInferredJsonSchema(name string, st ast.StreamType)
 			return nil, err
 			return nil, err
 		}
 		}
 	}
 	}
-	return convertSchema(sfs), nil
-}
-
-func convertSchema(sfs ast.StreamFields) map[string]*ast.JsonStreamField {
-	result := make(map[string]*ast.JsonStreamField, len(sfs))
-	for _, sf := range sfs {
-		result[sf.Name] = convertFieldType(sf.FieldType)
-	}
-	return result
-}
-
-func convertFieldType(sf ast.FieldType) *ast.JsonStreamField {
-	switch t := sf.(type) {
-	case *ast.BasicType:
-		return &ast.JsonStreamField{
-			Type: t.Type.String(),
-		}
-	case *ast.ArrayType:
-		var items *ast.JsonStreamField
-		switch t.Type {
-		case ast.ARRAY, ast.STRUCT:
-			items = convertFieldType(t.FieldType)
-		default:
-			items = &ast.JsonStreamField{
-				Type: t.Type.String(),
-			}
-		}
-		return &ast.JsonStreamField{
-			Type:  "array",
-			Items: items,
-		}
-	case *ast.RecType:
-		return &ast.JsonStreamField{
-			Type:       "struct",
-			Properties: convertSchema(t.StreamFields),
-		}
-	default: // should never happen
-		return nil
-	}
+	return sfs.ToJsonSchema(), nil
 }
 }
 
 
 func (p *StreamProcessor) execExplain(stmt ast.NameNode, st ast.StreamType) (string, error) {
 func (p *StreamProcessor) execExplain(stmt ast.NameNode, st ast.StreamType) (string, error) {

+ 6 - 1
internal/topo/node/source_node.go

@@ -163,7 +163,12 @@ func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
 								stats.IncTotalRecordsIn()
 								stats.IncTotalRecordsIn()
 								stats.ProcessTimeStart()
 								stats.ProcessTimeStart()
 								tuple := &xsql.Tuple{Emitter: m.name, Message: data.Message(), Timestamp: conf.GetNowInMilli(), Metadata: data.Meta()}
 								tuple := &xsql.Tuple{Emitter: m.name, Message: data.Message(), Timestamp: conf.GetNowInMilli(), Metadata: data.Meta()}
-								processedData := m.preprocessOp.Apply(ctx, tuple, nil, nil)
+								var processedData interface{}
+								if m.preprocessOp != nil {
+									processedData = m.preprocessOp.Apply(ctx, tuple, nil, nil)
+								} else {
+									processedData = tuple
+								}
 								stats.ProcessTimeEnd()
 								stats.ProcessTimeEnd()
 								//blocking
 								//blocking
 								switch val := processedData.(type) {
 								switch val := processedData.(type) {

+ 81 - 415
internal/topo/operator/field_processor.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -15,446 +15,112 @@
 package operator
 package operator
 
 
 import (
 import (
-	"encoding/base64"
 	"encoding/json"
 	"encoding/json"
 	"fmt"
 	"fmt"
-	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
-	"github.com/lf-edge/ekuiper/pkg/message"
-	"math"
 	"reflect"
 	"reflect"
-	"strconv"
 	"time"
 	"time"
 )
 )
 
 
+// Only run when strict validation mode is on, fields is defined and is not binary
+// Do not convert types
 type defaultFieldProcessor struct {
 type defaultFieldProcessor struct {
-	streamFields     []interface{}
-	timestampFormat  string
-	isBinary         bool
-	strictValidation bool
+	streamFields    map[string]*ast.JsonStreamField
+	timestampFormat string
 }
 }
 
 
-func (p *defaultFieldProcessor) processField(tuple *xsql.Tuple, _ *xsql.FunctionValuer) (map[string]interface{}, error) {
-	result := make(map[string]interface{})
-	if p.streamFields != nil {
-		for _, f := range p.streamFields {
-			switch sf := f.(type) {
-			case *ast.StreamField:
-				if p.isBinary {
-					tuple.Message[sf.Name] = tuple.Message[message.DefaultField]
-				}
-				if e := p.addRecField(sf.FieldType, result, tuple.Message, sf.Name); e != nil {
-					return nil, e
-				}
-			case string: //schemaless
-				if p.isBinary {
-					result = tuple.Message
-				} else {
-					if m, ok := tuple.Message.Value(sf, ""); ok {
-						result[sf] = m
-					}
-				}
-			}
-			if p.isBinary {
-				break //binary format should only have ONE field
-			}
-		}
-	} else {
-		result = tuple.Message
-	}
-	return result, nil
+func (p *defaultFieldProcessor) validateAndConvert(tuple *xsql.Tuple) error {
+	_, err := p.validateAndConvertMessage(p.streamFields, tuple.Message)
+	return err
 }
 }
 
 
-func (p *defaultFieldProcessor) addRecField(ft ast.FieldType, r map[string]interface{}, j xsql.Message, n string) error {
-	if t, ok := j.Value(n, ""); ok {
-		v := reflect.ValueOf(t)
-		jtype := v.Kind()
-		switch st := ft.(type) {
-		case *ast.BasicType:
-			switch st.Type {
-			case ast.UNKNOWN:
-				return fmt.Errorf("invalid data type unknown defined for %s, please check the stream definition", t)
-			case ast.BIGINT:
-				if jtype == reflect.Int {
-					r[n] = t.(int)
-				} else if jtype == reflect.Float64 {
-					if tt, ok1 := t.(float64); ok1 {
-						if tt > math.MaxInt64 {
-							r[n] = uint64(tt)
-						} else {
-							r[n] = int(tt)
-						}
-					}
-				} else if jtype == reflect.String {
-					if i, err := strconv.Atoi(t.(string)); err != nil {
-						return fmt.Errorf("invalid data type for %s, expect bigint but found %[2]T(%[2]v)", n, t)
-					} else {
-						r[n] = i
-					}
-				} else if jtype == reflect.Uint64 {
-					r[n] = t.(uint64)
-				} else {
-					return fmt.Errorf("invalid data type for %s, expect bigint but found %[2]T(%[2]v)", n, t)
-				}
-			case ast.FLOAT:
-				if jtype == reflect.Float64 {
-					r[n] = t.(float64)
-				} else if jtype == reflect.String {
-					if f, err := strconv.ParseFloat(t.(string), 64); err != nil {
-						return fmt.Errorf("invalid data type for %s, expect float but found %[2]T(%[2]v)", n, t)
-					} else {
-						r[n] = f
-					}
-				} else {
-					return fmt.Errorf("invalid data type for %s, expect float but found %[2]T(%[2]v)", n, t)
-				}
-			case ast.STRINGS:
-				if jtype == reflect.String {
-					r[n] = t.(string)
-				} else {
-					return fmt.Errorf("invalid data type for %s, expect string but found %[2]T(%[2]v)", n, t)
-				}
-			case ast.DATETIME:
-				switch jtype {
-				case reflect.Int:
-					ai := t.(int64)
-					r[n] = cast.TimeFromUnixMilli(ai)
-				case reflect.Float64:
-					ai := int64(t.(float64))
-					r[n] = cast.TimeFromUnixMilli(ai)
-				case reflect.String:
-					if t, err := p.parseTime(t.(string)); err != nil {
-						return fmt.Errorf("invalid data type for %s, cannot convert to datetime: %s", n, err)
-					} else {
-						r[n] = t
-					}
-				default:
-					return fmt.Errorf("invalid data type for %s, expect datatime but find %[2]T(%[2]v)", n, t)
-				}
-			case ast.BOOLEAN:
-				if jtype == reflect.Bool {
-					r[n] = t.(bool)
-				} else if jtype == reflect.String {
-					if i, err := strconv.ParseBool(t.(string)); err != nil {
-						return fmt.Errorf("invalid data type for %s, expect boolean but found %[2]T(%[2]v)", n, t)
-					} else {
-						r[n] = i
-					}
-				} else {
-					return fmt.Errorf("invalid data type for %s, expect boolean but found %[2]T(%[2]v)", n, t)
-				}
-			case ast.BYTEA:
-				if jtype == reflect.String {
-					if b, err := base64.StdEncoding.DecodeString(t.(string)); err != nil {
-						return fmt.Errorf("invalid data type for %s, expect bytea but found %[2]T(%[2]v) which cannot base64 decode", n, t)
-					} else {
-						r[n] = b
-					}
-				} else if jtype == reflect.Slice {
-					if b, ok := t.([]byte); ok {
-						r[n] = b
-					} else {
-						return fmt.Errorf("invalid data type for %s, expect bytea but found %[2]T(%[2]v)", n, t)
-					}
-				}
-			default:
-				return fmt.Errorf("invalid data type for %s, it is not supported yet", st)
-			}
-		case *ast.ArrayType:
-			var s []interface{}
-			if t == nil {
-				s = nil
-			} else if jtype == reflect.Slice {
-				s = t.([]interface{})
-			} else if jtype == reflect.String {
-				err := json.Unmarshal([]byte(t.(string)), &s)
-				if err != nil {
-					return fmt.Errorf("invalid data type for %s, expect array but found %[2]T(%[2]v)", n, t)
-				}
-			} else {
-				return fmt.Errorf("invalid data type for %s, expect array but found %[2]T(%[2]v)", n, t)
-			}
-
-			if tempArr, err := p.addArrayField(st, s); err != nil {
-				return fmt.Errorf("fail to parse field %s: %s", n, err)
-			} else {
-				r[n] = tempArr
-			}
-		case *ast.RecType:
-			nextJ := make(map[string]interface{})
-			if t == nil {
-				nextJ = nil
-				r[n] = nextJ
-				return nil
-			} else if jtype == reflect.Map {
-				nextJ, ok = t.(map[string]interface{})
-				if !ok {
-					return fmt.Errorf("invalid data type for %s, expect map but found %[2]T(%[2]v)", n, t)
-				}
-			} else if jtype == reflect.String {
-				err := json.Unmarshal([]byte(t.(string)), &nextJ)
-				if err != nil {
-					return fmt.Errorf("invalid data type for %s, expect map but found %[2]T(%[2]v)", n, t)
-				}
-			} else {
-				return fmt.Errorf("invalid data type for %s, expect struct but found %[2]T(%[2]v)", n, t)
-			}
-			nextR := make(map[string]interface{})
-			for _, nextF := range st.StreamFields {
-				nextP := nextF.Name
-				if e := p.addRecField(nextF.FieldType, nextR, nextJ, nextP); e != nil {
-					return e
-				}
-			}
-			r[n] = nextR
-		default:
-			return fmt.Errorf("unsupported type %T", st)
+func (p *defaultFieldProcessor) validateAndConvertMessage(schema map[string]*ast.JsonStreamField, message xsql.Message) (map[string]interface{}, error) {
+	for name, sf := range schema {
+		v, ok := message.Value(name, "")
+		if !ok {
+			return nil, fmt.Errorf("field %s is not found", name)
 		}
 		}
-	} else {
-		if p.strictValidation {
-			return fmt.Errorf("invalid data %s, field %s not found", j, n)
+		if nv, err := p.validateAndConvertField(sf, v); err != nil {
+			return nil, fmt.Errorf("field %s type mismatch: %v", name, err)
 		} else {
 		} else {
-			switch st := ft.(type) {
-			case *ast.BasicType:
-				switch st.Type {
-				case ast.UNKNOWN:
-					return fmt.Errorf("invalid data type unknown defined for %s, please check the stream definition", t)
-				case ast.BIGINT:
-					r[n] = int64(0)
-				case ast.FLOAT:
-					r[n] = 0.0
-				case ast.STRINGS:
-					r[n] = ""
-				case ast.DATETIME:
-					r[n] = conf.GetNow()
-				case ast.BOOLEAN:
-					r[n] = false
-				case ast.BYTEA:
-					r[n] = []byte{}
-				default:
-					return fmt.Errorf("invalid data type for %s, it is not supported yet", st)
-				}
-			case *ast.ArrayType:
-				if tempArr, err := p.addArrayField(st, nil); err != nil {
-					return fmt.Errorf("fail to parse field %s: %s", n, err)
-				} else {
-					r[n] = tempArr
-				}
-			case *ast.RecType:
-				r[n] = make(map[string]interface{})
-			default:
-				return fmt.Errorf("unsupported type %T", st)
-			}
+			message[name] = nv
 		}
 		}
 	}
 	}
-	return nil
+	return message, nil
 }
 }
 
 
-//ft must be ast.ArrayType
-//side effect: r[p] will be set to the new array
-func (p *defaultFieldProcessor) addArrayField(ft *ast.ArrayType, srcSlice []interface{}) (interface{}, error) {
-	if ft.FieldType != nil { //complex type array or struct
-		switch st := ft.FieldType.(type) { //Only two complex types supported here
-		case *ast.ArrayType: //TODO handle array of array. Now the type is treated as interface{}
-			if srcSlice == nil {
-				return [][]interface{}(nil), nil
-			}
-			var s []interface{}
-			var tempSlice reflect.Value
-			for i, t := range srcSlice {
-				jtype := reflect.ValueOf(t).Kind()
-				if t == nil {
-					s = nil
-				} else if jtype == reflect.Slice || jtype == reflect.Array {
-					s = t.([]interface{})
-				} else if jtype == reflect.String {
-					err := json.Unmarshal([]byte(t.(string)), &s)
-					if err != nil {
-						return nil, fmt.Errorf("invalid data type for [%d], expect array but found %[2]T(%[2]v)", i, t)
-					}
-				} else {
-					return nil, fmt.Errorf("invalid data type for [%d], expect array but found %[2]T(%[2]v)", i, t)
-				}
-				if tempArr, err := p.addArrayField(st, s); err != nil {
-					return nil, err
-				} else {
-					if !tempSlice.IsValid() {
-						s := reflect.TypeOf(tempArr)
-						tempSlice = reflect.MakeSlice(reflect.SliceOf(s), 0, 0)
-					}
-					tempSlice = reflect.Append(tempSlice, reflect.ValueOf(tempArr))
-				}
-			}
-			return tempSlice.Interface(), nil
-		case *ast.RecType:
-			if srcSlice == nil {
-				return []map[string]interface{}(nil), nil
-			}
-			tempSlice := make([]map[string]interface{}, 0)
-			for i, t := range srcSlice {
-				jtype := reflect.ValueOf(t).Kind()
-				j := make(map[string]interface{})
-				var ok bool
-				if t == nil {
-					j = nil
-					tempSlice = append(tempSlice, j)
-					continue
-				} else if jtype == reflect.Map {
-					j, ok = t.(map[string]interface{})
-					if !ok {
-						return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
-					}
-
-				} else if jtype == reflect.String {
-					err := json.Unmarshal([]byte(t.(string)), &j)
-					if err != nil {
-						return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
-					}
-				} else {
-					return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
-				}
-				r := make(map[string]interface{})
-				for _, f := range st.StreamFields {
-					n := f.Name
-					if e := p.addRecField(f.FieldType, r, j, n); e != nil {
-						return nil, e
-					}
-				}
-				tempSlice = append(tempSlice, r)
-			}
-			return tempSlice, nil
-		default:
-			return nil, fmt.Errorf("unsupported type %T", st)
+// Validate and convert field value to the type defined in schema
+func (p *defaultFieldProcessor) validateAndConvertField(sf *ast.JsonStreamField, t interface{}) (interface{}, error) {
+	v := reflect.ValueOf(t)
+	jtype := v.Kind()
+	switch sf.Type {
+	case (ast.BIGINT).String():
+		if jtype == reflect.Int64 {
+			return t, nil
 		}
 		}
-	} else { //basic type
-		switch ft.Type {
-		case ast.UNKNOWN:
-			return nil, fmt.Errorf("invalid data type unknown defined for %s, please checke the stream definition", srcSlice)
-		case ast.BIGINT:
-			if srcSlice == nil {
-				return []int(nil), nil
-			}
-			tempSlice := make([]int, 0)
-			for i, t := range srcSlice {
-				jtype := reflect.ValueOf(t).Kind()
-				if jtype == reflect.Float64 {
-					tempSlice = append(tempSlice, int(t.(float64)))
-				} else if jtype == reflect.String {
-					if v, err := strconv.Atoi(t.(string)); err != nil {
-						return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
-					} else {
-						tempSlice = append(tempSlice, v)
-					}
-				} else {
-					return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
-				}
-			}
-			return tempSlice, nil
-		case ast.FLOAT:
-			if srcSlice == nil {
-				return []float64(nil), nil
-			}
-			tempSlice := make([]float64, 0)
-			for i, t := range srcSlice {
-				jtype := reflect.ValueOf(t).Kind()
-				if jtype == reflect.Float64 {
-					tempSlice = append(tempSlice, t.(float64))
-				} else if jtype == reflect.String {
-					if f, err := strconv.ParseFloat(t.(string), 64); err != nil {
-						return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
-					} else {
-						tempSlice = append(tempSlice, f)
-					}
-				} else {
-					return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
-				}
-			}
-			return tempSlice, nil
-		case ast.STRINGS:
-			if srcSlice == nil {
-				return []string(nil), nil
-			}
-			tempSlice := make([]string, 0)
-			for i, t := range srcSlice {
-				if reflect.ValueOf(t).Kind() == reflect.String {
-					tempSlice = append(tempSlice, t.(string))
-				} else {
-					return nil, fmt.Errorf("invalid data type for [%d], expect string but found %[2]T(%[2]v)", i, t)
+		return cast.ToInt64(t, cast.CONVERT_SAMEKIND)
+	case (ast.FLOAT).String():
+		if jtype == reflect.Float64 {
+			return t, nil
+		}
+		return cast.ToFloat64(t, cast.CONVERT_SAMEKIND)
+	case (ast.BOOLEAN).String():
+		if jtype == reflect.Bool {
+			return t, nil
+		}
+		return cast.ToBool(t, cast.CONVERT_SAMEKIND)
+	case (ast.STRINGS).String():
+		if jtype == reflect.String {
+			return t, nil
+		}
+		return cast.ToString(t, cast.CONVERT_SAMEKIND)
+	case (ast.DATETIME).String():
+		return cast.InterfaceToTime(t, p.timestampFormat)
+	case (ast.BYTEA).String():
+		return cast.ToByteA(t, cast.CONVERT_SAMEKIND)
+	case (ast.ARRAY).String():
+		if t == nil {
+			return []interface{}(nil), nil
+		} else if jtype == reflect.Slice {
+			a := t.([]interface{})
+			for i, e := range a {
+				ne, err := p.validateAndConvertField(sf.Items, e)
+				if err != nil {
+					return nil, fmt.Errorf("array element type mismatch: %v", err)
 				}
 				}
-			}
-			return tempSlice, nil
-		case ast.DATETIME:
-			if srcSlice == nil {
-				return []time.Time(nil), nil
-			}
-			tempSlice := make([]time.Time, 0)
-			for i, t := range srcSlice {
-				jtype := reflect.ValueOf(t).Kind()
-				switch jtype {
-				case reflect.Int:
-					ai := t.(int64)
-					tempSlice = append(tempSlice, cast.TimeFromUnixMilli(ai))
-				case reflect.Float64:
-					ai := int64(t.(float64))
-					tempSlice = append(tempSlice, cast.TimeFromUnixMilli(ai))
-				case reflect.String:
-					if ai, err := p.parseTime(t.(string)); err != nil {
-						return nil, fmt.Errorf("invalid data type for %s, cannot convert to datetime: %[2]T(%[2]v)", t, err)
-					} else {
-						tempSlice = append(tempSlice, ai)
-					}
-				default:
-					return nil, fmt.Errorf("invalid data type for [%d], expect datetime but found %[2]T(%[2]v)", i, t)
+				if ne != nil {
+					a[i] = ne
 				}
 				}
 			}
 			}
-			return tempSlice, nil
-		case ast.BOOLEAN:
-			if srcSlice == nil {
-				return []bool(nil), nil
-			}
-			tempSlice := make([]bool, 0)
-			for i, t := range srcSlice {
-				jtype := reflect.ValueOf(t).Kind()
-				if jtype == reflect.Bool {
-					tempSlice = append(tempSlice, t.(bool))
-				} else if jtype == reflect.String {
-					if v, err := strconv.ParseBool(t.(string)); err != nil {
-						return nil, fmt.Errorf("invalid data type for [%d], expect boolean but found %[2]T(%[2]v)", i, t)
-					} else {
-						tempSlice = append(tempSlice, v)
-					}
-				} else {
-					return nil, fmt.Errorf("invalid data type for [%d], expect boolean but found %[2]T(%[2]v)", i, t)
-				}
-			}
-			return tempSlice, nil
-		case ast.BYTEA:
-			if srcSlice == nil {
-				return [][]byte(nil), nil
-			}
-			tempSlice := make([][]byte, 0)
-			for i, t := range srcSlice {
-				jtype := reflect.ValueOf(t).Kind()
-				if jtype == reflect.String {
-					if b, err := base64.StdEncoding.DecodeString(t.(string)); err != nil {
-						return nil, fmt.Errorf("invalid data type for [%d], expect bytea but found %[2]T(%[2]v) which cannot base64 decode", i, t)
-					} else {
-						tempSlice = append(tempSlice, b)
-					}
-				} else if jtype == reflect.Slice {
-					if b, ok := t.([]byte); ok {
-						tempSlice = append(tempSlice, b)
-					} else {
-						return nil, fmt.Errorf("invalid data type for [%d], expect bytea but found %[2]T(%[2]v)", i, t)
-					}
-				}
+			return a, nil
+		} else {
+			return nil, fmt.Errorf("expect array but got %v", t)
+		}
+	case (ast.STRUCT).String():
+		var (
+			nextJ map[string]interface{}
+			ok    bool
+		)
+		if t == nil {
+			return map[string]interface{}(nil), nil
+		} else if jtype == reflect.Map {
+			nextJ, ok = t.(map[string]interface{})
+			if !ok {
+				return nil, fmt.Errorf("expect map but found %[1]T(%[1]v)", t)
+			}
+		} else if jtype == reflect.String {
+			err := json.Unmarshal([]byte(t.(string)), &nextJ)
+			if err != nil {
+				return nil, fmt.Errorf("invalid data type for %s, expect map but found %[1]T(%[1]v)", t)
 			}
 			}
-			return tempSlice, nil
-		default:
-			return nil, fmt.Errorf("invalid data type for %T", ft.Type)
+		} else {
+			return nil, fmt.Errorf("expect struct but found %[1]T(%[1]v)", t)
 		}
 		}
+		return p.validateAndConvertMessage(sf.Properties, nextJ)
+	default:
+		return nil, fmt.Errorf("unsupported type %s", sf.Type)
 	}
 	}
 }
 }
 
 

+ 35 - 25
internal/topo/operator/preprocessor.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -18,31 +18,40 @@ import (
 	"fmt"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
+	"github.com/lf-edge/ekuiper/pkg/message"
 )
 )
 
 
+// Preprocessor only planned when
+// 1. eventTime, to convert the timestamp field
+// 2. schema validate and convert, when strict_validation is on and field type is not binary
+// Do not convert types
 type Preprocessor struct {
 type Preprocessor struct {
 	//Pruned stream fields. Could be streamField(with data type info) or string
 	//Pruned stream fields. Could be streamField(with data type info) or string
 	defaultFieldProcessor
 	defaultFieldProcessor
-	allMeta        bool
-	metaFields     []string //only needed if not allMeta
+	//allMeta        bool
+	//metaFields     []string //only needed if not allMeta
 	isEventTime    bool
 	isEventTime    bool
-	isSchemaless   bool
 	timestampField string
 	timestampField string
+	checkSchema    bool
+	isBinary       bool
 }
 }
 
 
-func NewPreprocessor(isSchemaless bool, fields []interface{}, allMeta bool, metaFields []string, iet bool, timestampField string, timestampFormat string, isBinary bool, strictValidation bool) (*Preprocessor, error) {
+func NewPreprocessor(isSchemaless bool, fields map[string]*ast.JsonStreamField, _ bool, _ []string, iet bool, timestampField string, timestampFormat string, isBinary bool, strictValidation bool) (*Preprocessor, error) {
 	p := &Preprocessor{
 	p := &Preprocessor{
-		allMeta: allMeta, metaFields: metaFields, isEventTime: iet,
-		isSchemaless: isSchemaless, timestampField: timestampField}
-	p.defaultFieldProcessor = defaultFieldProcessor{
-		streamFields: fields, isBinary: isBinary, timestampFormat: timestampFormat, strictValidation: strictValidation,
+		isEventTime: iet, timestampField: timestampField, isBinary: isBinary}
+	if !isSchemaless && (strictValidation || isBinary) {
+		p.checkSchema = true
+		p.defaultFieldProcessor = defaultFieldProcessor{
+			streamFields: fields, timestampFormat: timestampFormat,
+		}
 	}
 	}
 	return p, nil
 	return p, nil
 }
 }
 
 
-/*
- *	input: *xsql.Tuple
+// Apply the preprocessor to the tuple
+/*	input: *xsql.Tuple
  *	output: *xsql.Tuple
  *	output: *xsql.Tuple
  */
  */
 func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}, _ *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
 func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}, _ *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
@@ -53,29 +62,30 @@ func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}, _ *xsql.Fu
 	}
 	}
 
 
 	log.Debugf("preprocessor receive %s", tuple.Message)
 	log.Debugf("preprocessor receive %s", tuple.Message)
-	var (
-		result map[string]interface{}
-		err    error
-	)
-	if !p.isSchemaless && p.streamFields != nil {
-		result, err = p.processField(tuple, nil)
-		if err != nil {
-			return fmt.Errorf("error in preprocessor: %s", err)
+	if p.checkSchema {
+		if !p.isBinary {
+			err := p.validateAndConvert(tuple)
+			if err != nil {
+				return fmt.Errorf("error in preprocessor: %s", err)
+			}
+		} else {
+			for name := range p.streamFields {
+				tuple.Message[name] = tuple.Message[message.DefaultField]
+				delete(tuple.Message, message.DefaultField)
+				break
+			}
 		}
 		}
-		tuple.Message = result
-	} else {
-		result = tuple.Message
 	}
 	}
 	if p.isEventTime {
 	if p.isEventTime {
-		if t, ok := result[p.timestampField]; ok {
+		if t, ok := tuple.Message[p.timestampField]; ok {
 			if ts, err := cast.InterfaceToUnixMilli(t, p.timestampFormat); err != nil {
 			if ts, err := cast.InterfaceToUnixMilli(t, p.timestampFormat); err != nil {
 				return fmt.Errorf("cannot convert timestamp field %s to timestamp with error %v", p.timestampField, err)
 				return fmt.Errorf("cannot convert timestamp field %s to timestamp with error %v", p.timestampField, err)
 			} else {
 			} else {
 				tuple.Timestamp = ts
 				tuple.Timestamp = ts
-				log.Debugf("preprocessor calculate timstamp %d", tuple.Timestamp)
+				log.Debugf("preprocessor calculate timestamp %d", tuple.Timestamp)
 			}
 			}
 		} else {
 		} else {
-			return fmt.Errorf("cannot find timestamp field %s in tuple %v", p.timestampField, result)
+			return fmt.Errorf("cannot find timestamp field %s in tuple %v", p.timestampField, tuple.Message)
 		}
 		}
 	}
 	}
 	// No need to reconstruct meta as the memory has been allocated earlier
 	// No need to reconstruct meta as the memory has been allocated earlier

+ 103 - 177
internal/topo/operator/preprocessor_test.go

@@ -43,7 +43,7 @@ func TestPreprocessor_Apply(t *testing.T) {
 		result interface{}
 		result interface{}
 	}{
 	}{
 		//Basic type
 		//Basic type
-		{
+		{ // 0
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name: ast.StreamName("demo"),
 				Name: ast.StreamName("demo"),
 				StreamFields: []ast.StreamField{
 				StreamFields: []ast.StreamField{
@@ -51,9 +51,9 @@ func TestPreprocessor_Apply(t *testing.T) {
 				},
 				},
 			},
 			},
 			data:   []byte(`{"a": 6}`),
 			data:   []byte(`{"a": 6}`),
-			result: errors.New("error in preprocessor: invalid data map[a:%!s(float64=6)], field abc not found"),
+			result: errors.New("error in preprocessor: field abc is not found"),
 		},
 		},
-		{
+		{ // 1
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name: ast.StreamName("demo"),
 				Name: ast.StreamName("demo"),
 				StreamFields: []ast.StreamField{
 				StreamFields: []ast.StreamField{
@@ -61,9 +61,9 @@ func TestPreprocessor_Apply(t *testing.T) {
 				},
 				},
 			},
 			},
 			data:   []byte(`{"abc": null}`),
 			data:   []byte(`{"abc": null}`),
-			result: errors.New("error in preprocessor: invalid data type for abc, expect bigint but found <nil>(<nil>)"),
+			result: errors.New("error in preprocessor: field abc type mismatch: cannot convert <nil>(<nil>) to int64"),
 		},
 		},
-		{
+		{ // 2
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name:         ast.StreamName("demo"),
 				Name:         ast.StreamName("demo"),
 				StreamFields: nil,
 				StreamFields: nil,
@@ -74,7 +74,7 @@ func TestPreprocessor_Apply(t *testing.T) {
 			},
 			},
 			},
 			},
 		},
 		},
-		{
+		{ // 3
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name: ast.StreamName("demo"),
 				Name: ast.StreamName("demo"),
 				StreamFields: []ast.StreamField{
 				StreamFields: []ast.StreamField{
@@ -83,11 +83,11 @@ func TestPreprocessor_Apply(t *testing.T) {
 			},
 			},
 			data: []byte(`{"abc": 6}`),
 			data: []byte(`{"abc": 6}`),
 			result: &xsql.Tuple{Message: xsql.Message{
 			result: &xsql.Tuple{Message: xsql.Message{
-				"abc": 6,
+				"abc": int64(6),
 			},
 			},
 			},
 			},
 		},
 		},
-		{
+		{ // 4
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name:         ast.StreamName("demo"),
 				Name:         ast.StreamName("demo"),
 				StreamFields: nil,
 				StreamFields: nil,
@@ -98,7 +98,7 @@ func TestPreprocessor_Apply(t *testing.T) {
 			},
 			},
 			},
 			},
 		},
 		},
-		{
+		{ // 5
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name: ast.StreamName("demo"),
 				Name: ast.StreamName("demo"),
 				StreamFields: []ast.StreamField{
 				StreamFields: []ast.StreamField{
@@ -110,10 +110,12 @@ func TestPreprocessor_Apply(t *testing.T) {
 			result: &xsql.Tuple{Message: xsql.Message{
 			result: &xsql.Tuple{Message: xsql.Message{
 				"abc": float64(34),
 				"abc": float64(34),
 				"dEf": "hello",
 				"dEf": "hello",
+				"def": "hello",
+				"ghi": float64(50),
 			},
 			},
 			},
 			},
 		},
 		},
-		{
+		{ // 6
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name:         ast.StreamName("demo"),
 				Name:         ast.StreamName("demo"),
 				StreamFields: nil,
 				StreamFields: nil,
@@ -126,7 +128,7 @@ func TestPreprocessor_Apply(t *testing.T) {
 			},
 			},
 			},
 			},
 		},
 		},
-		{
+		{ // 7
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name: ast.StreamName("demo"),
 				Name: ast.StreamName("demo"),
 				StreamFields: []ast.StreamField{
 				StreamFields: []ast.StreamField{
@@ -134,14 +136,10 @@ func TestPreprocessor_Apply(t *testing.T) {
 					{Name: "def", FieldType: &ast.BasicType{Type: ast.STRINGS}},
 					{Name: "def", FieldType: &ast.BasicType{Type: ast.STRINGS}},
 				},
 				},
 			},
 			},
-			data: []byte(`{"abc": "34", "def" : "hello", "ghi": "50"}`),
-			result: &xsql.Tuple{Message: xsql.Message{
-				"abc": float64(34),
-				"def": "hello",
-			},
-			},
+			data:   []byte(`{"abc": "34", "def" : "hello", "ghi": "50"}`),
+			result: errors.New("error in preprocessor: field abc type mismatch: cannot convert string(34) to float64"),
 		},
 		},
-		{
+		{ // 8
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name: ast.StreamName("demo"),
 				Name: ast.StreamName("demo"),
 				StreamFields: []ast.StreamField{
 				StreamFields: []ast.StreamField{
@@ -150,9 +148,9 @@ func TestPreprocessor_Apply(t *testing.T) {
 				},
 				},
 			},
 			},
 			data:   []byte(`{"abc": 77, "def" : "hello"}`),
 			data:   []byte(`{"abc": 77, "def" : "hello"}`),
-			result: errors.New("error in preprocessor: invalid data type for def, expect boolean but found string(hello)"),
+			result: errors.New("error in preprocessor: field def type mismatch: cannot convert string(hello) to bool"),
 		},
 		},
-		{
+		{ // 9
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name: ast.StreamName("demo"),
 				Name: ast.StreamName("demo"),
 				StreamFields: []ast.StreamField{
 				StreamFields: []ast.StreamField{
@@ -161,9 +159,9 @@ func TestPreprocessor_Apply(t *testing.T) {
 				},
 				},
 			},
 			},
 			data:   []byte(`{"a": {"b" : "hello"}}`),
 			data:   []byte(`{"a": {"b" : "hello"}}`),
-			result: errors.New("error in preprocessor: invalid data map[a:map[b:hello]], field abc not found"),
+			result: errors.New("error in preprocessor: field abc is not found"),
 		},
 		},
-		{
+		{ // 10
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name:         ast.StreamName("demo"),
 				Name:         ast.StreamName("demo"),
 				StreamFields: nil,
 				StreamFields: nil,
@@ -177,7 +175,7 @@ func TestPreprocessor_Apply(t *testing.T) {
 			},
 			},
 		},
 		},
 		//Rec type
 		//Rec type
-		{
+		{ // 11
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name: ast.StreamName("demo"),
 				Name: ast.StreamName("demo"),
 				StreamFields: []ast.StreamField{
 				StreamFields: []ast.StreamField{
@@ -196,7 +194,7 @@ func TestPreprocessor_Apply(t *testing.T) {
 			},
 			},
 			},
 			},
 		},
 		},
-		{
+		{ // 12
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name: ast.StreamName("demo"),
 				Name: ast.StreamName("demo"),
 				StreamFields: []ast.StreamField{
 				StreamFields: []ast.StreamField{
@@ -207,15 +205,10 @@ func TestPreprocessor_Apply(t *testing.T) {
 					}},
 					}},
 				},
 				},
 			},
 			},
-			data: []byte(`{"a": "{\"b\" : \"32\"}"}`),
-			result: &xsql.Tuple{Message: xsql.Message{
-				"a": map[string]interface{}{
-					"b": float64(32),
-				},
-			},
-			},
+			data:   []byte(`{"a": "{\"b\" : \"32\"}"}`),
+			result: errors.New("error in preprocessor: field a type mismatch: field b type mismatch: cannot convert string(32) to float64"),
 		},
 		},
-		{
+		{ // 13
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name:         ast.StreamName("demo"),
 				Name:         ast.StreamName("demo"),
 				StreamFields: nil,
 				StreamFields: nil,
@@ -229,7 +222,7 @@ func TestPreprocessor_Apply(t *testing.T) {
 			},
 			},
 		},
 		},
 		//Array of complex type
 		//Array of complex type
-		{
+		{ // 14
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name: ast.StreamName("demo"),
 				Name: ast.StreamName("demo"),
 				StreamFields: []ast.StreamField{
 				StreamFields: []ast.StreamField{
@@ -245,14 +238,14 @@ func TestPreprocessor_Apply(t *testing.T) {
 			},
 			},
 			data: []byte(`{"a": [{"b" : "hello1"}, {"b" : "hello2"}]}`),
 			data: []byte(`{"a": [{"b" : "hello1"}, {"b" : "hello2"}]}`),
 			result: &xsql.Tuple{Message: xsql.Message{
 			result: &xsql.Tuple{Message: xsql.Message{
-				"a": []map[string]interface{}{
-					{"b": "hello1"},
-					{"b": "hello2"},
+				"a": []interface{}{
+					map[string]interface{}{"b": "hello1"},
+					map[string]interface{}{"b": "hello2"},
 				},
 				},
 			},
 			},
 			},
 			},
 		},
 		},
-		{
+		{ // 15
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name: ast.StreamName("demo"),
 				Name: ast.StreamName("demo"),
 				StreamFields: []ast.StreamField{
 				StreamFields: []ast.StreamField{
@@ -268,11 +261,11 @@ func TestPreprocessor_Apply(t *testing.T) {
 			},
 			},
 			data: []byte(`{"a": []}`),
 			data: []byte(`{"a": []}`),
 			result: &xsql.Tuple{Message: xsql.Message{
 			result: &xsql.Tuple{Message: xsql.Message{
-				"a": make([]map[string]interface{}, 0),
+				"a": make([]interface{}, 0),
 			},
 			},
 			},
 			},
 		},
 		},
-		{
+		{ // 16
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name: ast.StreamName("demo"),
 				Name: ast.StreamName("demo"),
 				StreamFields: []ast.StreamField{
 				StreamFields: []ast.StreamField{
@@ -288,11 +281,11 @@ func TestPreprocessor_Apply(t *testing.T) {
 			},
 			},
 			data: []byte(`{"a": null}`),
 			data: []byte(`{"a": null}`),
 			result: &xsql.Tuple{Message: xsql.Message{
 			result: &xsql.Tuple{Message: xsql.Message{
-				"a": []map[string]interface{}(nil),
+				"a": []interface{}(nil),
 			},
 			},
 			},
 			},
 		},
 		},
-		{
+		{ // 17
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name: ast.StreamName("demo"),
 				Name: ast.StreamName("demo"),
 				StreamFields: []ast.StreamField{
 				StreamFields: []ast.StreamField{
@@ -308,14 +301,14 @@ func TestPreprocessor_Apply(t *testing.T) {
 			},
 			},
 			data: []byte(`{"a": [null, {"b" : "hello2"}]}`),
 			data: []byte(`{"a": [null, {"b" : "hello2"}]}`),
 			result: &xsql.Tuple{Message: xsql.Message{
 			result: &xsql.Tuple{Message: xsql.Message{
-				"a": []map[string]interface{}{
-					nil,
-					{"b": "hello2"},
+				"a": []interface{}{
+					map[string]interface{}(nil),
+					map[string]interface{}{"b": "hello2"},
 				},
 				},
 			},
 			},
 			},
 			},
 		},
 		},
-		{
+		{ // 18
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name: ast.StreamName("demo"),
 				Name: ast.StreamName("demo"),
 				StreamFields: []ast.StreamField{
 				StreamFields: []ast.StreamField{
@@ -329,15 +322,15 @@ func TestPreprocessor_Apply(t *testing.T) {
 			},
 			},
 			data: []byte(`{"a": [[50, 60, 70],[66], [77]]}`),
 			data: []byte(`{"a": [[50, 60, 70],[66], [77]]}`),
 			result: &xsql.Tuple{Message: xsql.Message{
 			result: &xsql.Tuple{Message: xsql.Message{
-				"a": [][]int{
-					{50, 60, 70},
-					{66},
-					{77},
+				"a": []interface{}{
+					[]interface{}{int64(50), int64(60), int64(70)},
+					[]interface{}{int64(66)},
+					[]interface{}{int64(77)},
 				},
 				},
 			},
 			},
 			},
 			},
 		},
 		},
-		{
+		{ // 19
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name: ast.StreamName("demo"),
 				Name: ast.StreamName("demo"),
 				StreamFields: []ast.StreamField{
 				StreamFields: []ast.StreamField{
@@ -351,15 +344,15 @@ func TestPreprocessor_Apply(t *testing.T) {
 			},
 			},
 			data: []byte(`{"a": [null, [66], [77]]}`),
 			data: []byte(`{"a": [null, [66], [77]]}`),
 			result: &xsql.Tuple{Message: xsql.Message{
 			result: &xsql.Tuple{Message: xsql.Message{
-				"a": [][]int{
-					[]int(nil),
-					{66},
-					{77},
+				"a": []interface{}{
+					[]interface{}(nil),
+					[]interface{}{int64(66)},
+					[]interface{}{int64(77)},
 				},
 				},
 			},
 			},
 			},
 			},
 		},
 		},
-		{
+		{ // 20
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name:         ast.StreamName("demo"),
 				Name:         ast.StreamName("demo"),
 				StreamFields: nil,
 				StreamFields: nil,
@@ -373,7 +366,7 @@ func TestPreprocessor_Apply(t *testing.T) {
 			},
 			},
 			},
 			},
 		},
 		},
-		{
+		{ // 21
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name: ast.StreamName("demo"),
 				Name: ast.StreamName("demo"),
 				StreamFields: []ast.StreamField{
 				StreamFields: []ast.StreamField{
@@ -382,16 +375,10 @@ func TestPreprocessor_Apply(t *testing.T) {
 					}},
 					}},
 				},
 				},
 			},
 			},
-			data: []byte(`{"a": "[\"55\", \"77\"]"}`),
-			result: &xsql.Tuple{Message: xsql.Message{
-				"a": []float64{
-					55,
-					77,
-				},
-			},
-			},
+			data:   []byte(`{"a": "[\"55\", \"77\"]"}`),
+			result: errors.New("error in preprocessor: field a type mismatch: expect array but got [\"55\", \"77\"]"),
 		},
 		},
-		{
+		{ // 22
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name:         ast.StreamName("demo"),
 				Name:         ast.StreamName("demo"),
 				StreamFields: nil,
 				StreamFields: nil,
@@ -406,7 +393,7 @@ func TestPreprocessor_Apply(t *testing.T) {
 			},
 			},
 		},
 		},
 		//Rec of complex type
 		//Rec of complex type
-		{
+		{ // 23
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name: ast.StreamName("demo"),
 				Name: ast.StreamName("demo"),
 				StreamFields: []ast.StreamField{
 				StreamFields: []ast.StreamField{
@@ -427,13 +414,13 @@ func TestPreprocessor_Apply(t *testing.T) {
 				"a": map[string]interface{}{
 				"a": map[string]interface{}{
 					"b": "hello",
 					"b": "hello",
 					"c": map[string]interface{}{
 					"c": map[string]interface{}{
-						"d": 35,
+						"d": int64(35),
 					},
 					},
 				},
 				},
 			},
 			},
 			},
 			},
 		},
 		},
-		{
+		{ // 24
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name: ast.StreamName("demo"),
 				Name: ast.StreamName("demo"),
 				StreamFields: []ast.StreamField{
 				StreamFields: []ast.StreamField{
@@ -455,7 +442,7 @@ func TestPreprocessor_Apply(t *testing.T) {
 			},
 			},
 			},
 			},
 		},
 		},
-		{
+		{ // 25
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name: ast.StreamName("demo"),
 				Name: ast.StreamName("demo"),
 				StreamFields: []ast.StreamField{
 				StreamFields: []ast.StreamField{
@@ -473,14 +460,14 @@ func TestPreprocessor_Apply(t *testing.T) {
 			result: &xsql.Tuple{Message: xsql.Message{
 			result: &xsql.Tuple{Message: xsql.Message{
 				"a": map[string]interface{}{
 				"a": map[string]interface{}{
 					"b": "hello",
 					"b": "hello",
-					"c": []float64{
+					"c": []interface{}{
 						35.2, 38.2,
 						35.2, 38.2,
 					},
 					},
 				},
 				},
 			},
 			},
 			},
 			},
 		},
 		},
-		{
+		{ // 26
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name: ast.StreamName("demo"),
 				Name: ast.StreamName("demo"),
 				StreamFields: []ast.StreamField{
 				StreamFields: []ast.StreamField{
@@ -498,12 +485,12 @@ func TestPreprocessor_Apply(t *testing.T) {
 			result: &xsql.Tuple{Message: xsql.Message{
 			result: &xsql.Tuple{Message: xsql.Message{
 				"a": map[string]interface{}{
 				"a": map[string]interface{}{
 					"b": "hello",
 					"b": "hello",
-					"c": []float64(nil),
+					"c": []interface{}(nil),
 				},
 				},
 			},
 			},
 			},
 			},
 		},
 		},
-		{
+		{ // 27
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name: ast.StreamName("demo"),
 				Name: ast.StreamName("demo"),
 				StreamFields: []ast.StreamField{
 				StreamFields: []ast.StreamField{
@@ -518,9 +505,9 @@ func TestPreprocessor_Apply(t *testing.T) {
 				},
 				},
 			},
 			},
 			data:   []byte(`{"a": {"b" : "hello", "c": [null, 35.4]}}`),
 			data:   []byte(`{"a": {"b" : "hello", "c": [null, 35.4]}}`),
-			result: errors.New("error in preprocessor: fail to parse field c: invalid data type for [0], expect float but found <nil>(<nil>)"),
+			result: errors.New("error in preprocessor: field a type mismatch: field c type mismatch: array element type mismatch: cannot convert <nil>(<nil>) to float64"),
 		},
 		},
-		{
+		{ // 28
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name:         ast.StreamName("demo"),
 				Name:         ast.StreamName("demo"),
 				StreamFields: nil,
 				StreamFields: nil,
@@ -535,31 +522,6 @@ func TestPreprocessor_Apply(t *testing.T) {
 				},
 				},
 			},
 			},
 			},
 			},
-		}, {
-			stmt: &ast.StreamStmt{
-				Name: ast.StreamName("demo"),
-				StreamFields: []ast.StreamField{
-					{Name: "a", FieldType: &ast.RecType{
-						StreamFields: []ast.StreamField{
-							{Name: "b", FieldType: &ast.BasicType{Type: ast.STRINGS}},
-						},
-					}},
-					{Name: "b", FieldType: &ast.BasicType{Type: ast.FLOAT}},
-					{Name: "c", FieldType: &ast.ArrayType{Type: ast.BIGINT}},
-				},
-				Options: &ast.Options{
-					STRICT_VALIDATION: false,
-				},
-			},
-			data: []byte(`{"a": {"d" : "hello"}}`),
-			result: &xsql.Tuple{Message: xsql.Message{
-				"a": map[string]interface{}{
-					"b": "",
-				},
-				"b": 0.0,
-				"c": []int(nil),
-			},
-			},
 		},
 		},
 	}
 	}
 
 
@@ -569,13 +531,8 @@ func TestPreprocessor_Apply(t *testing.T) {
 	contextLogger := conf.Log.WithField("rule", "TestPreprocessor_Apply")
 	contextLogger := conf.Log.WithField("rule", "TestPreprocessor_Apply")
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 	for i, tt := range tests {
 	for i, tt := range tests {
-		pp := &Preprocessor{}
-		if tt.stmt.Options != nil {
-			pp.strictValidation = tt.stmt.Options.STRICT_VALIDATION
-		} else {
-			pp.strictValidation = true
-		}
-		pp.streamFields = convertFields(tt.stmt.StreamFields)
+		pp := &Preprocessor{checkSchema: true}
+		pp.streamFields = tt.stmt.StreamFields.ToJsonSchema()
 
 
 		dm := make(map[string]interface{})
 		dm := make(map[string]interface{})
 		if e := json.Unmarshal(tt.data, &dm); e != nil {
 		if e := json.Unmarshal(tt.data, &dm); e != nil {
@@ -599,7 +556,7 @@ func TestPreprocessorTime_Apply(t *testing.T) {
 		data   []byte
 		data   []byte
 		result interface{}
 		result interface{}
 	}{
 	}{
-		{
+		{ // 0
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name: ast.StreamName("demo"),
 				Name: ast.StreamName("demo"),
 				StreamFields: []ast.StreamField{
 				StreamFields: []ast.StreamField{
@@ -614,7 +571,7 @@ func TestPreprocessorTime_Apply(t *testing.T) {
 			},
 			},
 			},
 			},
 		},
 		},
-		{
+		{ // 1
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name:         ast.StreamName("demo"),
 				Name:         ast.StreamName("demo"),
 				StreamFields: nil,
 				StreamFields: nil,
@@ -626,7 +583,7 @@ func TestPreprocessorTime_Apply(t *testing.T) {
 			},
 			},
 			},
 			},
 		},
 		},
-		{
+		{ // 2
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name: ast.StreamName("demo"),
 				Name: ast.StreamName("demo"),
 				StreamFields: []ast.StreamField{
 				StreamFields: []ast.StreamField{
@@ -635,9 +592,9 @@ func TestPreprocessorTime_Apply(t *testing.T) {
 				},
 				},
 			},
 			},
 			data:   []byte(`{"abc": "2019-09-19T00:55:1dd5Z", "def" : 111568854573431}`),
 			data:   []byte(`{"abc": "2019-09-19T00:55:1dd5Z", "def" : 111568854573431}`),
-			result: errors.New("error in preprocessor: invalid data type for abc, cannot convert to datetime: parsing time \"2019-09-19T00:55:1dd5Z\" as \"2006-01-02T15:04:05.000Z07:00\": cannot parse \"1dd5Z\" as \"05\""),
+			result: errors.New("error in preprocessor: field abc type mismatch: parsing time \"2019-09-19T00:55:1dd5Z\" as \"2006-01-02T15:04:05.000Z07:00\": cannot parse \"1dd5Z\" as \"05\""),
 		},
 		},
-		{
+		{ // 3
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name: ast.StreamName("demo"),
 				Name: ast.StreamName("demo"),
 				StreamFields: []ast.StreamField{
 				StreamFields: []ast.StreamField{
@@ -661,7 +618,7 @@ func TestPreprocessorTime_Apply(t *testing.T) {
 			}},
 			}},
 		},
 		},
 		//Array type
 		//Array type
-		{
+		{ // 4
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name: ast.StreamName("demo"),
 				Name: ast.StreamName("demo"),
 				StreamFields: []ast.StreamField{
 				StreamFields: []ast.StreamField{
@@ -672,14 +629,14 @@ func TestPreprocessorTime_Apply(t *testing.T) {
 			},
 			},
 			data: []byte(`{"a": [1568854515123, 1568854573431]}`),
 			data: []byte(`{"a": [1568854515123, 1568854573431]}`),
 			result: &xsql.Tuple{Message: xsql.Message{
 			result: &xsql.Tuple{Message: xsql.Message{
-				"a": []time.Time{
+				"a": []interface{}{
 					cast.TimeFromUnixMilli(1568854515123),
 					cast.TimeFromUnixMilli(1568854515123),
 					cast.TimeFromUnixMilli(1568854573431),
 					cast.TimeFromUnixMilli(1568854573431),
 				},
 				},
 			},
 			},
 			},
 			},
 		},
 		},
-		{
+		{ // 5
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name: ast.StreamName("demo"),
 				Name: ast.StreamName("demo"),
 				StreamFields: []ast.StreamField{
 				StreamFields: []ast.StreamField{
@@ -708,8 +665,8 @@ func TestPreprocessorTime_Apply(t *testing.T) {
 	contextLogger := conf.Log.WithField("rule", "TestPreprocessorTime_Apply")
 	contextLogger := conf.Log.WithField("rule", "TestPreprocessorTime_Apply")
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 	for i, tt := range tests {
 	for i, tt := range tests {
-		pp := &Preprocessor{}
-		pp.streamFields = convertFields(tt.stmt.StreamFields)
+		pp := &Preprocessor{checkSchema: true}
+		pp.streamFields = tt.stmt.StreamFields.ToJsonSchema()
 		if tt.stmt.Options != nil {
 		if tt.stmt.Options != nil {
 			pp.timestampFormat = tt.stmt.Options.TIMESTAMP_FORMAT
 			pp.timestampFormat = tt.stmt.Options.TIMESTAMP_FORMAT
 		}
 		}
@@ -754,7 +711,7 @@ func TestPreprocessorEventtime_Apply(t *testing.T) {
 		result interface{}
 		result interface{}
 	}{
 	}{
 		//Basic type
 		//Basic type
-		{
+		{ // 0
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name: ast.StreamName("demo"),
 				Name: ast.StreamName("demo"),
 				StreamFields: []ast.StreamField{
 				StreamFields: []ast.StreamField{
@@ -772,11 +729,11 @@ func TestPreprocessorEventtime_Apply(t *testing.T) {
 			},
 			},
 			data: []byte(`{"abc": 1568854515000}`),
 			data: []byte(`{"abc": 1568854515000}`),
 			result: &xsql.Tuple{Message: xsql.Message{
 			result: &xsql.Tuple{Message: xsql.Message{
-				"abc": 1568854515000,
+				"abc": int64(1568854515000),
 			}, Timestamp: 1568854515000,
 			}, Timestamp: 1568854515000,
 			},
 			},
 		},
 		},
-		{
+		{ // 1
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name:         ast.StreamName("demo"),
 				Name:         ast.StreamName("demo"),
 				StreamFields: nil,
 				StreamFields: nil,
@@ -796,7 +753,7 @@ func TestPreprocessorEventtime_Apply(t *testing.T) {
 			}, Timestamp: 1568854515000,
 			}, Timestamp: 1568854515000,
 			},
 			},
 		},
 		},
-		{
+		{ // 2
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name: ast.StreamName("demo"),
 				Name: ast.StreamName("demo"),
 				StreamFields: []ast.StreamField{
 				StreamFields: []ast.StreamField{
@@ -810,7 +767,7 @@ func TestPreprocessorEventtime_Apply(t *testing.T) {
 			data:   []byte(`{"abc": true}`),
 			data:   []byte(`{"abc": true}`),
 			result: errors.New("cannot convert timestamp field abc to timestamp with error unsupported type to convert to timestamp true"),
 			result: errors.New("cannot convert timestamp field abc to timestamp with error unsupported type to convert to timestamp true"),
 		},
 		},
-		{
+		{ // 3
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name: ast.StreamName("demo"),
 				Name: ast.StreamName("demo"),
 				StreamFields: []ast.StreamField{
 				StreamFields: []ast.StreamField{
@@ -826,10 +783,11 @@ func TestPreprocessorEventtime_Apply(t *testing.T) {
 			result: &xsql.Tuple{Message: xsql.Message{
 			result: &xsql.Tuple{Message: xsql.Message{
 				"abc": float64(34),
 				"abc": float64(34),
 				"def": "2019-09-23T02:47:29.754Z",
 				"def": "2019-09-23T02:47:29.754Z",
+				"ghi": float64(50),
 			}, Timestamp: int64(1569206849754),
 			}, Timestamp: int64(1569206849754),
 			},
 			},
 		},
 		},
-		{
+		{ // 4
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name: ast.StreamName("demo"),
 				Name: ast.StreamName("demo"),
 				StreamFields: []ast.StreamField{
 				StreamFields: []ast.StreamField{
@@ -848,7 +806,7 @@ func TestPreprocessorEventtime_Apply(t *testing.T) {
 			}, Timestamp: int64(1568854515000),
 			}, Timestamp: int64(1568854515000),
 			},
 			},
 		},
 		},
-		{
+		{ // 5
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name: ast.StreamName("demo"),
 				Name: ast.StreamName("demo"),
 				StreamFields: []ast.StreamField{
 				StreamFields: []ast.StreamField{
@@ -865,10 +823,11 @@ func TestPreprocessorEventtime_Apply(t *testing.T) {
 			result: &xsql.Tuple{Message: xsql.Message{
 			result: &xsql.Tuple{Message: xsql.Message{
 				"abc": float64(34),
 				"abc": float64(34),
 				"def": "2019-09-23AT02:47:29",
 				"def": "2019-09-23AT02:47:29",
+				"ghi": float64(50),
 			}, Timestamp: int64(1569206849000),
 			}, Timestamp: int64(1569206849000),
 			},
 			},
 		},
 		},
-		{
+		{ // 6
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name: ast.StreamName("demo"),
 				Name: ast.StreamName("demo"),
 				StreamFields: []ast.StreamField{
 				StreamFields: []ast.StreamField{
@@ -894,9 +853,9 @@ func TestPreprocessorEventtime_Apply(t *testing.T) {
 	for i, tt := range tests {
 	for i, tt := range tests {
 
 
 		pp := &Preprocessor{
 		pp := &Preprocessor{
+			checkSchema: true,
 			defaultFieldProcessor: defaultFieldProcessor{
 			defaultFieldProcessor: defaultFieldProcessor{
-				streamFields:    convertFields(tt.stmt.StreamFields),
-				isBinary:        false,
+				streamFields:    tt.stmt.StreamFields.ToJsonSchema(),
 				timestampFormat: tt.stmt.Options.TIMESTAMP_FORMAT,
 				timestampFormat: tt.stmt.Options.TIMESTAMP_FORMAT,
 			},
 			},
 			isEventTime:    true,
 			isEventTime:    true,
@@ -939,7 +898,7 @@ func TestPreprocessorError(t *testing.T) {
 				},
 				},
 			},
 			},
 			data:   []byte(`{"abc": "dafsad"}`),
 			data:   []byte(`{"abc": "dafsad"}`),
-			result: errors.New("error in preprocessor: invalid data type for abc, expect bigint but found string(dafsad)"),
+			result: errors.New("error in preprocessor: field abc type mismatch: cannot convert string(dafsad) to int64"),
 		}, {
 		}, {
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name: ast.StreamName("demo"),
 				Name: ast.StreamName("demo"),
@@ -952,7 +911,7 @@ func TestPreprocessorError(t *testing.T) {
 				},
 				},
 			},
 			},
 			data:   []byte(`{"a": {"d" : "hello"}}`),
 			data:   []byte(`{"a": {"d" : "hello"}}`),
-			result: errors.New("error in preprocessor: invalid data map[d:hello], field b not found"),
+			result: errors.New("error in preprocessor: field a type mismatch: field b is not found"),
 		}, {
 		}, {
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name: ast.StreamName("demo"),
 				Name: ast.StreamName("demo"),
@@ -970,7 +929,7 @@ func TestPreprocessorError(t *testing.T) {
 				},
 				},
 			},
 			},
 			data:   []byte(`{"abc": "not a time"}`),
 			data:   []byte(`{"abc": "not a time"}`),
-			result: errors.New("error in preprocessor: invalid data type for abc, expect bigint but found string(not a time)"),
+			result: errors.New("error in preprocessor: field abc type mismatch: cannot convert string(not a time) to int64"),
 		},
 		},
 	}
 	}
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
@@ -980,9 +939,8 @@ func TestPreprocessorError(t *testing.T) {
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 	for i, tt := range tests {
 	for i, tt := range tests {
 
 
-		pp := &Preprocessor{}
-		pp.strictValidation = true
-		pp.streamFields = convertFields(tt.stmt.StreamFields)
+		pp := &Preprocessor{checkSchema: true}
+		pp.streamFields = tt.stmt.StreamFields.ToJsonSchema()
 		dm := make(map[string]interface{})
 		dm := make(map[string]interface{})
 		if e := json.Unmarshal(tt.data, &dm); e != nil {
 		if e := json.Unmarshal(tt.data, &dm); e != nil {
 			log.Fatal(e)
 			log.Fatal(e)
@@ -1009,40 +967,12 @@ func TestPreprocessorForBinary(t *testing.T) {
 		t.Errorf("Cannot read image: %v", err)
 		t.Errorf("Cannot read image: %v", err)
 	}
 	}
 	b64img := base64.StdEncoding.EncodeToString(image)
 	b64img := base64.StdEncoding.EncodeToString(image)
-	//TODO test bytea type conversion to string or else
 	var tests = []struct {
 	var tests = []struct {
-		stmt     *ast.StreamStmt
-		data     []byte
-		isBinary bool
-		result   interface{}
+		stmt   *ast.StreamStmt
+		data   []byte
+		result interface{}
 	}{
 	}{
-		{
-			stmt: &ast.StreamStmt{
-				Name:         ast.StreamName("demo"),
-				StreamFields: nil,
-			},
-			data:     image,
-			isBinary: true,
-			result: &xsql.Tuple{Message: xsql.Message{
-				"self": image,
-			},
-			},
-		},
-		{
-			stmt: &ast.StreamStmt{
-				Name: ast.StreamName("demo"),
-				StreamFields: []ast.StreamField{
-					{Name: "img", FieldType: &ast.BasicType{Type: ast.BYTEA}},
-				},
-			},
-			data:     image,
-			isBinary: true,
-			result: &xsql.Tuple{Message: xsql.Message{
-				"img": image,
-			},
-			},
-		},
-		{
+		{ // 0
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name: ast.StreamName("demo"),
 				Name: ast.StreamName("demo"),
 				StreamFields: []ast.StreamField{
 				StreamFields: []ast.StreamField{
@@ -1061,7 +991,7 @@ func TestPreprocessorForBinary(t *testing.T) {
 			},
 			},
 			},
 			},
 		},
 		},
-		{
+		{ // 1
 			stmt: &ast.StreamStmt{
 			stmt: &ast.StreamStmt{
 				Name: ast.StreamName("demo"),
 				Name: ast.StreamName("demo"),
 				StreamFields: []ast.StreamField{
 				StreamFields: []ast.StreamField{
@@ -1072,7 +1002,7 @@ func TestPreprocessorForBinary(t *testing.T) {
 			},
 			},
 			data: []byte(fmt.Sprintf(`{"a": ["%s"]}`, b64img)),
 			data: []byte(fmt.Sprintf(`{"a": ["%s"]}`, b64img)),
 			result: &xsql.Tuple{Message: xsql.Message{
 			result: &xsql.Tuple{Message: xsql.Message{
-				"a": [][]byte{
+				"a": []interface{}{
 					image,
 					image,
 				},
 				},
 			},
 			},
@@ -1094,8 +1024,8 @@ func TestPreprocessorForBinary(t *testing.T) {
 			},
 			},
 			data: []byte(fmt.Sprintf(`{"a": [{"b":"%s"}]}`, b64img)),
 			data: []byte(fmt.Sprintf(`{"a": [{"b":"%s"}]}`, b64img)),
 			result: &xsql.Tuple{Message: xsql.Message{
 			result: &xsql.Tuple{Message: xsql.Message{
-				"a": []map[string]interface{}{
-					{"b": image},
+				"a": []interface{}{
+					map[string]interface{}{"b": image},
 				},
 				},
 			},
 			},
 			},
 			},
@@ -1108,15 +1038,11 @@ func TestPreprocessorForBinary(t *testing.T) {
 	contextLogger := conf.Log.WithField("rule", "TestPreprocessorForBinary")
 	contextLogger := conf.Log.WithField("rule", "TestPreprocessorForBinary")
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 	for i, tt := range tests {
 	for i, tt := range tests {
-		pp := &Preprocessor{}
-		pp.streamFields = convertFields(tt.stmt.StreamFields)
-		pp.isBinary = tt.isBinary
+		pp := &Preprocessor{checkSchema: true}
+		pp.streamFields = tt.stmt.StreamFields.ToJsonSchema()
 		format := message.FormatJson
 		format := message.FormatJson
-		if tt.isBinary {
-			format = message.FormatBinary
-		}
-		converter, _ := converter.GetOrCreateConverter(format, "", "")
-		nCtx := context.WithValue(ctx, context.DecodeKey, converter)
+		ccc, _ := converter.GetOrCreateConverter(format, "", "")
+		nCtx := context.WithValue(ctx, context.DecodeKey, ccc)
 		if dm, e := nCtx.Decode(tt.data); e != nil {
 		if dm, e := nCtx.Decode(tt.data); e != nil {
 			log.Fatal(e)
 			log.Fatal(e)
 			return
 			return

+ 10 - 9
internal/topo/operator/table_processor.go

@@ -25,7 +25,7 @@ type TableProcessor struct {
 	//Pruned stream fields. Could be streamField(with data type info) or string
 	//Pruned stream fields. Could be streamField(with data type info) or string
 	defaultFieldProcessor
 	defaultFieldProcessor
 
 
-	isSchemaless bool
+	checkSchema  bool
 	isBatchInput bool // whether the inputs are batched, such as file which sends multiple messages at a batch. If batch input, only fires when EOF is received. This is mutual exclusive with retainSize.
 	isBatchInput bool // whether the inputs are batched, such as file which sends multiple messages at a batch. If batch input, only fires when EOF is received. This is mutual exclusive with retainSize.
 	retainSize   int  // how many(maximum) messages to be retained for each output
 	retainSize   int  // how many(maximum) messages to be retained for each output
 	emitterName  string
 	emitterName  string
@@ -34,11 +34,13 @@ type TableProcessor struct {
 	batchEmitted bool               // if batch input, this is the signal for whether the last batch has emitted. If true, reinitialize.
 	batchEmitted bool               // if batch input, this is the signal for whether the last batch has emitted. If true, reinitialize.
 }
 }
 
 
-func NewTableProcessor(isSchemaless bool, name string, fields []interface{}, options *ast.Options) (*TableProcessor, error) {
-	p := &TableProcessor{emitterName: name, batchEmitted: true, retainSize: 1, isSchemaless: isSchemaless}
-	p.defaultFieldProcessor = defaultFieldProcessor{
-		streamFields: fields, isBinary: false, timestampFormat: options.TIMESTAMP_FORMAT,
-		strictValidation: options.STRICT_VALIDATION,
+func NewTableProcessor(isSchemaless bool, name string, fields map[string]*ast.JsonStreamField, options *ast.Options) (*TableProcessor, error) {
+	p := &TableProcessor{emitterName: name, batchEmitted: true, retainSize: 1}
+	if !isSchemaless && options.STRICT_VALIDATION {
+		p.defaultFieldProcessor = defaultFieldProcessor{
+			streamFields: fields, timestampFormat: options.TIMESTAMP_FORMAT,
+		}
+		p.checkSchema = true
 	}
 	}
 	if options.RETAIN_SIZE > 0 {
 	if options.RETAIN_SIZE > 0 {
 		p.retainSize = options.RETAIN_SIZE
 		p.retainSize = options.RETAIN_SIZE
@@ -67,12 +69,11 @@ func (p *TableProcessor) Apply(ctx api.StreamContext, data interface{}, fv *xsql
 		p.batchEmitted = false
 		p.batchEmitted = false
 	}
 	}
 	if tuple.Message != nil {
 	if tuple.Message != nil {
-		if !p.isSchemaless && p.streamFields != nil {
-			result, err := p.processField(tuple, nil)
+		if p.checkSchema {
+			err := p.validateAndConvert(tuple)
 			if err != nil {
 			if err != nil {
 				return fmt.Errorf("error in preprocessor: %s", err)
 				return fmt.Errorf("error in preprocessor: %s", err)
 			}
 			}
-			tuple.Message = result
 		}
 		}
 		var newTuples []xsql.TupleRow
 		var newTuples []xsql.TupleRow
 		_ = p.output.Range(func(i int, r xsql.ReadonlyRow) (bool, error) {
 		_ = p.output.Range(func(i int, r xsql.ReadonlyRow) (bool, error) {

+ 11 - 11
internal/topo/operator/table_processor_test.go

@@ -50,27 +50,27 @@ func TestTableProcessor_Apply(t *testing.T) {
 				Content: []xsql.TupleRow{
 				Content: []xsql.TupleRow{
 					&xsql.Tuple{
 					&xsql.Tuple{
 						Message: xsql.Message{
 						Message: xsql.Message{
-							"a": []map[string]interface{}{
-								{"b": "hello1"},
-								{"b": "hello2"},
+							"a": []interface{}{
+								map[string]interface{}{"b": "hello1"},
+								map[string]interface{}{"b": "hello2"},
 							},
 							},
 						},
 						},
 						Emitter: "demo",
 						Emitter: "demo",
 					},
 					},
 					&xsql.Tuple{
 					&xsql.Tuple{
 						Message: xsql.Message{
 						Message: xsql.Message{
-							"a": []map[string]interface{}{
-								{"b": "hello2"},
-								{"b": "hello3"},
+							"a": []interface{}{
+								map[string]interface{}{"b": "hello2"},
+								map[string]interface{}{"b": "hello3"},
 							},
 							},
 						},
 						},
 						Emitter: "demo",
 						Emitter: "demo",
 					},
 					},
 					&xsql.Tuple{
 					&xsql.Tuple{
 						Message: xsql.Message{
 						Message: xsql.Message{
-							"a": []map[string]interface{}{
-								{"b": "hello3"},
-								{"b": "hello4"},
+							"a": []interface{}{
+								map[string]interface{}{"b": "hello3"},
+								map[string]interface{}{"b": "hello4"},
 							},
 							},
 						},
 						},
 						Emitter: "demo",
 						Emitter: "demo",
@@ -117,8 +117,8 @@ func TestTableProcessor_Apply(t *testing.T) {
 	contextLogger := conf.Log.WithField("rule", "TestPreprocessor_Apply")
 	contextLogger := conf.Log.WithField("rule", "TestPreprocessor_Apply")
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 	for i, tt := range tests {
 	for i, tt := range tests {
-		pp := &TableProcessor{isBatchInput: true, emitterName: "demo"}
-		pp.streamFields = convertFields(tt.stmt.StreamFields)
+		pp := &TableProcessor{isBatchInput: true, emitterName: "demo", checkSchema: true}
+		pp.streamFields = tt.stmt.StreamFields.ToJsonSchema()
 		pp.output = &xsql.WindowTuples{
 		pp.output = &xsql.WindowTuples{
 			Content: make([]xsql.TupleRow, 0),
 			Content: make([]xsql.TupleRow, 0),
 		}
 		}

+ 23 - 8
internal/topo/planner/analyzer.go

@@ -17,40 +17,55 @@ package planner
 import (
 import (
 	"fmt"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/binder/function"
 	"github.com/lf-edge/ekuiper/internal/binder/function"
+	"github.com/lf-edge/ekuiper/internal/schema"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/kv"
 	"github.com/lf-edge/ekuiper/pkg/kv"
 	"strings"
 	"strings"
 )
 )
 
 
+type streamInfo struct {
+	stmt   *ast.StreamStmt
+	schema ast.StreamFields
+}
+
 // Analyze the select statement by decorating the info from stream statement.
 // Analyze the select statement by decorating the info from stream statement.
 // Typically, set the correct stream name for fieldRefs
 // Typically, set the correct stream name for fieldRefs
-func decorateStmt(s *ast.SelectStatement, store kv.KeyValue) ([]*ast.StreamStmt, []*ast.Call, error) {
+func decorateStmt(s *ast.SelectStatement, store kv.KeyValue) ([]streamInfo, []*ast.Call, error) {
 	streamsFromStmt := xsql.GetStreams(s)
 	streamsFromStmt := xsql.GetStreams(s)
-	streamStmts := make([]*ast.StreamStmt, len(streamsFromStmt))
+	streamStmts := make([]streamInfo, len(streamsFromStmt))
 	isSchemaless := false
 	isSchemaless := false
 	for i, s := range streamsFromStmt {
 	for i, s := range streamsFromStmt {
 		streamStmt, err := xsql.GetDataSource(store, s)
 		streamStmt, err := xsql.GetDataSource(store, s)
 		if err != nil {
 		if err != nil {
 			return nil, nil, fmt.Errorf("fail to get stream %s, please check if stream is created", s)
 			return nil, nil, fmt.Errorf("fail to get stream %s, please check if stream is created", s)
 		}
 		}
-		streamStmts[i] = streamStmt
-		// TODO fine grain control of schemaless
-		if streamStmt.StreamFields == nil {
+		ss := streamStmt.StreamFields
+		if streamStmt.Options.SCHEMAID != "" {
+			ss, err = schema.InferFromSchemaFile(streamStmt.Options.FORMAT, streamStmt.Options.SCHEMAID)
+		}
+		if err != nil {
+			return nil, nil, err
+		}
+		streamStmts[i] = streamInfo{
+			stmt:   streamStmt,
+			schema: ss,
+		}
+		if ss == nil {
 			isSchemaless = true
 			isSchemaless = true
 		}
 		}
 	}
 	}
 
 
 	dsn := ast.DefaultStream
 	dsn := ast.DefaultStream
 	if len(streamsFromStmt) == 1 {
 	if len(streamsFromStmt) == 1 {
-		dsn = streamStmts[0].Name
+		dsn = streamStmts[0].stmt.Name
 	}
 	}
 	// [fieldName][streamsName][*aliasRef] if alias, with special key alias/default. Each key has exactly one value
 	// [fieldName][streamsName][*aliasRef] if alias, with special key alias/default. Each key has exactly one value
 	fieldsMap := newFieldsMap(isSchemaless, dsn)
 	fieldsMap := newFieldsMap(isSchemaless, dsn)
 	if !isSchemaless {
 	if !isSchemaless {
 		for _, streamStmt := range streamStmts {
 		for _, streamStmt := range streamStmts {
-			for _, field := range streamStmt.StreamFields {
-				fieldsMap.reserve(field.Name, streamStmt.Name)
+			for _, field := range streamStmt.schema {
+				fieldsMap.reserve(field.Name, streamStmt.stmt.Name)
 			}
 			}
 		}
 		}
 	}
 	}

+ 40 - 46
internal/topo/planner/dataSourcePlan.go

@@ -16,7 +16,6 @@ package planner
 
 
 import (
 import (
 	"fmt"
 	"fmt"
-	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/message"
 	"github.com/lf-edge/ekuiper/pkg/message"
 	"sort"
 	"sort"
@@ -28,9 +27,11 @@ type DataSourcePlan struct {
 	name ast.StreamName
 	name ast.StreamName
 	// calculated properties
 	// calculated properties
 	// initialized with stream definition, pruned with rule
 	// initialized with stream definition, pruned with rule
-	streamFields []interface{}
-	metaFields   []string
-	// passon properties
+	metaFields []string
+	// pass-on and converted state. For schemaless, the value is always nil
+	streamFields map[string]*ast.JsonStreamField
+	// pass-on properties
+	isSchemaless    bool
 	streamStmt      *ast.StreamStmt
 	streamStmt      *ast.StreamStmt
 	allMeta         bool
 	allMeta         bool
 	isBinary        bool
 	isBinary        bool
@@ -39,7 +40,7 @@ type DataSourcePlan struct {
 	timestampField  string
 	timestampField  string
 	// intermediate status
 	// intermediate status
 	isWildCard bool
 	isWildCard bool
-	fields     map[string]interface{}
+	fields     map[string]*ast.JsonStreamField
 	metaMap    map[string]string
 	metaMap    map[string]string
 }
 }
 
 
@@ -98,12 +99,20 @@ func (p *DataSourcePlan) PruneColumns(fields []ast.Expr) error {
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
-	p.fields = make(map[string]interface{})
+	p.fields = make(map[string]*ast.JsonStreamField)
 	if !p.allMeta {
 	if !p.allMeta {
 		p.metaMap = make(map[string]string)
 		p.metaMap = make(map[string]string)
 	}
 	}
 	if p.timestampField != "" {
 	if p.timestampField != "" {
-		p.fields[p.timestampField] = p.timestampField
+		if !p.isSchemaless {
+			tsf, ok := p.streamFields[p.timestampField]
+			if !ok {
+				return fmt.Errorf("timestamp field %s not found", p.timestampField)
+			}
+			p.fields[p.timestampField] = tsf
+		} else {
+			p.fields[p.timestampField] = nil
+		}
 	}
 	}
 	for _, field := range fields {
 	for _, field := range fields {
 		switch f := field.(type) {
 		switch f := field.(type) {
@@ -112,8 +121,11 @@ func (p *DataSourcePlan) PruneColumns(fields []ast.Expr) error {
 		case *ast.FieldRef:
 		case *ast.FieldRef:
 			if !p.isWildCard && (f.StreamName == ast.DefaultStream || f.StreamName == p.name) {
 			if !p.isWildCard && (f.StreamName == ast.DefaultStream || f.StreamName == p.name) {
 				if _, ok := p.fields[f.Name]; !ok {
 				if _, ok := p.fields[f.Name]; !ok {
-					sf := p.getField(f.Name)
-					if sf != nil {
+					sf, err := p.getField(f.Name, f.StreamName == p.name)
+					if err != nil {
+						return err
+					}
+					if p.isSchemaless || sf != nil {
 						p.fields[f.Name] = sf
 						p.fields[f.Name] = sf
 					}
 					}
 				}
 				}
@@ -132,8 +144,11 @@ func (p *DataSourcePlan) PruneColumns(fields []ast.Expr) error {
 			}
 			}
 		case *ast.SortField:
 		case *ast.SortField:
 			if !p.isWildCard {
 			if !p.isWildCard {
-				sf := p.getField(f.Name)
-				if sf != nil {
+				sf, err := p.getField(f.Name, f.StreamName == p.name)
+				if err != nil {
+					return err
+				}
+				if p.isSchemaless || sf != nil {
 					p.fields[f.Name] = sf
 					p.fields[f.Name] = sf
 				}
 				}
 			}
 			}
@@ -145,47 +160,26 @@ func (p *DataSourcePlan) PruneColumns(fields []ast.Expr) error {
 	return nil
 	return nil
 }
 }
 
 
-func (p *DataSourcePlan) getField(name string) interface{} {
-	if p.streamStmt.StreamFields != nil {
-		for _, f := range p.streamStmt.StreamFields { // The input can only be StreamFields
-			if f.Name == name {
-				return &f
+func (p *DataSourcePlan) getField(name string, strict bool) (*ast.JsonStreamField, error) {
+	if !p.isSchemaless {
+		r, ok := p.streamFields[name]
+		if !ok {
+			if strict {
+				return nil, fmt.Errorf("field %s not found in stream %s", name, p.name)
 			}
 			}
+		} else {
+			return r, nil
 		}
 		}
-	} else {
-		return name
 	}
 	}
-	return nil
+	// always return nil for schemaless
+	return nil, nil
 }
 }
 
 
+// Do not prune fields now for preprocessor
+// TODO provide field information to the source for it to prune
 func (p *DataSourcePlan) getAllFields() {
 func (p *DataSourcePlan) getAllFields() {
-	// convert fields
-	p.streamFields = make([]interface{}, 0)
-	if p.isWildCard {
-		if p.streamStmt.StreamFields != nil {
-			for k := range p.streamStmt.StreamFields { // The input can only be StreamFields
-				p.streamFields = append(p.streamFields, &p.streamStmt.StreamFields[k])
-			}
-		} else {
-			p.streamFields = nil
-		}
-	} else {
-		sfs := make([]interface{}, 0, len(p.fields))
-		if conf.IsTesting {
-			var keys []string
-			for k := range p.fields {
-				keys = append(keys, k)
-			}
-			sort.Strings(keys)
-			for _, k := range keys {
-				sfs = append(sfs, p.fields[k])
-			}
-		} else {
-			for _, v := range p.fields {
-				sfs = append(sfs, v)
-			}
-		}
-		p.streamFields = sfs
+	if !p.isWildCard {
+		p.streamFields = p.fields
 	}
 	}
 	p.metaFields = make([]string, 0, len(p.metaMap))
 	p.metaFields = make([]string, 0, len(p.metaMap))
 	for _, v := range p.metaMap {
 	for _, v := range p.metaMap {

+ 21 - 14
internal/topo/planner/planner.go

@@ -119,12 +119,18 @@ func buildOps(lp LogicalPlan, tp *topo.Topo, options *api.RuleOption, sources []
 	)
 	)
 	switch t := lp.(type) {
 	switch t := lp.(type) {
 	case *DataSourcePlan:
 	case *DataSourcePlan:
-		isSchemaless := t.streamStmt.StreamFields == nil
+		isSchemaless := t.isSchemaless
 		switch t.streamStmt.StreamType {
 		switch t.streamStmt.StreamType {
 		case ast.TypeStream:
 		case ast.TypeStream:
-			pp, err := operator.NewPreprocessor(isSchemaless, t.streamFields, t.allMeta, t.metaFields, t.iet, t.timestampField, t.timestampFormat, t.isBinary, t.streamStmt.Options.STRICT_VALIDATION)
-			if err != nil {
-				return nil, 0, err
+			var (
+				pp  node.UnOperation
+				err error
+			)
+			if t.iet || (!isSchemaless && (t.streamStmt.Options.STRICT_VALIDATION || !t.isBinary)) {
+				pp, err = operator.NewPreprocessor(isSchemaless, t.streamFields, t.allMeta, t.metaFields, t.iet, t.timestampField, t.timestampFormat, t.isBinary, t.streamStmt.Options.STRICT_VALIDATION)
+				if err != nil {
+					return nil, 0, err
+				}
 			}
 			}
 			var srcNode *node.SourceNode
 			var srcNode *node.SourceNode
 			if len(sources) == 0 {
 			if len(sources) == 0 {
@@ -229,24 +235,26 @@ func createLogicalPlan(stmt *ast.SelectStatement, opt *api.RuleOption, store kv.
 		return nil, err
 		return nil, err
 	}
 	}
 
 
-	for _, streamStmt := range streamStmts {
-		if streamStmt.StreamType == ast.TypeTable && streamStmt.Options.KIND == ast.StreamKindLookup {
+	for _, sInfo := range streamStmts {
+		if sInfo.stmt.StreamType == ast.TypeTable && sInfo.stmt.Options.KIND == ast.StreamKindLookup {
 			if lookupTableChildren == nil {
 			if lookupTableChildren == nil {
 				lookupTableChildren = make(map[string]*ast.Options)
 				lookupTableChildren = make(map[string]*ast.Options)
 			}
 			}
-			lookupTableChildren[string(streamStmt.Name)] = streamStmt.Options
+			lookupTableChildren[string(sInfo.stmt.Name)] = sInfo.stmt.Options
 		} else {
 		} else {
 			p = DataSourcePlan{
 			p = DataSourcePlan{
-				name:       streamStmt.Name,
-				streamStmt: streamStmt,
-				iet:        opt.IsEventTime,
-				allMeta:    opt.SendMetaToSink,
+				name:         sInfo.stmt.Name,
+				streamStmt:   sInfo.stmt,
+				streamFields: sInfo.schema.ToJsonSchema(),
+				isSchemaless: sInfo.schema == nil,
+				iet:          opt.IsEventTime,
+				allMeta:      opt.SendMetaToSink,
 			}.Init()
 			}.Init()
-			if streamStmt.StreamType == ast.TypeStream {
+			if sInfo.stmt.StreamType == ast.TypeStream {
 				children = append(children, p)
 				children = append(children, p)
 			} else {
 			} else {
 				scanTableChildren = append(scanTableChildren, p)
 				scanTableChildren = append(scanTableChildren, p)
-				scanTableEmitters = append(scanTableEmitters, string(streamStmt.Name))
+				scanTableEmitters = append(scanTableEmitters, string(sInfo.stmt.Name))
 			}
 			}
 		}
 		}
 	}
 	}
@@ -337,7 +345,6 @@ func createLogicalPlan(stmt *ast.SelectStatement, opt *api.RuleOption, store kv.
 		p.SetChildren(children)
 		p.SetChildren(children)
 		children = []LogicalPlan{p}
 		children = []LogicalPlan{p}
 	}
 	}
-	// TODO handle aggregateAlias in optimization as it does not only happen in select fields
 	if dimensions != nil {
 	if dimensions != nil {
 		ds = dimensions.GetGroups()
 		ds = dimensions.GetGroups()
 		if ds != nil && len(ds) > 0 {
 		if ds != nil && len(ds) > 0 {

+ 264 - 275
internal/topo/planner/planner_test.go

@@ -104,14 +104,15 @@ func Test_createLogicalPlan(t *testing.T) {
 						DataSourcePlan{
 						DataSourcePlan{
 							baseLogicalPlan: baseLogicalPlan{},
 							baseLogicalPlan: baseLogicalPlan{},
 							name:            "src1",
 							name:            "src1",
-							streamFields: []interface{}{
-								&ast.StreamField{
-									Name:      "myarray",
-									FieldType: &ast.ArrayType{Type: ast.STRINGS},
+							streamFields: map[string]*ast.JsonStreamField{
+								"myarray": {
+									Type: "array",
+									Items: &ast.JsonStreamField{
+										Type: "string",
+									},
 								},
 								},
-								&ast.StreamField{
-									Name:      "temp",
-									FieldType: &ast.BasicType{Type: ast.BIGINT},
+								"temp": {
+									Type: "bigint",
 								},
 								},
 							},
 							},
 							streamStmt: streams["src1"],
 							streamStmt: streams["src1"],
@@ -152,14 +153,12 @@ func Test_createLogicalPlan(t *testing.T) {
 											children: []LogicalPlan{
 											children: []LogicalPlan{
 												DataSourcePlan{
 												DataSourcePlan{
 													name: "src1",
 													name: "src1",
-													streamFields: []interface{}{
-														&ast.StreamField{
-															Name:      "name",
-															FieldType: &ast.BasicType{Type: ast.STRINGS},
+													streamFields: map[string]*ast.JsonStreamField{
+														"name": {
+															Type: "string",
 														},
 														},
-														&ast.StreamField{
-															Name:      "temp",
-															FieldType: &ast.BasicType{Type: ast.BIGINT},
+														"temp": {
+															Type: "bigint",
 														},
 														},
 													},
 													},
 													streamStmt: streams["src1"],
 													streamStmt: streams["src1"],
@@ -205,14 +204,12 @@ func Test_createLogicalPlan(t *testing.T) {
 											children: []LogicalPlan{
 											children: []LogicalPlan{
 												DataSourcePlan{
 												DataSourcePlan{
 													name: "src1",
 													name: "src1",
-													streamFields: []interface{}{
-														&ast.StreamField{
-															Name:      "id1",
-															FieldType: &ast.BasicType{Type: ast.BIGINT},
+													streamFields: map[string]*ast.JsonStreamField{
+														"id1": {
+															Type: "bigint",
 														},
 														},
-														&ast.StreamField{
-															Name:      "temp",
-															FieldType: &ast.BasicType{Type: ast.BIGINT},
+														"temp": {
+															Type: "bigint",
 														},
 														},
 													},
 													},
 													streamStmt: streams["src1"],
 													streamStmt: streams["src1"],
@@ -220,14 +217,12 @@ func Test_createLogicalPlan(t *testing.T) {
 												}.Init(),
 												}.Init(),
 												DataSourcePlan{
 												DataSourcePlan{
 													name: "src2",
 													name: "src2",
-													streamFields: []interface{}{
-														&ast.StreamField{
-															Name:      "hum",
-															FieldType: &ast.BasicType{Type: ast.BIGINT},
+													streamFields: map[string]*ast.JsonStreamField{
+														"hum": {
+															Type: "bigint",
 														},
 														},
-														&ast.StreamField{
-															Name:      "id2",
-															FieldType: &ast.BasicType{Type: ast.BIGINT},
+														"id2": {
+															Type: "bigint",
 														},
 														},
 													},
 													},
 													streamStmt:      streams["src2"],
 													streamStmt:      streams["src2"],
@@ -295,18 +290,15 @@ func Test_createLogicalPlan(t *testing.T) {
 											children: []LogicalPlan{
 											children: []LogicalPlan{
 												DataSourcePlan{
 												DataSourcePlan{
 													name: "src1",
 													name: "src1",
-													streamFields: []interface{}{
-														&ast.StreamField{
-															Name:      "id1",
-															FieldType: &ast.BasicType{Type: ast.BIGINT},
+													streamFields: map[string]*ast.JsonStreamField{
+														"id1": {
+															Type: "bigint",
 														},
 														},
-														&ast.StreamField{
-															Name:      "name",
-															FieldType: &ast.BasicType{Type: ast.STRINGS},
+														"name": {
+															Type: "string",
 														},
 														},
-														&ast.StreamField{
-															Name:      "temp",
-															FieldType: &ast.BasicType{Type: ast.BIGINT},
+														"temp": {
+															Type: "bigint",
 														},
 														},
 													},
 													},
 													streamStmt: streams["src1"],
 													streamStmt: streams["src1"],
@@ -364,22 +356,21 @@ func Test_createLogicalPlan(t *testing.T) {
 															DataSourcePlan{
 															DataSourcePlan{
 																name:       "src1",
 																name:       "src1",
 																isWildCard: true,
 																isWildCard: true,
-																streamFields: []interface{}{
-																	&ast.StreamField{
-																		Name:      "id1",
-																		FieldType: &ast.BasicType{Type: ast.BIGINT},
+																streamFields: map[string]*ast.JsonStreamField{
+																	"id1": {
+																		Type: "bigint",
 																	},
 																	},
-																	&ast.StreamField{
-																		Name:      "temp",
-																		FieldType: &ast.BasicType{Type: ast.BIGINT},
+																	"temp": {
+																		Type: "bigint",
 																	},
 																	},
-																	&ast.StreamField{
-																		Name:      "name",
-																		FieldType: &ast.BasicType{Type: ast.STRINGS},
+																	"name": {
+																		Type: "string",
 																	},
 																	},
-																	&ast.StreamField{
-																		Name:      "myarray",
-																		FieldType: &ast.ArrayType{Type: ast.STRINGS},
+																	"myarray": {
+																		Type: "array",
+																		Items: &ast.JsonStreamField{
+																			Type: "string",
+																		},
 																	},
 																	},
 																},
 																},
 																streamStmt: streams["src1"],
 																streamStmt: streams["src1"],
@@ -438,14 +429,12 @@ func Test_createLogicalPlan(t *testing.T) {
 														children: []LogicalPlan{
 														children: []LogicalPlan{
 															DataSourcePlan{
 															DataSourcePlan{
 																name: "src1",
 																name: "src1",
-																streamFields: []interface{}{
-																	&ast.StreamField{
-																		Name:      "id1",
-																		FieldType: &ast.BasicType{Type: ast.BIGINT},
+																streamFields: map[string]*ast.JsonStreamField{
+																	"id1": {
+																		Type: "bigint",
 																	},
 																	},
-																	&ast.StreamField{
-																		Name:      "temp",
-																		FieldType: &ast.BasicType{Type: ast.BIGINT},
+																	"temp": {
+																		Type: "bigint",
 																	},
 																	},
 																},
 																},
 																streamStmt: streams["src1"],
 																streamStmt: streams["src1"],
@@ -472,14 +461,12 @@ func Test_createLogicalPlan(t *testing.T) {
 														children: []LogicalPlan{
 														children: []LogicalPlan{
 															DataSourcePlan{
 															DataSourcePlan{
 																name: "src2",
 																name: "src2",
-																streamFields: []interface{}{
-																	&ast.StreamField{
-																		Name:      "hum",
-																		FieldType: &ast.BasicType{Type: ast.BIGINT},
+																streamFields: map[string]*ast.JsonStreamField{
+																	"hum": {
+																		Type: "bigint",
 																	},
 																	},
-																	&ast.StreamField{
-																		Name:      "id2",
-																		FieldType: &ast.BasicType{Type: ast.BIGINT},
+																	"id2": {
+																		Type: "bigint",
 																	},
 																	},
 																},
 																},
 																streamStmt:      streams["src2"],
 																streamStmt:      streams["src2"],
@@ -547,14 +534,12 @@ func Test_createLogicalPlan(t *testing.T) {
 														children: []LogicalPlan{
 														children: []LogicalPlan{
 															DataSourcePlan{
 															DataSourcePlan{
 																name: "src1",
 																name: "src1",
-																streamFields: []interface{}{
-																	&ast.StreamField{
-																		Name:      "id1",
-																		FieldType: &ast.BasicType{Type: ast.BIGINT},
+																streamFields: map[string]*ast.JsonStreamField{
+																	"id1": {
+																		Type: "bigint",
 																	},
 																	},
-																	&ast.StreamField{
-																		Name:      "temp",
-																		FieldType: &ast.BasicType{Type: ast.BIGINT},
+																	"temp": {
+																		Type: "bigint",
 																	},
 																	},
 																},
 																},
 																streamStmt: streams["src1"],
 																streamStmt: streams["src1"],
@@ -570,14 +555,12 @@ func Test_createLogicalPlan(t *testing.T) {
 												}.Init(),
 												}.Init(),
 												DataSourcePlan{
 												DataSourcePlan{
 													name: "src2",
 													name: "src2",
-													streamFields: []interface{}{
-														&ast.StreamField{
-															Name:      "hum",
-															FieldType: &ast.BasicType{Type: ast.BIGINT},
+													streamFields: map[string]*ast.JsonStreamField{
+														"hum": {
+															Type: "bigint",
 														},
 														},
-														&ast.StreamField{
-															Name:      "id2",
-															FieldType: &ast.BasicType{Type: ast.BIGINT},
+														"id2": {
+															Type: "bigint",
 														},
 														},
 													},
 													},
 													streamStmt:      streams["src2"],
 													streamStmt:      streams["src2"],
@@ -657,14 +640,12 @@ func Test_createLogicalPlan(t *testing.T) {
 														children: []LogicalPlan{
 														children: []LogicalPlan{
 															DataSourcePlan{
 															DataSourcePlan{
 																name: "src1",
 																name: "src1",
-																streamFields: []interface{}{
-																	&ast.StreamField{
-																		Name:      "id1",
-																		FieldType: &ast.BasicType{Type: ast.BIGINT},
+																streamFields: map[string]*ast.JsonStreamField{
+																	"id1": {
+																		Type: "bigint",
 																	},
 																	},
-																	&ast.StreamField{
-																		Name:      "temp",
-																		FieldType: &ast.BasicType{Type: ast.BIGINT},
+																	"temp": {
+																		Type: "bigint",
 																	},
 																	},
 																},
 																},
 																streamStmt: streams["src1"],
 																streamStmt: streams["src1"],
@@ -688,14 +669,12 @@ func Test_createLogicalPlan(t *testing.T) {
 												}.Init(),
 												}.Init(),
 												DataSourcePlan{
 												DataSourcePlan{
 													name: "tableInPlanner",
 													name: "tableInPlanner",
-													streamFields: []interface{}{
-														&ast.StreamField{
-															Name:      "hum",
-															FieldType: &ast.BasicType{Type: ast.BIGINT},
+													streamFields: map[string]*ast.JsonStreamField{
+														"hum": {
+															Type: "bigint",
 														},
 														},
-														&ast.StreamField{
-															Name:      "id",
-															FieldType: &ast.BasicType{Type: ast.BIGINT},
+														"id": {
+															Type: "bigint",
 														},
 														},
 													},
 													},
 													streamStmt: streams["tableInPlanner"],
 													streamStmt: streams["tableInPlanner"],
@@ -759,14 +738,12 @@ func Test_createLogicalPlan(t *testing.T) {
 
 
 															DataSourcePlan{
 															DataSourcePlan{
 																name: "src1",
 																name: "src1",
-																streamFields: []interface{}{
-																	&ast.StreamField{
-																		Name:      "id1",
-																		FieldType: &ast.BasicType{Type: ast.BIGINT},
+																streamFields: map[string]*ast.JsonStreamField{
+																	"id1": {
+																		Type: "bigint",
 																	},
 																	},
-																	&ast.StreamField{
-																		Name:      "temp",
-																		FieldType: &ast.BasicType{Type: ast.BIGINT},
+																	"temp": {
+																		Type: "bigint",
 																	},
 																	},
 																},
 																},
 																streamStmt: streams["src1"],
 																streamStmt: streams["src1"],
@@ -782,14 +759,12 @@ func Test_createLogicalPlan(t *testing.T) {
 												}.Init(),
 												}.Init(),
 												DataSourcePlan{
 												DataSourcePlan{
 													name: "tableInPlanner",
 													name: "tableInPlanner",
-													streamFields: []interface{}{
-														&ast.StreamField{
-															Name:      "hum",
-															FieldType: &ast.BasicType{Type: ast.BIGINT},
+													streamFields: map[string]*ast.JsonStreamField{
+														"hum": {
+															Type: "bigint",
 														},
 														},
-														&ast.StreamField{
-															Name:      "id",
-															FieldType: &ast.BasicType{Type: ast.BIGINT},
+														"id": {
+															Type: "bigint",
 														},
 														},
 													},
 													},
 													streamStmt: streams["tableInPlanner"],
 													streamStmt: streams["tableInPlanner"],
@@ -862,10 +837,9 @@ func Test_createLogicalPlan(t *testing.T) {
 								children: []LogicalPlan{
 								children: []LogicalPlan{
 									DataSourcePlan{
 									DataSourcePlan{
 										name: "src1",
 										name: "src1",
-										streamFields: []interface{}{
-											&ast.StreamField{
-												Name:      "temp",
-												FieldType: &ast.BasicType{Type: ast.BIGINT},
+										streamFields: map[string]*ast.JsonStreamField{
+											"temp": {
+												Type: "bigint",
 											},
 											},
 										},
 										},
 										streamStmt: streams["src1"],
 										streamStmt: streams["src1"],
@@ -938,14 +912,12 @@ func Test_createLogicalPlan(t *testing.T) {
 											children: []LogicalPlan{
 											children: []LogicalPlan{
 												DataSourcePlan{
 												DataSourcePlan{
 													name: "src2",
 													name: "src2",
-													streamFields: []interface{}{
-														&ast.StreamField{
-															Name:      "hum",
-															FieldType: &ast.BasicType{Type: ast.BIGINT},
+													streamFields: map[string]*ast.JsonStreamField{
+														"hum": {
+															Type: "bigint",
 														},
 														},
-														&ast.StreamField{
-															Name:      "id2",
-															FieldType: &ast.BasicType{Type: ast.BIGINT},
+														"id2": {
+															Type: "bigint",
 														},
 														},
 													},
 													},
 													streamStmt:      streams["src2"],
 													streamStmt:      streams["src2"],
@@ -954,14 +926,12 @@ func Test_createLogicalPlan(t *testing.T) {
 												}.Init(),
 												}.Init(),
 												DataSourcePlan{
 												DataSourcePlan{
 													name: "tableInPlanner",
 													name: "tableInPlanner",
-													streamFields: []interface{}{
-														&ast.StreamField{
-															Name:      "hum",
-															FieldType: &ast.BasicType{Type: ast.BIGINT},
+													streamFields: map[string]*ast.JsonStreamField{
+														"hum": {
+															Type: "bigint",
 														},
 														},
-														&ast.StreamField{
-															Name:      "id",
-															FieldType: &ast.BasicType{Type: ast.BIGINT},
+														"id": {
+															Type: "bigint",
 														},
 														},
 													},
 													},
 													streamStmt: streams["tableInPlanner"],
 													streamStmt: streams["tableInPlanner"],
@@ -1051,10 +1021,9 @@ func Test_createLogicalPlan(t *testing.T) {
 								children: []LogicalPlan{
 								children: []LogicalPlan{
 									DataSourcePlan{
 									DataSourcePlan{
 										name: "src1",
 										name: "src1",
-										streamFields: []interface{}{
-											&ast.StreamField{
-												Name:      "temp",
-												FieldType: &ast.BasicType{Type: ast.BIGINT},
+										streamFields: map[string]*ast.JsonStreamField{
+											"temp": {
+												Type: "bigint",
 											},
 											},
 										},
 										},
 										streamStmt: streams["src1"],
 										streamStmt: streams["src1"],
@@ -1114,18 +1083,15 @@ func Test_createLogicalPlan(t *testing.T) {
 											children: []LogicalPlan{
 											children: []LogicalPlan{
 												DataSourcePlan{
 												DataSourcePlan{
 													name: "src1",
 													name: "src1",
-													streamFields: []interface{}{
-														&ast.StreamField{
-															Name:      "id1",
-															FieldType: &ast.BasicType{Type: ast.BIGINT},
+													streamFields: map[string]*ast.JsonStreamField{
+														"id1": {
+															Type: "bigint",
 														},
 														},
-														&ast.StreamField{
-															Name:      "name",
-															FieldType: &ast.BasicType{Type: ast.STRINGS},
+														"name": {
+															Type: "string",
 														},
 														},
-														&ast.StreamField{
-															Name:      "temp",
-															FieldType: &ast.BasicType{Type: ast.BIGINT},
+														"temp": {
+															Type: "bigint",
 														},
 														},
 													},
 													},
 													streamStmt: streams["src1"],
 													streamStmt: streams["src1"],
@@ -1200,22 +1166,21 @@ func Test_createLogicalPlan(t *testing.T) {
 						DataSourcePlan{
 						DataSourcePlan{
 							baseLogicalPlan: baseLogicalPlan{},
 							baseLogicalPlan: baseLogicalPlan{},
 							name:            "src1",
 							name:            "src1",
-							streamFields: []interface{}{
-								&ast.StreamField{
-									Name:      "id1",
-									FieldType: &ast.BasicType{Type: ast.BIGINT},
+							streamFields: map[string]*ast.JsonStreamField{
+								"id1": {
+									Type: "bigint",
 								},
 								},
-								&ast.StreamField{
-									Name:      "temp",
-									FieldType: &ast.BasicType{Type: ast.BIGINT},
+								"temp": {
+									Type: "bigint",
 								},
 								},
-								&ast.StreamField{
-									Name:      "name",
-									FieldType: &ast.BasicType{Type: ast.STRINGS},
+								"name": {
+									Type: "string",
 								},
 								},
-								&ast.StreamField{
-									Name:      "myarray",
-									FieldType: &ast.ArrayType{Type: ast.STRINGS},
+								"myarray": {
+									Type: "array",
+									Items: &ast.JsonStreamField{
+										Type: "string",
+									},
 								},
 								},
 							},
 							},
 							streamStmt: streams["src1"],
 							streamStmt: streams["src1"],
@@ -1267,18 +1232,15 @@ func Test_createLogicalPlan(t *testing.T) {
 											children: []LogicalPlan{
 											children: []LogicalPlan{
 												DataSourcePlan{
 												DataSourcePlan{
 													name: "src1",
 													name: "src1",
-													streamFields: []interface{}{
-														&ast.StreamField{
-															Name:      "id1",
-															FieldType: &ast.BasicType{Type: ast.BIGINT},
+													streamFields: map[string]*ast.JsonStreamField{
+														"id1": {
+															Type: "bigint",
 														},
 														},
-														&ast.StreamField{
-															Name:      "name",
-															FieldType: &ast.BasicType{Type: ast.STRINGS},
+														"name": {
+															Type: "string",
 														},
 														},
-														&ast.StreamField{
-															Name:      "temp",
-															FieldType: &ast.BasicType{Type: ast.BIGINT},
+														"temp": {
+															Type: "bigint",
 														},
 														},
 													},
 													},
 													streamStmt: streams["src1"],
 													streamStmt: streams["src1"],
@@ -1363,18 +1325,15 @@ func Test_createLogicalPlan(t *testing.T) {
 											children: []LogicalPlan{
 											children: []LogicalPlan{
 												DataSourcePlan{
 												DataSourcePlan{
 													name: "src1",
 													name: "src1",
-													streamFields: []interface{}{
-														&ast.StreamField{
-															Name:      "id1",
-															FieldType: &ast.BasicType{Type: ast.BIGINT},
+													streamFields: map[string]*ast.JsonStreamField{
+														"id1": {
+															Type: "bigint",
 														},
 														},
-														&ast.StreamField{
-															Name:      "name",
-															FieldType: &ast.BasicType{Type: ast.STRINGS},
+														"name": {
+															Type: "string",
 														},
 														},
-														&ast.StreamField{
-															Name:      "temp",
-															FieldType: &ast.BasicType{Type: ast.BIGINT},
+														"temp": {
+															Type: "bigint",
 														},
 														},
 													},
 													},
 													streamStmt: streams["src1"],
 													streamStmt: streams["src1"],
@@ -1467,22 +1426,21 @@ func Test_createLogicalPlan(t *testing.T) {
 															DataSourcePlan{
 															DataSourcePlan{
 																name:       "src1",
 																name:       "src1",
 																isWildCard: true,
 																isWildCard: true,
-																streamFields: []interface{}{
-																	&ast.StreamField{
-																		Name:      "id1",
-																		FieldType: &ast.BasicType{Type: ast.BIGINT},
+																streamFields: map[string]*ast.JsonStreamField{
+																	"id1": {
+																		Type: "bigint",
 																	},
 																	},
-																	&ast.StreamField{
-																		Name:      "temp",
-																		FieldType: &ast.BasicType{Type: ast.BIGINT},
+																	"temp": {
+																		Type: "bigint",
 																	},
 																	},
-																	&ast.StreamField{
-																		Name:      "name",
-																		FieldType: &ast.BasicType{Type: ast.STRINGS},
+																	"name": {
+																		Type: "string",
 																	},
 																	},
-																	&ast.StreamField{
-																		Name:      "myarray",
-																		FieldType: &ast.ArrayType{Type: ast.STRINGS},
+																	"myarray": {
+																		Type: "array",
+																		Items: &ast.JsonStreamField{
+																			Type: "string",
+																		},
 																	},
 																	},
 																},
 																},
 																streamStmt: streams["src1"],
 																streamStmt: streams["src1"],
@@ -1619,11 +1577,12 @@ func Test_createLogicalPlanSchemaless(t *testing.T) {
 						DataSourcePlan{
 						DataSourcePlan{
 							baseLogicalPlan: baseLogicalPlan{},
 							baseLogicalPlan: baseLogicalPlan{},
 							name:            "src1",
 							name:            "src1",
-							streamFields: []interface{}{
-								"name",
+							streamFields: map[string]*ast.JsonStreamField{
+								"name": nil,
 							},
 							},
-							streamStmt: streams["src1"],
-							metaFields: []string{},
+							streamStmt:   streams["src1"],
+							isSchemaless: true,
+							metaFields:   []string{},
 						}.Init(),
 						}.Init(),
 					},
 					},
 				},
 				},
@@ -1650,11 +1609,13 @@ func Test_createLogicalPlanSchemaless(t *testing.T) {
 											children: []LogicalPlan{
 											children: []LogicalPlan{
 												DataSourcePlan{
 												DataSourcePlan{
 													name: "src1",
 													name: "src1",
-													streamFields: []interface{}{
-														"name", "temp",
+													streamFields: map[string]*ast.JsonStreamField{
+														"name": nil,
+														"temp": nil,
 													},
 													},
-													streamStmt: streams["src1"],
-													metaFields: []string{},
+													streamStmt:   streams["src1"],
+													metaFields:   []string{},
+													isSchemaless: true,
 												}.Init(),
 												}.Init(),
 											},
 											},
 										},
 										},
@@ -1696,19 +1657,24 @@ func Test_createLogicalPlanSchemaless(t *testing.T) {
 											children: []LogicalPlan{
 											children: []LogicalPlan{
 												DataSourcePlan{
 												DataSourcePlan{
 													name: "src1",
 													name: "src1",
-													streamFields: []interface{}{
-														"id1", "temp",
+													streamFields: map[string]*ast.JsonStreamField{
+														"id1":  nil,
+														"temp": nil,
 													},
 													},
-													streamStmt: streams["src1"],
-													metaFields: []string{},
+													streamStmt:   streams["src1"],
+													metaFields:   []string{},
+													isSchemaless: true,
 												}.Init(),
 												}.Init(),
 												DataSourcePlan{
 												DataSourcePlan{
 													name: "src2",
 													name: "src2",
-													streamFields: []interface{}{ // can't determine where is id1 belonged to
-														"hum", "id1", "id2",
+													streamFields: map[string]*ast.JsonStreamField{ // can't determine where is id1 belonged to
+														"hum": nil,
+														"id1": nil,
+														"id2": nil,
 													},
 													},
-													streamStmt: streams["src2"],
-													metaFields: []string{},
+													isSchemaless: true,
+													streamStmt:   streams["src2"],
+													metaFields:   []string{},
 												}.Init(),
 												}.Init(),
 											},
 											},
 										},
 										},
@@ -1771,11 +1737,14 @@ func Test_createLogicalPlanSchemaless(t *testing.T) {
 											children: []LogicalPlan{
 											children: []LogicalPlan{
 												DataSourcePlan{
 												DataSourcePlan{
 													name: "src1",
 													name: "src1",
-													streamFields: []interface{}{
-														"id1", "name", "temp",
+													streamFields: map[string]*ast.JsonStreamField{
+														"id1":  nil,
+														"name": nil,
+														"temp": nil,
 													},
 													},
-													streamStmt: streams["src1"],
-													metaFields: []string{},
+													isSchemaless: true,
+													streamStmt:   streams["src1"],
+													metaFields:   []string{},
 												}.Init(),
 												}.Init(),
 											},
 											},
 										},
 										},
@@ -1829,9 +1798,10 @@ func Test_createLogicalPlanSchemaless(t *testing.T) {
 															DataSourcePlan{
 															DataSourcePlan{
 																name:         "src1",
 																name:         "src1",
 																isWildCard:   true,
 																isWildCard:   true,
-																streamFields: nil,
+																streamFields: map[string]*ast.JsonStreamField{},
 																streamStmt:   streams["src1"],
 																streamStmt:   streams["src1"],
 																metaFields:   []string{},
 																metaFields:   []string{},
+																isSchemaless: true,
 															}.Init(),
 															}.Init(),
 														},
 														},
 													},
 													},
@@ -1886,11 +1856,13 @@ func Test_createLogicalPlanSchemaless(t *testing.T) {
 														children: []LogicalPlan{
 														children: []LogicalPlan{
 															DataSourcePlan{
 															DataSourcePlan{
 																name: "src1",
 																name: "src1",
-																streamFields: []interface{}{
-																	"id1", "temp",
+																streamFields: map[string]*ast.JsonStreamField{
+																	"id1":  nil,
+																	"temp": nil,
 																},
 																},
-																streamStmt: streams["src1"],
-																metaFields: []string{},
+																isSchemaless: true,
+																streamStmt:   streams["src1"],
+																metaFields:   []string{},
 															}.Init(),
 															}.Init(),
 														},
 														},
 													},
 													},
@@ -1913,11 +1885,14 @@ func Test_createLogicalPlanSchemaless(t *testing.T) {
 														children: []LogicalPlan{
 														children: []LogicalPlan{
 															DataSourcePlan{
 															DataSourcePlan{
 																name: "src2",
 																name: "src2",
-																streamFields: []interface{}{
-																	"hum", "id1", "id2",
+																streamFields: map[string]*ast.JsonStreamField{
+																	"hum": nil,
+																	"id1": nil,
+																	"id2": nil,
 																},
 																},
-																streamStmt: streams["src2"],
-																metaFields: []string{},
+																isSchemaless: true,
+																streamStmt:   streams["src2"],
+																metaFields:   []string{},
 															}.Init(),
 															}.Init(),
 														},
 														},
 													},
 													},
@@ -1980,11 +1955,13 @@ func Test_createLogicalPlanSchemaless(t *testing.T) {
 														children: []LogicalPlan{
 														children: []LogicalPlan{
 															DataSourcePlan{
 															DataSourcePlan{
 																name: "src1",
 																name: "src1",
-																streamFields: []interface{}{
-																	"id1", "temp",
+																streamFields: map[string]*ast.JsonStreamField{
+																	"id1":  nil,
+																	"temp": nil,
 																},
 																},
-																streamStmt: streams["src1"],
-																metaFields: []string{},
+																isSchemaless: true,
+																streamStmt:   streams["src1"],
+																metaFields:   []string{},
 															}.Init(),
 															}.Init(),
 														},
 														},
 													},
 													},
@@ -1996,11 +1973,14 @@ func Test_createLogicalPlanSchemaless(t *testing.T) {
 												}.Init(),
 												}.Init(),
 												DataSourcePlan{
 												DataSourcePlan{
 													name: "src2",
 													name: "src2",
-													streamFields: []interface{}{
-														"hum", "id1", "id2",
+													streamFields: map[string]*ast.JsonStreamField{
+														"hum": nil,
+														"id1": nil,
+														"id2": nil,
 													},
 													},
-													streamStmt: streams["src2"],
-													metaFields: []string{},
+													isSchemaless: true,
+													streamStmt:   streams["src2"],
+													metaFields:   []string{},
 												}.Init(),
 												}.Init(),
 											},
 											},
 										},
 										},
@@ -2075,11 +2055,14 @@ func Test_createLogicalPlanSchemaless(t *testing.T) {
 														children: []LogicalPlan{
 														children: []LogicalPlan{
 															DataSourcePlan{
 															DataSourcePlan{
 																name: "src1",
 																name: "src1",
-																streamFields: []interface{}{
-																	"hum", "id1", "temp",
+																streamFields: map[string]*ast.JsonStreamField{
+																	"hum":  nil,
+																	"id1":  nil,
+																	"temp": nil,
 																},
 																},
-																streamStmt: streams["src1"],
-																metaFields: []string{},
+																isSchemaless: true,
+																streamStmt:   streams["src1"],
+																metaFields:   []string{},
 															}.Init(),
 															}.Init(),
 														},
 														},
 													},
 													},
@@ -2099,14 +2082,12 @@ func Test_createLogicalPlanSchemaless(t *testing.T) {
 												}.Init(),
 												}.Init(),
 												DataSourcePlan{
 												DataSourcePlan{
 													name: "tableInPlanner",
 													name: "tableInPlanner",
-													streamFields: []interface{}{
-														&ast.StreamField{
-															Name:      "hum",
-															FieldType: &ast.BasicType{Type: ast.BIGINT},
+													streamFields: map[string]*ast.JsonStreamField{
+														"hum": {
+															Type: "bigint",
 														},
 														},
-														&ast.StreamField{
-															Name:      "id",
-															FieldType: &ast.BasicType{Type: ast.BIGINT},
+														"id": {
+															Type: "bigint",
 														},
 														},
 													},
 													},
 													streamStmt: streams["tableInPlanner"],
 													streamStmt: streams["tableInPlanner"],
@@ -2169,11 +2150,13 @@ func Test_createLogicalPlanSchemaless(t *testing.T) {
 														children: []LogicalPlan{
 														children: []LogicalPlan{
 															DataSourcePlan{
 															DataSourcePlan{
 																name: "src1",
 																name: "src1",
-																streamFields: []interface{}{
-																	"id1", "temp",
+																streamFields: map[string]*ast.JsonStreamField{
+																	"id1":  nil,
+																	"temp": nil,
 																},
 																},
-																streamStmt: streams["src1"],
-																metaFields: []string{},
+																isSchemaless: true,
+																streamStmt:   streams["src1"],
+																metaFields:   []string{},
 															}.Init(),
 															}.Init(),
 														},
 														},
 													},
 													},
@@ -2185,14 +2168,12 @@ func Test_createLogicalPlanSchemaless(t *testing.T) {
 												}.Init(),
 												}.Init(),
 												DataSourcePlan{
 												DataSourcePlan{
 													name: "tableInPlanner",
 													name: "tableInPlanner",
-													streamFields: []interface{}{
-														&ast.StreamField{
-															Name:      "hum",
-															FieldType: &ast.BasicType{Type: ast.BIGINT},
+													streamFields: map[string]*ast.JsonStreamField{
+														"hum": {
+															Type: "bigint",
 														},
 														},
-														&ast.StreamField{
-															Name:      "id",
-															FieldType: &ast.BasicType{Type: ast.BIGINT},
+														"id": {
+															Type: "bigint",
 														},
 														},
 													},
 													},
 													streamStmt: streams["tableInPlanner"],
 													streamStmt: streams["tableInPlanner"],
@@ -2265,11 +2246,12 @@ func Test_createLogicalPlanSchemaless(t *testing.T) {
 								children: []LogicalPlan{
 								children: []LogicalPlan{
 									DataSourcePlan{
 									DataSourcePlan{
 										name: "src1",
 										name: "src1",
-										streamFields: []interface{}{
-											"temp",
+										streamFields: map[string]*ast.JsonStreamField{
+											"temp": nil,
 										},
 										},
-										streamStmt: streams["src1"],
-										metaFields: []string{"Humidity", "device", "id"},
+										isSchemaless: true,
+										streamStmt:   streams["src1"],
+										metaFields:   []string{"Humidity", "device", "id"},
 									}.Init(),
 									}.Init(),
 								},
 								},
 							},
 							},
@@ -2338,22 +2320,23 @@ func Test_createLogicalPlanSchemaless(t *testing.T) {
 											children: []LogicalPlan{
 											children: []LogicalPlan{
 												DataSourcePlan{
 												DataSourcePlan{
 													name: "src2",
 													name: "src2",
-													streamFields: []interface{}{
-														"hum", "id", "id2",
+													streamFields: map[string]*ast.JsonStreamField{
+														"hum": nil,
+														"id":  nil,
+														"id2": nil,
 													},
 													},
-													streamStmt: streams["src2"],
-													metaFields: []string{},
+													isSchemaless: true,
+													streamStmt:   streams["src2"],
+													metaFields:   []string{},
 												}.Init(),
 												}.Init(),
 												DataSourcePlan{
 												DataSourcePlan{
 													name: "tableInPlanner",
 													name: "tableInPlanner",
-													streamFields: []interface{}{
-														&ast.StreamField{
-															Name:      "hum",
-															FieldType: &ast.BasicType{Type: ast.BIGINT},
+													streamFields: map[string]*ast.JsonStreamField{
+														"hum": {
+															Type: "bigint",
 														},
 														},
-														&ast.StreamField{
-															Name:      "id",
-															FieldType: &ast.BasicType{Type: ast.BIGINT},
+														"id": {
+															Type: "bigint",
 														},
 														},
 													},
 													},
 													streamStmt: streams["tableInPlanner"],
 													streamStmt: streams["tableInPlanner"],
@@ -2441,11 +2424,12 @@ func Test_createLogicalPlanSchemaless(t *testing.T) {
 						DataSourcePlan{
 						DataSourcePlan{
 							baseLogicalPlan: baseLogicalPlan{},
 							baseLogicalPlan: baseLogicalPlan{},
 							name:            "src1",
 							name:            "src1",
-							streamFields: []interface{}{
-								"name",
+							streamFields: map[string]*ast.JsonStreamField{
+								"name": nil,
 							},
 							},
-							streamStmt: streams["src1"],
-							metaFields: []string{},
+							isSchemaless: true,
+							streamStmt:   streams["src1"],
+							metaFields:   []string{},
 						}.Init(),
 						}.Init(),
 					},
 					},
 				},
 				},
@@ -2555,11 +2539,12 @@ func Test_createLogicalPlan4Lookup(t *testing.T) {
 									DataSourcePlan{
 									DataSourcePlan{
 										baseLogicalPlan: baseLogicalPlan{},
 										baseLogicalPlan: baseLogicalPlan{},
 										name:            "src1",
 										name:            "src1",
-										streamFields: []interface{}{
-											"a",
+										streamFields: map[string]*ast.JsonStreamField{
+											"a": nil,
 										},
 										},
-										streamStmt: streams["src1"],
-										metaFields: []string{},
+										isSchemaless: true,
+										streamStmt:   streams["src1"],
+										metaFields:   []string{},
 									}.Init(),
 									}.Init(),
 								},
 								},
 							},
 							},
@@ -2636,11 +2621,12 @@ func Test_createLogicalPlan4Lookup(t *testing.T) {
 															DataSourcePlan{
 															DataSourcePlan{
 																baseLogicalPlan: baseLogicalPlan{},
 																baseLogicalPlan: baseLogicalPlan{},
 																name:            "src1",
 																name:            "src1",
-																streamFields: []interface{}{
-																	"a",
+																streamFields: map[string]*ast.JsonStreamField{
+																	"a": nil,
 																},
 																},
-																streamStmt: streams["src1"],
-																metaFields: []string{},
+																isSchemaless: true,
+																streamStmt:   streams["src1"],
+																metaFields:   []string{},
 															}.Init(),
 															}.Init(),
 														},
 														},
 													},
 													},
@@ -2775,11 +2761,12 @@ func Test_createLogicalPlan4Lookup(t *testing.T) {
 												DataSourcePlan{
 												DataSourcePlan{
 													baseLogicalPlan: baseLogicalPlan{},
 													baseLogicalPlan: baseLogicalPlan{},
 													name:            "src1",
 													name:            "src1",
-													streamFields: []interface{}{
-														"a",
+													streamFields: map[string]*ast.JsonStreamField{
+														"a": nil,
 													},
 													},
-													streamStmt: streams["src1"],
-													metaFields: []string{},
+													isSchemaless: true,
+													streamStmt:   streams["src1"],
+													metaFields:   []string{},
 												}.Init(),
 												}.Init(),
 											},
 											},
 										},
 										},
@@ -2895,8 +2882,10 @@ func Test_createLogicalPlan4Lookup(t *testing.T) {
 													baseLogicalPlan: baseLogicalPlan{},
 													baseLogicalPlan: baseLogicalPlan{},
 													name:            "src1",
 													name:            "src1",
 													streamStmt:      streams["src1"],
 													streamStmt:      streams["src1"],
+													streamFields:    map[string]*ast.JsonStreamField{},
 													metaFields:      []string{},
 													metaFields:      []string{},
 													isWildCard:      true,
 													isWildCard:      true,
+													isSchemaless:    true,
 												}.Init(),
 												}.Init(),
 											},
 											},
 										},
 										},

+ 4 - 4
internal/topo/topotest/mocknode/mock_data.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -76,7 +76,7 @@ var TestData = map[string][]*xsql.Tuple{
 		{
 		{
 			Emitter: "demoError",
 			Emitter: "demoError",
 			Message: map[string]interface{}{
 			Message: map[string]interface{}{
-				"color": 3,
+				"color": "blue",
 				"size":  "red",
 				"size":  "red",
 				"ts":    1541152486013,
 				"ts":    1541152486013,
 			},
 			},
@@ -87,7 +87,7 @@ var TestData = map[string][]*xsql.Tuple{
 			Message: map[string]interface{}{
 			Message: map[string]interface{}{
 				"color": "blue",
 				"color": "blue",
 				"size":  6,
 				"size":  6,
-				"ts":    "1541152486822",
+				"ts":    1541152486822,
 			},
 			},
 			Timestamp: 1541152486822,
 			Timestamp: 1541152486822,
 		},
 		},
@@ -508,7 +508,7 @@ var TestData = map[string][]*xsql.Tuple{
 			Emitter: "demoErr",
 			Emitter: "demoErr",
 			Message: map[string]interface{}{
 			Message: map[string]interface{}{
 				"color": 2,
 				"color": 2,
-				"size":  "blue",
+				"size":  5,
 				"ts":    1541152487632,
 				"ts":    1541152487632,
 			},
 			},
 			Timestamp: 1541152487722,
 			Timestamp: 1541152487722,

+ 2 - 2
internal/topo/topotest/rule_test.go

@@ -153,7 +153,7 @@ func TestSingleSQL(t *testing.T) {
 			Sql:  `SELECT size as Int8, ts FROM demoError where size > 3`,
 			Sql:  `SELECT size as Int8, ts FROM demoError where size > 3`,
 			R: [][]map[string]interface{}{
 			R: [][]map[string]interface{}{
 				{{
 				{{
-					"error": "error in preprocessor: invalid data type for size, expect bigint but found string(red)",
+					"error": "error in preprocessor: field size type mismatch: cannot convert string(red) to int64",
 				}},
 				}},
 				{{
 				{{
 					"Int8": float64(6),
 					"Int8": float64(6),
@@ -164,7 +164,7 @@ func TestSingleSQL(t *testing.T) {
 					"ts":   float64(1541152488442),
 					"ts":   float64(1541152488442),
 				}},
 				}},
 				{{
 				{{
-					"error": "error in preprocessor: invalid data type for size, expect bigint but found string(blue)",
+					"error": "error in preprocessor: field size type mismatch: cannot convert string(blue) to int64",
 				}},
 				}},
 			},
 			},
 			M: map[string]interface{}{
 			M: map[string]interface{}{

+ 4 - 4
internal/topo/topotest/window_rule_test.go

@@ -419,7 +419,7 @@ func TestWindow(t *testing.T) {
 			Sql:  `SELECT * FROM demoError GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
 			Sql:  `SELECT * FROM demoError GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
 			R: [][]map[string]interface{}{
 			R: [][]map[string]interface{}{
 				{{
 				{{
-					"error": "error in preprocessor: invalid data type for color, expect string but found int(3)",
+					"error": "error in preprocessor: field size type mismatch: cannot convert string(red) to int64",
 				}},
 				}},
 				{{
 				{{
 					"color": "blue",
 					"color": "blue",
@@ -436,7 +436,7 @@ func TestWindow(t *testing.T) {
 					"ts":    float64(1541152487632),
 					"ts":    float64(1541152487632),
 				}},
 				}},
 				{{
 				{{
-					"error": "error in preprocessor: invalid data type for color, expect string but found int(7)",
+					"error": "error in preprocessor: field color type mismatch: cannot convert int(7) to string",
 				}},
 				}},
 				{{
 				{{
 					"color": "blue",
 					"color": "blue",
@@ -444,7 +444,7 @@ func TestWindow(t *testing.T) {
 					"ts":    float64(1541152487632),
 					"ts":    float64(1541152487632),
 				}},
 				}},
 				{{
 				{{
-					"error": "error in preprocessor: invalid data type for size, expect bigint but found string(blue)",
+					"error": "error in preprocessor: field size type mismatch: cannot convert string(blue) to int64",
 				}},
 				}},
 				{},
 				{},
 			},
 			},
@@ -1015,7 +1015,7 @@ func TestEventWindow(t *testing.T) {
 			Sql:  `SELECT * FROM demoErr GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
 			Sql:  `SELECT * FROM demoErr GROUP BY HOPPINGWINDOW(ss, 2, 1)`,
 			R: [][]map[string]interface{}{
 			R: [][]map[string]interface{}{
 				{{
 				{{
-					"error": "error in preprocessor: invalid data type for color, expect string but found int(2)",
+					"error": "error in preprocessor: field color type mismatch: cannot convert int(2) to string",
 				}},
 				}},
 				{{
 				{{
 					"color": "red",
 					"color": "red",

+ 41 - 0
pkg/ast/sourceStmt.go

@@ -84,6 +84,47 @@ func (sf *StreamFields) UnmarshalFromMap(data map[string]*JsonStreamField) error
 	*sf = t
 	*sf = t
 	return nil
 	return nil
 }
 }
+func (sf *StreamFields) ToJsonSchema() map[string]*JsonStreamField {
+	return convertSchema(*sf)
+}
+
+func convertSchema(sfs StreamFields) map[string]*JsonStreamField {
+	result := make(map[string]*JsonStreamField, len(sfs))
+	for _, sf := range sfs {
+		result[sf.Name] = convertFieldType(sf.FieldType)
+	}
+	return result
+}
+
+func convertFieldType(sf FieldType) *JsonStreamField {
+	switch t := sf.(type) {
+	case *BasicType:
+		return &JsonStreamField{
+			Type: t.Type.String(),
+		}
+	case *ArrayType:
+		var items *JsonStreamField
+		switch t.Type {
+		case ARRAY, STRUCT:
+			items = convertFieldType(t.FieldType)
+		default:
+			items = &JsonStreamField{
+				Type: t.Type.String(),
+			}
+		}
+		return &JsonStreamField{
+			Type:  "array",
+			Items: items,
+		}
+	case *RecType:
+		return &JsonStreamField{
+			Type:       "struct",
+			Properties: convertSchema(t.StreamFields),
+		}
+	default: // should never happen
+		return nil
+	}
+}
 
 
 func fieldsTypeFromSchema(mjsf map[string]*JsonStreamField) (StreamFields, error) {
 func fieldsTypeFromSchema(mjsf map[string]*JsonStreamField) (StreamFields, error) {
 	sfs := make(StreamFields, 0, len(mjsf))
 	sfs := make(StreamFields, 0, len(mjsf))

+ 73 - 1
pkg/ast/sourceStmt_test.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -64,3 +64,75 @@ func TestPrintFieldType(t *testing.T) {
 		}
 		}
 	}
 	}
 }
 }
+
+func TestToJsonFields(t *testing.T) {
+	var tests = []struct {
+		input  StreamFields
+		output map[string]*JsonStreamField
+	}{
+		{
+			input: StreamFields{
+				{Name: "STREET_NAME", FieldType: &BasicType{Type: STRINGS}},
+			},
+			output: map[string]*JsonStreamField{
+				"STREET_NAME": {
+					Type: "string",
+				},
+			},
+		}, {
+			input: []StreamField{
+				{Name: "USERID", FieldType: &BasicType{Type: BIGINT}},
+				{Name: "FIRST_NAME", FieldType: &BasicType{Type: STRINGS}},
+				{Name: "LAST_NAME", FieldType: &BasicType{Type: STRINGS}},
+				{Name: "NICKNAMES", FieldType: &ArrayType{Type: STRINGS}},
+				{Name: "data", FieldType: &BasicType{Type: BYTEA}},
+				{Name: "Gender", FieldType: &BasicType{Type: BOOLEAN}},
+				{Name: "ADDRESS", FieldType: &RecType{
+					StreamFields: []StreamField{
+						{Name: "STREET_NAME", FieldType: &BasicType{Type: STRINGS}},
+						{Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}},
+					},
+				}},
+			},
+			output: map[string]*JsonStreamField{
+				"USERID":     {Type: "bigint"},
+				"FIRST_NAME": {Type: "string"},
+				"LAST_NAME":  {Type: "string"},
+				"NICKNAMES":  {Type: "array", Items: &JsonStreamField{Type: "string"}},
+				"data":       {Type: "bytea"},
+				"Gender":     {Type: "boolean"},
+				"ADDRESS": {Type: "struct", Properties: map[string]*JsonStreamField{
+					"STREET_NAME": {Type: "string"},
+					"NUMBER":      {Type: "bigint"},
+				}},
+			},
+		}, {
+			input: []StreamField{
+				{Name: "ADDRESSES", FieldType: &ArrayType{
+					Type: STRUCT,
+					FieldType: &RecType{
+						StreamFields: []StreamField{
+							{Name: "STREET_NAME", FieldType: &BasicType{Type: STRINGS}},
+							{Name: "NUMBER", FieldType: &BasicType{Type: BIGINT}},
+						},
+					},
+				}},
+			},
+			output: map[string]*JsonStreamField{
+				"ADDRESSES": {Type: "array", Items: &JsonStreamField{
+					Type: "struct", Properties: map[string]*JsonStreamField{
+						"STREET_NAME": {Type: "string"},
+						"NUMBER":      {Type: "bigint"},
+					}},
+				},
+			},
+		},
+	}
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	for i, tt := range tests {
+		result := tt.input.ToJsonSchema()
+		if !reflect.DeepEqual(tt.output, result) {
+			t.Errorf("%d. \nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.output, result)
+		}
+	}
+}

+ 16 - 0
pkg/cast/cast.go

@@ -15,6 +15,7 @@
 package cast
 package cast
 
 
 import (
 import (
+	"encoding/base64"
 	"fmt"
 	"fmt"
 	"github.com/mitchellh/mapstructure"
 	"github.com/mitchellh/mapstructure"
 	"reflect"
 	"reflect"
@@ -804,6 +805,21 @@ func ToBytes(input interface{}, sn Strictness) ([]byte, error) {
 	return nil, fmt.Errorf("cannot convert %[1]T(%[1]v) to bytes", input)
 	return nil, fmt.Errorf("cannot convert %[1]T(%[1]v) to bytes", input)
 }
 }
 
 
+// ToByteA converts to eKuiper internal byte array
+func ToByteA(input interface{}, _ Strictness) ([]byte, error) {
+	switch b := input.(type) {
+	case []byte:
+		return b, nil
+	case string:
+		r, err := base64.StdEncoding.DecodeString(b)
+		if err != nil {
+			return nil, fmt.Errorf("illegal string %s, must be base64 encoded string", b)
+		}
+		return r, nil
+	}
+	return nil, fmt.Errorf("cannot convert %[1]T(%[1]v) to bytes", input)
+}
+
 func ToStringMap(input interface{}) (map[string]interface{}, error) {
 func ToStringMap(input interface{}) (map[string]interface{}, error) {
 	var m = map[string]interface{}{}
 	var m = map[string]interface{}{}
 
 

+ 40 - 1
pkg/cast/cast_test.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -15,6 +15,7 @@
 package cast
 package cast
 
 
 import (
 import (
+	"encoding/hex"
 	"fmt"
 	"fmt"
 	"reflect"
 	"reflect"
 	"testing"
 	"testing"
@@ -396,3 +397,41 @@ func TestMapToStructTag(t *testing.T) {
 		})
 		})
 	}
 	}
 }
 }
+
+func TestToByteA(t *testing.T) {
+	bytea, _ := hex.DecodeString("736f6d6520646174612077697468200020616e6420efbbbf")
+	tests := []struct {
+		input  interface{}
+		output []byte
+		err    string
+	}{
+		{
+			input: "foo",
+			err:   "illegal string foo, must be base64 encoded string",
+		}, {
+			input:  []byte("foo"),
+			output: []byte("foo"),
+			err:    "",
+		}, {
+			input:  1,
+			output: nil,
+			err:    "cannot convert int(1) to bytes",
+		}, {
+			input:  "c29tZSBkYXRhIHdpdGggACBhbmQg77u/",
+			output: bytea,
+		},
+	}
+	for i, tt := range tests {
+		r, err := ToByteA(tt.input, CONVERT_SAMEKIND)
+		if err != nil {
+			if err.Error() != tt.err {
+				t.Errorf("%d, ToByteA() error = %v, wantErr %v", i, err, tt.err)
+				continue
+			}
+		} else {
+			if !reflect.DeepEqual(r, tt.output) {
+				t.Errorf("%d: ToByteA() = %x, want %x", i, r, tt.output)
+			}
+		}
+	}
+}