server.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. package server
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/plugins"
  6. "github.com/emqx/kuiper/xsql/processors"
  7. "github.com/prometheus/client_golang/prometheus/promhttp"
  8. "net"
  9. "net/http"
  10. "net/rpc"
  11. "path"
  12. )
  13. var (
  14. dataDir string
  15. logger = common.Log
  16. ruleProcessor *processors.RuleProcessor
  17. streamProcessor *processors.StreamProcessor
  18. pluginManager *plugins.Manager
  19. )
  20. func StartUp(Version string) {
  21. common.InitConf()
  22. dr, err := common.GetDataLoc()
  23. if err != nil {
  24. logger.Panic(err)
  25. } else {
  26. logger.Infof("db location is %s", dr)
  27. dataDir = dr
  28. }
  29. ruleProcessor = processors.NewRuleProcessor(path.Dir(dataDir))
  30. streamProcessor = processors.NewStreamProcessor(path.Join(path.Dir(dataDir), "stream"))
  31. pluginManager, err = plugins.NewPluginManager()
  32. if err != nil {
  33. logger.Panic(err)
  34. }
  35. registry = &RuleRegistry{internal: make(map[string]*RuleState)}
  36. server := new(Server)
  37. //Start rules
  38. if rules, err := ruleProcessor.GetAllRules(); err != nil {
  39. logger.Infof("Start rules error: %s", err)
  40. } else {
  41. logger.Info("Starting rules")
  42. var reply string
  43. for _, rule := range rules {
  44. err = server.StartRule(rule, &reply)
  45. if err != nil {
  46. logger.Info(err)
  47. } else {
  48. logger.Info(reply)
  49. }
  50. }
  51. }
  52. //Start server
  53. err = rpc.Register(server)
  54. if err != nil {
  55. logger.Fatal("Format of service Server isn't correct. ", err)
  56. }
  57. // Register a HTTP handler
  58. rpc.HandleHTTP()
  59. // Listen to TPC connections on port 1234
  60. listener, e := net.Listen("tcp", fmt.Sprintf(":%d", common.Config.Port))
  61. if e != nil {
  62. m := fmt.Sprintf("Listen error: %s", e)
  63. fmt.Printf(m)
  64. logger.Fatal(m)
  65. }
  66. if common.Config.Prometheus {
  67. go func() {
  68. port := common.Config.PrometheusPort
  69. if port <= 0 {
  70. logger.Fatal("Miss configuration prometheusPort")
  71. }
  72. listener, e := net.Listen("tcp", fmt.Sprintf(":%d", port))
  73. if e != nil {
  74. logger.Fatal("Listen prometheus error: ", e)
  75. }
  76. logger.Infof("Serving prometheus metrics on port http://localhost:%d/metrics", port)
  77. http.Handle("/metrics", promhttp.Handler())
  78. http.Serve(listener, nil)
  79. }()
  80. }
  81. //Start rest service
  82. srv := createRestServer(common.Config.RestPort)
  83. go func() {
  84. var err error
  85. if common.Config.RestTls == nil {
  86. err = srv.ListenAndServe()
  87. } else {
  88. err = srv.ListenAndServeTLS(common.Config.RestTls.Certfile, common.Config.RestTls.Keyfile)
  89. }
  90. if err != nil {
  91. logger.Fatal("Error serving rest service: ", err)
  92. }
  93. }()
  94. t := "http"
  95. if common.Config.RestTls != nil {
  96. t = "https"
  97. }
  98. msg := fmt.Sprintf("Serving kuiper (version - %s) on port %d, and restful api on %s://0.0.0.0:%d. \n", Version, common.Config.Port, t, common.Config.RestPort)
  99. logger.Info(msg)
  100. fmt.Printf(msg)
  101. // Start accept incoming HTTP connections
  102. err = http.Serve(listener, nil)
  103. if err != nil {
  104. logger.Fatal("Error serving: ", err)
  105. }
  106. }