Преглед изворни кода

feat(source): http push source to have a global server

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang пре 2 година
родитељ
комит
57c837c7e4

+ 8 - 0
docs/directory.json

@@ -193,6 +193,10 @@
 									"path": "rules/sources/builtin/http_pull"
 								},
 								{
+									"title": "HTTP 推送源",
+									"path": "rules/sources/builtin/http_push"
+								},
+								{
 									"title": "内存源",
 									"path": "rules/sources/builtin/memory"
 								},
@@ -743,6 +747,10 @@
 									"path": "rules/sources/builtin/http_pull"
 								},
 								{
+									"title": "HTTP push source",
+									"path": "rules/sources/builtin/http_push"
+								},
+								{
 									"title": "Memory source",
 									"path": "rules/sources/builtin/memory"
 								},

+ 39 - 22
docs/en_US/rules/sources/builtin/http_push.md

@@ -1,37 +1,54 @@
 # HTTP push source 
 
-eKuiper provides built-in support for push HTTP source stream, which can receive the message from HTTP client.  The configuration file of HTTP push source is at `etc/sources/httppush.yaml`. Below is the file format.
+eKuiper provides built-in HTTP source stream, which serves as an HTTP server and can receive the message from HTTP client. There will be a single global HTTP server for all HTTP push sources. Each source can have its own endpoint so that multiple endpoints are supported.
+
+## Configurations
+
+There are two kinds of configurations: global server configuration and the source configuration.
+
+### Server Configuration
+
+The server configuration is in the `source` section in `etc/kuiper.yaml`.
 
 ```yaml
-#Global httppull configurations
-default:
-  # the address to listen on
-  server: ":8900"
-    
-#Override the global configurations
-application_conf: #Conf_key
-  server: ":9000"
+source:
+  ## Configurations for the global http data server for httppush source
+  # HTTP data service ip
+  httpServerIp: 0.0.0.0
+  # HTTP data service port
+  httpServerPort: 10081
+  # httpServerTls:
+  #    certfile: /var/https-server.crt
+  #    keyfile: /var/https-server.key
 ```
 
-## Global HTTP push configurations
+User can specify the following properties:
 
-Use can specify the global HTTP push settings here. The configuration items specified in `default` section will be taken as default settings for all HTTP connections. 
+- httpServerIp: the ip to bind the http data server.
+- httpServerPort: the port to bind the http data server.
+- httpServerTls: the configuration of the http TLS.
 
-### server
+The global server will start once any rules needs a httppush source starts. It will shut down once all referred rules are closed.
 
-The server address for http push listen on.
+### Source Configuration
 
+Each stream can configure its url endpoint and http method. The endpoint property is mapped to the `datasource` property in create stream statement.
 
-## Override the default settings
+- Example: Bind the source to `/api/data` endpoint. Thus, with the default server configuration, it will listen on `http://localhost:10081/api/data`.
 
-If you have a specific connection that need to overwrite the default settings, you can create a customized section. In the previous sample, we create a specific setting named with `application_conf`.  Then you can specify the configuration with option `CONF_KEY` when creating the stream definition (see [stream specs](../../../sqls/streams.md) for more info).
+```sql
+CREATE STREAM httpDemo() WITH (DATASOURCE="/api/data", FORMAT="json", TYPE="httppush")
+```
 
-**Sample**
+The configuration file of HTTP push source is at `etc/sources/httppush.yaml`. Right now, only one property `method` is allowed to configure the http method to listen on.
 
+```yaml
+#Global httppush configurations
+default:
+  # the request method to listen on
+  method: "POST"
+    
+#Override the global configurations
+application_conf: #Conf_key
+  server: "PUT"
 ```
-demo (
-		...
-	) WITH (DATASOURCE="/feed", FORMAT="JSON", TYPE="httppush", KEY="USERID", CONF_KEY="application_conf");
-```
-
-The configuration keys used for these specific settings are the same as in `default` settings, any values specified in specific settings will overwrite the values in `default` section.

+ 38 - 24
docs/zh_CN/rules/sources/builtin/http_push.md

@@ -1,40 +1,54 @@
-# HTTP 接收
+# HTTP push 
 
-eKuiper 为接收 HTTP 源流提供了内置支持,该支持可从 HTTP 客户端接收消息并输入 eKuiper 处理管道。 HTTP接收源的配置文件位于 `etc/sources/httppush.yaml`中。 以下是文件格式
+eKuiper 提供了内置的 HTTP 源,它作为一个 HTTP 服务器,可以接收来自 HTTP 客户端的消息。所有的 HTTP 推送源共用单一的全局 HTTP 数据服务器。每个源可以有自己的 URL,这样就可以支持多个端点
 
-```yaml
-#全局httppull配置
-default:
-  # 接收服务器地址
-  server: ":8900" 
- 
+## 配置
 
-#重载全局配置
-application_conf: #Conf_key
-  server: ":9000"
-```
+配置分成两个部分:全局服务器配置和源配置。
 
-## 全局HTTP接收配置
+### 服务器配置
 
-用户可以在此处指定全局 HTTP 接收设置。 `default` 部分中指定的配置项将用作所有HTTP 连接的默认设置
+服务器配置在 `etc/kuiper.yaml` 中的 `source` 部分
 
-### server
+```yaml
+source:
+  ## Configurations for the global http data server for httppush source
+  # HTTP data service ip
+  httpServerIp: 0.0.0.0
+  # HTTP data service port
+  httpServerPort: 10081
+  # httpServerTls:
+  #    certfile: /var/https-server.crt
+  #    keyfile: /var/https-server.key
+```
 
-接收数据的服务器地址。
+用户可以指定以下属性:
 
+- httpServerIp:用于绑定 http 数据服务器的IP。
+- httpServerPort:用于绑定 http 数据服务器的端口。
+- httpServerTls: http 服务器 TLS 的配置。
 
+一旦有任何需要 httppush 源的规则启动,全局服务器就会启动。一旦所有引用的规则都关闭,它就会关闭。
 
-## 重载默认设置
+### 源配
 
-如果您有特定的连接需要重载默认设置,则可以创建一个自定义部分。 在上一个示例中,我们创建了一个名为 `application_conf` 的特定设置。 然后,您可以在创建流定义时使用选项 `CONF_KEY` 指定配置(有关更多信息,请参见 [流规格](../../../sqls/streams.md))。
+每个流可以配置它的 URL 端点和 http 请求方法。端点属性被映射到创建流语句中的 `datasource` 属性
 
-**样例**
+在以下示例中,源被绑定到 `/api/data` 端点。此时,在默认的服务器配置下,它将监听`http://localhost:10081/api/data` 。
 
-```
-demo (
-		...
-	) WITH (DATASOURCE="/feed", FORMAT="JSON", TYPE="httppush", KEY="USERID", CONF_KEY="application_conf");
+```sql
+CREATE STREAM httpDemo() WITH (DATASOURCE="/api/data", FORMAT="json", TYPE="httppush")
 ```
 
-这些特定设置所使用的配置键与 `default` 设置中的配置键相同,在特定设置中指定的任何值都将重载 `default` 部分中的值
+HTTP 推送源的配置文件在 `etc/sources/httppush.yaml` 。目前仅一个属性 `method` ,用于配置 HTTP 监听的请求方法
 
+```yaml
+#Global httppush configurations
+default:
+  # the request method to listen on
+  method: "POST"
+    
+#Override the global configurations
+application_conf: #Conf_key
+  server: "PUT"
+```

+ 10 - 0
etc/kuiper.yaml

@@ -61,6 +61,16 @@ sink:
   # Whether to clean the cache when the rule stops
   cleanCacheAtStop: false
 
+source:
+  ## Configurations for the global http data server for httppush source
+  # HTTP data service ip
+  httpServerIp: 0.0.0.0
+  # HTTP data service port
+  httpServerPort: 10081
+  # httpServerTls:
+  #    certfile: /var/https-server.crt
+  #    keyfile: /var/https-server.key
+
 store:
   #Type of store that will be used for keeping state of the application
   type: sqlite

+ 12 - 11
etc/sources/httppush.json

@@ -9,28 +9,29 @@
 			"website": "https://www.emqx.io"
 		},
 		"helpUrl": {
-			"en_US": "https://github.com/lf-edge/ekuiper/blob/master/docs/en_US/rules/sources/builtin/http_pull.md",
-			"zh_CN": "https://github.com/lf-edge/ekuiper/blob/master/docs/zh_CN/rules/sources/builtin/http_pull.md"
+			"en_US": "https://github.com/lf-edge/ekuiper/blob/master/docs/en_US/rules/sources/builtin/http_push.md",
+			"zh_CN": "https://github.com/lf-edge/ekuiper/blob/master/docs/zh_CN/rules/sources/builtin/http_push.md"
 		},
 		"description": {
-			"en_US": "eKuiper provides built-in support for pulling HTTP source stream, which can pull the message from HTTP server broker and feed into the eKuiper processing pipeline.",
-			"zh_CN": "eKuiper 为提取 HTTP 源流提供了内置支持,该支持可从 HTTP 服务器代理提取消息并输入 eKuiper 处理管道。"
+			"en_US": "eKuiper provides built-in support for HTTP server to receive the stream data pushed by HTTP protocol.",
+			"zh_CN": "eKuiper 提供了内置的 HTTP 服务支持,用于接收 HTTP 协议推送的流数据。"
 		}
 	},
 	"properties": {
 		"default": [{
-			"name": "server",
-			"default": ":9999",
-			"optional": false,
+			"name": "method",
+			"default": "POST",
+			"optional": true,
 			"control": "text",
 			"type": "string",
+			"values": ["POST","PUT"],
 			"hint": {
-				"en_US": "The address to listen on.",
-				"zh_CN": "http服务器监听地址"
+				"en_US": "The method to bind the http service",
+				"zh_CN": "http 请求方法"
 			},
 			"label": {
-				"en_US": "Server",
-				"zh_CN": "服务器"
+				"en_US": "Method",
+				"zh_CN": "请求方法"
 			}
 		}]
 	}

+ 3 - 4
etc/sources/httppush.yaml

@@ -1,5 +1,4 @@
-#Global httppull configurations
+#Global httppush configurations
 default:
-  # the address to listen on
-  server: ":8900"
-  endpoint: "/"
+  # the http method to use
+  method: "POST"

+ 27 - 3
internal/conf/conf.go

@@ -93,6 +93,25 @@ func (sc *SinkConf) Validate() error {
 	return e.GetError()
 }
 
+type SourceConf struct {
+	HttpServerIp   string   `json:"httpServerIp" yaml:"httpServerIp"`
+	HttpServerPort int      `json:"httpServerPort" yaml:"httpServerPort"`
+	HttpServerTls  *tlsConf `json:"httpServerTls" yaml:"httpServerTls"`
+}
+
+func (sc *SourceConf) Validate() error {
+	e := make(errorx.MultiError)
+	if sc.HttpServerIp == "" {
+		sc.HttpServerIp = "0.0.0.0"
+	}
+	if sc.HttpServerPort <= 0 || sc.HttpServerPort > 65535 {
+		Log.Warnf("invalid source.httpServerPort configuration %d, set to 10081", sc.HttpServerPort)
+		e["invalidHttpServerPort"] = fmt.Errorf("httpServerPort must between 0 and 65535")
+		sc.HttpServerPort = 10081
+	}
+	return e
+}
+
 type KuiperConf struct {
 	Basic struct {
 		Debug          bool     `yaml:"debug"`
@@ -111,9 +130,10 @@ type KuiperConf struct {
 		Authentication bool     `yaml:"authentication"`
 		IgnoreCase     bool     `yaml:"ignoreCase"`
 	}
-	Rule  api.RuleOption
-	Sink  *SinkConf
-	Store struct {
+	Rule   api.RuleOption
+	Sink   *SinkConf
+	Source *SourceConf
+	Store  struct {
 		Type  string `yaml:"type"`
 		Redis struct {
 			Host               string `yaml:"host"`
@@ -199,6 +219,10 @@ func InitConf() {
 	if Config.Portable.PythonBin == "" {
 		Config.Portable.PythonBin = "python"
 	}
+	if Config.Source == nil {
+		Config.Source = &SourceConf{}
+	}
+	_ = Config.Source.Validate()
 	if Config.Sink == nil {
 		Config.Sink = &SinkConf{}
 	}

+ 81 - 0
internal/conf/conf_test.go

@@ -0,0 +1,81 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package conf
+
+import (
+	"fmt"
+	"reflect"
+	"testing"
+)
+
+func TestSourceConfValidate(t *testing.T) {
+	var tests = []struct {
+		s   *SourceConf
+		e   *SourceConf
+		err string
+	}{
+		{
+			s: &SourceConf{},
+			e: &SourceConf{
+				HttpServerIp:   "0.0.0.0",
+				HttpServerPort: 10081,
+			},
+			err: "invalidHttpServerPort:httpServerPort must between 0 and 65535",
+		}, {
+			s: &SourceConf{
+				HttpServerIp: "192.168.0.1",
+			},
+			e: &SourceConf{
+				HttpServerIp:   "192.168.0.1",
+				HttpServerPort: 10081,
+			},
+			err: "invalidHttpServerPort:httpServerPort must between 0 and 65535",
+		}, {
+			s: &SourceConf{
+				HttpServerPort: 99999,
+			},
+			e: &SourceConf{
+				HttpServerIp:   "0.0.0.0",
+				HttpServerPort: 10081,
+			},
+			err: "invalidHttpServerPort:httpServerPort must between 0 and 65535",
+		}, {
+			s: &SourceConf{
+				HttpServerPort: 9090,
+				HttpServerTls: &tlsConf{
+					Certfile: "certfile",
+					Keyfile:  "keyfile",
+				},
+			},
+			e: &SourceConf{
+				HttpServerIp:   "0.0.0.0",
+				HttpServerPort: 9090,
+				HttpServerTls: &tlsConf{
+					Certfile: "certfile",
+					Keyfile:  "keyfile",
+				},
+			},
+		},
+	}
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	for i, tt := range tests {
+		err := tt.s.Validate()
+		if err != nil && tt.err != err.Error() {
+			t.Errorf("%d: error mismatch:\n  exp=%s\n  got=%s\n\n", i, tt.err, err)
+		}
+		if !reflect.DeepEqual(tt.s, tt.e) {
+			t.Errorf("%d\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.s, tt.e)
+		}
+	}
+}

+ 0 - 1
internal/topo/neuron/source.go

@@ -25,7 +25,6 @@ import (
 type source struct {
 	url          string
 	bufferLength int
-	connected    bool
 }
 
 func (s *source) Configure(_ string, props map[string]interface{}) error {

+ 3 - 3
internal/topo/node/sink_node_test.go

@@ -366,7 +366,7 @@ func TestConfig(t *testing.T) {
 				"maxDiskCache":         6,
 				"resendInterval":       10,
 			},
-			err: errors.New("invalid cache properties: \nmaxDiskCacheTooSmall:maxDiskCache must be greater than bufferPageSize"),
+			err: errors.New("invalid cache properties: maxDiskCacheTooSmall:maxDiskCache must be greater than bufferPageSize"),
 		}, {
 			config: map[string]interface{}{
 				"enableCache":          true,
@@ -376,7 +376,7 @@ func TestConfig(t *testing.T) {
 				"maxDiskCache":         21,
 				"resendInterval":       10,
 			},
-			err: errors.New("invalid cache properties: \nmemoryCacheThresholdNotMultiple:memoryCacheThreshold must be a multiple of bufferPageSize"),
+			err: errors.New("invalid cache properties: memoryCacheThresholdNotMultiple:memoryCacheThreshold must be a multiple of bufferPageSize"),
 		}, {
 			config: map[string]interface{}{
 				"enableCache":          true,
@@ -386,7 +386,7 @@ func TestConfig(t *testing.T) {
 				"maxDiskCache":         22,
 				"resendInterval":       10,
 			},
-			err: errors.New("invalid cache properties: \nmaxDiskCacheNotMultiple:maxDiskCache must be a multiple of bufferPageSize"),
+			err: errors.New("invalid cache properties: maxDiskCacheNotMultiple:maxDiskCache must be a multiple of bufferPageSize"),
 		},
 	}
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))

+ 31 - 54
internal/topo/source/httppush_source.go

@@ -15,10 +15,10 @@
 package source
 
 import (
-	"encoding/json"
 	"fmt"
-	"github.com/gorilla/mux"
 	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/topo/memory"
+	"github.com/lf-edge/ekuiper/internal/topo/source/httpserver"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/infra"
@@ -27,8 +27,10 @@ import (
 )
 
 type HTTPPushConf struct {
-	Server   string `json:"server"`
-	Endpoint string `json:"endpoint"`
+	Method       string `json:"method"`
+	ContentType  string `json:"contentType"`
+	BufferLength int    `json:"bufferLength"`
+	Endpoint     string `json:"endpoint"`
 }
 
 type HTTPPushSource struct {
@@ -37,15 +39,19 @@ type HTTPPushSource struct {
 
 func (hps *HTTPPushSource) Configure(endpoint string, props map[string]interface{}) error {
 	cfg := &HTTPPushConf{
-		Server:   ":9999",
-		Endpoint: "",
+		Method:       http.MethodPost,
+		ContentType:  "application/json",
+		BufferLength: 1024,
 	}
 	err := cast.MapToStruct(props, cfg)
 	if err != nil {
 		return err
 	}
-	if strings.Trim(cfg.Server, " ") == "" {
-		return fmt.Errorf("property `server` is required")
+	if cfg.Method != http.MethodPost && cfg.Method != http.MethodPut {
+		return fmt.Errorf("method %s is not supported, must be POST or PUT", cfg.Method)
+	}
+	if cfg.ContentType != "application/json" {
+		return fmt.Errorf("property `contentType` must be application/json")
 	}
 	if !strings.HasPrefix(endpoint, "/") {
 		return fmt.Errorf("property `endpoint` must start with /")
@@ -58,47 +64,27 @@ func (hps *HTTPPushSource) Configure(endpoint string, props map[string]interface
 }
 
 func (hps *HTTPPushSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
-	r := mux.NewRouter()
-	meta := make(map[string]interface{})
-	r.HandleFunc(hps.conf.Endpoint, func(w http.ResponseWriter, r *http.Request) {
-		ctx.GetLogger().Debugf("receive getGPS request")
-		defer r.Body.Close()
-		m := make(map[string]interface{})
-		err := json.NewDecoder(r.Body).Decode(&m)
-		if err != nil {
-			handleError(w, err, "Fail to decode data")
-			return
-		}
-		ctx.GetLogger().Debugf("message: %v", m)
+	t, done, err := httpserver.RegisterEndpoint(hps.conf.Endpoint, hps.conf.Method, hps.conf.ContentType)
+	if err != nil {
+		infra.DrainError(ctx, err, errCh)
+		return
+	}
+	defer httpserver.UnregisterEndpoint(hps.conf.Endpoint)
+	ch := memory.CreateSub(t, nil, fmt.Sprintf("%s_%s_%d", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId()), hps.conf.BufferLength)
+	defer memory.CloseSourceConsumerChannel(t, fmt.Sprintf("%s_%s_%d", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId()))
+	for {
 		select {
-		case consumer <- api.NewDefaultSourceTuple(m, meta):
-			ctx.GetLogger().Debugf("send data from http push source")
+		case <-done: // http data server error
+			infra.DrainError(ctx, fmt.Errorf("http data server shutdown"), errCh)
+			return
+		case v, opened := <-ch:
+			if !opened {
+				return
+			}
+			consumer <- v
 		case <-ctx.Done():
-			handleError(w, err, "stopped")
 			return
 		}
-		w.WriteHeader(http.StatusOK)
-		w.Write([]byte("ok"))
-	})
-	// TODO global server
-	srv := &http.Server{
-		Addr:    hps.conf.Server,
-		Handler: r,
-	}
-	go func() {
-		if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
-			ctx.GetLogger().Errorf("listen: %s", err)
-			infra.DrainError(ctx, err, errCh)
-		}
-	}()
-	ctx.GetLogger().Infof("http server source listen at: %s", hps.conf.Server)
-	select {
-	case <-ctx.Done():
-		ctx.GetLogger().Infof("shutting down server...")
-		if err := srv.Shutdown(ctx); err != nil {
-			ctx.GetLogger().Errorf("shutdown: %s\n", err)
-		}
-		ctx.GetLogger().Infof("server exiting")
 	}
 }
 
@@ -107,12 +93,3 @@ func (hps *HTTPPushSource) Close(ctx api.StreamContext) error {
 	logger.Infof("Closing HTTP push source")
 	return nil
 }
-
-func handleError(w http.ResponseWriter, err error, prefix string) {
-	message := prefix
-	if message != "" {
-		message += ": "
-	}
-	message += err.Error()
-	http.Error(w, message, http.StatusBadRequest)
-}

+ 149 - 0
internal/topo/source/httpserver/data_server.go

@@ -0,0 +1,149 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package httpserver
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/gorilla/handlers"
+	"github.com/gorilla/mux"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	kctx "github.com/lf-edge/ekuiper/internal/topo/context"
+	"github.com/lf-edge/ekuiper/internal/topo/memory"
+	"github.com/lf-edge/ekuiper/internal/topo/state"
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"net/http"
+	"sync"
+	"time"
+)
+
+// manage the global http data server
+
+var (
+	refCount int32
+	server   *http.Server
+	router   *mux.Router
+	done     chan struct{}
+	sctx     api.StreamContext
+	lock     sync.RWMutex
+)
+
+const TopicPrefix = "$$httppush/"
+
+func init() {
+	contextLogger := conf.Log.WithField("httppush_connection", 0)
+	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
+	ruleId := "$$httppush_connection"
+	opId := "$$httppush_connection"
+	store, err := state.CreateStore(ruleId, 0)
+	if err != nil {
+		ctx.GetLogger().Errorf("neuron connection create store error %v", err)
+		panic(err)
+	}
+	sctx = ctx.WithMeta(ruleId, opId, store)
+}
+
+func registerInit() error {
+	lock.Lock()
+	defer lock.Unlock()
+	if server == nil {
+		var err error
+		server, router, err = createDataServer()
+		if err != nil {
+			return err
+		}
+	}
+	refCount++
+	return nil
+}
+
+func RegisterEndpoint(endpoint string, method string, _ string) (string, chan struct{}, error) {
+	err := registerInit()
+	if err != nil {
+		return "", nil, err
+	}
+	topic := TopicPrefix + endpoint
+	memory.CreatePub(topic)
+	router.HandleFunc(endpoint, func(w http.ResponseWriter, r *http.Request) {
+		sctx.GetLogger().Debugf("receive http request: %s", r.URL.String())
+		defer r.Body.Close()
+		m := make(map[string]interface{})
+		err := json.NewDecoder(r.Body).Decode(&m)
+		if err != nil {
+			handleError(w, err, "Fail to decode data")
+			memory.ProduceError(sctx, topic, fmt.Errorf("fail to decode data %s: %v", r.Body, err))
+			return
+		}
+		sctx.GetLogger().Debugf("httppush received message %s", m)
+		memory.Produce(sctx, topic, m)
+		w.WriteHeader(http.StatusOK)
+		_, _ = w.Write([]byte("ok"))
+	}).Methods(method)
+	return topic, done, nil
+}
+
+func UnregisterEndpoint(endpoint string) {
+	lock.Lock()
+	defer lock.Unlock()
+	memory.RemovePub(TopicPrefix + endpoint)
+	refCount--
+	// TODO async close server
+	if refCount == 0 {
+		sctx.GetLogger().Infof("shutting down http data server...")
+		if err := server.Shutdown(sctx); err != nil {
+			sctx.GetLogger().Errorf("shutdown: %s", err)
+		}
+		sctx.GetLogger().Infof("http data server exiting")
+		server = nil
+		router = nil
+	}
+}
+
+// createDataServer creates a new http data server. Must run inside lock
+func createDataServer() (*http.Server, *mux.Router, error) {
+	r := mux.NewRouter()
+	s := &http.Server{
+		Addr: fmt.Sprintf("%s:%d", conf.Config.Source.HttpServerIp, conf.Config.Source.HttpServerPort),
+		// Good practice to set timeouts to avoid Slowloris attacks.
+		WriteTimeout: time.Second * 60 * 5,
+		ReadTimeout:  time.Second * 60 * 5,
+		IdleTimeout:  time.Second * 60,
+		Handler:      handlers.CORS(handlers.AllowedHeaders([]string{"Accept", "Accept-Language", "Content-Type", "Content-Language", "Origin", "Authorization"}), handlers.AllowedMethods([]string{"POST", "GET", "PUT", "DELETE", "HEAD"}))(r),
+	}
+	done = make(chan struct{})
+	go func() {
+		var err error
+		if conf.Config.Source.HttpServerTls == nil {
+			err = s.ListenAndServe()
+		} else {
+			err = s.ListenAndServeTLS(conf.Config.Source.HttpServerTls.Certfile, conf.Config.Source.HttpServerTls.Keyfile)
+		}
+		if err != nil {
+			sctx.GetLogger().Errorf("http data server error: %v", err)
+			close(done)
+		}
+	}()
+	sctx.GetLogger().Infof("Serving http data server on port http://%s:%d", conf.Config.Source.HttpServerIp, conf.Config.Source.HttpServerPort)
+	return s, r, nil
+}
+
+func handleError(w http.ResponseWriter, err error, prefix string) {
+	message := prefix
+	if message != "" {
+		message += ": "
+	}
+	message += err.Error()
+	http.Error(w, message, http.StatusBadRequest)
+}

+ 99 - 0
internal/topo/source/httpserver/data_server_test.go

@@ -0,0 +1,99 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package httpserver
+
+import (
+	"bytes"
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/testx"
+	"net/http"
+	"testing"
+)
+
+var body = []byte(`{
+        "title": "Post title",
+        "body": "Post description",
+        "userId": 1
+    }`)
+
+func TestEndpoints(t *testing.T) {
+	testx.InitEnv()
+	endpoints := []string{
+		"/ee1", "/eb2", "/ec3",
+	}
+	RegisterEndpoint(endpoints[0], "POST", "application/json")
+	RegisterEndpoint(endpoints[1], "PUT", "application/json")
+	RegisterEndpoint(endpoints[2], "POST", "application/json")
+
+	if server == nil || router == nil {
+		t.Error("server or router is nil after registering")
+		return
+	}
+	if refCount != 3 {
+		t.Error("refCount is not 3 after registering")
+		return
+	}
+	UnregisterEndpoint(endpoints[0])
+	UnregisterEndpoint(endpoints[1])
+	UnregisterEndpoint(endpoints[2])
+	if refCount != 0 {
+		t.Error("refCount is not 0 after unregistering")
+		return
+	}
+	if server != nil || router != nil {
+		t.Error("server or router is not nil after unregistering")
+		return
+	}
+	urlPrefix := "http://localhost:10081"
+
+	client := &http.Client{}
+
+	RegisterEndpoint(endpoints[0], "POST", "application/json")
+	_, _, err := RegisterEndpoint(endpoints[0], "PUT", "application/json")
+	if err != nil {
+		t.Error("RegisterEndpoint should not return error for same endpoint")
+	}
+	RegisterEndpoint(endpoints[1], "PUT", "application/json")
+
+	err = testHttp(client, urlPrefix+endpoints[0], "POST")
+	if err != nil {
+		t.Error(err)
+	}
+	err = testHttp(client, urlPrefix+endpoints[1], "PUT")
+	if err != nil {
+		t.Error(err)
+	}
+
+	RegisterEndpoint(endpoints[2], "POST", "application/json")
+	err = testHttp(client, urlPrefix+endpoints[2], "POST")
+	if err != nil {
+		t.Error(err)
+	}
+}
+
+func testHttp(client *http.Client, url string, method string) error {
+	r, err := http.NewRequest(method, url, bytes.NewBuffer(body))
+	if err != nil {
+		return err
+	}
+	resp, err := client.Do(r)
+	if err != nil {
+		return err
+	}
+	if resp.StatusCode != http.StatusOK {
+		return fmt.Errorf("status code is not 200 for %s", url)
+	}
+	return nil
+}

+ 3 - 3
pkg/errorx/errors.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -56,10 +56,10 @@ func (e MultiError) Error() string {
 	case 0, 1:
 		s = ""
 	default:
-		s = "Get multiple errors: "
+		s = "Get multiple errors: \n"
 	}
 	for k, v := range e {
-		s = fmt.Sprintf("%s\n%s:%s", s, k, v.Error())
+		s = fmt.Sprintf("%s%s:%s", s, k, v.Error())
 	}
 	return s
 }