Переглянути джерело

perf: Avoid string copying during JSON parsing (#2010)

* perf: Avoid string copying during JSON parsing

Signed-off-by: t_max <1172915550@qq.com>

* refactor: move hack to cast

Signed-off-by: t_max <1172915550@qq.com>

---------

Signed-off-by: t_max <1172915550@qq.com>
Xuefeng Tan 1 рік тому
батько
коміт
cf03440dc9

+ 2 - 1
cmd/kuiper/main.go

@@ -28,6 +28,7 @@ import (
 
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/pkg/model"
+	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/infra"
 )
 
@@ -1104,7 +1105,7 @@ func main() {
 						rulesArray := c.String("rules")
 						if rulesArray != "" {
 							var rules []string
-							err := json.Unmarshal([]byte(rulesArray), &rules)
+							err := json.Unmarshal(cast.StringToBytes(rulesArray), &rules)
 							if err != nil {
 								fmt.Printf("rules %s unmarshal error %s", rules, err)
 								return nil

+ 1 - 1
internal/binder/function/funcs_misc.go

@@ -85,7 +85,7 @@ func registerMiscFunc() {
 				return fmt.Errorf("fail to convert %v to string", args[0]), false
 			}
 			var data interface{}
-			err = json.Unmarshal([]byte(text), &data)
+			err = json.Unmarshal(cast.StringToBytes(text), &data)
 			if err != nil {
 				return fmt.Errorf("fail to parse json: %v", err), false
 			}

+ 1 - 1
internal/conf/jsonpath_eval.go

@@ -47,7 +47,7 @@ func (e *gvalPathEval) Eval(data interface{}) (interface{}, error) {
 			input = cast.ConvertSlice(data)
 		case reflect.String:
 			v, _ := data.(string)
-			err := json.Unmarshal([]byte(v), &input)
+			err := json.Unmarshal(cast.StringToBytes(v), &input)
 			if err != nil {
 				return nil, fmt.Errorf("data '%v' is not a valid json string", data)
 			}

+ 6 - 6
internal/io/edgex/edgex_source.go

@@ -257,21 +257,21 @@ func (es *EdgexSource) getValue(r dtos.BaseReading, logger api.Logger) (interfac
 		return v, nil
 	case v3.ValueTypeBoolArray:
 		var val []bool
-		if e := json.Unmarshal([]byte(v), &val); e == nil {
+		if e := json.Unmarshal(cast.StringToBytes(v), &val); e == nil {
 			return val, nil
 		} else {
 			return nil, e
 		}
 	case v3.ValueTypeInt8Array, v3.ValueTypeInt16Array, v3.ValueTypeInt32Array, v3.ValueTypeInt64Array, v3.ValueTypeUint8Array, v3.ValueTypeUint16Array, v3.ValueTypeUint32Array:
 		var val []int
-		if e := json.Unmarshal([]byte(v), &val); e == nil {
+		if e := json.Unmarshal(cast.StringToBytes(v), &val); e == nil {
 			return val, nil
 		} else {
 			return nil, e
 		}
 	case v3.ValueTypeUint64Array:
 		var val []uint64
-		if e := json.Unmarshal([]byte(v), &val); e == nil {
+		if e := json.Unmarshal(cast.StringToBytes(v), &val); e == nil {
 			return val, nil
 		} else {
 			return nil, e
@@ -282,7 +282,7 @@ func (es *EdgexSource) getValue(r dtos.BaseReading, logger api.Logger) (interfac
 		return convertFloatArray(v, 64)
 	case v3.ValueTypeStringArray:
 		var val []string
-		if e := json.Unmarshal([]byte(v), &val); e == nil {
+		if e := json.Unmarshal(cast.StringToBytes(v), &val); e == nil {
 			return val, nil
 		} else {
 			return nil, e
@@ -299,7 +299,7 @@ func (es *EdgexSource) getValue(r dtos.BaseReading, logger api.Logger) (interfac
 
 func convertFloatArray(v string, bitSize int) (interface{}, error) {
 	var val1 []string
-	if e := json.Unmarshal([]byte(v), &val1); e == nil {
+	if e := json.Unmarshal(cast.StringToBytes(v), &val1); e == nil {
 		var ret []float64
 		for _, v := range val1 {
 			if fv, err := strconv.ParseFloat(v, bitSize); err != nil {
@@ -311,7 +311,7 @@ func convertFloatArray(v string, bitSize int) (interface{}, error) {
 		return ret, nil
 	} else {
 		var val []float64
-		if e := json.Unmarshal([]byte(v), &val); e == nil {
+		if e := json.Unmarshal(cast.StringToBytes(v), &val); e == nil {
 			return val, nil
 		} else {
 			return nil, e

+ 1 - 1
internal/io/http/client.go

@@ -309,7 +309,7 @@ func (cc *ClientConf) parseHeaders(ctx api.StreamContext, data interface{}) (map
 		if err != nil {
 			return nil, fmt.Errorf("fail to parse the header template %s: %v", cc.config.HeadersTemplate, err)
 		}
-		err = json.Unmarshal([]byte(tstr), &headers)
+		err = json.Unmarshal(cast.StringToBytes(tstr), &headers)
 		if err != nil {
 			return nil, fmt.Errorf("parsed header template is not json: %s", tstr)
 		}

+ 2 - 2
internal/io/redis/lookup.go

@@ -97,7 +97,7 @@ func (s *lookupSource) Lookup(ctx api.StreamContext, _ []string, keys []string,
 			return nil, err
 		}
 		m := make(map[string]interface{})
-		err = json.Unmarshal([]byte(res), &m)
+		err = json.Unmarshal(cast.StringToBytes(res), &m)
 		if err != nil {
 			return nil, err
 		}
@@ -113,7 +113,7 @@ func (s *lookupSource) Lookup(ctx api.StreamContext, _ []string, keys []string,
 		ret := make([]api.SourceTuple, 0, len(res))
 		for _, r := range res {
 			m := make(map[string]interface{})
-			err = json.Unmarshal([]byte(r), &m)
+			err = json.Unmarshal(cast.StringToBytes(r), &m)
 			if err != nil {
 				return nil, err
 			}

+ 7 - 6
internal/meta/yamlConfigMeta.go

@@ -22,6 +22,7 @@ import (
 
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/pkg/store"
+	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/kv"
 )
 
@@ -547,7 +548,7 @@ func LoadConfigurations(configSets YamlConfigurationSet) YamlConfigurationSet {
 
 	for key, val := range srcResources {
 		configs := YamlConfigurations{}
-		err := json.Unmarshal([]byte(val), &configs)
+		err := json.Unmarshal(cast.StringToBytes(val), &configs)
 		if err != nil {
 			_ = ConfigManager.sourceConfigStatusDb.Set(key, err.Error())
 			configResponse.Sources[key] = err.Error()
@@ -563,7 +564,7 @@ func LoadConfigurations(configSets YamlConfigurationSet) YamlConfigurationSet {
 
 	for key, val := range sinkResources {
 		configs := YamlConfigurations{}
-		err := json.Unmarshal([]byte(val), &configs)
+		err := json.Unmarshal(cast.StringToBytes(val), &configs)
 		if err != nil {
 			_ = ConfigManager.sinkConfigStatusDb.Set(key, err.Error())
 			configResponse.Sinks[key] = err.Error()
@@ -579,7 +580,7 @@ func LoadConfigurations(configSets YamlConfigurationSet) YamlConfigurationSet {
 
 	for key, val := range connectionResources {
 		configs := YamlConfigurations{}
-		err := json.Unmarshal([]byte(val), &configs)
+		err := json.Unmarshal(cast.StringToBytes(val), &configs)
 		if err != nil {
 			_ = ConfigManager.connectionConfigStatusDb.Set(key, err.Error())
 			configResponse.Connections[key] = err.Error()
@@ -608,7 +609,7 @@ func LoadConfigurationsPartial(configSets YamlConfigurationSet) YamlConfiguratio
 
 	for key, val := range srcResources {
 		configs := YamlConfigurations{}
-		err := json.Unmarshal([]byte(val), &configs)
+		err := json.Unmarshal(cast.StringToBytes(val), &configs)
 		if err != nil {
 			configResponse.Sources[key] = err.Error()
 			continue
@@ -622,7 +623,7 @@ func LoadConfigurationsPartial(configSets YamlConfigurationSet) YamlConfiguratio
 
 	for key, val := range sinkResources {
 		configs := YamlConfigurations{}
-		err := json.Unmarshal([]byte(val), &configs)
+		err := json.Unmarshal(cast.StringToBytes(val), &configs)
 		if err != nil {
 			configResponse.Sinks[key] = err.Error()
 			continue
@@ -636,7 +637,7 @@ func LoadConfigurationsPartial(configSets YamlConfigurationSet) YamlConfiguratio
 
 	for key, val := range connectionResources {
 		configs := YamlConfigurations{}
-		err := json.Unmarshal([]byte(val), &configs)
+		err := json.Unmarshal(cast.StringToBytes(val), &configs)
 		if err != nil {
 			configResponse.Connections[key] = err.Error()
 			continue

+ 2 - 1
internal/plugin/native/manager.go

@@ -39,6 +39,7 @@ import (
 	"github.com/lf-edge/ekuiper/internal/pkg/store"
 	plugin2 "github.com/lf-edge/ekuiper/internal/plugin"
 	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
 	"github.com/lf-edge/ekuiper/pkg/kv"
 )
@@ -882,7 +883,7 @@ func (rr *Manager) clearInstallFlag() {
 func (rr *Manager) pluginRegisterForImport(key, script string) error {
 	plgType := plugin2.PluginTypeMap[strings.Split(key, "_")[0]]
 	sd := plugin2.NewPluginByType(plgType)
-	err := json.Unmarshal([]byte(script), &sd)
+	err := json.Unmarshal(cast.StringToBytes(script), &sd)
 	if err != nil {
 		return err
 	}

+ 2 - 1
internal/plugin/portable/manager.go

@@ -34,6 +34,7 @@ import (
 	"github.com/lf-edge/ekuiper/internal/pkg/store"
 	"github.com/lf-edge/ekuiper/internal/plugin"
 	"github.com/lf-edge/ekuiper/internal/plugin/portable/runtime"
+	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/kv"
 )
 
@@ -430,7 +431,7 @@ func (m *Manager) GetAllPluginsStatus() map[string]string {
 
 func (m *Manager) pluginRegisterForImport(k, v string) error {
 	sd := plugin.NewPluginByType(plugin.PORTABLE)
-	err := json.Unmarshal([]byte(v), &sd)
+	err := json.Unmarshal(cast.StringToBytes(v), &sd)
 	if err != nil {
 		return err
 	}

+ 2 - 1
internal/processor/rule.go

@@ -23,6 +23,7 @@ import (
 	"github.com/lf-edge/ekuiper/internal/pkg/store"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
 	"github.com/lf-edge/ekuiper/pkg/kv"
 )
@@ -162,7 +163,7 @@ func (p *RuleProcessor) GetRuleByJsonValidated(ruleJson string) (*api.Rule, erro
 		Triggered: true,
 		Options:   clone(opt),
 	}
-	if err := json.Unmarshal([]byte(ruleJson), &rule); err != nil {
+	if err := json.Unmarshal(cast.StringToBytes(ruleJson), &rule); err != nil {
 		return nil, fmt.Errorf("Parse rule %s error : %s.", ruleJson, err)
 	}
 	if rule.Options == nil {

+ 5 - 4
internal/processor/stream.go

@@ -29,6 +29,7 @@ import (
 	"github.com/lf-edge/ekuiper/internal/topo/lookup"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/pkg/ast"
+	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
 	"github.com/lf-edge/ekuiper/pkg/kv"
 )
@@ -126,7 +127,7 @@ func (p *StreamProcessor) RecoverLookupTable() error {
 	)
 	for _, k := range keys {
 		if ok, _ := p.db.Get(k, &v); ok {
-			if err := json.Unmarshal([]byte(v), vs); err == nil && vs.StreamType == ast.TypeTable {
+			if err := json.Unmarshal(cast.StringToBytes(v), vs); err == nil && vs.StreamType == ast.TypeTable {
 				parser := xsql.NewParser(strings.NewReader(vs.Statement))
 				stmt, e := xsql.Language.Parse(parser)
 				if e != nil {
@@ -232,7 +233,7 @@ func (p *StreamProcessor) ShowStream(st ast.StreamType) ([]string, error) {
 	)
 	for _, k := range keys {
 		if ok, _ := p.db.Get(k, &v); ok {
-			if err := json.Unmarshal([]byte(v), vs); err == nil && vs.StreamType == st {
+			if err := json.Unmarshal(cast.StringToBytes(v), vs); err == nil && vs.StreamType == st {
 				result = append(result, k)
 			}
 		}
@@ -255,7 +256,7 @@ func (p *StreamProcessor) ShowTable(kind string) ([]string, error) {
 	)
 	for _, k := range keys {
 		if ok, _ := p.db.Get(k, &v); ok {
-			if err := json.Unmarshal([]byte(v), vs); err == nil && vs.StreamType == ast.TypeTable {
+			if err := json.Unmarshal(cast.StringToBytes(v), vs); err == nil && vs.StreamType == ast.TypeTable {
 				if kind == "scan" && (vs.StreamKind == ast.StreamKindScan || vs.StreamKind == "") {
 					result = append(result, k)
 				} else if kind == "lookup" && vs.StreamKind == ast.StreamKindLookup {
@@ -471,7 +472,7 @@ func (p *StreamProcessor) GetAll() (result map[string]map[string]string, e error
 		"tables":  make(map[string]string),
 	}
 	for k, v := range defs {
-		if err := json.Unmarshal([]byte(v), vs); err == nil {
+		if err := json.Unmarshal(cast.StringToBytes(v), vs); err == nil {
 			switch vs.StreamType {
 			case ast.TypeStream:
 				result["streams"][k] = vs.Statement

+ 2 - 1
internal/schema/ext_inferer_custom.go

@@ -24,6 +24,7 @@ import (
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/pkg/def"
 	"github.com/lf-edge/ekuiper/pkg/ast"
+	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/message"
 )
 
@@ -59,7 +60,7 @@ func InferCustom(schemaFile string, messageName string) (ast.StreamFields, error
 	if ok {
 		sj := mc.GetSchemaJson()
 		var result ast.StreamFields
-		err := json.Unmarshal([]byte(sj), &result)
+		err := json.Unmarshal(cast.StringToBytes(sj), &result)
 		if err != nil {
 			return nil, fmt.Errorf("invalid schema json %s: %v", sj, err)
 		}

+ 3 - 2
internal/schema/registry.go

@@ -26,6 +26,7 @@ import (
 	"github.com/lf-edge/ekuiper/internal/pkg/def"
 	"github.com/lf-edge/ekuiper/internal/pkg/httpx"
 	"github.com/lf-edge/ekuiper/internal/pkg/store"
+	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/kv"
 )
 
@@ -271,7 +272,7 @@ func UninstallAllSchema() {
 	}
 	for key, value := range schemaMaps {
 		info := &Info{}
-		_ = json.Unmarshal([]byte(value), info)
+		_ = json.Unmarshal(cast.StringToBytes(value), info)
 		_ = DeleteSchema(info.Type, key)
 	}
 }
@@ -320,7 +321,7 @@ func SchemaPartialImport(schemas map[string]string) map[string]string {
 
 func schemaRegisterForImport(k, v string) error {
 	info := &Info{}
-	err := json.Unmarshal([]byte(v), info)
+	err := json.Unmarshal(cast.StringToBytes(v), info)
 	if err != nil {
 		return err
 	}

+ 2 - 1
internal/server/rpc_plugin.go

@@ -22,6 +22,7 @@ import (
 
 	"github.com/lf-edge/ekuiper/internal/pkg/model"
 	"github.com/lf-edge/ekuiper/internal/plugin"
+	"github.com/lf-edge/ekuiper/pkg/cast"
 )
 
 func (t *Server) CreatePlugin(arg *model.PluginDesc, reply *string) error {
@@ -102,7 +103,7 @@ func (t *Server) ShowPlugins(arg int, reply *string) error {
 func getPluginByJson(arg *model.PluginDesc, pt plugin.PluginType) (plugin.Plugin, error) {
 	p := plugin.NewPluginByType(pt)
 	if arg.Json != "" {
-		if err := json.Unmarshal([]byte(arg.Json), p); err != nil {
+		if err := json.Unmarshal(cast.StringToBytes(arg.Json), p); err != nil {
 			return nil, fmt.Errorf("Parse plugin %s error : %s.", arg.Json, err)
 		}
 	}

+ 2 - 1
internal/server/rpc_schema.go

@@ -24,12 +24,13 @@ import (
 	"github.com/lf-edge/ekuiper/internal/pkg/def"
 	"github.com/lf-edge/ekuiper/internal/pkg/model"
 	"github.com/lf-edge/ekuiper/internal/schema"
+	"github.com/lf-edge/ekuiper/pkg/cast"
 )
 
 func (t *Server) CreateSchema(arg *model.RPCTypedArgDesc, reply *string) error {
 	sd := &schema.Info{Type: def.SchemaType(arg.Type)}
 	if arg.Json != "" {
-		if err := json.Unmarshal([]byte(arg.Json), sd); err != nil {
+		if err := json.Unmarshal(cast.StringToBytes(arg.Json), sd); err != nil {
 			return fmt.Errorf("Parse service %s error : %s.", arg.Json, err)
 		}
 	}

+ 2 - 1
internal/server/rpc_service.go

@@ -23,12 +23,13 @@ import (
 
 	"github.com/lf-edge/ekuiper/internal/pkg/model"
 	"github.com/lf-edge/ekuiper/internal/service"
+	"github.com/lf-edge/ekuiper/pkg/cast"
 )
 
 func (t *Server) CreateService(arg *model.RPCArgDesc, reply *string) error {
 	sd := &service.ServiceCreationRequest{}
 	if arg.Json != "" {
-		if err := json.Unmarshal([]byte(arg.Json), sd); err != nil {
+		if err := json.Unmarshal(cast.StringToBytes(arg.Json), sd); err != nil {
 			return fmt.Errorf("Parse service %s error : %s.", arg.Json, err)
 		}
 	}

+ 2 - 1
internal/service/manager.go

@@ -30,6 +30,7 @@ import (
 	"github.com/lf-edge/ekuiper/internal/pkg/store"
 	"github.com/lf-edge/ekuiper/internal/plugin"
 	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/kv"
 )
 
@@ -478,7 +479,7 @@ func (m *Manager) UninstallAllServices() {
 
 func (m *Manager) servicesRegisterForImport(_, v string) error {
 	req := &ServiceCreationRequest{}
-	err := json.Unmarshal([]byte(v), req)
+	err := json.Unmarshal(cast.StringToBytes(v), req)
 	if err != nil {
 		return err
 	}

+ 1 - 1
internal/topo/operator/field_processor.go

@@ -115,7 +115,7 @@ func (p *defaultFieldProcessor) validateAndConvertField(sf *ast.JsonStreamField,
 				return nil, fmt.Errorf("expect map but found %[1]T(%[1]v)", t)
 			}
 		} else if jtype == reflect.String {
-			err := json.Unmarshal([]byte(t.(string)), &nextJ)
+			err := json.Unmarshal(cast.StringToBytes(t.(string)), &nextJ)
 			if err != nil {
 				return nil, fmt.Errorf("invalid data type for %s, expect map but found %[1]T(%[1]v)", t)
 			}

+ 2 - 1
internal/xsql/stmtx.go

@@ -20,6 +20,7 @@ import (
 	"strings"
 
 	"github.com/lf-edge/ekuiper/pkg/ast"
+	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
 	"github.com/lf-edge/ekuiper/pkg/kv"
 )
@@ -66,7 +67,7 @@ func GetDataSourceStatement(m kv.KeyValue, name string) (*StreamInfo, error) {
 		vs = &StreamInfo{}
 	)
 	if ok, _ := m.Get(name, &v); ok {
-		if err := json.Unmarshal([]byte(v), vs); err != nil {
+		if err := json.Unmarshal(cast.StringToBytes(v), vs); err != nil {
 			return nil, fmt.Errorf("error unmarshall %s, the data in db may be corrupted", name)
 		} else {
 			return vs, nil

+ 7 - 0
pkg/cast/hack.go

@@ -0,0 +1,7 @@
+package cast
+
+import "unsafe"
+
+func StringToBytes(str string) []byte {
+	return unsafe.Slice(unsafe.StringData(str), len(str))
+}

+ 63 - 0
pkg/cast/hack_test.go

@@ -0,0 +1,63 @@
+package cast
+
+import (
+	"reflect"
+	"strings"
+	"testing"
+)
+
+func TestString2bytes(t *testing.T) {
+	type args struct {
+		str string
+	}
+	tests := []struct {
+		name string
+		args args
+		want []byte
+	}{
+		{
+			name: "common",
+			args: args{
+				str: "abc",
+			},
+			want: []byte{'a', 'b', 'c'},
+		},
+		{
+			name: "nil",
+			args: args{
+				str: "",
+			},
+			want: nil,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if got := StringToBytes(tt.args.str); !reflect.DeepEqual(got, tt.want) {
+				t.Errorf("StringToBytes() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+var (
+	l   = 1024 * 1024
+	str = strings.Repeat("a", l)
+)
+
+func BenchmarkStringToBytes(b *testing.B) {
+	for i := 0; i < b.N; i++ {
+		bt := []byte(str)
+		if len(bt) != l {
+			b.Fatal()
+		}
+	}
+}
+
+func BenchmarkStringToBytesUnsafe(b *testing.B) {
+	for i := 0; i < b.N; i++ {
+		bt := StringToBytes(str)
+		if len(bt) != l {
+			b.Fatal()
+		}
+	}
+}