Browse Source

perf: reduce data duplication (#2050)

Signed-off-by: t_max <1172915550@qq.com>
Xuefeng Tan 1 year ago
parent
commit
524604094b

+ 4 - 3
internal/converter/delimited/converter.go

@@ -15,6 +15,7 @@
 package delimited
 
 import (
+	"bytes"
 	"fmt"
 	"sort"
 	"strconv"
@@ -43,7 +44,7 @@ func (c *Converter) SetColumns(cols []string) {
 func (c *Converter) Encode(d interface{}) ([]byte, error) {
 	switch m := d.(type) {
 	case map[string]interface{}:
-		var sb strings.Builder
+		sb := &bytes.Buffer{}
 		if len(c.cols) == 0 {
 			keys := make([]string, 0, len(m))
 			for k := range m {
@@ -57,9 +58,9 @@ func (c *Converter) Encode(d interface{}) ([]byte, error) {
 			if i > 0 {
 				sb.WriteString(c.delimiter)
 			}
-			sb.WriteString(fmt.Sprintf("%v", m[v]))
+			fmt.Fprintf(sb, "%v", m[v])
 		}
-		return []byte(sb.String()), nil
+		return sb.Bytes(), nil
 	default:
 		return nil, fmt.Errorf("unsupported type %v, must be a map", d)
 	}

+ 4 - 4
internal/io/file/file_source.go

@@ -398,15 +398,15 @@ func (fs *FileSource) prepareFile(ctx api.StreamContext, file string) (io.Reader
 
 			ln := 0
 			// This is a queue to store the lines that should be ignored
-			tempLines := make([]string, 0, fs.config.IgnoreEndLines)
+			tempLines := make([][]byte, 0, fs.config.IgnoreEndLines)
 			for scanner.Scan() {
 				if ln >= fs.config.IgnoreStartLines {
 					if fs.config.IgnoreEndLines > 0 { // the last n line are left in the tempLines
 						slot := (ln - fs.config.IgnoreStartLines) % fs.config.IgnoreEndLines
 						if len(tempLines) <= slot { // first round
-							tempLines = append(tempLines, scanner.Text())
+							tempLines = append(tempLines, scanner.Bytes())
 						} else {
-							_, err := w.Write([]byte(tempLines[slot]))
+							_, err := w.Write(tempLines[slot])
 							if err != nil {
 								ctx.GetLogger().Error(err)
 								break
@@ -416,7 +416,7 @@ func (fs *FileSource) prepareFile(ctx api.StreamContext, file string) (io.Reader
 								ctx.GetLogger().Error(err)
 								break
 							}
-							tempLines[slot] = scanner.Text()
+							tempLines[slot] = scanner.Bytes()
 						}
 					} else {
 						_, err = w.Write(scanner.Bytes())

+ 2 - 2
internal/io/http/client.go

@@ -235,7 +235,7 @@ func (cc *ClientConf) InitConf(device string, props map[string]interface{}) erro
 
 // initialize the oAuth access token
 func (cc *ClientConf) auth(ctx api.StreamContext) error {
-	if resp, e := httpx.Send(conf.Log, cc.client, "json", http.MethodPost, cc.accessConf.Url, nil, true, []byte(cc.accessConf.Body)); e == nil {
+	if resp, e := httpx.Send(conf.Log, cc.client, "json", http.MethodPost, cc.accessConf.Url, nil, true, cc.accessConf.Body); e == nil {
 		conf.Log.Infof("try to get access token got response %v", resp)
 		tokens, _, e := cc.parseResponse(ctx, resp, true, nil)
 		if e != nil {
@@ -273,7 +273,7 @@ func (cc *ClientConf) refresh(ctx api.StreamContext) error {
 				return fmt.Errorf("fail to parse the header for refresh token request %s: %v", k, err)
 			}
 		}
-		rr, ee := httpx.Send(conf.Log, cc.client, "json", http.MethodPost, cc.refreshConf.Url, headers, true, []byte(cc.accessConf.Body))
+		rr, ee := httpx.Send(conf.Log, cc.client, "json", http.MethodPost, cc.refreshConf.Url, headers, true, cc.accessConf.Body)
 		if ee != nil {
 			return fmt.Errorf("fail to get refresh token: %v", ee)
 		}

+ 1 - 1
internal/io/http/httppull_source.go

@@ -76,7 +76,7 @@ func (hps *PullSource) initTimerPull(ctx api.StreamContext, consumer chan<- api.
 				continue
 			}
 			ctx.GetLogger().Debugf("rest sink sending request url: %s, headers: %v, body %s", hps.config.Url, headers, hps.config.Body)
-			if resp, e := httpx.Send(logger, hps.client, hps.config.BodyType, hps.config.Method, hps.config.Url, headers, true, []byte(hps.config.Body)); e != nil {
+			if resp, e := httpx.Send(logger, hps.client, hps.config.BodyType, hps.config.Method, hps.config.Url, headers, true, hps.config.Body); e != nil {
 				logger.Warnf("Found error %s when trying to reach %v ", e, hps)
 			} else {
 				logger.Debugf("rest sink got response %v", resp)

+ 2 - 2
internal/pkg/httpx/http.go

@@ -43,12 +43,12 @@ func Send(logger api.Logger, client *http.Client, bodyType string, method string
 			return nil, fmt.Errorf("fail to create request: %v", err)
 		}
 	case "json", "text", "javascript", "html", "xml":
-		body := &(bytes.Buffer{})
+		var body io.Reader
 		switch t := v.(type) {
 		case []byte:
 			body = bytes.NewBuffer(t)
 		case string:
-			body = bytes.NewBufferString(t)
+			body = strings.NewReader(t)
 		default:
 			vj, err := json.Marshal(v)
 			if err != nil {

+ 1 - 2
internal/pkg/store/redis/redisKv.go

@@ -17,7 +17,6 @@
 package redis
 
 import (
-	"bytes"
 	"context"
 	"encoding/gob"
 	"fmt"
@@ -74,7 +73,7 @@ func (kv redisKvStore) Get(key string, value interface{}) (bool, error) {
 	if err != nil {
 		return false, nil
 	}
-	dec := gob.NewDecoder(bytes.NewBuffer([]byte(val)))
+	dec := gob.NewDecoder(strings.NewReader(val))
 	if err := dec.Decode(value); err != nil {
 		return false, err
 	}

+ 3 - 3
internal/pkg/store/redis/redisTs.go

@@ -18,11 +18,11 @@
 package redis
 
 import (
-	"bytes"
 	"context"
 	"encoding/gob"
 	"fmt"
 	"strconv"
+	"strings"
 
 	"github.com/redis/go-redis/v9"
 
@@ -88,7 +88,7 @@ func (t *ts) Get(key int64, value interface{}) (bool, error) {
 	if len(reply) == 0 {
 		return false, fmt.Errorf("record under %s key and %d score not found", t.key, key)
 	}
-	dec := gob.NewDecoder(bytes.NewBuffer([]byte(reply[0])))
+	dec := gob.NewDecoder(strings.NewReader(reply[0]))
 	err := dec.Decode(value)
 	if err != nil {
 		return false, err
@@ -122,7 +122,7 @@ func getLast(db *redis.Client, key string, value interface{}) (int64, error) {
 	if len(reply) > 0 {
 		if value != nil {
 			v := reply[0].Member.(string)
-			dec := gob.NewDecoder(bytes.NewBuffer([]byte(v)))
+			dec := gob.NewDecoder(strings.NewReader(v))
 			if err := dec.Decode(value); err != nil {
 				return 0, err
 			}

+ 1 - 1
internal/processor/rule.go

@@ -236,7 +236,7 @@ func (p *RuleProcessor) ExecDesc(name string) (string, error) {
 		return "", fmt.Errorf("Rule %s is not found.", name)
 	}
 	dst := &bytes.Buffer{}
-	if err := json.Indent(dst, []byte(s1), "", "  "); err != nil {
+	if err := json.Indent(dst, cast.StringToBytes(s1), "", "  "); err != nil {
 		return "", err
 	}
 

+ 1 - 1
internal/schema/registry.go

@@ -154,7 +154,7 @@ func CreateOrUpdateSchema(info *Info) error {
 			defer file.Close()
 		}
 		if info.Content != "" {
-			err := os.WriteFile(schemaFile, []byte(info.Content), 0o666)
+			err := os.WriteFile(schemaFile, cast.StringToBytes(info.Content), 0o666)
 			if err != nil {
 				return err
 			}

+ 1 - 1
internal/server/rpc.go

@@ -170,7 +170,7 @@ func (t *Server) GetTopoRule(name string, reply *string) error {
 		return err
 	} else {
 		dst := &bytes.Buffer{}
-		if err = json.Indent(dst, []byte(r), "", "  "); err != nil {
+		if err = json.Indent(dst, cast.StringToBytes(r), "", "  "); err != nil {
 			*reply = r
 		} else {
 			*reply = dst.String()

+ 2 - 1
internal/server/rule_manager.go

@@ -25,6 +25,7 @@ import (
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/topo/rule"
 	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
 	"github.com/lf-edge/ekuiper/pkg/infra"
 )
@@ -240,7 +241,7 @@ func getRuleStatus(name string) (string, error) {
 			}
 			metrics = metrics[:len(metrics)-1] + "}"
 			dst := &bytes.Buffer{}
-			if err = json.Indent(dst, []byte(metrics), "", "  "); err != nil {
+			if err = json.Indent(dst, cast.StringToBytes(metrics), "", "  "); err != nil {
 				result = metrics
 			} else {
 				result = dst.String()

+ 2 - 1
internal/topo/transform/func.go

@@ -27,6 +27,7 @@ import (
 	"github.com/Masterminds/sprig/v3"
 
 	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/pkg/cast"
 )
 
 func RegisterAdditionalFuncs() {
@@ -57,7 +58,7 @@ func Base64Encode(para interface{}) (string, error) {
 		fv := strconv.FormatFloat(v.Float(), 'f', -1, 64)
 		return base64.StdEncoding.EncodeToString([]byte(fv)), nil
 	case reflect.String:
-		return base64.StdEncoding.EncodeToString([]byte(v.String())), nil
+		return base64.StdEncoding.EncodeToString(cast.StringToBytes(v.String())), nil
 	case reflect.Map:
 		if a, err := json.Marshal(para); err != nil {
 			return "", err

+ 2 - 1
test/benchmark/multiple_rules/ruleCreator.go

@@ -19,6 +19,7 @@ import (
 	"encoding/json"
 	"fmt"
 	"net/http"
+	"strings"
 	"time"
 )
 
@@ -45,7 +46,7 @@ func create() {
 
 func createStream() {
 	s := `{"sql":"CREATE STREAM rawdata() WITH (DATASOURCE=\"rawdata\", SHARED=\"TRUE\");"}`
-	resp, err := http.Post(url+"streams", "application/json", bytes.NewReader([]byte(s)))
+	resp, err := http.Post(url+"streams", "application/json", strings.NewReader(s))
 	if err != nil {
 		fmt.Println(err)
 	}

+ 3 - 3
tools/kubernetes/conf/conf.go

@@ -15,7 +15,6 @@
 package conf
 
 import (
-	"bytes"
 	"encoding/json"
 	"fmt"
 	"io"
@@ -24,6 +23,7 @@ import (
 	"path"
 	"path/filepath"
 	"runtime"
+	"strings"
 	"time"
 
 	"github.com/sirupsen/logrus"
@@ -196,7 +196,7 @@ func Get(inUrl string) (data []byte, err error) {
 }
 
 func Post(inHead, inBody string) (data []byte, err error) {
-	request, err := http.NewRequest(http.MethodPost, inHead, bytes.NewBuffer([]byte(inBody)))
+	request, err := http.NewRequest(http.MethodPost, inHead, strings.NewReader(inBody))
 	if nil != err {
 		return nil, err
 	}
@@ -205,7 +205,7 @@ func Post(inHead, inBody string) (data []byte, err error) {
 }
 
 func Put(inHead, inBody string) (data []byte, err error) {
-	request, err := http.NewRequest(http.MethodPut, inHead, bytes.NewBuffer([]byte(inBody)))
+	request, err := http.NewRequest(http.MethodPut, inHead, strings.NewReader(inBody))
 	if nil != err {
 		return nil, err
 	}