Browse Source

test: add ut for server (#1942)

Signed-off-by: Rui-Gan <1171530954@qq.com>
Regina 1 year ago
parent
commit
718340092f

+ 185 - 0
internal/server/meta_init_test.go

@@ -0,0 +1,185 @@
+// Copyright 2023 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package server
+
+import (
+	"bytes"
+	"fmt"
+	"net/http"
+	"net/http/httptest"
+	"os"
+	"path"
+	"testing"
+
+	"github.com/gorilla/mux"
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/suite"
+
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/meta"
+)
+
+type MetaTestSuite struct {
+	suite.Suite
+	m metaComp
+	r *mux.Router
+}
+
+func (suite *MetaTestSuite) SetupTest() {
+	suite.m = metaComp{}
+	suite.r = mux.NewRouter()
+	suite.m.rest(suite.r)
+	meta.InitYamlConfigManager()
+	confDir, err := conf.GetConfLoc()
+	if err != nil {
+		fmt.Println(err)
+	}
+	if err := meta.ReadSinkMetaFile(path.Join(confDir, "sinks", "mqtt.json"), true); nil != err {
+		fmt.Println(err)
+	}
+	if err := meta.ReadSourceMetaFile(path.Join(confDir, "mqtt_source.json"), true, false); nil != err {
+		fmt.Println(err)
+	}
+}
+
+func (suite *MetaTestSuite) TestSinksMetaHandler() {
+	req, _ := http.NewRequest(http.MethodGet, "/metadata/sinks", bytes.NewBufferString("any"))
+	w := httptest.NewRecorder()
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusOK, w.Code)
+}
+
+func (suite *MetaTestSuite) TestNewSinkMetaHandler() {
+	req, _ := http.NewRequest(http.MethodGet, "/metadata/sinks/mqtt", bytes.NewBufferString("any"))
+	w := httptest.NewRecorder()
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusOK, w.Code)
+}
+
+func (suite *MetaTestSuite) TestFunctionsMetaHandler() {
+	req, _ := http.NewRequest(http.MethodGet, "/metadata/functions", bytes.NewBufferString("any"))
+	w := httptest.NewRecorder()
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusOK, w.Code)
+}
+
+func (suite *MetaTestSuite) TestOperatorsMetaHandler() {
+	req, _ := http.NewRequest(http.MethodGet, "/metadata/operators", bytes.NewBufferString("any"))
+	w := httptest.NewRecorder()
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusOK, w.Code)
+}
+
+func (suite *MetaTestSuite) TestSourcesMetaHandler() {
+	req, _ := http.NewRequest(http.MethodGet, "/metadata/sources", bytes.NewBufferString("any"))
+	w := httptest.NewRecorder()
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusOK, w.Code)
+}
+
+func (suite *MetaTestSuite) TestConnectionsMetaHandler() {
+	req, _ := http.NewRequest(http.MethodGet, "/metadata/connections", bytes.NewBufferString("any"))
+	w := httptest.NewRecorder()
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusOK, w.Code)
+}
+
+func (suite *MetaTestSuite) TestSourceMetaHandler() {
+	req, _ := http.NewRequest(http.MethodGet, "/metadata/sources/mqtt", bytes.NewBufferString("any"))
+	w := httptest.NewRecorder()
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusOK, w.Code)
+}
+
+func (suite *MetaTestSuite) TestConnectionMetaHandler() {
+	req, _ := http.NewRequest(http.MethodGet, "/metadata/connections/mqtt", bytes.NewBufferString("any"))
+	w := httptest.NewRecorder()
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusOK, w.Code)
+}
+
+func (suite *MetaTestSuite) TestSourceConfHandler() {
+	req, _ := http.NewRequest(http.MethodGet, "/metadata/sources/yaml/mqtt", bytes.NewBufferString("any"))
+	w := httptest.NewRecorder()
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusOK, w.Code)
+}
+
+func (suite *MetaTestSuite) TestConnectionConfHandler() {
+	req, _ := http.NewRequest(http.MethodGet, "/metadata/connections/yaml/mqtt", bytes.NewBufferString("any"))
+	w := httptest.NewRecorder()
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusOK, w.Code)
+}
+
+func (suite *MetaTestSuite) TestSinkConfHandler() {
+	req, _ := http.NewRequest(http.MethodGet, "/metadata/sinks/yaml/mqtt", bytes.NewBufferString("any"))
+	w := httptest.NewRecorder()
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusOK, w.Code)
+}
+
+func (suite *MetaTestSuite) TestSourceConfKeyHandler() {
+	req, _ := http.NewRequest(http.MethodPut, "/metadata/sources/mqtt/confKeys/test", bytes.NewBufferString(`{"qos": 0, "server": "tcp://10.211.55.6:1883"}`))
+	w := httptest.NewRecorder()
+	DataDir, _ := conf.GetDataLoc()
+	os.MkdirAll(path.Join(DataDir, "sources"), 0o755)
+	if _, err := os.Create(path.Join(DataDir, "sources", "mqtt.yaml")); err != nil {
+		fmt.Println(err)
+	}
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusOK, w.Code)
+	os.Remove(path.Join(DataDir, "sources", "mqtt.yaml"))
+	os.Remove(path.Join(DataDir, "sources"))
+}
+
+func (suite *MetaTestSuite) TestConnectionConfKeyHandler() {
+	req, _ := http.NewRequest(http.MethodPut, "/metadata/connections/mqtt/confKeys/test", bytes.NewBufferString(`{"qos": 0, "server": "tcp://10.211.55.6:1883"}`))
+	w := httptest.NewRecorder()
+	DataDir, _ := conf.GetDataLoc()
+	os.MkdirAll(path.Join(DataDir, "connections"), 0o755)
+	if _, err := os.Create(path.Join(DataDir, "connections", "connection.yaml")); err != nil {
+		fmt.Println(err)
+	}
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusOK, w.Code)
+	os.Remove(path.Join(DataDir, "connections", "connection.yaml"))
+	os.Remove(path.Join(DataDir, "connections"))
+}
+
+func (suite *MetaTestSuite) TestSinkConfKeyHandler() {
+	req, _ := http.NewRequest(http.MethodPut, "/metadata/sinks/mqtt/confKeys/test", bytes.NewBufferString(`{"qos": 0, "server": "tcp://10.211.55.6:1883"}`))
+	DataDir, _ := conf.GetDataLoc()
+	os.MkdirAll(path.Join(DataDir, "sinks"), 0o755)
+	if _, err := os.Create(path.Join(DataDir, "sinks", "mqtt.yaml")); err != nil {
+		fmt.Println(err)
+	}
+	w := httptest.NewRecorder()
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusOK, w.Code)
+	os.Remove(path.Join(DataDir, "sinks", "mqtt.yaml"))
+	os.Remove(path.Join(DataDir, "sinks"))
+}
+
+func (suite *MetaTestSuite) TestResourcesHandler() {
+	req, _ := http.NewRequest(http.MethodGet, "/metadata/resources", bytes.NewBufferString("any"))
+	w := httptest.NewRecorder()
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusOK, w.Code)
+}
+
+func TestMetaTestSuite(t *testing.T) {
+	suite.Run(t, new(MetaTestSuite))
+}

+ 56 - 11
internal/server/plugin_init_test.go

@@ -1,4 +1,4 @@
-// Copyright 2022 EMQ Technologies Co., Ltd.
+// Copyright 2022-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -18,14 +18,32 @@
 package server
 
 import (
-	"fmt"
-	"reflect"
+	"bytes"
+	"net/http"
+	"net/http/httptest"
 	"testing"
 
+	"github.com/gorilla/mux"
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/suite"
+
 	"github.com/lf-edge/ekuiper/internal/plugin"
 )
 
-func Test_fetchPluginList(t *testing.T) {
+type PluginTestSuite struct {
+	suite.Suite
+	m pluginComp
+	r *mux.Router
+}
+
+func (suite *PluginTestSuite) SetupTest() {
+	suite.m = pluginComp{}
+	suite.r = mux.NewRouter()
+	suite.m.rest(suite.r)
+	suite.m.register()
+}
+
+func (suite *PluginTestSuite) Test_fetchPluginList() {
 	version = "1.4.0"
 	type args struct {
 		t     plugin.PluginType
@@ -70,12 +88,39 @@ func Test_fetchPluginList(t *testing.T) {
 		},
 	}
 	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			gotResult, gotErr := fetchPluginList(tt.args.t, tt.args.hosts, tt.args.os, tt.args.arch)
-			if !reflect.DeepEqual(gotErr, tt.wantErr) {
-				t.Errorf("fetchPluginList() gotErr = %v, want %v", gotErr, tt.wantErr)
-			}
-			fmt.Printf("%v", gotResult)
-		})
+		_, gotErr := fetchPluginList(tt.args.t, tt.args.hosts, tt.args.os, tt.args.arch)
+		assert.Equal(suite.T(), tt.wantErr, gotErr)
 	}
 }
+
+func (suite *PluginTestSuite) TestSourcesHandler() {
+	req, _ := http.NewRequest(http.MethodGet, "/plugins/sources", bytes.NewBufferString("any"))
+	w := httptest.NewRecorder()
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusOK, w.Code)
+}
+
+func (suite *PluginTestSuite) TestSinksHandler() {
+	req, _ := http.NewRequest(http.MethodGet, "/plugins/sinks", bytes.NewBufferString("any"))
+	w := httptest.NewRecorder()
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusOK, w.Code)
+}
+
+func (suite *PluginTestSuite) TestFunctionsHandler() {
+	req, _ := http.NewRequest(http.MethodGet, "/plugins/functions", bytes.NewBufferString("any"))
+	w := httptest.NewRecorder()
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusOK, w.Code)
+}
+
+func (suite *PluginTestSuite) TestUdfsHandler() {
+	req, _ := http.NewRequest(http.MethodGet, "/plugins/udfs", bytes.NewBufferString("any"))
+	w := httptest.NewRecorder()
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusOK, w.Code)
+}
+
+func TestPluginTestSuite(t *testing.T) {
+	suite.Run(t, new(PluginTestSuite))
+}

+ 169 - 140
internal/server/rest_test.go

@@ -21,15 +21,18 @@ import (
 	"io"
 	"net/http"
 	"net/http/httptest"
-	"reflect"
+	"os"
+	"path/filepath"
 	"testing"
 
 	"github.com/gorilla/mux"
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/suite"
 
+	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/processor"
 	"github.com/lf-edge/ekuiper/internal/testx"
 	"github.com/lf-edge/ekuiper/internal/topo/rule"
-	"github.com/lf-edge/ekuiper/pkg/ast"
 )
 
 func init() {
@@ -40,57 +43,75 @@ func init() {
 	registry = &RuleRegistry{internal: make(map[string]*rule.RuleState)}
 }
 
-func Test_rootHandler(t *testing.T) {
+type RestTestSuite struct {
+	suite.Suite
+	r *mux.Router
+}
+
+func (suite *RestTestSuite) SetupTest() {
+	dataDir, err := conf.GetDataLoc()
+	if err != nil {
+		panic(err)
+	}
+	uploadDir = filepath.Join(dataDir, "uploads")
+
 	r := mux.NewRouter()
 	r.HandleFunc("/", rootHandler).Methods(http.MethodGet, http.MethodPost)
+	r.HandleFunc("/ping", pingHandler).Methods(http.MethodGet)
+	r.HandleFunc("/streams", streamsHandler).Methods(http.MethodGet, http.MethodPost)
+	r.HandleFunc("/streams/{name}", streamHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
+	r.HandleFunc("/streams/{name}/schema", streamSchemaHandler).Methods(http.MethodGet)
+	r.HandleFunc("/tables", tablesHandler).Methods(http.MethodGet, http.MethodPost)
+	r.HandleFunc("/tables/{name}", tableHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
+	r.HandleFunc("/tables/{name}/schema", tableSchemaHandler).Methods(http.MethodGet)
+	r.HandleFunc("/rules", rulesHandler).Methods(http.MethodGet, http.MethodPost)
+	r.HandleFunc("/rules/{name}", ruleHandler).Methods(http.MethodDelete, http.MethodGet, http.MethodPut)
+	r.HandleFunc("/rules/{name}/status", getStatusRuleHandler).Methods(http.MethodGet)
+	r.HandleFunc("/rules/{name}/start", startRuleHandler).Methods(http.MethodPost)
+	r.HandleFunc("/rules/{name}/stop", stopRuleHandler).Methods(http.MethodPost)
+	r.HandleFunc("/rules/{name}/restart", restartRuleHandler).Methods(http.MethodPost)
+	r.HandleFunc("/rules/{name}/topo", getTopoRuleHandler).Methods(http.MethodGet)
+	r.HandleFunc("/ruleset/export", exportHandler).Methods(http.MethodPost)
+	r.HandleFunc("/ruleset/import", importHandler).Methods(http.MethodPost)
+	r.HandleFunc("/config/uploads", fileUploadHandler).Methods(http.MethodPost, http.MethodGet)
+	r.HandleFunc("/config/uploads/{name}", fileDeleteHandler).Methods(http.MethodDelete)
+	r.HandleFunc("/data/export", configurationExportHandler).Methods(http.MethodGet, http.MethodPost)
+	r.HandleFunc("/data/import", configurationImportHandler).Methods(http.MethodPost)
+	r.HandleFunc("/data/import/status", configurationStatusHandler).Methods(http.MethodGet)
+	suite.r = r
+}
 
+func (suite *RestTestSuite) Test_rootHandler() {
 	req, _ := http.NewRequest(http.MethodPost, "/", bytes.NewBufferString("any"))
 	w := httptest.NewRecorder()
-	r.ServeHTTP(w, req)
-	resp := w.Result()
-
-	if !reflect.DeepEqual(resp.StatusCode, 200) {
-		t.Errorf("Expect\t %v\nBut got\t%v", 200, resp.StatusCode)
-	}
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusOK, w.Code)
 }
 
-func Test_sourcesManageHandler(t *testing.T) {
+func (suite *RestTestSuite) Test_sourcesManageHandler() {
 	req, _ := http.NewRequest(http.MethodGet, "/", bytes.NewBufferString("any"))
 	w := httptest.NewRecorder()
-
-	sourcesManageHandler(w, req, ast.TypeStream)
-
-	if !reflect.DeepEqual(w.Result().StatusCode, 200) {
-		t.Errorf("Expect\t %v\nBut got\t%v", 200, w.Result().StatusCode)
-	}
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusOK, w.Code)
 
 	// get scan table
 	req, _ = http.NewRequest(http.MethodGet, "http://localhost:8080/streams?kind=scan", bytes.NewBufferString("any"))
 	w = httptest.NewRecorder()
-
-	sourcesManageHandler(w, req, ast.TypeTable)
-
-	if !reflect.DeepEqual(w.Result().StatusCode, 200) {
-		t.Errorf("Expect\t %v\nBut got\t%v", 200, w.Result().StatusCode)
-	}
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusOK, w.Code)
 
 	// get lookup table
 	req, _ = http.NewRequest(http.MethodGet, "http://localhost:8080/streams?kind=lookup", bytes.NewBufferString("any"))
 	w = httptest.NewRecorder()
-
-	sourcesManageHandler(w, req, ast.TypeTable)
-
-	if !reflect.DeepEqual(w.Result().StatusCode, 200) {
-		t.Errorf("Expect\t %v\nBut got\t%v", 200, w.Result().StatusCode)
-	}
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusOK, w.Code)
 
 	// create table
 	buf := bytes.NewBuffer([]byte(` {"sql":"CREATE TABLE alertTable() WITH (DATASOURCE=\"0\", TYPE=\"memory\", KEY=\"id\", KIND=\"lookup\")"}`))
 	req, _ = http.NewRequest(http.MethodPost, "http://localhost:8080/streams?kind=lookup", buf)
 	w = httptest.NewRecorder()
-
-	sourcesManageHandler(w, req, ast.TypeTable)
-
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusCreated, w.Code)
 	var returnVal []byte
 	returnVal, _ = io.ReadAll(w.Result().Body)
 	fmt.Printf("returnVal %s\n", string(returnVal))
@@ -99,102 +120,73 @@ func Test_sourcesManageHandler(t *testing.T) {
 	buf = bytes.NewBuffer([]byte(`{"sql":"CREATE stream alert() WITH (DATASOURCE=\"0\", TYPE=\"mqtt\")"}`))
 	req, _ = http.NewRequest(http.MethodPost, "http://localhost:8080/streams", buf)
 	w = httptest.NewRecorder()
-
-	sourcesManageHandler(w, req, ast.TypeStream)
-
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusCreated, w.Code)
 	returnVal, _ = io.ReadAll(w.Result().Body)
-
 	fmt.Printf("returnVal %s\n", string(returnVal))
 
 	// get stream
-	r := mux.NewRouter()
-	r.HandleFunc("/streams/{name}", streamHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
-
 	req, _ = http.NewRequest(http.MethodGet, "http://localhost:8080/streams/alert", bytes.NewBufferString("any"))
 	w = httptest.NewRecorder()
-	r.ServeHTTP(w, req)
-
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusOK, w.Code)
 	expect := []byte(`{"Name":"alert","Options":{"datasource":"0","type":"mqtt"},"Statement":null,"StreamFields":null,"StreamType":0}`)
 	exp := map[string]interface{}{}
 	_ = json.NewDecoder(bytes.NewBuffer(expect)).Decode(&exp)
 
 	res := map[string]interface{}{}
 	_ = json.NewDecoder(w.Result().Body).Decode(&res)
-	if !reflect.DeepEqual(exp, res) {
-		t.Errorf("Expect\t%v\nBut got\t%v", exp, res)
-	}
+	assert.Equal(suite.T(), exp, res)
+	req, _ = http.NewRequest(http.MethodGet, "http://localhost:8080/streams/alert/schema", bytes.NewBufferString("any"))
+	w = httptest.NewRecorder()
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusOK, w.Code)
 
 	// get table
-	r = mux.NewRouter()
-	r.HandleFunc("/tables/{name}", tableHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
-
 	req, _ = http.NewRequest(http.MethodGet, "http://localhost:8080/tables/alertTable", bytes.NewBufferString("any"))
 	w = httptest.NewRecorder()
-	r.ServeHTTP(w, req)
-
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusOK, w.Code)
 	expect = []byte(`{"Name":"alertTable","Options":{"datasource":"0","type":"memory", "key":"id","kind":"lookup"},"Statement":null,"StreamFields":null,"StreamType":1}`)
 	exp = map[string]interface{}{}
 	_ = json.NewDecoder(bytes.NewBuffer(expect)).Decode(&exp)
-
 	res = map[string]interface{}{}
 	_ = json.NewDecoder(w.Result().Body).Decode(&res)
+	assert.Equal(suite.T(), exp, res)
 
-	if !reflect.DeepEqual(exp, res) {
-		t.Errorf("Expect\t%v\nBut got\t%v", exp, res)
-	}
+	req, _ = http.NewRequest(http.MethodGet, "http://localhost:8080/tables/alertTable/schema", bytes.NewBufferString("any"))
+	w = httptest.NewRecorder()
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusOK, w.Code)
 
 	// put table
 	buf = bytes.NewBuffer([]byte(` {"sql":"CREATE TABLE alertTable() WITH (DATASOURCE=\"0\", TYPE=\"memory\", KEY=\"id\", KIND=\"lookup\")"}`))
-	r = mux.NewRouter()
-	r.HandleFunc("/tables/{name}", tableHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
-
 	req, _ = http.NewRequest(http.MethodPut, "http://localhost:8080/tables/alertTable", buf)
 	w = httptest.NewRecorder()
-	r.ServeHTTP(w, req)
-
-	if !reflect.DeepEqual(w.Result().StatusCode, 200) {
-		t.Errorf("Expect\t%v\nBut got\t%v", 200, w.Result().StatusCode)
-	}
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusOK, w.Code)
 
 	// put stream
 	buf = bytes.NewBuffer([]byte(`{"sql":"CREATE stream alert() WITH (DATASOURCE=\"0\", TYPE=\"httppull\")"}`))
-	r = mux.NewRouter()
-	r.HandleFunc("/streams/{name}", streamHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
-
 	req, _ = http.NewRequest(http.MethodPut, "http://localhost:8080/streams/alert", buf)
 	w = httptest.NewRecorder()
-	r.ServeHTTP(w, req)
-
-	if !reflect.DeepEqual(w.Result().StatusCode, 200) {
-		t.Errorf("Expect\t%v\nBut got\t%v", 200, w.Result().StatusCode)
-	}
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusOK, w.Code)
 
 	// drop table
-	r = mux.NewRouter()
-	r.HandleFunc("/tables/{name}", tableHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
-
 	req, _ = http.NewRequest(http.MethodDelete, "http://localhost:8080/tables/alertTable", bytes.NewBufferString("any"))
 	w = httptest.NewRecorder()
-	r.ServeHTTP(w, req)
-
-	if !reflect.DeepEqual(w.Result().StatusCode, 200) {
-		t.Errorf("Expect\t%v\nBut got\t%v", 200, w.Result().StatusCode)
-	}
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusOK, w.Code)
 
 	// drop stream
-	r = mux.NewRouter()
-	r.HandleFunc("/streams/{name}", streamHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
-
 	req, _ = http.NewRequest(http.MethodDelete, "http://localhost:8080/streams/alert", bytes.NewBufferString("any"))
 	w = httptest.NewRecorder()
-	r.ServeHTTP(w, req)
-
-	if !reflect.DeepEqual(w.Result().StatusCode, 200) {
-		t.Errorf("Expect\t%v\nBut got\t%v", 200, w.Result().StatusCode)
-	}
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusOK, w.Code)
 }
 
-func Test_rulesManageHandler(t *testing.T) {
+func (suite *RestTestSuite) Test_rulesManageHandler() {
 	// Start rules
 	if rules, err := ruleProcessor.GetAllRules(); err != nil {
 		logger.Infof("Start rules error: %s", err)
@@ -215,21 +207,10 @@ func Test_rulesManageHandler(t *testing.T) {
 		}
 	}
 
-	r := mux.NewRouter()
-	r.HandleFunc("/rules", rulesHandler).Methods(http.MethodGet, http.MethodPost)
-	r.HandleFunc("/streams", streamsHandler).Methods(http.MethodGet, http.MethodPost)
-	r.HandleFunc("/rules/{name}", ruleHandler).Methods(http.MethodDelete, http.MethodGet, http.MethodPut)
-	r.HandleFunc("/streams/{name}", streamHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
-	r.HandleFunc("/rules/{name}/status", getStatusRuleHandler).Methods(http.MethodGet)
-	r.HandleFunc("/rules/{name}/topo", getTopoRuleHandler).Methods(http.MethodGet)
-	r.HandleFunc("/rules/{name}/start", startRuleHandler).Methods(http.MethodPost)
-	r.HandleFunc("/rules/{name}/stop", stopRuleHandler).Methods(http.MethodPost)
-	r.HandleFunc("/rules/{name}/restart", restartRuleHandler).Methods(http.MethodPost)
-
 	buf1 := bytes.NewBuffer([]byte(`{"sql":"CREATE stream alert() WITH (DATASOURCE=\"0\", TYPE=\"mqtt\")"}`))
 	req1, _ := http.NewRequest(http.MethodPost, "http://localhost:8080/streams", buf1)
 	w1 := httptest.NewRecorder()
-	r.ServeHTTP(w1, req1)
+	suite.r.ServeHTTP(w1, req1)
 
 	// create rule with trigger false
 	ruleJson := `{"id": "rule1","triggered": false,"sql": "select * from alert","actions": [{"log": {}}]}`
@@ -237,12 +218,12 @@ func Test_rulesManageHandler(t *testing.T) {
 	buf2 := bytes.NewBuffer([]byte(ruleJson))
 	req2, _ := http.NewRequest(http.MethodPost, "http://localhost:8080/rules", buf2)
 	w2 := httptest.NewRecorder()
-	r.ServeHTTP(w2, req2)
+	suite.r.ServeHTTP(w2, req2)
 
 	// get all rules
 	req3, _ := http.NewRequest(http.MethodGet, "http://localhost:8080/rules", bytes.NewBufferString("any"))
 	w3 := httptest.NewRecorder()
-	r.ServeHTTP(w3, req3)
+	suite.r.ServeHTTP(w3, req3)
 
 	_, _ = io.ReadAll(w3.Result().Body)
 
@@ -252,11 +233,9 @@ func Test_rulesManageHandler(t *testing.T) {
 	buf2 = bytes.NewBuffer([]byte(ruleJson))
 	req1, _ = http.NewRequest(http.MethodPut, "http://localhost:8080/rules/rule1", buf2)
 	w1 = httptest.NewRecorder()
-	r.ServeHTTP(w1, req1)
+	suite.r.ServeHTTP(w1, req1)
 
-	if w1.Result().StatusCode != http.StatusOK {
-		t.Errorf("Expect\t%v\nBut got\t%v", 200, w1.Result().StatusCode)
-	}
+	assert.Equal(suite.T(), http.StatusOK, w1.Code)
 
 	// update wron rule
 	ruleJson = `{"id": "rule1","sql": "select * from alert1","actions": [{"nop": {}}]}`
@@ -264,91 +243,74 @@ func Test_rulesManageHandler(t *testing.T) {
 	buf2 = bytes.NewBuffer([]byte(ruleJson))
 	req1, _ = http.NewRequest(http.MethodPut, "http://localhost:8080/rules/rule1", buf2)
 	w1 = httptest.NewRecorder()
-	r.ServeHTTP(w1, req1)
+	suite.r.ServeHTTP(w1, req1)
 
-	if w1.Result().StatusCode != http.StatusBadRequest {
-		t.Errorf("Expect\t%v\nBut got\t%v", 200, w1.Result().StatusCode)
-	}
+	assert.Equal(suite.T(), http.StatusBadRequest, w1.Code)
 
 	// get rule
 	req1, _ = http.NewRequest(http.MethodGet, "http://localhost:8080/rules/rule1", bytes.NewBufferString("any"))
 	w1 = httptest.NewRecorder()
-	r.ServeHTTP(w1, req1)
+	suite.r.ServeHTTP(w1, req1)
 
 	returnVal, _ := io.ReadAll(w1.Result().Body)
 	expect := `{"id": "rule1","triggered": false,"sql": "select * from alert","actions": [{"nop": {}}]}`
-	if string(returnVal) != expect {
-		t.Errorf("Expect\t%v\nBut got\t%v", expect, string(returnVal))
-	}
+	assert.Equal(suite.T(), expect, string(returnVal))
 
 	// get rule status
 	req1, _ = http.NewRequest(http.MethodGet, "http://localhost:8080/rules/rule1/status", bytes.NewBufferString("any"))
 	w1 = httptest.NewRecorder()
-	r.ServeHTTP(w1, req1)
+	suite.r.ServeHTTP(w1, req1)
 	returnVal, _ = io.ReadAll(w1.Result().Body) //nolint
 
 	// get rule topo
 	req1, _ = http.NewRequest(http.MethodGet, "http://localhost:8080/rules/rule1/topo", bytes.NewBufferString("any"))
 	w1 = httptest.NewRecorder()
-	r.ServeHTTP(w1, req1)
+	suite.r.ServeHTTP(w1, req1)
 	returnVal, _ = io.ReadAll(w1.Result().Body)
 
 	expect = `{"sources":["source_alert"],"edges":{"op_2_project":["sink_nop_0"],"source_alert":["op_2_project"]}}`
-	if string(returnVal) != expect {
-		t.Errorf("Expect\t%v\nBut got\t%v", expect, string(returnVal))
-	}
+	assert.Equal(suite.T(), expect, string(returnVal))
 
 	// start rule
 	req1, _ = http.NewRequest(http.MethodPost, "http://localhost:8080/rules/rule1/start", bytes.NewBufferString("any"))
 	w1 = httptest.NewRecorder()
-	r.ServeHTTP(w1, req1)
+	suite.r.ServeHTTP(w1, req1)
 	returnVal, _ = io.ReadAll(w1.Result().Body)
 
 	expect = `Rule rule1 was started`
-	if string(returnVal) != expect {
-		t.Errorf("Expect\t%v\nBut got\t%v", expect, string(returnVal))
-	}
+	assert.Equal(suite.T(), expect, string(returnVal))
 
 	// stop rule
 	req1, _ = http.NewRequest(http.MethodPost, "http://localhost:8080/rules/rule1/stop", bytes.NewBufferString("any"))
 	w1 = httptest.NewRecorder()
-	r.ServeHTTP(w1, req1)
+	suite.r.ServeHTTP(w1, req1)
 	returnVal, _ = io.ReadAll(w1.Result().Body)
 
 	expect = `Rule rule1 was stopped.`
-	if string(returnVal) != expect {
-		t.Errorf("Expect\t%v\nBut got\t%v", expect, string(returnVal))
-	}
+	assert.Equal(suite.T(), expect, string(returnVal))
 
 	// restart rule
 	req1, _ = http.NewRequest(http.MethodPost, "http://localhost:8080/rules/rule1/restart", bytes.NewBufferString("any"))
 	w1 = httptest.NewRecorder()
-	r.ServeHTTP(w1, req1)
+	suite.r.ServeHTTP(w1, req1)
 	returnVal, _ = io.ReadAll(w1.Result().Body)
 
 	expect = `Rule rule1 was restarted`
-	if string(returnVal) != expect {
-		t.Errorf("Expect\t%v\nBut got\t%v", expect, string(returnVal))
-	}
+	assert.Equal(suite.T(), expect, string(returnVal))
 
 	// delete rule
 	req1, _ = http.NewRequest(http.MethodDelete, "http://localhost:8080/rules/rule1", bytes.NewBufferString("any"))
 	w1 = httptest.NewRecorder()
-	r.ServeHTTP(w1, req1)
+	suite.r.ServeHTTP(w1, req1)
 
 	// drop stream
 	req, _ := http.NewRequest(http.MethodDelete, "http://localhost:8080/streams/alert", bytes.NewBufferString("any"))
 	w := httptest.NewRecorder()
-	r.ServeHTTP(w, req)
+	suite.r.ServeHTTP(w, req)
 }
 
-func Test_ruleSetImport(t *testing.T) {
-	r := mux.NewRouter()
-	r.HandleFunc("/ruleset/import", importHandler).Methods(http.MethodPost)
-	r.HandleFunc("/ruleset/export", exportHandler).Methods(http.MethodPost)
-
+func (suite *RestTestSuite) Test_ruleSetImport() {
 	ruleJson := `{"streams":{"plugin":"\n              CREATE STREAM plugin\n              ()\n              WITH (FORMAT=\"json\", CONF_KEY=\"default\", TYPE=\"mqtt\", SHARED=\"false\", );\n          "},"tables":{},"rules":{"rule1":"{\"id\":\"rule1\",\"name\":\"\",\"sql\":\"select name from plugin\",\"actions\":[{\"log\":{\"runAsync\":false,\"omitIfEmpty\":false,\"sendSingle\":true,\"bufferLength\":1024,\"enableCache\":false,\"format\":\"json\"}}],\"options\":{\"restartStrategy\":{}}}"}}`
-
 	ruleSetJson := map[string]string{
 		"content": ruleJson,
 	}
@@ -356,11 +318,78 @@ func Test_ruleSetImport(t *testing.T) {
 	buf2 := bytes.NewBuffer(buf)
 	req1, _ := http.NewRequest(http.MethodPost, "http://localhost:8080/ruleset/import", buf2)
 	w1 := httptest.NewRecorder()
-	r.ServeHTTP(w1, req1)
+	suite.r.ServeHTTP(w1, req1)
+	assert.Equal(suite.T(), http.StatusOK, w1.Code)
 
 	req1, _ = http.NewRequest(http.MethodPost, "http://localhost:8080/ruleset/export", bytes.NewBufferString("any"))
 	w1 = httptest.NewRecorder()
-	r.ServeHTTP(w1, req1)
-	returnVal, _ := io.ReadAll(w1.Result().Body)
-	fmt.Printf("%s\n", string(returnVal))
+	suite.r.ServeHTTP(w1, req1)
+	assert.Equal(suite.T(), http.StatusOK, w1.Code)
+}
+
+func (suite *RestTestSuite) Test_dataImport() {
+	file := "rpc_test_data/data/import_configuration.json"
+	f, err := os.Open(file)
+	if err != nil {
+		fmt.Printf("fail to open file %s: %v", file, err)
+		return
+	}
+	defer f.Close()
+	buffer := new(bytes.Buffer)
+	_, err = io.Copy(buffer, f)
+	if err != nil {
+		fmt.Printf("fail to convert file %s: %v", file, err)
+		return
+	}
+	content := buffer.Bytes()
+	ruleSetJson := map[string]string{
+		"content": string(content),
+	}
+	buf, _ := json.Marshal(ruleSetJson)
+	buf2 := bytes.NewBuffer(buf)
+	req, _ := http.NewRequest(http.MethodPost, "http://localhost:8080/data/import", buf2)
+	w := httptest.NewRecorder()
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusOK, w.Code)
+
+	req, _ = http.NewRequest(http.MethodGet, "http://localhost:8080/data/import/status", bytes.NewBufferString("any"))
+	w = httptest.NewRecorder()
+	suite.r.ServeHTTP(w, req)
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusOK, w.Code)
+
+	req, _ = http.NewRequest(http.MethodGet, "http://localhost:8080/data/export", bytes.NewBufferString("any"))
+	w = httptest.NewRecorder()
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusOK, w.Code)
+
+	req, _ = http.NewRequest(http.MethodPost, "http://localhost:8080/data/import?partial=1", bytes.NewBuffer(buf))
+	w = httptest.NewRecorder()
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusOK, w.Code)
+}
+
+func (suite *RestTestSuite) Test_fileUpload() {
+	fileJson := `{"Name": "test.txt", "Content": "test"}`
+	req, _ := http.NewRequest(http.MethodPost, "http://localhost:8080/config/uploads", bytes.NewBufferString(fileJson))
+	req.Header["Content-Type"] = []string{"application/json"}
+	os.Mkdir(uploadDir, 0o777)
+	w := httptest.NewRecorder()
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusCreated, w.Code)
+
+	req, _ = http.NewRequest(http.MethodGet, "http://localhost:8080/config/uploads", bytes.NewBufferString("any"))
+	w = httptest.NewRecorder()
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusOK, w.Code)
+
+	req, _ = http.NewRequest(http.MethodDelete, "http://localhost:8080/config/uploads/test.txt", bytes.NewBufferString("any"))
+	w = httptest.NewRecorder()
+	suite.r.ServeHTTP(w, req)
+	assert.Equal(suite.T(), http.StatusOK, w.Code)
+	os.Remove(uploadDir)
+}
+
+func TestRestTestSuite(t *testing.T) {
+	suite.Run(t, new(RestTestSuite))
 }

+ 192 - 0
internal/server/rpc_test.go

@@ -0,0 +1,192 @@
+// Copyright 2023 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package server
+
+import (
+	"os"
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/suite"
+
+	"github.com/lf-edge/ekuiper/internal/meta"
+	"github.com/lf-edge/ekuiper/internal/pkg/model"
+	"github.com/lf-edge/ekuiper/internal/plugin/native"
+	"github.com/lf-edge/ekuiper/internal/plugin/portable"
+	"github.com/lf-edge/ekuiper/internal/schema"
+	"github.com/lf-edge/ekuiper/internal/service"
+)
+
+type ServerTestSuite struct {
+	suite.Suite
+	s *Server
+}
+
+func (suite *ServerTestSuite) SetupTest() {
+	suite.s = new(Server)
+	nativeManager, _ = native.InitManager()
+	portableManager, _ = portable.InitManager()
+	serviceManager, _ = service.InitManager()
+	_ = schema.InitRegistry()
+	meta.InitYamlConfigManager()
+}
+
+func (suite *ServerTestSuite) TestStream() {
+	sql := `Create Stream test () WITH (DATASOURCE="../internal/server/rpc_test_data/test.json", FORMAT="JSON", type="file");`
+	var reply string
+	err := suite.s.Stream(sql, &reply)
+	assert.Nil(suite.T(), err)
+	assert.Equal(suite.T(), "Stream test is created.\n", reply)
+
+	reply = ""
+	sql = "show streams;"
+	err = suite.s.Stream(sql, &reply)
+	assert.Nil(suite.T(), err)
+
+	reply = ""
+	sql = "SELECT * FROM test;"
+	err = suite.s.CreateQuery(sql, &reply)
+	assert.Nil(suite.T(), err)
+	assert.Equal(suite.T(), "Query was submit successfully.", reply)
+
+	var result string = ""
+	for i := 0; i < 5; i++ {
+		var queryresult string
+		time.Sleep(time.Second)
+		err = suite.s.GetQueryResult("test", &queryresult)
+		assert.Nil(suite.T(), err)
+		result += queryresult
+	}
+	assert.Equal(suite.T(), "[{\"humidity\":50,\"id\":1,\"temperature\":20}]\n[{\"humidity\":51,\"id\":2,\"temperature\":21}]\n[{\"humidity\":52,\"id\":3,\"temperature\":22}]\n[{\"humidity\":53,\"id\":4,\"temperature\":23}]", result)
+	stopQuery()
+}
+
+func (suite *ServerTestSuite) TestRule() {
+	sql := `Create Stream test () WITH (DATASOURCE="../internal/server/rpc_test_data/test.json", FORMAT="JSON", type="file");`
+	var reply string
+	err := suite.s.Stream(sql, &reply)
+	assert.Nil(suite.T(), err)
+	assert.Equal(suite.T(), "Stream test is created.\n", reply)
+
+	reply = ""
+	rule := `{
+			  "sql": "SELECT * from test;",
+			  "actions": [{
+				"file": {
+				  "path": "../internal/server/rpc_test_data/data/result.txt",
+				  "interval": 5000,
+				  "fileType": "lines",
+				  "format": "json"
+				}
+			  }]
+	}`
+	ruleId := "myRule"
+	args := &model.RPCArgDesc{Name: ruleId, Json: rule}
+	err = suite.s.CreateRule(args, &reply)
+	assert.Nil(suite.T(), err)
+	assert.Equal(suite.T(), "Rule myRule was created successfully, please use 'bin/kuiper getstatus rule myRule' command to get rule status.", reply)
+
+	reply = ""
+	err = suite.s.GetStatusRule(ruleId, &reply)
+	assert.Nil(suite.T(), err)
+
+	reply = ""
+	err = suite.s.ShowRules(1, &reply)
+	assert.Nil(suite.T(), err)
+
+	reply = ""
+	err = suite.s.DescRule(ruleId, &reply)
+	assert.Nil(suite.T(), err)
+	assert.Equal(suite.T(), "{\n  \"sql\": \"SELECT * from test;\",\n  \"actions\": [\n    {\n      \"file\": {\n        \"path\": \"../internal/server/rpc_test_data/data/result.txt\",\n        \"interval\": 5000,\n        \"fileType\": \"lines\",\n        \"format\": \"json\"\n      }\n    }\n  ]\n}\n", reply)
+
+	reply = ""
+	err = suite.s.GetTopoRule(ruleId, &reply)
+	assert.Nil(suite.T(), err)
+	assert.Equal(suite.T(), "{\n  \"sources\": [\n    \"source_test\"\n  ],\n  \"edges\": {\n    \"op_2_project\": [\n      \"sink_file_0\"\n    ],\n    \"source_test\": [\n      \"op_2_project\"\n    ]\n  }\n}", reply)
+
+	reply = ""
+	err = suite.s.StopRule(ruleId, &reply)
+	assert.Nil(suite.T(), err)
+	assert.Equal(suite.T(), "Rule myRule was stopped.", reply)
+
+	reply = ""
+	err = suite.s.StartRule(ruleId, &reply)
+	assert.Nil(suite.T(), err)
+	assert.Equal(suite.T(), "Rule myRule was started", reply)
+
+	reply = ""
+	err = suite.s.RestartRule(ruleId, &reply)
+	assert.Nil(suite.T(), err)
+	assert.Equal(suite.T(), "Rule myRule was restarted.", reply)
+
+	reply = ""
+	err = suite.s.DropRule(ruleId, &reply)
+	assert.Nil(suite.T(), err)
+	assert.Equal(suite.T(), "Rule myRule is dropped.", reply)
+}
+
+func (suite *ServerTestSuite) TestImportAndExport() {
+	file := "rpc_test_data/import.json"
+	var reply string
+	err := suite.s.Import(file, &reply)
+	assert.Nil(suite.T(), err)
+	assert.Equal(suite.T(), "imported 1 streams, 0 tables and 1 rules", reply)
+
+	reply = ""
+	file = "rpc_test_data/export.json"
+	err = suite.s.Export(file, &reply)
+	assert.Nil(suite.T(), err)
+	os.Remove(file)
+}
+
+func (suite *ServerTestSuite) TestConfigurarion() {
+	importArg := model.ImportDataDesc{
+		FileName: "rpc_test_data/import_configuration.json",
+		Stop:     false,
+		Partial:  false,
+	}
+	var reply string
+	err := suite.s.ImportConfiguration(&importArg, &reply)
+	assert.Nil(suite.T(), err)
+	assert.Equal(suite.T(), "{\n  \"ErrorMsg\": \"\",\n  \"ConfigResponse\": {\n    \"streams\": {},\n    \"tables\": {},\n    \"rules\": {},\n    \"nativePlugins\": {},\n    \"portablePlugins\": {},\n    \"sourceConfig\": {},\n    \"sinkConfig\": {},\n    \"connectionConfig\": {},\n    \"Service\": {},\n    \"Schema\": {}\n  }\n}", reply)
+
+	reply = ""
+	err = suite.s.GetStatusImport(1, &reply)
+	assert.Nil(suite.T(), err)
+	assert.Equal(suite.T(), "{\n  \"streams\": {},\n  \"tables\": {},\n  \"rules\": {},\n  \"nativePlugins\": {},\n  \"portablePlugins\": {},\n  \"sourceConfig\": {},\n  \"sinkConfig\": {},\n  \"connectionConfig\": {},\n  \"Service\": {},\n  \"Schema\": {}\n}", reply)
+
+	reply = ""
+	exportArg := model.ExportDataDesc{
+		FileName: "rpc_test_data/export_configuration.json",
+		Rules:    []string{},
+	}
+	err = suite.s.ExportConfiguration(&exportArg, &reply)
+	assert.Nil(suite.T(), err)
+	assert.Equal(suite.T(), "export configuration success", reply)
+	os.Remove("rpc_test_data/export_configuration.json")
+}
+
+func (suite *ServerTestSuite) TearDownTest() {
+	// Clean up
+	sql := "DROP STREAM test;"
+	var reply string
+	_ = suite.s.Stream(sql, &reply)
+	_ = suite.s.DropRule("myRule", &reply)
+}
+
+func TestServerTestSuite(t *testing.T) {
+	suite.Run(t, new(ServerTestSuite))
+}

+ 9 - 0
internal/server/rpc_test_data/import.json

@@ -0,0 +1,9 @@
+{
+  "streams": {
+    "test": "CREATE STREAM test () WITH (DATASOURCE=\"users\", FORMAT=\"JSON\")"
+  },
+  "tables": {},
+  "rules": {
+    "myRule": "{\"id\": \"myRule\",\"sql\": \"SELECT * FROM test\",\"actions\": [{\"log\": {}}]}"
+  }
+}

+ 24 - 0
internal/server/rpc_test_data/import_configuration.json

@@ -0,0 +1,24 @@
+{
+  "streams": {
+    "test": "CREATE STREAM test () WITH (DATASOURCE=\"users\", FORMAT=\"JSON\")"
+  },
+  "tables": {
+  },
+  "rules": {
+    "myRule": "{\"id\": \"myRule\",\"sql\": \"SELECT * FROM test\",\"actions\": [{\"log\": {}}]}"
+  },
+  "nativePlugins":{
+  },
+  "portablePlugins":{
+  },
+  "sourceConfig":{
+  },
+  "sinkConfig":{
+  },
+  "connectionConfig":{
+  },
+  "Service":{
+  },
+  "Schema":{
+  }
+}

+ 22 - 0
internal/server/rpc_test_data/test.json

@@ -0,0 +1,22 @@
+[
+  {
+    "id": 1,
+    "temperature": 20,
+    "humidity": 50
+  },
+  {
+    "id": 2,
+    "temperature": 21,
+    "humidity": 51
+  },
+  {
+    "id": 3,
+    "temperature": 22,
+    "humidity": 52
+  },
+  {
+    "id": 4,
+    "temperature": 23,
+    "humidity": 53
+  }
+]

+ 65 - 0
internal/server/schema_init_test.go

@@ -0,0 +1,65 @@
+// Copyright 2023 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package server
+
+import (
+	"bytes"
+	"net/http"
+	"net/http/httptest"
+	"testing"
+
+	"github.com/gorilla/mux"
+	"github.com/stretchr/testify/suite"
+)
+
+type SchemaTestSuite struct {
+	suite.Suite
+	sc schemaComp
+	r  *mux.Router
+}
+
+func (suite *SchemaTestSuite) SetupTest() {
+	suite.sc = schemaComp{}
+	suite.r = mux.NewRouter()
+	suite.sc.register()
+	suite.sc.rest(suite.r)
+}
+
+func (suite *SchemaTestSuite) TestSchema() {
+	proto := `{"name": "test", "Content": "message ListOfDoubles {repeated double doubles=1;}"}`
+	req, _ := http.NewRequest(http.MethodPost, "/schemas/protobuf", bytes.NewBufferString(proto))
+	w := httptest.NewRecorder()
+	suite.r.ServeHTTP(w, req)
+	suite.Equal(http.StatusCreated, w.Code)
+
+	req, _ = http.NewRequest(http.MethodGet, "/schemas/protobuf", bytes.NewBufferString("any"))
+	w = httptest.NewRecorder()
+	suite.r.ServeHTTP(w, req)
+	suite.Equal(http.StatusOK, w.Code)
+
+	req, _ = http.NewRequest(http.MethodGet, "/schemas/protobuf/test", bytes.NewBufferString("any"))
+	w = httptest.NewRecorder()
+	suite.r.ServeHTTP(w, req)
+	suite.Equal(http.StatusOK, w.Code)
+
+	req, _ = http.NewRequest(http.MethodDelete, "/schemas/protobuf/test", bytes.NewBufferString("any"))
+	w = httptest.NewRecorder()
+	suite.r.ServeHTTP(w, req)
+	suite.Equal(http.StatusOK, w.Code)
+}
+
+func TestSchemaTestSuite(t *testing.T) {
+	suite.Run(t, new(SchemaTestSuite))
+}