Browse Source

feat: introduce fastjson in converter (#1954)

* feat: introduce fastjson with schema (#1939)

* introduce fastjson

Signed-off-by: yisaer <disxiaofei@163.com>

* feat: support column pruning in deserialization (#1946)

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: Song Gao <disxiaofei@163.com>

* add test

Signed-off-by: Song Gao <disxiaofei@163.com>

* add stack

Signed-off-by: Song Gao <disxiaofei@163.com>

* add stack

Signed-off-by: Song Gao <disxiaofei@163.com>

* fix test

Signed-off-by: Song Gao <disxiaofei@163.com>

* fix test

Signed-off-by: Song Gao <disxiaofei@163.com>

---------

Signed-off-by: yisaer <disxiaofei@163.com>
Signed-off-by: Song Gao <disxiaofei@163.com>
Song Gao 1 year ago
parent
commit
e13cd2a6fb

+ 2 - 1
extensions/sinks/tdengine/tdengine.go

@@ -18,10 +18,11 @@ import (
 	"database/sql"
 	"encoding/json"
 	"fmt"
-	"github.com/lf-edge/ekuiper/internal/topo/transform"
 	"reflect"
 	"strings"
 
+	"github.com/lf-edge/ekuiper/internal/topo/transform"
+
 	_ "github.com/taosdata/driver-go/v2/taosSql"
 
 	"github.com/lf-edge/ekuiper/internal/conf"

+ 1 - 0
go.mod

@@ -34,6 +34,7 @@ require (
 	github.com/stretchr/testify v1.8.2
 	github.com/ugorji/go/codec v1.2.10
 	github.com/urfave/cli v1.22.12
+	github.com/valyala/fastjson v1.6.4
 	go.nanomsg.org/mangos/v3 v3.4.2
 	golang.org/x/text v0.9.0
 	google.golang.org/genproto v0.0.0-20230227214838-9b19f0bdc514

+ 2 - 0
go.sum

@@ -233,6 +233,8 @@ github.com/ugorji/go/codec v1.2.10 h1:eimT6Lsr+2lzmSZxPhLFoOWFmQqwk0fllJJ5hEbTXt
 github.com/ugorji/go/codec v1.2.10/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
 github.com/urfave/cli v1.22.12 h1:igJgVw1JdKH+trcLWLeLwZjU9fEfPesQ+9/e4MQ44S8=
 github.com/urfave/cli v1.22.12/go.mod h1:sSBEIC79qR6OvcmsD4U3KABeOTxDqQtdDnaFuUN30b8=
+github.com/valyala/fastjson v1.6.4 h1:uAUNq9Z6ymTgGhcm0UynUAB6tlbakBrz6CQFax3BXVQ=
+github.com/valyala/fastjson v1.6.4/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY=
 github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
 github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
 github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=

+ 4 - 0
internal/converter/converter.go

@@ -47,6 +47,10 @@ func GetOrCreateConverter(options *ast.Options) (message.Converter, error) {
 	if t == "" {
 		t = message.FormatJson
 	}
+	if t == message.FormatJson && len(options.Schema) > 0 {
+		return json.NewFastJsonConverter(options.Schema), nil
+	}
+
 	schemaFile := ""
 	schemaName := options.SCHEMAID
 	if schemaName != "" {

+ 88 - 8
internal/converter/json/convert_bench_test.go

@@ -17,23 +17,103 @@ package json
 import (
 	"os"
 	"testing"
+
+	"github.com/lf-edge/ekuiper/pkg/ast"
 )
 
 func BenchmarkSimpleTuples(b *testing.B) {
-	payload := []byte(`{"key": "value"}`)
-	b.ResetTimer()
-	for i := 0; i < b.N; i++ {
-		converter.Decode(payload)
+	benchmarkByFiles("./testdata/simple.json", b, nil)
+}
+
+func BenchmarkSimpleTuplesWithSchema(b *testing.B) {
+	schema := map[string]*ast.JsonStreamField{
+		"key": {
+			Type: "string",
+		},
+	}
+	benchmarkByFiles("./testdata/simple.json", b, schema)
+}
+
+func BenchmarkSmallJSON(b *testing.B) {
+	benchmarkByFiles("./testdata/small.json", b, nil)
+}
+
+func BenchmarkSmallJSONWithSchema(b *testing.B) {
+	schema := map[string]*ast.JsonStreamField{
+		"sid": {
+			Type: "bigint",
+		},
 	}
+	benchmarkByFiles("./testdata/small.json", b, schema)
+}
+
+func BenchmarkMediumJSON(b *testing.B) {
+	benchmarkByFiles("./testdata/medium.json", b, nil)
+}
+
+func BenchmarkMediumJSONWithSchema(b *testing.B) {
+	schema := map[string]*ast.JsonStreamField{
+		"person": {
+			Type: "struct",
+			Properties: map[string]*ast.JsonStreamField{
+				"id": {
+					Type: "string",
+				},
+			},
+		},
+	}
+	benchmarkByFiles("./testdata/medium.json", b, schema)
+}
+
+func BenchmarkLargeJSON(b *testing.B) {
+	benchmarkByFiles("./testdata/large.json", b, nil)
+}
+
+func BenchmarkLargeJSONWithSchema(b *testing.B) {
+	schema := map[string]*ast.JsonStreamField{
+		"users": {
+			Type: "array",
+			Items: &ast.JsonStreamField{
+				Type: "struct",
+				Properties: map[string]*ast.JsonStreamField{
+					"id": {
+						Type: "bigint",
+					},
+				},
+			},
+		},
+	}
+	benchmarkByFiles("./testdata/large.json", b, schema)
 }
 
 func BenchmarkComplexTuples(b *testing.B) {
-	payload, err := os.ReadFile("./testdata/MDFD.json")
+	benchmarkByFiles("./testdata/MDFD.json", b, nil)
+}
+
+func BenchmarkComplexTuplesWithSchema(b *testing.B) {
+	schema := map[string]*ast.JsonStreamField{
+		"STD_AbsoluteWindDirection": {
+			Type: "float",
+		},
+	}
+	benchmarkByFiles("./testdata/MDFD.json", b, schema)
+}
+
+func benchmarkByFiles(filePath string, b *testing.B, schema map[string]*ast.JsonStreamField) {
+	payload, err := os.ReadFile(filePath)
 	if err != nil {
 		b.Fatalf(err.Error())
 	}
-	b.ResetTimer()
-	for i := 0; i < b.N; i++ {
-		converter.Decode(payload)
+	if schema != nil {
+		f := NewFastJsonConverter(schema)
+		b.ResetTimer()
+		for i := 0; i < b.N; i++ {
+			f.Decode(payload)
+		}
+	} else {
+		b.ResetTimer()
+		for i := 0; i < b.N; i++ {
+			converter.Decode(payload)
+		}
 	}
 }

+ 281 - 0
internal/converter/json/converter.go

@@ -16,7 +16,11 @@ package json
 
 import (
 	"encoding/json"
+	"fmt"
 
+	"github.com/valyala/fastjson"
+
+	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/message"
 )
 
@@ -40,3 +44,280 @@ func (c *Converter) Decode(b []byte) (interface{}, error) {
 	}
 	return r0, nil
 }
+
+type FastJsonConverter struct {
+	schema map[string]*ast.JsonStreamField
+}
+
+func NewFastJsonConverter(schema map[string]*ast.JsonStreamField) *FastJsonConverter {
+	return &FastJsonConverter{
+		schema: schema,
+	}
+}
+
+func (c *FastJsonConverter) Encode(d interface{}) ([]byte, error) {
+	return json.Marshal(d)
+}
+
+func (c *FastJsonConverter) Decode(b []byte) (interface{}, error) {
+	return c.decodeWithSchema(b, c.schema)
+}
+
+func (f *FastJsonConverter) decodeWithSchema(b []byte, schema map[string]*ast.JsonStreamField) (interface{}, error) {
+	var p fastjson.Parser
+	v, err := p.ParseBytes(b)
+	if err != nil {
+		return nil, err
+	}
+	switch v.Type() {
+	case fastjson.TypeArray:
+		array, err := v.Array()
+		if err != nil {
+			return nil, err
+		}
+		ms := make([]map[string]interface{}, len(array))
+		for i, v := range array {
+			obj, err := v.Object()
+			if err != nil {
+				return nil, err
+			}
+			subMap, err := f.decodeObject(obj, schema)
+			if err != nil {
+				return nil, err
+			}
+			ms[i] = subMap
+		}
+		return ms, nil
+	case fastjson.TypeObject:
+		obj, err := v.Object()
+		if err != nil {
+			return nil, err
+		}
+		m, err := f.decodeObject(obj, schema)
+		if err != nil {
+			return nil, err
+		}
+		return m, nil
+	}
+	return nil, fmt.Errorf("only map[string]interface{} and []map[string]interface{} is supported")
+}
+
+func (f *FastJsonConverter) decodeArray(array []*fastjson.Value, field *ast.JsonStreamField) ([]interface{}, error) {
+	vs := make([]interface{}, len(array))
+	switch field.Type {
+	case "bigint", "float":
+		for i, item := range array {
+			typ := item.Type()
+			switch typ {
+			case fastjson.TypeNumber:
+				f64, err := item.Float64()
+				if err != nil {
+					return nil, err
+				}
+				vs[i] = f64
+			default:
+				return nil, fmt.Errorf("array has wrong type:%v, expect:%v", typ.String(), field.Type)
+			}
+		}
+	case "string", "bytea":
+		for i, item := range array {
+			typ := item.Type()
+			switch typ {
+			case fastjson.TypeString:
+				s, err := item.StringBytes()
+				if err != nil {
+					return nil, err
+				}
+				vs[i] = string(s)
+			case fastjson.TypeNumber:
+				f64, err := item.Float64()
+				if err != nil {
+					return nil, err
+				}
+				vs[i] = f64
+			default:
+				return nil, fmt.Errorf("array has wrong type:%v, expect:%v", typ.String(), field.Type)
+			}
+		}
+	case "array":
+		for i, item := range array {
+			typ := item.Type()
+			switch typ {
+			case fastjson.TypeArray:
+				childArrays, err := item.Array()
+				if err != nil {
+					return nil, err
+				}
+				subList, err := f.decodeArray(childArrays, field.Items)
+				if err != nil {
+					return nil, err
+				}
+				vs[i] = subList
+			default:
+				return nil, fmt.Errorf("array has wrong type:%v, expect:%v", typ.String(), field.Type)
+			}
+		}
+	case "struct":
+		for i, item := range array {
+			typ := item.Type()
+			switch typ {
+			case fastjson.TypeObject:
+				childObj, err := item.Object()
+				if err != nil {
+					return nil, err
+				}
+				subMap, err := f.decodeObject(childObj, field.Properties)
+				if err != nil {
+					return nil, err
+				}
+				vs[i] = subMap
+			default:
+				return nil, fmt.Errorf("array has wrong type:%v, expect:%v", typ.String(), field.Type)
+			}
+		}
+	case "boolean":
+		for i, item := range array {
+			typ := item.Type()
+			switch typ {
+			case fastjson.TypeTrue, fastjson.TypeFalse:
+				b, err := item.Bool()
+				if err != nil {
+					return nil, err
+				}
+				vs[i] = b
+			default:
+				return nil, fmt.Errorf("array has wrong type:%v, expect:%v", typ.String(), field.Type)
+			}
+		}
+	case "datetime":
+		for i, item := range array {
+			typ := item.Type()
+			switch typ {
+			case fastjson.TypeNumber:
+				f64, err := item.Float64()
+				if err != nil {
+					return nil, err
+				}
+				vs[i] = f64
+			case fastjson.TypeString:
+				s, err := item.StringBytes()
+				if err != nil {
+					return nil, err
+				}
+				vs[i] = string(s)
+			default:
+				return nil, fmt.Errorf("array has wrong type:%v, expect:%v", typ.String(), field.Type)
+			}
+		}
+
+	default:
+		return nil, fmt.Errorf("unknown filed type:%s", field.Type)
+	}
+	return vs, nil
+}
+
+func (f *FastJsonConverter) decodeObject(obj *fastjson.Object, schema map[string]*ast.JsonStreamField) (map[string]interface{}, error) {
+	m := make(map[string]interface{})
+	for key, field := range schema {
+		if obj.Get(key) == nil {
+			continue
+		}
+		switch field.Type {
+		case "bigint", "float":
+			typ := obj.Get(key).Type()
+			switch typ {
+			case fastjson.TypeNumber:
+				f64v, err := obj.Get(key).Float64()
+				if err != nil {
+					return nil, err
+				}
+				m[key] = f64v
+			default:
+				return nil, fmt.Errorf("%v has wrong type:%v, expect:%v", key, typ.String(), field.Type)
+			}
+		case "string", "bytea":
+			typ := obj.Get(key).Type()
+			switch typ {
+			case fastjson.TypeString:
+				s, err := obj.Get(key).StringBytes()
+				if err != nil {
+					return nil, err
+				}
+				m[key] = string(s)
+			case fastjson.TypeNumber:
+				f64v, err := obj.Get(key).Float64()
+				if err != nil {
+					return nil, err
+				}
+				m[key] = f64v
+			default:
+				return nil, fmt.Errorf("%v has wrong type:%v, expect:%v", key, typ.String(), field.Type)
+			}
+		case "array":
+			typ := obj.Get(key).Type()
+			switch typ {
+			case fastjson.TypeArray:
+				childArray, err := obj.Get(key).Array()
+				if err != nil {
+					return nil, err
+				}
+				subList, err := f.decodeArray(childArray, schema[key].Items)
+				if err != nil {
+					return nil, err
+				}
+				m[key] = subList
+			default:
+				return nil, fmt.Errorf("%v has wrong type:%v, expect:%v", key, typ.String(), field.Type)
+			}
+		case "struct":
+			typ := obj.Get(key).Type()
+			switch typ {
+			case fastjson.TypeObject:
+				childObj, err := obj.Get(key).Object()
+				if err != nil {
+					return nil, err
+				}
+				childMap, err := f.decodeObject(childObj, schema[key].Properties)
+				if err != nil {
+					return nil, err
+				}
+				m[key] = childMap
+			default:
+				return nil, fmt.Errorf("%v has wrong type:%v, expect:%v", key, typ.String(), field.Type)
+			}
+		case "boolean":
+			typ := obj.Get(key).Type()
+			switch typ {
+			case fastjson.TypeFalse, fastjson.TypeTrue:
+				b, err := obj.Get(key).Bool()
+				if err != nil {
+					return nil, err
+				}
+				m[key] = b
+			default:
+				return nil, fmt.Errorf("%v has wrong type:%v, expect:%v", key, typ.String(), field.Type)
+			}
+		case "datetime":
+			typ := obj.Get(key).Type()
+			switch typ {
+			case fastjson.TypeString:
+				s, err := obj.Get(key).StringBytes()
+				if err != nil {
+					return nil, err
+				}
+				m[key] = string(s)
+			case fastjson.TypeNumber:
+				f64v, err := obj.Get(key).Float64()
+				if err != nil {
+					return nil, err
+				}
+				m[key] = f64v
+			default:
+				return nil, fmt.Errorf("%v has wrong type:%v, expect:%v", key, typ.String(), field.Type)
+			}
+		default:
+			return nil, fmt.Errorf("unknown filed type:%s", field.Type)
+		}
+	}
+	return m, nil
+}

+ 335 - 1
internal/converter/json/converter_test.go

@@ -1,4 +1,4 @@
-// Copyright 2022 EMQ Technologies Co., Ltd.
+// Copyright 2022-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -21,6 +21,10 @@ import (
 	"path"
 	"reflect"
 	"testing"
+
+	"github.com/stretchr/testify/require"
+
+	"github.com/lf-edge/ekuiper/pkg/ast"
 )
 
 func TestMessageDecode(t *testing.T) {
@@ -73,3 +77,333 @@ func TestMessageDecode(t *testing.T) {
 		}
 	}
 }
+
+func TestFastJsonConverterWithSchema(t *testing.T) {
+	testcases := []struct {
+		schema  map[string]*ast.JsonStreamField
+		payload []byte
+		require map[string]interface{}
+	}{
+		{
+			payload: []byte(`{"a":1}`),
+			schema: map[string]*ast.JsonStreamField{
+				"a": {
+					Type: "bigint",
+				},
+			},
+			require: map[string]interface{}{
+				"a": float64(1),
+			},
+		},
+		{
+			payload: []byte(`{"a":1}`),
+			schema: map[string]*ast.JsonStreamField{
+				"a": {
+					Type: "float",
+				},
+			},
+			require: map[string]interface{}{
+				"a": float64(1),
+			},
+		},
+		{
+			payload: []byte(`{"a":"a"}`),
+			schema: map[string]*ast.JsonStreamField{
+				"a": {
+					Type: "string",
+				},
+			},
+			require: map[string]interface{}{
+				"a": "a",
+			},
+		},
+		{
+			payload: []byte(`{"a":"a"}`),
+			schema: map[string]*ast.JsonStreamField{
+				"a": {
+					Type: "bytea",
+				},
+			},
+			require: map[string]interface{}{
+				"a": "a",
+			},
+		},
+		{
+			payload: []byte(`{"a":true}`),
+			schema: map[string]*ast.JsonStreamField{
+				"a": {
+					Type: "boolean",
+				},
+			},
+			require: map[string]interface{}{
+				"a": true,
+			},
+		},
+		{
+			payload: []byte(`{"a":123}`),
+			schema: map[string]*ast.JsonStreamField{
+				"a": {
+					Type: "datetime",
+				},
+			},
+			require: map[string]interface{}{
+				"a": float64(123),
+			},
+		},
+		{
+			payload: []byte(`{"a":"123"}`),
+			schema: map[string]*ast.JsonStreamField{
+				"a": {
+					Type: "datetime",
+				},
+			},
+			require: map[string]interface{}{
+				"a": "123",
+			},
+		},
+		{
+			payload: []byte(`{"a":{"b":1}}`),
+			schema: map[string]*ast.JsonStreamField{
+				"a": {
+					Type: "struct",
+					Properties: map[string]*ast.JsonStreamField{
+						"b": {
+							Type: "bigint",
+						},
+					},
+				},
+			},
+			require: map[string]interface{}{
+				"a": map[string]interface{}{
+					"b": float64(1),
+				},
+			},
+		},
+	}
+	for _, tc := range testcases {
+		f := NewFastJsonConverter(tc.schema)
+		v, err := f.Decode(tc.payload)
+		require.NoError(t, err)
+		require.Equal(t, v, tc.require)
+	}
+
+	for _, tc := range testcases {
+		arrayPayload := []byte(fmt.Sprintf("[%s]", string(tc.payload)))
+		arrayRequire := []map[string]interface{}{
+			tc.require,
+		}
+		f := NewFastJsonConverter(tc.schema)
+		v, err := f.Decode(arrayPayload)
+		require.NoError(t, err)
+		require.Equal(t, v, arrayRequire)
+	}
+}
+
+func TestFastJsonConverterWithSchemaError(t *testing.T) {
+	testcases := []struct {
+		schema  map[string]*ast.JsonStreamField
+		payload []byte
+		err     error
+	}{
+		{
+			payload: []byte(`{123}`),
+			schema: map[string]*ast.JsonStreamField{
+				"a": {
+					Type: "bigint",
+				},
+			},
+			err: fmt.Errorf(`cannot parse JSON: cannot parse object: cannot find opening '"" for object key; unparsed tail: "123}"`),
+		},
+		{
+			payload: []byte(`123`),
+			schema: map[string]*ast.JsonStreamField{
+				"a": {
+					Type: "bigint",
+				},
+			},
+			err: fmt.Errorf("only map[string]interface{} and []map[string]interface{} is supported"),
+		},
+		{
+			payload: []byte(`{"a":"123"}`),
+			schema: map[string]*ast.JsonStreamField{
+				"a": {
+					Type: "bigint",
+				},
+			},
+			err: fmt.Errorf("a has wrong type:string, expect:bigint"),
+		},
+		{
+			payload: []byte(`{"a":true}`),
+			schema: map[string]*ast.JsonStreamField{
+				"a": {
+					Type: "string",
+				},
+			},
+			err: fmt.Errorf("a has wrong type:true, expect:string"),
+		},
+		{
+			payload: []byte(`{"a":123}`),
+			schema: map[string]*ast.JsonStreamField{
+				"a": {
+					Type: "array",
+				},
+			},
+			err: fmt.Errorf("a has wrong type:number, expect:array"),
+		},
+		{
+			payload: []byte(`{"a":123}`),
+			schema: map[string]*ast.JsonStreamField{
+				"a": {
+					Type: "struct",
+				},
+			},
+			err: fmt.Errorf("a has wrong type:number, expect:struct"),
+		},
+		{
+			payload: []byte(`{"a":123}`),
+			schema: map[string]*ast.JsonStreamField{
+				"a": {
+					Type: "boolean",
+				},
+			},
+			err: fmt.Errorf("a has wrong type:number, expect:boolean"),
+		},
+		{
+			payload: []byte(`{"a":true}`),
+			schema: map[string]*ast.JsonStreamField{
+				"a": {
+					Type: "datetime",
+				},
+			},
+			err: fmt.Errorf("a has wrong type:true, expect:datetime"),
+		},
+		{
+			payload: []byte(`{"a":["123"]}`),
+			schema: map[string]*ast.JsonStreamField{
+				"a": {
+					Type: "array",
+					Items: &ast.JsonStreamField{
+						Type: "bigint",
+					},
+				},
+			},
+			err: fmt.Errorf("array has wrong type:string, expect:bigint"),
+		},
+		{
+			payload: []byte(`{"a":[true]}`),
+			schema: map[string]*ast.JsonStreamField{
+				"a": {
+					Type: "array",
+					Items: &ast.JsonStreamField{
+						Type: "string",
+					},
+				},
+			},
+			err: fmt.Errorf("array has wrong type:true, expect:string"),
+		},
+		{
+			payload: []byte(`{"a":[123]}`),
+			schema: map[string]*ast.JsonStreamField{
+				"a": {
+					Type: "array",
+					Items: &ast.JsonStreamField{
+						Type: "array",
+					},
+				},
+			},
+			err: fmt.Errorf("array has wrong type:number, expect:array"),
+		},
+		{
+			payload: []byte(`{"a":[123]}`),
+			schema: map[string]*ast.JsonStreamField{
+				"a": {
+					Type: "array",
+					Items: &ast.JsonStreamField{
+						Type: "struct",
+					},
+				},
+			},
+			err: fmt.Errorf("array has wrong type:number, expect:struct"),
+		},
+		{
+			payload: []byte(`{"a":[123]}`),
+			schema: map[string]*ast.JsonStreamField{
+				"a": {
+					Type: "array",
+					Items: &ast.JsonStreamField{
+						Type: "boolean",
+					},
+				},
+			},
+			err: fmt.Errorf("array has wrong type:number, expect:boolean"),
+		},
+		{
+			payload: []byte(`{"a":[true]}`),
+			schema: map[string]*ast.JsonStreamField{
+				"a": {
+					Type: "array",
+					Items: &ast.JsonStreamField{
+						Type: "datetime",
+					},
+				},
+			},
+			err: fmt.Errorf("array has wrong type:true, expect:datetime"),
+		},
+	}
+
+	for _, tc := range testcases {
+		f := NewFastJsonConverter(tc.schema)
+		_, err := f.Decode(tc.payload)
+		require.Error(t, err)
+		require.Equal(t, err, tc.err)
+	}
+}
+
+func TestFastJsonEncode(t *testing.T) {
+	a := make(map[string]int)
+	a["a"] = 1
+	f := NewFastJsonConverter(nil)
+	v, err := f.Encode(a)
+	require.NoError(t, err)
+	require.Equal(t, v, []byte(`{"a":1}`))
+}
+
+func TestArrayWithArray(t *testing.T) {
+	payload := []byte(`{
+    "a":[
+        [
+            {
+                "c":1
+            }
+        ]
+    ]
+}`)
+	schema := map[string]*ast.JsonStreamField{
+		"a": {
+			Type: "array",
+			Items: &ast.JsonStreamField{
+				Type: "array",
+				Items: &ast.JsonStreamField{
+					Type: "struct",
+					Properties: map[string]*ast.JsonStreamField{
+						"c": {
+							Type: "bigint",
+						},
+					},
+				},
+			},
+		},
+	}
+	f := NewFastJsonConverter(schema)
+	v, err := f.Decode(payload)
+	require.NoError(t, err)
+	require.Equal(t, v, map[string]interface{}{
+		"a": []interface{}{
+			[]interface{}{
+				map[string]interface{}{
+					"c": float64(1),
+				},
+			},
+		},
+	})
+}

File diff suppressed because it is too large
+ 1 - 0
internal/converter/json/testdata/large.json


+ 93 - 0
internal/converter/json/testdata/medium.json

@@ -0,0 +1,93 @@
+{
+  "person": {
+    "id": "d50887ca-a6ce-4e59-b89f-14f0b5d03b03",
+    "name": {
+      "fullName": "Leonid Bugaev",
+      "givenName": "Leonid",
+      "familyName": "Bugaev"
+    },
+    "email": "leonsbox@gmail.com",
+    "gender": "male",
+    "location": "Saint Petersburg, Saint Petersburg, RU",
+    "geo": {
+      "city": "Saint Petersburg",
+      "state": "Saint Petersburg",
+      "country": "Russia",
+      "lat": 59.9342802,
+      "lng": 30.3350986
+    },
+    "bio": "Senior engineer at Granify.com",
+    "site": "http://flickfaver.com",
+    "avatar": "https://d1ts43dypk8bqh.cloudfront.net/v1/avatars/d50887ca-a6ce-4e59-b89f-14f0b5d03b03",
+    "employment": {
+      "name": "www.latera.ru",
+      "title": "Software Engineer",
+      "domain": "gmail.com"
+    },
+    "facebook": {
+      "handle": "leonid.bugaev"
+    },
+    "github": {
+      "handle": "buger",
+      "id": 14009,
+      "avatar": "https://avatars.githubusercontent.com/u/14009?v=3",
+      "company": "Granify",
+      "blog": "http://leonsbox.com",
+      "followers": 95,
+      "following": 10
+    },
+    "twitter": {
+      "handle": "flickfaver",
+      "id": 77004410,
+      "bio": null,
+      "followers": 2,
+      "following": 1,
+      "statuses": 5,
+      "favorites": 0,
+      "location": "",
+      "site": "http://flickfaver.com",
+      "avatar": null
+    },
+    "linkedin": {
+      "handle": "in/leonidbugaev"
+    },
+    "googleplus": {
+      "handle": null
+    },
+    "angellist": {
+      "handle": "leonid-bugaev",
+      "id": 61541,
+      "bio": "Senior engineer at Granify.com",
+      "blog": "http://buger.github.com",
+      "site": "http://buger.github.com",
+      "followers": 41,
+      "avatar": "https://d1qb2nb5cznatu.cloudfront.net/users/61541-medium_jpg?1405474390"
+    },
+    "klout": {
+      "handle": null,
+      "score": null
+    },
+    "foursquare": {
+      "handle": null
+    },
+    "aboutme": {
+      "handle": "leonid.bugaev",
+      "bio": null,
+      "avatar": null
+    },
+    "gravatar": {
+      "handle": "buger",
+      "urls": [
+      ],
+      "avatar": "http://1.gravatar.com/avatar/f7c8edd577d13b8930d5522f28123510",
+      "avatars": [
+        {
+          "url": "http://1.gravatar.com/avatar/f7c8edd577d13b8930d5522f28123510",
+          "type": "thumbnail"
+        }
+      ]
+    },
+    "fuzzy": false
+  },
+  "company": null
+}

+ 3 - 0
internal/converter/json/testdata/simple.json

@@ -0,0 +1,3 @@
+{
+  "key": "value"
+}

+ 11 - 0
internal/converter/json/testdata/small.json

@@ -0,0 +1,11 @@
+{
+    "st": 1,
+    "sid": 486,
+    "tt": "active",
+    "gr": 0,
+    "uuid": "de305d54-75b4-431b-adb2-eb6b9e546014",
+    "ip": "127.0.0.1",
+    "ua": "user_agent",
+    "tz": -6,
+    "v": 1
+}

+ 0 - 1
internal/io/mqtt/mqtt_source.go

@@ -38,7 +38,6 @@ type MQTTSource struct {
 
 	config map[string]interface{}
 	model  modelVersion
-	schema map[string]interface{}
 
 	cli          api.MessageClient
 	decompressor message.Decompressor

+ 2 - 1
internal/service/executors_msgpack.go

@@ -18,12 +18,13 @@ package service
 
 import (
 	"fmt"
-	"github.com/ugorji/go/codec"
 	"net"
 	"net/rpc"
 	"reflect"
 	"sync"
 
+	"github.com/ugorji/go/codec"
+
 	"github.com/lf-edge/ekuiper/pkg/api"
 )
 

+ 2 - 1
internal/service/executors_msgpack_test.go

@@ -17,11 +17,12 @@
 package service
 
 import (
-	"github.com/msgpack-rpc/msgpack-rpc-go/rpc"
 	"net"
 	"reflect"
 	"testing"
 
+	"github.com/msgpack-rpc/msgpack-rpc-go/rpc"
+
 	"github.com/lf-edge/ekuiper/internal/topo/topotest"
 	"github.com/lf-edge/ekuiper/pkg/api"
 )

+ 4 - 2
internal/topo/context/decoder.go

@@ -1,4 +1,4 @@
-// Copyright 2022 EMQ Technologies Co., Ltd.
+// Copyright 2022-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -20,7 +20,9 @@ import (
 	"github.com/lf-edge/ekuiper/pkg/message"
 )
 
-const DecodeKey = "$$decode"
+const (
+	DecodeKey = "$$decode"
+)
 
 func (c *DefaultContext) Decode(data []byte) (map[string]interface{}, error) {
 	v := c.Value(DecodeKey)

+ 16 - 3
internal/topo/node/source_node.go

@@ -40,9 +40,10 @@ type SourceNode struct {
 	mutex        sync.RWMutex
 	sources      []api.Source
 	preprocessOp UnOperation
+	schema       map[string]*ast.JsonStreamField
 }
 
-func NewSourceNode(name string, st ast.StreamType, op UnOperation, options *ast.Options, sendError bool) *SourceNode {
+func NewSourceNode(name string, st ast.StreamType, op UnOperation, options *ast.Options, sendError bool, schema map[string]*ast.JsonStreamField) *SourceNode {
 	t := options.TYPE
 	if t == "" {
 		if st == ast.TypeStream {
@@ -62,6 +63,7 @@ func NewSourceNode(name string, st ast.StreamType, op UnOperation, options *ast.
 		},
 		preprocessOp: op,
 		options:      options,
+		schema:       schema,
 	}
 }
 
@@ -95,13 +97,17 @@ func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
 				props["isTable"] = true
 			}
 			props["delimiter"] = m.options.DELIMITER
-			converter, err := converter.GetOrCreateConverter(m.options)
+			m.options.Schema = nil
+			if m.schema != nil {
+				m.options.Schema = m.schema
+			}
+			converterTool, err := converter.GetOrCreateConverter(m.options)
 			if err != nil {
 				msg := fmt.Sprintf("cannot get converter from format %s, schemaId %s: %v", m.options.FORMAT, m.options.SCHEMAID, err)
 				logger.Warnf(msg)
 				return fmt.Errorf(msg)
 			}
-			ctx = context.WithValue(ctx.(*context.DefaultContext), context.DecodeKey, converter)
+			ctx = context.WithValue(ctx.(*context.DefaultContext), context.DecodeKey, converterTool)
 			m.reset()
 			logger.Infof("open source node with props %v, concurrency: %d, bufferLength: %d", conf.Printable(m.props), m.concurrency, m.bufferLength)
 			for i := 0; i < m.concurrency; i++ { // workers
@@ -140,6 +146,13 @@ func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
 						for {
 							select {
 							case <-ctx.Done():
+								// We should clear the schema after we close the topo in order to avoid the following problem:
+								// 1. stop the rule
+								// 2. change the schema
+								// 3. restart the rule
+								// As the schema has changed, it will be error if we hold the old schema here
+								// TODO: fetch the latest stream schema after we open the topo
+								m.schema = nil
 								return nil
 							case err := <-si.errorCh:
 								return err

+ 2 - 2
internal/topo/node/source_node_test.go

@@ -42,7 +42,7 @@ func TestGetConf_Apply(t *testing.T) {
 	n := NewSourceNode("test", ast.TypeStream, nil, &ast.Options{
 		DATASOURCE: "/feed",
 		TYPE:       "httppull",
-	}, false)
+	}, false, nil)
 	conf := nodeConf.GetSourceConf(n.sourceType, n.options)
 	if !reflect.DeepEqual(result, conf) {
 		t.Errorf("result mismatch:\n\nexp=%s\n\ngot=%s\n\n", result, conf)
@@ -69,7 +69,7 @@ func TestGetConfAndConvert_Apply(t *testing.T) {
 		DATASOURCE: "/feed",
 		TYPE:       "httppull",
 		CONF_KEY:   "application_conf",
-	}, false)
+	}, false, nil)
 	conf := nodeConf.GetSourceConf(n.sourceType, n.options)
 	if !reflect.DeepEqual(result, conf) {
 		t.Errorf("result mismatch:\n\nexp=%s\n\ngot=%s\n\n", result, conf)

+ 4 - 4
internal/topo/node/source_pool_test.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -29,7 +29,7 @@ func TestSourcePool(t *testing.T) {
 		DATASOURCE: "demo",
 		TYPE:       "mock",
 		SHARED:     true,
-	}, false)
+	}, false, nil)
 	n.concurrency = 2
 	contextLogger := conf.Log.WithField("rule", "mockRule0")
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
@@ -39,7 +39,7 @@ func TestSourcePool(t *testing.T) {
 		DATASOURCE: "demo1",
 		TYPE:       "mock",
 		SHARED:     true,
-	}, false)
+	}, false, nil)
 
 	contextLogger = conf.Log.WithField("rule", "mockRule1")
 	ctx = context.WithValue(context.Background(), context.LoggerKey, contextLogger)
@@ -48,7 +48,7 @@ func TestSourcePool(t *testing.T) {
 	n2 := NewSourceNode("test2", ast.TypeStream, nil, &ast.Options{
 		DATASOURCE: "demo1",
 		TYPE:       "mock",
-	}, false)
+	}, false, nil)
 	contextLogger = conf.Log.WithField("rule", "mockRule2")
 	ctx = context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 	tempStore, _ = state.CreateStore("mockRule2", api.AtMostOnce)

+ 11 - 2
internal/topo/planner/planner.go

@@ -194,7 +194,12 @@ func transformSourceNode(t *DataSourcePlan, sources []*node.SourceNode, options
 		}
 		var srcNode *node.SourceNode
 		if len(sources) == 0 {
-			sourceNode := node.NewSourceNode(string(t.name), t.streamStmt.StreamType, pp, t.streamStmt.Options, options.SendError)
+			var sourceNode *node.SourceNode
+			schema := t.streamFields
+			if t.isSchemaless {
+				schema = nil
+			}
+			sourceNode = node.NewSourceNode(string(t.name), t.streamStmt.StreamType, pp, t.streamStmt.Options, options.SendError, schema)
 			srcNode = sourceNode
 		} else {
 			srcNode = getMockSource(sources, string(t.name))
@@ -213,7 +218,11 @@ func transformSourceNode(t *DataSourcePlan, sources []*node.SourceNode, options
 			srcNode = getMockSource(sources, string(t.name))
 		}
 		if srcNode == nil {
-			srcNode = node.NewSourceNode(string(t.name), t.streamStmt.StreamType, pp, t.streamStmt.Options, options.SendError)
+			schema := t.streamFields
+			if t.isSchemaless {
+				schema = nil
+			}
+			srcNode = node.NewSourceNode(string(t.name), t.streamStmt.StreamType, pp, t.streamStmt.Options, options.SendError, schema)
 		}
 		return srcNode, nil
 	}

+ 1 - 1
internal/topo/planner/planner_graph.go

@@ -497,7 +497,7 @@ func parseSource(nodeName string, gn *api.GraphNode, rule *api.Rule, store kv.Ke
 			if err != nil {
 				return nil, ILLEGAL, "", err
 			}
-			srcNode := node.NewSourceNode(nodeName, ast.TypeStream, pp, sourceOption, rule.Options.SendError)
+			srcNode := node.NewSourceNode(nodeName, ast.TypeStream, pp, sourceOption, rule.Options.SendError, nil)
 			return srcNode, STREAM, nodeName, nil
 		case "table":
 			return nil, ILLEGAL, "", fmt.Errorf("anonymouse table source is not supported, please create it prior to the rule")

+ 26 - 1
internal/topo/planner/planner_test.go

@@ -3106,6 +3106,11 @@ func Test_createLogicalPlan4Lookup(t *testing.T) {
 }
 
 func TestTransformSourceNode(t *testing.T) {
+	schema := map[string]*ast.JsonStreamField{
+		"a": {
+			Type: "bigint",
+		},
+	}
 	testCases := []struct {
 		name string
 		plan *DataSourcePlan
@@ -3129,7 +3134,27 @@ func TestTransformSourceNode(t *testing.T) {
 			},
 			node: node.NewSourceNode("test", ast.TypeStream, nil, &ast.Options{
 				TYPE: "file",
-			}, false),
+			}, false, nil),
+		},
+		{
+			name: "schema source node",
+			plan: &DataSourcePlan{
+				name: "test",
+				streamStmt: &ast.StreamStmt{
+					StreamType: ast.TypeStream,
+					Options: &ast.Options{
+						TYPE: "file",
+					},
+				},
+				streamFields: schema,
+				allMeta:      false,
+				metaFields:   []string{},
+				iet:          false,
+				isBinary:     false,
+			},
+			node: node.NewSourceNode("test", ast.TypeStream, nil, &ast.Options{
+				TYPE: "file",
+			}, false, schema),
 		},
 	}
 	for _, tc := range testCases {

+ 3 - 1
pkg/ast/sourceStmt.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -235,6 +235,8 @@ type Options struct {
 	KIND string `json:"kind,omitempty"`
 	// for delimited format only
 	DELIMITER string `json:"delimiter,omitempty"`
+
+	Schema map[string]*JsonStreamField `json:"-"`
 }
 
 func (o Options) node() {}