data_server.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package httpserver
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "net/http"
  19. "sync"
  20. "time"
  21. "github.com/gorilla/handlers"
  22. "github.com/gorilla/mux"
  23. "github.com/lf-edge/ekuiper/internal/conf"
  24. "github.com/lf-edge/ekuiper/internal/io/memory/pubsub"
  25. kctx "github.com/lf-edge/ekuiper/internal/topo/context"
  26. "github.com/lf-edge/ekuiper/internal/topo/state"
  27. "github.com/lf-edge/ekuiper/pkg/api"
  28. )
  29. // manage the global http data server
  30. var (
  31. refCount int32
  32. server *http.Server
  33. router *mux.Router
  34. done chan struct{}
  35. sctx api.StreamContext
  36. lock sync.RWMutex
  37. )
  38. const TopicPrefix = "$$httppush/"
  39. func init() {
  40. contextLogger := conf.Log.WithField("httppush_connection", 0)
  41. ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
  42. ruleId := "$$httppush_connection"
  43. opId := "$$httppush_connection"
  44. store, err := state.CreateStore(ruleId, 0)
  45. if err != nil {
  46. ctx.GetLogger().Errorf("neuron connection create store error %v", err)
  47. panic(err)
  48. }
  49. sctx = ctx.WithMeta(ruleId, opId, store)
  50. }
  51. func registerInit() error {
  52. lock.Lock()
  53. defer lock.Unlock()
  54. if server == nil {
  55. var err error
  56. server, router, err = createDataServer()
  57. if err != nil {
  58. return err
  59. }
  60. }
  61. refCount++
  62. return nil
  63. }
  64. func RegisterEndpoint(endpoint string, method string, _ string) (string, chan struct{}, error) {
  65. err := registerInit()
  66. if err != nil {
  67. return "", nil, err
  68. }
  69. topic := TopicPrefix + endpoint
  70. pubsub.CreatePub(topic)
  71. router.HandleFunc(endpoint, func(w http.ResponseWriter, r *http.Request) {
  72. sctx.GetLogger().Debugf("receive http request: %s", r.URL.String())
  73. defer r.Body.Close()
  74. m := make(map[string]interface{})
  75. err := json.NewDecoder(r.Body).Decode(&m)
  76. if err != nil {
  77. handleError(w, err, "Fail to decode data")
  78. pubsub.ProduceError(sctx, topic, fmt.Errorf("fail to decode data %s: %v", r.Body, err))
  79. return
  80. }
  81. sctx.GetLogger().Debugf("httppush received message %s", m)
  82. pubsub.Produce(sctx, topic, m)
  83. w.WriteHeader(http.StatusOK)
  84. _, _ = w.Write([]byte("ok"))
  85. }).Methods(method)
  86. return topic, done, nil
  87. }
  88. func UnregisterEndpoint(endpoint string) {
  89. lock.Lock()
  90. defer lock.Unlock()
  91. pubsub.RemovePub(TopicPrefix + endpoint)
  92. refCount--
  93. // TODO async close server
  94. if refCount == 0 {
  95. sctx.GetLogger().Infof("shutting down http data server...")
  96. if err := server.Shutdown(sctx); err != nil {
  97. sctx.GetLogger().Errorf("shutdown: %s", err)
  98. }
  99. sctx.GetLogger().Infof("http data server exiting")
  100. server = nil
  101. router = nil
  102. }
  103. }
  104. // createDataServer creates a new http data server. Must run inside lock
  105. func createDataServer() (*http.Server, *mux.Router, error) {
  106. r := mux.NewRouter()
  107. s := &http.Server{
  108. Addr: fmt.Sprintf("%s:%d", conf.Config.Source.HttpServerIp, conf.Config.Source.HttpServerPort),
  109. // Good practice to set timeouts to avoid Slowloris attacks.
  110. WriteTimeout: time.Second * 60 * 5,
  111. ReadTimeout: time.Second * 60 * 5,
  112. IdleTimeout: time.Second * 60,
  113. Handler: handlers.CORS(handlers.AllowedHeaders([]string{"Accept", "Accept-Language", "Content-Type", "Content-Language", "Origin", "Authorization"}), handlers.AllowedMethods([]string{"POST", "GET", "PUT", "DELETE", "HEAD"}))(r),
  114. }
  115. done = make(chan struct{})
  116. go func() {
  117. var err error
  118. if conf.Config.Source.HttpServerTls == nil {
  119. err = s.ListenAndServe()
  120. } else {
  121. err = s.ListenAndServeTLS(conf.Config.Source.HttpServerTls.Certfile, conf.Config.Source.HttpServerTls.Keyfile)
  122. }
  123. if err != nil {
  124. sctx.GetLogger().Errorf("http data server error: %v", err)
  125. close(done)
  126. }
  127. }()
  128. sctx.GetLogger().Infof("Serving http data server on port http://%s:%d", conf.Config.Source.HttpServerIp, conf.Config.Source.HttpServerPort)
  129. return s, r, nil
  130. }
  131. func handleError(w http.ResponseWriter, err error, prefix string) {
  132. message := prefix
  133. if message != "" {
  134. message += ": "
  135. }
  136. message += err.Error()
  137. http.Error(w, message, http.StatusBadRequest)
  138. }