rpc_test.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. // Copyright 2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package server
  15. import (
  16. "os"
  17. "testing"
  18. "time"
  19. "github.com/stretchr/testify/assert"
  20. "github.com/stretchr/testify/suite"
  21. "github.com/lf-edge/ekuiper/internal/meta"
  22. "github.com/lf-edge/ekuiper/internal/pkg/model"
  23. "github.com/lf-edge/ekuiper/internal/plugin/native"
  24. "github.com/lf-edge/ekuiper/internal/plugin/portable"
  25. "github.com/lf-edge/ekuiper/internal/schema"
  26. "github.com/lf-edge/ekuiper/internal/service"
  27. )
  28. type ServerTestSuite struct {
  29. suite.Suite
  30. s *Server
  31. }
  32. func (suite *ServerTestSuite) SetupTest() {
  33. suite.s = new(Server)
  34. nativeManager, _ = native.InitManager()
  35. portableManager, _ = portable.InitManager()
  36. serviceManager, _ = service.InitManager()
  37. _ = schema.InitRegistry()
  38. meta.InitYamlConfigManager()
  39. }
  40. func (suite *ServerTestSuite) TestStream() {
  41. sql := `Create Stream test () WITH (DATASOURCE="../internal/server/rpc_test_data/test.json", FORMAT="JSON", type="file");`
  42. var reply string
  43. err := suite.s.Stream(sql, &reply)
  44. assert.Nil(suite.T(), err)
  45. assert.Equal(suite.T(), "Stream test is created.\n", reply)
  46. reply = ""
  47. sql = "show streams;"
  48. err = suite.s.Stream(sql, &reply)
  49. assert.Nil(suite.T(), err)
  50. reply = ""
  51. sql = "SELECT * FROM test;"
  52. err = suite.s.CreateQuery(sql, &reply)
  53. assert.Nil(suite.T(), err)
  54. assert.Equal(suite.T(), "Query was submit successfully.", reply)
  55. var result string = ""
  56. for i := 0; i < 5; i++ {
  57. var queryresult string
  58. time.Sleep(time.Second)
  59. err = suite.s.GetQueryResult("test", &queryresult)
  60. assert.Nil(suite.T(), err)
  61. result += queryresult
  62. }
  63. assert.Equal(suite.T(), "[{\"humidity\":50,\"id\":1,\"temperature\":20}]\n[{\"humidity\":51,\"id\":2,\"temperature\":21}]\n[{\"humidity\":52,\"id\":3,\"temperature\":22}]\n[{\"humidity\":53,\"id\":4,\"temperature\":23}]", result)
  64. stopQuery()
  65. }
  66. func (suite *ServerTestSuite) TestRule() {
  67. sql := `Create Stream test () WITH (DATASOURCE="../internal/server/rpc_test_data/test.json", FORMAT="JSON", type="file");`
  68. var reply string
  69. err := suite.s.Stream(sql, &reply)
  70. assert.Nil(suite.T(), err)
  71. assert.Equal(suite.T(), "Stream test is created.\n", reply)
  72. reply = ""
  73. rule := `{
  74. "sql": "SELECT * from test;",
  75. "actions": [{
  76. "file": {
  77. "path": "../internal/server/rpc_test_data/data/result.txt",
  78. "interval": 5000,
  79. "fileType": "lines",
  80. "format": "json"
  81. }
  82. }]
  83. }`
  84. ruleId := "myRule"
  85. args := &model.RPCArgDesc{Name: ruleId, Json: rule}
  86. err = suite.s.ValidateRule(args, &reply)
  87. assert.Nil(suite.T(), err)
  88. assert.Equal(suite.T(), "The rule has been successfully validated and is confirmed to be correct.", reply)
  89. reply = ""
  90. rule = `{
  91. "sql": "SELECT * from test;"
  92. }`
  93. args = &model.RPCArgDesc{Name: ruleId, Json: rule}
  94. err = suite.s.ValidateRule(args, &reply)
  95. assert.Nil(suite.T(), err)
  96. assert.Equal(suite.T(), "invalid rule json: Missing rule actions.", reply)
  97. reply = ""
  98. rule = `{
  99. "sql": "SELECT * from test;",
  100. "actions": [{
  101. "file": {
  102. "path": "../internal/server/rpc_test_data/data/result.txt",
  103. "interval": 5000,
  104. "fileType": "lines",
  105. "format": "json"
  106. }
  107. }]
  108. }`
  109. args = &model.RPCArgDesc{Name: ruleId, Json: rule}
  110. err = suite.s.CreateRule(args, &reply)
  111. assert.Nil(suite.T(), err)
  112. assert.Equal(suite.T(), "Rule myRule was created successfully, please use 'bin/kuiper getstatus rule myRule' command to get rule status.", reply)
  113. reply = ""
  114. err = suite.s.GetStatusRule(ruleId, &reply)
  115. assert.Nil(suite.T(), err)
  116. reply = ""
  117. err = suite.s.ShowRules(1, &reply)
  118. assert.Nil(suite.T(), err)
  119. reply = ""
  120. err = suite.s.DescRule(ruleId, &reply)
  121. assert.Nil(suite.T(), err)
  122. assert.Equal(suite.T(), "{\n \"sql\": \"SELECT * from test;\",\n \"actions\": [\n {\n \"file\": {\n \"path\": \"../internal/server/rpc_test_data/data/result.txt\",\n \"interval\": 5000,\n \"fileType\": \"lines\",\n \"format\": \"json\"\n }\n }\n ]\n}\n", reply)
  123. reply = ""
  124. err = suite.s.GetTopoRule(ruleId, &reply)
  125. assert.Nil(suite.T(), err)
  126. assert.Equal(suite.T(), "{\n \"sources\": [\n \"source_test\"\n ],\n \"edges\": {\n \"op_2_project\": [\n \"sink_file_0\"\n ],\n \"source_test\": [\n \"op_2_project\"\n ]\n }\n}", reply)
  127. reply = ""
  128. err = suite.s.StopRule(ruleId, &reply)
  129. assert.Nil(suite.T(), err)
  130. assert.Equal(suite.T(), "Rule myRule was stopped.", reply)
  131. reply = ""
  132. err = suite.s.StartRule(ruleId, &reply)
  133. assert.Nil(suite.T(), err)
  134. assert.Equal(suite.T(), "Rule myRule was started", reply)
  135. reply = ""
  136. err = suite.s.RestartRule(ruleId, &reply)
  137. assert.Nil(suite.T(), err)
  138. assert.Equal(suite.T(), "Rule myRule was restarted.", reply)
  139. reply = ""
  140. err = suite.s.DropRule(ruleId, &reply)
  141. assert.Nil(suite.T(), err)
  142. assert.Equal(suite.T(), "Rule myRule is dropped.", reply)
  143. }
  144. func (suite *ServerTestSuite) TestImportAndExport() {
  145. file := "rpc_test_data/import.json"
  146. var reply string
  147. err := suite.s.Import(file, &reply)
  148. assert.Nil(suite.T(), err)
  149. assert.Equal(suite.T(), "imported 1 streams, 0 tables and 1 rules", reply)
  150. reply = ""
  151. file = "rpc_test_data/export.json"
  152. err = suite.s.Export(file, &reply)
  153. assert.Nil(suite.T(), err)
  154. os.Remove(file)
  155. }
  156. func (suite *ServerTestSuite) TestConfigurarion() {
  157. importArg := model.ImportDataDesc{
  158. FileName: "rpc_test_data/import_configuration.json",
  159. Stop: false,
  160. Partial: false,
  161. }
  162. var reply string
  163. err := suite.s.ImportConfiguration(&importArg, &reply)
  164. assert.Nil(suite.T(), err)
  165. assert.Equal(suite.T(), "{\n \"ErrorMsg\": \"\",\n \"ConfigResponse\": {\n \"streams\": {},\n \"tables\": {},\n \"rules\": {},\n \"nativePlugins\": {},\n \"portablePlugins\": {},\n \"sourceConfig\": {},\n \"sinkConfig\": {},\n \"connectionConfig\": {},\n \"Service\": {},\n \"Schema\": {},\n \"uploads\": {}\n }\n}", reply)
  166. reply = ""
  167. err = suite.s.GetStatusImport(1, &reply)
  168. assert.Nil(suite.T(), err)
  169. assert.Equal(suite.T(), "{\n \"streams\": {},\n \"tables\": {},\n \"rules\": {},\n \"nativePlugins\": {},\n \"portablePlugins\": {},\n \"sourceConfig\": {},\n \"sinkConfig\": {},\n \"connectionConfig\": {},\n \"Service\": {},\n \"Schema\": {},\n \"uploads\": {}\n}", reply)
  170. reply = ""
  171. exportArg := model.ExportDataDesc{
  172. FileName: "rpc_test_data/export_configuration.json",
  173. Rules: []string{},
  174. }
  175. err = suite.s.ExportConfiguration(&exportArg, &reply)
  176. assert.Nil(suite.T(), err)
  177. assert.Equal(suite.T(), "export configuration success", reply)
  178. os.Remove("rpc_test_data/export_configuration.json")
  179. }
  180. func (suite *ServerTestSuite) TearDownTest() {
  181. // Clean up
  182. sql := "DROP STREAM test;"
  183. var reply string
  184. _ = suite.s.Stream(sql, &reply)
  185. _ = suite.s.DropRule("myRule", &reply)
  186. }
  187. func TestServerTestSuite(t *testing.T) {
  188. suite.Run(t, new(ServerTestSuite))
  189. }