stream_test.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package processor
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/testx"
  18. "reflect"
  19. "testing"
  20. )
  21. func init() {
  22. testx.InitEnv()
  23. }
  24. func TestStreamCreateProcessor(t *testing.T) {
  25. var tests = []struct {
  26. s string
  27. r []string
  28. err string
  29. }{
  30. {
  31. s: `SHOW STREAMS;`,
  32. r: []string{"No stream definitions are found."},
  33. },
  34. {
  35. s: `EXPLAIN STREAM topic1;`,
  36. err: "Explain stream fails, topic1 is not found.",
  37. },
  38. {
  39. s: `CREATE STREAM topic1 (
  40. USERID BIGINT,
  41. FIRST_NAME STRING,
  42. LAST_NAME STRING,
  43. NICKNAMES ARRAY(STRING),
  44. Gender BOOLEAN,
  45. ADDRESS STRUCT(STREET_NAME STRING, NUMBER BIGINT, BUILDING STRUCT(NAME STRING, ROOM BIGINT)),
  46. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  47. r: []string{"Stream topic1 is created."},
  48. },
  49. {
  50. s: `CREATE STREAM ` + "`stream`" + ` (
  51. USERID BIGINT,
  52. FIRST_NAME STRING,
  53. LAST_NAME STRING,
  54. NICKNAMES ARRAY(STRING),
  55. Gender BOOLEAN,
  56. ` + "`地址`" + ` STRUCT(STREET_NAME STRING, NUMBER BIGINT),
  57. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  58. r: []string{"Stream stream is created."},
  59. },
  60. {
  61. s: `CREATE STREAM topic1 (
  62. USERID BIGINT,
  63. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  64. err: "Create stream fails: Item topic1 already exists.",
  65. },
  66. {
  67. s: `EXPLAIN STREAM topic1;`,
  68. r: []string{"TO BE SUPPORTED"},
  69. },
  70. {
  71. s: `DESCRIBE STREAM topic1;`,
  72. r: []string{"Fields\n--------------------------------------------------------------------------------\nUSERID\tbigint\nFIRST_NAME\tstring\nLAST_NAME\tstring\nNICKNAMES\t" +
  73. "array(string)\nGender\tboolean\nADDRESS\tstruct(STREET_NAME string, NUMBER bigint, BUILDING struct(NAME string, ROOM bigint))\n\n" +
  74. "DATASOURCE: users\nFORMAT: JSON\nKEY: USERID\n"},
  75. },
  76. {
  77. s: `DROP STREAM topic1;`,
  78. r: []string{"Stream topic1 is dropped."},
  79. },
  80. {
  81. s: `SHOW STREAMS;`,
  82. r: []string{"stream"},
  83. },
  84. {
  85. s: `DESCRIBE STREAM topic1;`,
  86. err: "Describe stream fails, topic1 is not found.",
  87. },
  88. {
  89. s: `DROP STREAM topic1;`,
  90. err: "Drop stream fails: topic1 is not found.",
  91. },
  92. {
  93. s: "DROP STREAM `stream`;",
  94. r: []string{"Stream stream is dropped."},
  95. },
  96. }
  97. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  98. p := NewStreamProcessor()
  99. for i, tt := range tests {
  100. results, err := p.ExecStmt(tt.s)
  101. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  102. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.s, tt.err, err)
  103. } else if tt.err == "" {
  104. if !reflect.DeepEqual(tt.r, results) {
  105. t.Errorf("%d. %q\n\nstmt mismatch:\nexp=%s\ngot=%#v\n\n", i, tt.s, tt.r, results)
  106. }
  107. }
  108. }
  109. }
  110. func TestTableProcessor(t *testing.T) {
  111. var tests = []struct {
  112. s string
  113. r []string
  114. err string
  115. }{
  116. {
  117. s: `SHOW TABLES;`,
  118. r: []string{"No table definitions are found."},
  119. },
  120. {
  121. s: `EXPLAIN TABLE topic1;`,
  122. err: "Explain table fails, topic1 is not found.",
  123. },
  124. {
  125. s: `CREATE TABLE topic1 (
  126. USERID BIGINT,
  127. FIRST_NAME STRING,
  128. LAST_NAME STRING,
  129. NICKNAMES ARRAY(STRING),
  130. Gender BOOLEAN,
  131. ADDRESS STRUCT(STREET_NAME STRING, NUMBER BIGINT),
  132. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  133. r: []string{"Table topic1 is created."},
  134. },
  135. {
  136. s: `CREATE TABLE ` + "`stream`" + ` (
  137. USERID BIGINT,
  138. FIRST_NAME STRING,
  139. LAST_NAME STRING,
  140. NICKNAMES ARRAY(STRING),
  141. Gender BOOLEAN,
  142. ` + "`地址`" + ` STRUCT(STREET_NAME STRING, NUMBER BIGINT),
  143. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  144. r: []string{"Table stream is created."},
  145. },
  146. {
  147. s: `CREATE TABLE topic1 (
  148. USERID BIGINT,
  149. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  150. err: "Create table fails: Item topic1 already exists.",
  151. },
  152. {
  153. s: `EXPLAIN TABLE topic1;`,
  154. r: []string{"TO BE SUPPORTED"},
  155. },
  156. {
  157. s: `DESCRIBE TABLE topic1;`,
  158. r: []string{"Fields\n--------------------------------------------------------------------------------\nUSERID\tbigint\nFIRST_NAME\tstring\nLAST_NAME\tstring\nNICKNAMES\t" +
  159. "array(string)\nGender\tboolean\nADDRESS\tstruct(STREET_NAME string, NUMBER bigint)\n\n" +
  160. "DATASOURCE: users\nFORMAT: JSON\nKEY: USERID\n"},
  161. },
  162. {
  163. s: `DROP TABLE topic1;`,
  164. r: []string{"Table topic1 is dropped."},
  165. },
  166. {
  167. s: `SHOW TABLES;`,
  168. r: []string{"stream"},
  169. },
  170. {
  171. s: `DESCRIBE TABLE topic1;`,
  172. err: "Describe table fails, topic1 is not found.",
  173. },
  174. {
  175. s: `DROP TABLE topic1;`,
  176. err: "Drop table fails: topic1 is not found.",
  177. },
  178. {
  179. s: "DROP TABLE `stream`;",
  180. r: []string{"Table stream is dropped."},
  181. },
  182. }
  183. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  184. p := NewStreamProcessor()
  185. for i, tt := range tests {
  186. results, err := p.ExecStmt(tt.s)
  187. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  188. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.s, tt.err, err)
  189. } else if tt.err == "" {
  190. if !reflect.DeepEqual(tt.r, results) {
  191. t.Errorf("%d. %q\n\nstmt mismatch:\nexp=%s\ngot=%#v\n\n", i, tt.s, tt.r, results)
  192. }
  193. }
  194. }
  195. }