server.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. package server
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/plugins"
  6. "github.com/emqx/kuiper/xsql/processors"
  7. "github.com/prometheus/client_golang/prometheus/promhttp"
  8. "net"
  9. "net/http"
  10. "net/rpc"
  11. "os"
  12. "os/signal"
  13. "path"
  14. "syscall"
  15. "time"
  16. )
  17. var (
  18. dataDir string
  19. logger = common.Log
  20. startTimeStamp int64
  21. version = ""
  22. ruleProcessor *processors.RuleProcessor
  23. streamProcessor *processors.StreamProcessor
  24. pluginManager *plugins.Manager
  25. )
  26. func StartUp(Version, LoadFileType string) {
  27. version = Version
  28. common.LoadFileType = LoadFileType
  29. startTimeStamp = time.Now().Unix()
  30. common.InitConf()
  31. dr, err := common.GetDataLoc()
  32. if err != nil {
  33. logger.Panic(err)
  34. } else {
  35. logger.Infof("db location is %s", dr)
  36. dataDir = dr
  37. }
  38. c := make(chan os.Signal)
  39. signal.Notify(c, os.Interrupt, syscall.SIGTERM)
  40. go func() {
  41. <-c
  42. logger.Printf("Kuiper is terminated.\n")
  43. os.Exit(0)
  44. }()
  45. ruleProcessor = processors.NewRuleProcessor(path.Dir(dataDir))
  46. streamProcessor = processors.NewStreamProcessor(path.Join(path.Dir(dataDir), "stream"))
  47. pluginManager, err = plugins.NewPluginManager()
  48. if err != nil {
  49. logger.Panic(err)
  50. }
  51. registry = &RuleRegistry{internal: make(map[string]*RuleState)}
  52. server := new(Server)
  53. //Start rules
  54. if rules, err := ruleProcessor.GetAllRules(); err != nil {
  55. logger.Infof("Start rules error: %s", err)
  56. } else {
  57. logger.Info("Starting rules")
  58. var reply string
  59. for _, rule := range rules {
  60. //err = server.StartRule(rule, &reply)
  61. reply = recoverRule(rule)
  62. if 0 != len(reply) {
  63. logger.Info(reply)
  64. }
  65. }
  66. }
  67. //Start server
  68. err = rpc.Register(server)
  69. if err != nil {
  70. logger.Fatal("Format of service Server isn't correct. ", err)
  71. }
  72. // Register a HTTP handler
  73. rpc.HandleHTTP()
  74. // Listen to TPC connections on port 1234
  75. listener, e := net.Listen("tcp", fmt.Sprintf(":%d", common.Config.Basic.Port))
  76. if e != nil {
  77. m := fmt.Sprintf("Listen error: %s", e)
  78. fmt.Printf(m)
  79. logger.Fatal(m)
  80. }
  81. if common.Config.Basic.Prometheus {
  82. go func() {
  83. port := common.Config.Basic.PrometheusPort
  84. if port <= 0 {
  85. logger.Fatal("Miss configuration prometheusPort")
  86. }
  87. listener, e := net.Listen("tcp", fmt.Sprintf(":%d", port))
  88. if e != nil {
  89. logger.Fatal("Listen prometheus error: ", e)
  90. }
  91. logger.Infof("Serving prometheus metrics on port http://localhost:%d/metrics", port)
  92. http.Handle("/metrics", promhttp.Handler())
  93. http.Serve(listener, nil)
  94. }()
  95. }
  96. //Start rest service
  97. srv := createRestServer(common.Config.Basic.RestPort)
  98. go func() {
  99. var err error
  100. if common.Config.Basic.RestTls == nil {
  101. err = srv.ListenAndServe()
  102. } else {
  103. err = srv.ListenAndServeTLS(common.Config.Basic.RestTls.Certfile, common.Config.Basic.RestTls.Keyfile)
  104. }
  105. if err != nil {
  106. logger.Fatal("Error serving rest service: ", err)
  107. }
  108. }()
  109. t := "http"
  110. if common.Config.Basic.RestTls != nil {
  111. t = "https"
  112. }
  113. msg := fmt.Sprintf("Serving kuiper (version - %s) on port %d, and restful api on %s://0.0.0.0:%d. \n", Version, common.Config.Basic.Port, t, common.Config.Basic.RestPort)
  114. logger.Info(msg)
  115. fmt.Printf(msg)
  116. // Start accept incoming HTTP connections
  117. err = http.Serve(listener, nil)
  118. if err != nil {
  119. logger.Fatal("Error serving: ", err)
  120. }
  121. }