server.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. package server
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/plugins"
  6. "github.com/emqx/kuiper/xsql/processors"
  7. "github.com/prometheus/client_golang/prometheus/promhttp"
  8. "net"
  9. "net/http"
  10. "net/rpc"
  11. "path"
  12. )
  13. var (
  14. dataDir string
  15. logger = common.Log
  16. ruleProcessor *processors.RuleProcessor
  17. streamProcessor *processors.StreamProcessor
  18. pluginManager *plugins.Manager
  19. )
  20. func StartUp(Version string) {
  21. common.InitConf()
  22. dr, err := common.GetDataLoc()
  23. if err != nil {
  24. logger.Panic(err)
  25. } else {
  26. logger.Infof("db location is %s", dr)
  27. dataDir = dr
  28. }
  29. ruleProcessor = processors.NewRuleProcessor(path.Dir(dataDir))
  30. streamProcessor = processors.NewStreamProcessor(path.Join(path.Dir(dataDir), "stream"))
  31. pluginManager, err = plugins.NewPluginManager()
  32. if err != nil {
  33. logger.Panic(err)
  34. }
  35. registry = &RuleRegistry{internal: make(map[string]*RuleState)}
  36. server := new(Server)
  37. //Start rules
  38. if rules, err := ruleProcessor.GetAllRules(); err != nil {
  39. logger.Infof("Start rules error: %s", err)
  40. } else {
  41. logger.Info("Starting rules")
  42. var reply string
  43. for _, rule := range rules {
  44. err = server.StartRule(rule, &reply)
  45. if err != nil {
  46. logger.Info(err)
  47. } else {
  48. logger.Info(reply)
  49. }
  50. }
  51. }
  52. //Start server
  53. err = rpc.Register(server)
  54. if err != nil {
  55. logger.Fatal("Format of service Server isn't correct. ", err)
  56. }
  57. // Register a HTTP handler
  58. rpc.HandleHTTP()
  59. // Listen to TPC connections on port 1234
  60. listener, e := net.Listen("tcp", fmt.Sprintf(":%d", common.Config.Port))
  61. if e != nil {
  62. logger.Fatal("Listen error: ", e)
  63. }
  64. if common.Config.Prometheus {
  65. go func() {
  66. port := common.Config.PrometheusPort
  67. if port <= 0 {
  68. logger.Fatal("Miss configuration prometheusPort")
  69. }
  70. listener, e := net.Listen("tcp", fmt.Sprintf(":%d", port))
  71. if e != nil {
  72. logger.Fatal("Listen prometheus error: ", e)
  73. }
  74. logger.Infof("Serving prometheus metrics on port http://localhost:%d/metrics", port)
  75. http.Handle("/metrics", promhttp.Handler())
  76. http.Serve(listener, nil)
  77. }()
  78. }
  79. //Start rest service
  80. srv := createRestServer(common.Config.RestPort)
  81. go func() {
  82. if err := srv.ListenAndServe(); err != nil {
  83. logger.Fatal("Error serving rest service: ", err)
  84. }
  85. }()
  86. msg := fmt.Sprintf("Serving kuiper (version - %s) on port %d, and restful api on port %d. \n", Version, common.Config.Port, common.Config.RestPort)
  87. logger.Info(msg)
  88. fmt.Printf(msg)
  89. // Start accept incoming HTTP connections
  90. err = http.Serve(listener, nil)
  91. if err != nil {
  92. logger.Fatal("Error serving: ", err)
  93. }
  94. }