123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- package server
- import (
- "fmt"
- "github.com/emqx/kuiper/common"
- "github.com/emqx/kuiper/plugins"
- "github.com/emqx/kuiper/xsql/processors"
- "github.com/prometheus/client_golang/prometheus/promhttp"
- "net"
- "net/http"
- "net/rpc"
- "path"
- )
- var (
- dataDir string
- logger = common.Log
- ruleProcessor *processors.RuleProcessor
- streamProcessor *processors.StreamProcessor
- pluginManager *plugins.Manager
- )
- func StartUp(Version string) {
- common.InitConf()
- dr, err := common.GetDataLoc()
- if err != nil {
- logger.Panic(err)
- } else {
- logger.Infof("db location is %s", dr)
- dataDir = dr
- }
- ruleProcessor = processors.NewRuleProcessor(path.Dir(dataDir))
- streamProcessor = processors.NewStreamProcessor(path.Join(path.Dir(dataDir), "stream"))
- pluginManager, err = plugins.NewPluginManager()
- if err != nil {
- logger.Panic(err)
- }
- registry = &RuleRegistry{internal: make(map[string]*RuleState)}
- server := new(Server)
- //Start rules
- if rules, err := ruleProcessor.GetAllRules(); err != nil {
- logger.Infof("Start rules error: %s", err)
- } else {
- logger.Info("Starting rules")
- var reply string
- for _, rule := range rules {
- err = server.StartRule(rule, &reply)
- if err != nil {
- logger.Info(err)
- } else {
- logger.Info(reply)
- }
- }
- }
- //Start server
- err = rpc.Register(server)
- if err != nil {
- logger.Fatal("Format of service Server isn't correct. ", err)
- }
- // Register a HTTP handler
- rpc.HandleHTTP()
- // Listen to TPC connections on port 1234
- listener, e := net.Listen("tcp", fmt.Sprintf(":%d", common.Config.Port))
- if e != nil {
- m := fmt.Sprintf("Listen error: %s", e)
- fmt.Printf(m)
- logger.Fatal(m)
- }
- if common.Config.Prometheus {
- go func() {
- port := common.Config.PrometheusPort
- if port <= 0 {
- logger.Fatal("Miss configuration prometheusPort")
- }
- listener, e := net.Listen("tcp", fmt.Sprintf(":%d", port))
- if e != nil {
- logger.Fatal("Listen prometheus error: ", e)
- }
- logger.Infof("Serving prometheus metrics on port http://localhost:%d/metrics", port)
- http.Handle("/metrics", promhttp.Handler())
- http.Serve(listener, nil)
- }()
- }
- //Start rest service
- srv := createRestServer(common.Config.RestPort)
- go func() {
- var err error
- if common.Config.RestTls == nil {
- err = srv.ListenAndServe()
- } else {
- err = srv.ListenAndServeTLS(common.Config.RestTls.Certfile, common.Config.RestTls.Keyfile)
- }
- if err != nil {
- logger.Fatal("Error serving rest service: ", err)
- }
- }()
- t := "http"
- if common.Config.RestTls != nil {
- t = "https"
- }
- msg := fmt.Sprintf("Serving kuiper (version - %s) on port %d, and restful api on %s://0.0.0.0:%d. \n", Version, common.Config.Port, t, common.Config.RestPort)
- logger.Info(msg)
- fmt.Printf(msg)
- // Start accept incoming HTTP connections
- err = http.Serve(listener, nil)
- if err != nil {
- logger.Fatal("Error serving: ", err)
- }
- }
|