data_server.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package httpserver
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "net/http"
  19. "sync"
  20. "time"
  21. "github.com/gorilla/handlers"
  22. "github.com/gorilla/mux"
  23. "github.com/lf-edge/ekuiper/internal/conf"
  24. "github.com/lf-edge/ekuiper/internal/io/memory/pubsub"
  25. kctx "github.com/lf-edge/ekuiper/internal/topo/context"
  26. "github.com/lf-edge/ekuiper/internal/topo/state"
  27. "github.com/lf-edge/ekuiper/pkg/api"
  28. "github.com/lf-edge/ekuiper/pkg/cast"
  29. )
  30. // manage the global http data server
  31. var (
  32. refCount int32
  33. server *http.Server
  34. router *mux.Router
  35. done chan struct{}
  36. sctx api.StreamContext
  37. lock sync.RWMutex
  38. )
  39. const TopicPrefix = "$$httppush/"
  40. func init() {
  41. contextLogger := conf.Log.WithField("httppush_connection", 0)
  42. ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
  43. ruleId := "$$httppush_connection"
  44. opId := "$$httppush_connection"
  45. store, err := state.CreateStore(ruleId, 0)
  46. if err != nil {
  47. ctx.GetLogger().Errorf("neuron connection create store error %v", err)
  48. panic(err)
  49. }
  50. sctx = ctx.WithMeta(ruleId, opId, store)
  51. }
  52. func registerInit() error {
  53. lock.Lock()
  54. defer lock.Unlock()
  55. if server == nil {
  56. var err error
  57. server, router, err = createDataServer()
  58. if err != nil {
  59. return err
  60. }
  61. }
  62. refCount++
  63. return nil
  64. }
  65. func RegisterEndpoint(endpoint string, method string, _ string) (string, chan struct{}, error) {
  66. err := registerInit()
  67. if err != nil {
  68. return "", nil, err
  69. }
  70. topic := TopicPrefix + endpoint
  71. pubsub.CreatePub(topic)
  72. router.HandleFunc(endpoint, func(w http.ResponseWriter, r *http.Request) {
  73. sctx.GetLogger().Debugf("receive http request: %s", r.URL.String())
  74. defer r.Body.Close()
  75. m := make(map[string]interface{})
  76. err := json.NewDecoder(r.Body).Decode(&m)
  77. if err != nil {
  78. handleError(w, err, "Fail to decode data")
  79. pubsub.ProduceError(sctx, topic, fmt.Errorf("fail to decode data %s: %v", r.Body, err))
  80. return
  81. }
  82. sctx.GetLogger().Debugf("httppush received message %s", m)
  83. pubsub.Produce(sctx, topic, m)
  84. w.WriteHeader(http.StatusOK)
  85. _, _ = w.Write([]byte("ok"))
  86. }).Methods(method)
  87. return topic, done, nil
  88. }
  89. func UnregisterEndpoint(endpoint string) {
  90. lock.Lock()
  91. defer lock.Unlock()
  92. pubsub.RemovePub(TopicPrefix + endpoint)
  93. refCount--
  94. // TODO async close server
  95. if refCount == 0 {
  96. sctx.GetLogger().Infof("shutting down http data server...")
  97. if err := server.Shutdown(sctx); err != nil {
  98. sctx.GetLogger().Errorf("shutdown: %s", err)
  99. }
  100. sctx.GetLogger().Infof("http data server exiting")
  101. server = nil
  102. router = nil
  103. }
  104. }
  105. // createDataServer creates a new http data server. Must run inside lock
  106. func createDataServer() (*http.Server, *mux.Router, error) {
  107. r := mux.NewRouter()
  108. s := &http.Server{
  109. Addr: cast.JoinHostPortInt(conf.Config.Source.HttpServerIp, conf.Config.Source.HttpServerPort),
  110. // Good practice to set timeouts to avoid Slowloris attacks.
  111. WriteTimeout: time.Second * 60 * 5,
  112. ReadTimeout: time.Second * 60 * 5,
  113. IdleTimeout: time.Second * 60,
  114. Handler: handlers.CORS(handlers.AllowedHeaders([]string{"Accept", "Accept-Language", "Content-Type", "Content-Language", "Origin", "Authorization"}), handlers.AllowedMethods([]string{"POST", "GET", "PUT", "DELETE", "HEAD"}))(r),
  115. }
  116. done = make(chan struct{})
  117. go func(done chan struct{}) {
  118. var err error
  119. if conf.Config.Source.HttpServerTls == nil {
  120. err = s.ListenAndServe()
  121. } else {
  122. err = s.ListenAndServeTLS(conf.Config.Source.HttpServerTls.Certfile, conf.Config.Source.HttpServerTls.Keyfile)
  123. }
  124. if err != nil {
  125. sctx.GetLogger().Errorf("http data server error: %v", err)
  126. close(done)
  127. }
  128. }(done)
  129. sctx.GetLogger().Infof("Serving http data server on port http://%s", cast.JoinHostPortInt(conf.Config.Source.HttpServerIp, conf.Config.Source.HttpServerPort))
  130. return s, r, nil
  131. }
  132. func handleError(w http.ResponseWriter, err error, prefix string) {
  133. message := prefix
  134. if message != "" {
  135. message += ": "
  136. }
  137. message += err.Error()
  138. http.Error(w, message, http.StatusBadRequest)
  139. }