rpc_test.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. // Copyright 2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package server
  15. import (
  16. "os"
  17. "testing"
  18. "time"
  19. "github.com/stretchr/testify/assert"
  20. "github.com/stretchr/testify/suite"
  21. "github.com/lf-edge/ekuiper/internal/meta"
  22. "github.com/lf-edge/ekuiper/internal/pkg/model"
  23. "github.com/lf-edge/ekuiper/internal/plugin/native"
  24. "github.com/lf-edge/ekuiper/internal/plugin/portable"
  25. "github.com/lf-edge/ekuiper/internal/schema"
  26. "github.com/lf-edge/ekuiper/internal/service"
  27. )
  28. type ServerTestSuite struct {
  29. suite.Suite
  30. s *Server
  31. }
  32. func (suite *ServerTestSuite) SetupTest() {
  33. suite.s = new(Server)
  34. nativeManager, _ = native.InitManager()
  35. portableManager, _ = portable.InitManager()
  36. serviceManager, _ = service.InitManager()
  37. _ = schema.InitRegistry()
  38. meta.InitYamlConfigManager()
  39. }
  40. func (suite *ServerTestSuite) TestStream() {
  41. sql := `Create Stream test () WITH (DATASOURCE="../internal/server/rpc_test_data/test.json", FORMAT="JSON", type="file");`
  42. var reply string
  43. err := suite.s.Stream(sql, &reply)
  44. assert.Nil(suite.T(), err)
  45. assert.Equal(suite.T(), "Stream test is created.\n", reply)
  46. reply = ""
  47. sql = "show streams;"
  48. err = suite.s.Stream(sql, &reply)
  49. assert.Nil(suite.T(), err)
  50. reply = ""
  51. sql = "SELECT * FROM test;"
  52. err = suite.s.CreateQuery(sql, &reply)
  53. assert.Nil(suite.T(), err)
  54. assert.Equal(suite.T(), "Query was submit successfully.", reply)
  55. var result string = ""
  56. for i := 0; i < 5; i++ {
  57. var queryresult string
  58. time.Sleep(time.Second)
  59. err = suite.s.GetQueryResult("test", &queryresult)
  60. assert.Nil(suite.T(), err)
  61. result += queryresult
  62. }
  63. assert.Equal(suite.T(), "[{\"humidity\":50,\"id\":1,\"temperature\":20}]\n[{\"humidity\":51,\"id\":2,\"temperature\":21}]\n[{\"humidity\":52,\"id\":3,\"temperature\":22}]\n[{\"humidity\":53,\"id\":4,\"temperature\":23}]", result)
  64. stopQuery()
  65. }
  66. func (suite *ServerTestSuite) TestRule() {
  67. sql := `Create Stream test () WITH (DATASOURCE="../internal/server/rpc_test_data/test.json", FORMAT="JSON", type="file");`
  68. var reply string
  69. err := suite.s.Stream(sql, &reply)
  70. assert.Nil(suite.T(), err)
  71. assert.Equal(suite.T(), "Stream test is created.\n", reply)
  72. reply = ""
  73. rule := `{
  74. "sql": "SELECT * from test;",
  75. "actions": [{
  76. "file": {
  77. "path": "../internal/server/rpc_test_data/data/result.txt",
  78. "interval": 5000,
  79. "fileType": "lines",
  80. "format": "json"
  81. }
  82. }]
  83. }`
  84. ruleId := "myRule"
  85. args := &model.RPCArgDesc{Name: ruleId, Json: rule}
  86. err = suite.s.CreateRule(args, &reply)
  87. assert.Nil(suite.T(), err)
  88. assert.Equal(suite.T(), "Rule myRule was created successfully, please use 'bin/kuiper getstatus rule myRule' command to get rule status.", reply)
  89. reply = ""
  90. err = suite.s.GetStatusRule(ruleId, &reply)
  91. assert.Nil(suite.T(), err)
  92. reply = ""
  93. err = suite.s.ShowRules(1, &reply)
  94. assert.Nil(suite.T(), err)
  95. reply = ""
  96. err = suite.s.DescRule(ruleId, &reply)
  97. assert.Nil(suite.T(), err)
  98. assert.Equal(suite.T(), "{\n \"sql\": \"SELECT * from test;\",\n \"actions\": [\n {\n \"file\": {\n \"path\": \"../internal/server/rpc_test_data/data/result.txt\",\n \"interval\": 5000,\n \"fileType\": \"lines\",\n \"format\": \"json\"\n }\n }\n ]\n}\n", reply)
  99. reply = ""
  100. err = suite.s.GetTopoRule(ruleId, &reply)
  101. assert.Nil(suite.T(), err)
  102. assert.Equal(suite.T(), "{\n \"sources\": [\n \"source_test\"\n ],\n \"edges\": {\n \"op_2_project\": [\n \"sink_file_0\"\n ],\n \"source_test\": [\n \"op_2_project\"\n ]\n }\n}", reply)
  103. reply = ""
  104. err = suite.s.StopRule(ruleId, &reply)
  105. assert.Nil(suite.T(), err)
  106. assert.Equal(suite.T(), "Rule myRule was stopped.", reply)
  107. reply = ""
  108. err = suite.s.StartRule(ruleId, &reply)
  109. assert.Nil(suite.T(), err)
  110. assert.Equal(suite.T(), "Rule myRule was started", reply)
  111. reply = ""
  112. err = suite.s.RestartRule(ruleId, &reply)
  113. assert.Nil(suite.T(), err)
  114. assert.Equal(suite.T(), "Rule myRule was restarted.", reply)
  115. reply = ""
  116. err = suite.s.DropRule(ruleId, &reply)
  117. assert.Nil(suite.T(), err)
  118. assert.Equal(suite.T(), "Rule myRule is dropped.", reply)
  119. }
  120. func (suite *ServerTestSuite) TestImportAndExport() {
  121. file := "rpc_test_data/import.json"
  122. var reply string
  123. err := suite.s.Import(file, &reply)
  124. assert.Nil(suite.T(), err)
  125. assert.Equal(suite.T(), "imported 1 streams, 0 tables and 1 rules", reply)
  126. reply = ""
  127. file = "rpc_test_data/export.json"
  128. err = suite.s.Export(file, &reply)
  129. assert.Nil(suite.T(), err)
  130. os.Remove(file)
  131. }
  132. func (suite *ServerTestSuite) TestConfigurarion() {
  133. importArg := model.ImportDataDesc{
  134. FileName: "rpc_test_data/import_configuration.json",
  135. Stop: false,
  136. Partial: false,
  137. }
  138. var reply string
  139. err := suite.s.ImportConfiguration(&importArg, &reply)
  140. assert.Nil(suite.T(), err)
  141. assert.Equal(suite.T(), "{\n \"ErrorMsg\": \"\",\n \"ConfigResponse\": {\n \"streams\": {},\n \"tables\": {},\n \"rules\": {},\n \"nativePlugins\": {},\n \"portablePlugins\": {},\n \"sourceConfig\": {},\n \"sinkConfig\": {},\n \"connectionConfig\": {},\n \"Service\": {},\n \"Schema\": {}\n }\n}", reply)
  142. reply = ""
  143. err = suite.s.GetStatusImport(1, &reply)
  144. assert.Nil(suite.T(), err)
  145. assert.Equal(suite.T(), "{\n \"streams\": {},\n \"tables\": {},\n \"rules\": {},\n \"nativePlugins\": {},\n \"portablePlugins\": {},\n \"sourceConfig\": {},\n \"sinkConfig\": {},\n \"connectionConfig\": {},\n \"Service\": {},\n \"Schema\": {}\n}", reply)
  146. reply = ""
  147. exportArg := model.ExportDataDesc{
  148. FileName: "rpc_test_data/export_configuration.json",
  149. Rules: []string{},
  150. }
  151. err = suite.s.ExportConfiguration(&exportArg, &reply)
  152. assert.Nil(suite.T(), err)
  153. assert.Equal(suite.T(), "export configuration success", reply)
  154. os.Remove("rpc_test_data/export_configuration.json")
  155. }
  156. func (suite *ServerTestSuite) TearDownTest() {
  157. // Clean up
  158. sql := "DROP STREAM test;"
  159. var reply string
  160. _ = suite.s.Stream(sql, &reply)
  161. _ = suite.s.DropRule("myRule", &reply)
  162. }
  163. func TestServerTestSuite(t *testing.T) {
  164. suite.Run(t, new(ServerTestSuite))
  165. }