Browse Source

fix: correctly concatenate ipv6 addr (#2034)

Fixes #2029

Signed-off-by: xjasonlyu <xjasonlyu@gmail.com>
Jason Lyu 1 year ago
parent
commit
a2c52d6ef1

+ 2 - 2
cmd/kuiper/main.go

@@ -88,9 +88,9 @@ func main() {
 		}
 	}
 
-	fmt.Printf("Connecting to %s:%d... \n", config.Host, config.Port)
+	fmt.Printf("Connecting to %s... \n", cast.JoinHostPortInt(config.Host, config.Port))
 	// Create a TCP connection to localhost on port 1234
-	client, err := rpc.DialHTTP("tcp", fmt.Sprintf("%s:%d", config.Host, config.Port))
+	client, err := rpc.DialHTTP("tcp", cast.JoinHostPortInt(config.Host, config.Port))
 	if err != nil {
 		fmt.Printf("Failed to connect the server, please start the server.\n")
 		return

+ 1 - 1
extensions/sinks/tdengine/tdengine.go

@@ -191,7 +191,7 @@ func (m *taosSink) Configure(props map[string]interface{}) error {
 	if cfg.STable != "" && len(cfg.TagFields) == 0 {
 		return fmt.Errorf("property tagFields is required when sTable is set")
 	}
-	m.url = fmt.Sprintf(`%s:%s@tcp(%s:%d)/%s`, cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.Database)
+	m.url = fmt.Sprintf(`%s:%s@tcp(%s)/%s`, cfg.User, cfg.Password, cast.JoinHostPortInt(cfg.Host, cfg.Port), cfg.Database)
 	if cfg.DataField == "" {
 		cfg.DataField = cfg.TableDataField
 	}

+ 3 - 2
internal/io/http/httpserver/data_server.go

@@ -29,6 +29,7 @@ import (
 	kctx "github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/internal/topo/state"
 	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/cast"
 )
 
 // manage the global http data server
@@ -117,7 +118,7 @@ func UnregisterEndpoint(endpoint string) {
 func createDataServer() (*http.Server, *mux.Router, error) {
 	r := mux.NewRouter()
 	s := &http.Server{
-		Addr: fmt.Sprintf("%s:%d", conf.Config.Source.HttpServerIp, conf.Config.Source.HttpServerPort),
+		Addr: cast.JoinHostPortInt(conf.Config.Source.HttpServerIp, conf.Config.Source.HttpServerPort),
 		// Good practice to set timeouts to avoid Slowloris attacks.
 		WriteTimeout: time.Second * 60 * 5,
 		ReadTimeout:  time.Second * 60 * 5,
@@ -137,7 +138,7 @@ func createDataServer() (*http.Server, *mux.Router, error) {
 			close(done)
 		}
 	}()
-	sctx.GetLogger().Infof("Serving http data server on port http://%s:%d", conf.Config.Source.HttpServerIp, conf.Config.Source.HttpServerPort)
+	sctx.GetLogger().Infof("Serving http data server on port http://%s", cast.JoinHostPortInt(conf.Config.Source.HttpServerIp, conf.Config.Source.HttpServerPort))
 	return s, r, nil
 }
 

+ 3 - 3
internal/pkg/store/redis/redis.go

@@ -18,18 +18,18 @@
 package redis
 
 import (
-	"fmt"
 	"time"
 
 	"github.com/redis/go-redis/v9"
 
 	"github.com/lf-edge/ekuiper/internal/pkg/store/definition"
+	"github.com/lf-edge/ekuiper/pkg/cast"
 )
 
 func NewRedisFromConf(c definition.Config) *redis.Client {
 	conf := c.Redis
 	return redis.NewClient(&redis.Options{
-		Addr:        fmt.Sprintf("%s:%d", conf.Host, conf.Port),
+		Addr:        cast.JoinHostPortInt(conf.Host, conf.Port),
 		Password:    conf.Password,
 		DialTimeout: time.Duration(conf.Timeout) * time.Millisecond,
 	})
@@ -37,6 +37,6 @@ func NewRedisFromConf(c definition.Config) *redis.Client {
 
 func NewRedis(host string, port int) *redis.Client {
 	return redis.NewClient(&redis.Options{
-		Addr: fmt.Sprintf("%s:%d", host, port),
+		Addr: cast.JoinHostPortInt(host, port),
 	})
 }

+ 2 - 1
internal/server/rest.go

@@ -40,6 +40,7 @@ import (
 	"github.com/lf-edge/ekuiper/internal/server/middleware"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
+	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
 	"github.com/lf-edge/ekuiper/pkg/infra"
 )
@@ -157,7 +158,7 @@ func createRestServer(ip string, port int, needToken bool) *http.Server {
 	}
 
 	server := &http.Server{
-		Addr: fmt.Sprintf("%s:%d", ip, port),
+		Addr: cast.JoinHostPortInt(ip, port),
 		// Good practice to set timeouts to avoid Slowloris attacks.
 		WriteTimeout: time.Second * 60 * 5,
 		ReadTimeout:  time.Second * 60 * 5,

+ 2 - 1
internal/server/rpc.go

@@ -33,6 +33,7 @@ import (
 	"github.com/lf-edge/ekuiper/internal/io/sink"
 	"github.com/lf-edge/ekuiper/internal/pkg/model"
 	"github.com/lf-edge/ekuiper/internal/topo/rule"
+	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/infra"
 )
 
@@ -59,7 +60,7 @@ func (r *rpcComp) serve() {
 		logger.Fatal("Format of service Server isn'restHttpType correct. ", err)
 	}
 	srvRpc := &http.Server{
-		Addr:         fmt.Sprintf("%s:%d", ipRpc, portRpc),
+		Addr:         cast.JoinHostPortInt(ipRpc, portRpc),
 		WriteTimeout: time.Second * 15,
 		ReadTimeout:  time.Second * 15,
 		IdleTimeout:  time.Second * 60,

+ 2 - 1
internal/server/server.go

@@ -39,6 +39,7 @@ import (
 	"github.com/lf-edge/ekuiper/internal/topo/connection/factory"
 	"github.com/lf-edge/ekuiper/internal/topo/rule"
 	"github.com/lf-edge/ekuiper/pkg/ast"
+	"github.com/lf-edge/ekuiper/pkg/cast"
 )
 
 var (
@@ -171,7 +172,7 @@ func StartUp(Version, LoadFileType string) {
 	if conf.Config.Basic.RestTls != nil {
 		restHttpType = "https"
 	}
-	msg := fmt.Sprintf("Serving kuiper (version - %s) on port %d, and restful api on %s://%s:%d.", Version, conf.Config.Basic.Port, restHttpType, conf.Config.Basic.RestIp, conf.Config.Basic.RestPort)
+	msg := fmt.Sprintf("Serving kuiper (version - %s) on port %d, and restful api on %s://%s.", Version, conf.Config.Basic.Port, restHttpType, cast.JoinHostPortInt(conf.Config.Basic.RestIp, conf.Config.Basic.RestPort))
 	logger.Info(msg)
 	fmt.Println(msg)
 

+ 24 - 0
pkg/cast/net.go

@@ -0,0 +1,24 @@
+// Copyright 2023 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package cast
+
+import (
+	"net"
+	"strconv"
+)
+
+func JoinHostPortInt(host string, port int) string {
+	return net.JoinHostPort(host, strconv.Itoa(port))
+}

+ 53 - 0
pkg/cast/net_test.go

@@ -0,0 +1,53 @@
+// Copyright 2023 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package cast
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestJoinHostPortInt(t *testing.T) {
+	tests := []struct {
+		host string
+		port int
+		want string
+	}{
+		{
+			"0.0.0.0",
+			8080,
+			"0.0.0.0:8080",
+		},
+		{
+			"0.0.0.0",
+			0,
+			"0.0.0.0:0",
+		},
+		{
+			"::1",
+			8080,
+			"[::1]:8080",
+		},
+		{
+			"example.com",
+			8080,
+			"example.com:8080",
+		},
+	}
+	for _, tt := range tests {
+		assert.Equal(t, tt.want, JoinHostPortInt(tt.host, tt.port))
+	}
+}

+ 2 - 1
tools/kubernetes/util/util.go

@@ -23,6 +23,7 @@ import (
 	"strings"
 	"time"
 
+	"github.com/lf-edge/ekuiper/pkg/cast"
 	kconf "github.com/lf-edge/ekuiper/tools/kubernetes/conf"
 )
 
@@ -170,7 +171,7 @@ func (s *server) processDir() bool {
 		return false
 	}
 	conf := kconf.GetConf()
-	host := fmt.Sprintf(`http://%s:%d`, conf.GetIp(), conf.GetPort())
+	host := "http://" + cast.JoinHostPortInt(conf.GetIp(), conf.GetPort())
 	for _, entry := range dirEntries {
 		if !strings.HasSuffix(entry.Name(), ".json") {
 			continue

+ 2 - 1
tools/plugin_server/plugin_test_server.go

@@ -31,6 +31,7 @@ import (
 	"github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/internal/topo/state"
 	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/cast"
 )
 
 // Only support to test a single plugin Testing process.
@@ -112,7 +113,7 @@ func createRestServer(ip string, port int) *http.Server {
 	r.HandleFunc("/symbol/start", startSymbolHandler).Methods(http.MethodPost)
 	r.HandleFunc("/symbol/stop", stopSymbolHandler).Methods(http.MethodPost)
 	server := &http.Server{
-		Addr: fmt.Sprintf("%s:%d", ip, port),
+		Addr: cast.JoinHostPortInt(ip, port),
 		// Good practice to set timeouts to avoid Slowloris attacks.
 		WriteTimeout: time.Second * 60 * 5,
 		ReadTimeout:  time.Second * 60 * 5,