Browse Source

feat(main): gracefully shut down restful and rpc services

gzj 4 years atrás
parent
commit
0515506b99
1 changed files with 74 additions and 47 deletions
  1. 74 47
      xstream/server/server/server.go

+ 74 - 47
xstream/server/server/server.go

@@ -1,12 +1,13 @@
 package server
 
 import (
-	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/plugins"
 	"github.com/emqx/kuiper/xsql/processors"
 	"github.com/prometheus/client_golang/prometheus/promhttp"
-	"net"
+
+	"context"
+	"fmt"
 	"net/http"
 	"net/rpc"
 	"os"
@@ -40,14 +41,6 @@ func StartUp(Version, LoadFileType string) {
 		dataDir = dr
 	}
 
-	c := make(chan os.Signal)
-	signal.Notify(c, os.Interrupt, syscall.SIGTERM)
-	go func() {
-		<-c
-		logger.Printf("Kuiper is terminated.\n")
-		os.Exit(0)
-	}()
-
 	ruleProcessor = processors.NewRuleProcessor(path.Dir(dataDir))
 	streamProcessor = processors.NewStreamProcessor(path.Join(path.Dir(dataDir), "stream"))
 	pluginManager, err = plugins.NewPluginManager()
@@ -73,62 +66,96 @@ func StartUp(Version, LoadFileType string) {
 		}
 	}
 
-	//Start server
-	err = rpc.Register(server)
-	if err != nil {
-		logger.Fatal("Format of service Server isn't correct. ", err)
-	}
-	// Register a HTTP handler
-	rpc.HandleHTTP()
-	// Listen to TPC connections on port 1234
-	listener, e := net.Listen("tcp", fmt.Sprintf(":%d", common.Config.Basic.Port))
-	if e != nil {
-		m := fmt.Sprintf("Listen error: %s", e)
-		fmt.Printf(m)
-		logger.Fatal(m)
-	}
-
+	//Start prometheus service
+	var srvPrometheus *http.Server = nil
 	if common.Config.Basic.Prometheus {
+		portPrometheus := common.Config.Basic.PrometheusPort
+		if portPrometheus <= 0 {
+			logger.Fatal("Miss configuration prometheusPort")
+		}
+		mux := http.NewServeMux()
+		mux.Handle("/metrics", promhttp.Handler())
+		srvPrometheus = &http.Server{
+			Addr:         fmt.Sprintf("0.0.0.0:%d", portPrometheus),
+			WriteTimeout: time.Second * 15,
+			ReadTimeout:  time.Second * 15,
+			IdleTimeout:  time.Second * 60,
+			Handler:      mux,
+		}
 		go func() {
-			port := common.Config.Basic.PrometheusPort
-			if port <= 0 {
-				logger.Fatal("Miss configuration prometheusPort")
+			if err := srvPrometheus.ListenAndServe(); err != nil && err != http.ErrServerClosed {
+				logger.Fatal("Listen prometheus error: ", err)
 			}
-			listener, e := net.Listen("tcp", fmt.Sprintf(":%d", port))
-			if e != nil {
-				logger.Fatal("Listen prometheus error: ", e)
-			}
-			logger.Infof("Serving prometheus metrics on port http://localhost:%d/metrics", port)
-			http.Handle("/metrics", promhttp.Handler())
-			http.Serve(listener, nil)
 		}()
+		msg := fmt.Sprintf("Serving prometheus metrics on port http://localhost:%d/metrics", portPrometheus)
+		logger.Infof(msg)
+		fmt.Println(msg)
 	}
 
 	//Start rest service
-	srv := createRestServer(common.Config.Basic.RestPort)
-
+	srvRest := createRestServer(common.Config.Basic.RestPort)
 	go func() {
 		var err error
 		if common.Config.Basic.RestTls == nil {
-			err = srv.ListenAndServe()
+			err = srvRest.ListenAndServe()
 		} else {
-			err = srv.ListenAndServeTLS(common.Config.Basic.RestTls.Certfile, common.Config.Basic.RestTls.Keyfile)
+			err = srvRest.ListenAndServeTLS(common.Config.Basic.RestTls.Certfile, common.Config.Basic.RestTls.Keyfile)
 		}
-		if err != nil {
+		if err != nil && err != http.ErrServerClosed {
 			logger.Fatal("Error serving rest service: ", err)
 		}
 	}()
-	t := "http"
+
+	// Start rpc service
+	portRpc := common.Config.Basic.Port
+	rpcSrv := rpc.NewServer()
+	err = rpcSrv.Register(server)
+	if err != nil {
+		logger.Fatal("Format of service Server isn'restHttpType correct. ", err)
+	}
+	srvRpc := &http.Server{
+		Addr:         fmt.Sprintf("0.0.0.0:%d", portRpc),
+		WriteTimeout: time.Second * 15,
+		ReadTimeout:  time.Second * 15,
+		IdleTimeout:  time.Second * 60,
+		Handler:      rpcSrv,
+	}
+	go func() {
+		if err = srvRpc.ListenAndServe(); err != nil && err != http.ErrServerClosed {
+			logger.Fatal("Error serving rpc service:", err)
+		}
+	}()
+
+	//Startup message
+	restHttpType := "http"
 	if common.Config.Basic.RestTls != nil {
-		t = "https"
+		restHttpType = "https"
 	}
-	msg := fmt.Sprintf("Serving kuiper (version - %s) on port %d, and restful api on %s://0.0.0.0:%d. \n", Version, common.Config.Basic.Port, t, common.Config.Basic.RestPort)
+	msg := fmt.Sprintf("Serving kuiper (version - %s) on port %d, and restful api on %s://0.0.0.0:%d. \n", Version, common.Config.Basic.Port, restHttpType, common.Config.Basic.RestPort)
 	logger.Info(msg)
 	fmt.Printf(msg)
 
-	// Start accept incoming HTTP connections
-	err = http.Serve(listener, nil)
-	if err != nil {
-		logger.Fatal("Error serving: ", err)
+	//Stop the services
+	sigint := make(chan os.Signal, 1)
+	signal.Notify(sigint, os.Interrupt, syscall.SIGTERM)
+	<-sigint
+
+	if err = srvRpc.Shutdown(context.TODO()); err != nil {
+		logger.Errorf("rpc server shutdown error: %v", err)
+	}
+	logger.Info("rpc server shutdown.")
+
+	if err = srvRest.Shutdown(context.TODO()); err != nil {
+		logger.Errorf("rest server shutdown error: %v", err)
 	}
+	logger.Info("rest server successfully shutdown.")
+
+	if srvPrometheus != nil {
+		if err = srvPrometheus.Shutdown(context.TODO()); err != nil {
+			logger.Errorf("prometheus server shutdown error: %v", err)
+		}
+		logger.Info("prometheus server successfully shutdown.")
+	}
+
+	os.Exit(0)
 }