stream_test.go 5.7 KB


  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package processor
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/testx"
  18. "path"
  19. "reflect"
  20. "testing"
  21. )
  22. var (
  23. DbDir = testx.GetDbDir()
  24. )
  25. func TestStreamCreateProcessor(t *testing.T) {
  26. var tests = []struct {
  27. s string
  28. r []string
  29. err string
  30. }{
  31. {
  32. s: `SHOW STREAMS;`,
  33. r: []string{"No stream definitions are found."},
  34. },
  35. {
  36. s: `EXPLAIN STREAM topic1;`,
  37. err: "Explain stream fails, topic1 is not found.",
  38. },
  39. {
  40. s: `CREATE STREAM topic1 (
  41. USERID BIGINT,
  42. FIRST_NAME STRING,
  43. LAST_NAME STRING,
  44. NICKNAMES ARRAY(STRING),
  45. Gender BOOLEAN,
  46. ADDRESS STRUCT(STREET_NAME STRING, NUMBER BIGINT, BUILDING STRUCT(NAME STRING, ROOM BIGINT)),
  47. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  48. r: []string{"Stream topic1 is created."},
  49. },
  50. {
  51. s: `CREATE STREAM ` + "`stream`" + ` (
  52. USERID BIGINT,
  53. FIRST_NAME STRING,
  54. LAST_NAME STRING,
  55. NICKNAMES ARRAY(STRING),
  56. Gender BOOLEAN,
  57. ` + "`地址`" + ` STRUCT(STREET_NAME STRING, NUMBER BIGINT),
  58. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  59. r: []string{"Stream stream is created."},
  60. },
  61. {
  62. s: `CREATE STREAM topic1 (
  63. USERID BIGINT,
  64. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  65. err: "Create stream fails: Item topic1 already exists.",
  66. },
  67. {
  68. s: `EXPLAIN STREAM topic1;`,
  69. r: []string{"TO BE SUPPORTED"},
  70. },
  71. {
  72. s: `DESCRIBE STREAM topic1;`,
  73. r: []string{"Fields\n--------------------------------------------------------------------------------\nUSERID\tbigint\nFIRST_NAME\tstring\nLAST_NAME\tstring\nNICKNAMES\t" +
  74. "array(string)\nGender\tboolean\nADDRESS\tstruct(STREET_NAME string, NUMBER bigint, BUILDING struct(NAME string, ROOM bigint))\n\n" +
  75. "DATASOURCE: users\nFORMAT: JSON\nKEY: USERID\n"},
  76. },
  77. {
  78. s: `DROP STREAM topic1;`,
  79. r: []string{"Stream topic1 is dropped."},
  80. },
  81. {
  82. s: `SHOW STREAMS;`,
  83. r: []string{"stream"},
  84. },
  85. {
  86. s: `DESCRIBE STREAM topic1;`,
  87. err: "Describe stream fails, topic1 is not found.",
  88. },
  89. {
  90. s: `DROP STREAM topic1;`,
  91. err: "Drop stream fails: topic1 is not found.",
  92. },
  93. {
  94. s: "DROP STREAM `stream`;",
  95. r: []string{"Stream stream is dropped."},
  96. },
  97. }
  98. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  99. streamDB := path.Join(DbDir, "streamTest")
  100. for i, tt := range tests {
  101. results, err := NewStreamProcessor(streamDB).ExecStmt(tt.s)
  102. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  103. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.s, tt.err, err)
  104. } else if tt.err == "" {
  105. if !reflect.DeepEqual(tt.r, results) {
  106. t.Errorf("%d. %q\n\nstmt mismatch:\nexp=%s\ngot=%#v\n\n", i, tt.s, tt.r, results)
  107. }
  108. }
  109. }
  110. }
  111. func TestTableProcessor(t *testing.T) {
  112. var tests = []struct {
  113. s string
  114. r []string
  115. err string
  116. }{
  117. {
  118. s: `SHOW TABLES;`,
  119. r: []string{"No table definitions are found."},
  120. },
  121. {
  122. s: `EXPLAIN TABLE topic1;`,
  123. err: "Explain table fails, topic1 is not found.",
  124. },
  125. {
  126. s: `CREATE TABLE topic1 (
  127. USERID BIGINT,
  128. FIRST_NAME STRING,
  129. LAST_NAME STRING,
  130. NICKNAMES ARRAY(STRING),
  131. Gender BOOLEAN,
  132. ADDRESS STRUCT(STREET_NAME STRING, NUMBER BIGINT),
  133. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  134. r: []string{"Table topic1 is created."},
  135. },
  136. {
  137. s: `CREATE TABLE ` + "`stream`" + ` (
  138. USERID BIGINT,
  139. FIRST_NAME STRING,
  140. LAST_NAME STRING,
  141. NICKNAMES ARRAY(STRING),
  142. Gender BOOLEAN,
  143. ` + "`地址`" + ` STRUCT(STREET_NAME STRING, NUMBER BIGINT),
  144. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  145. r: []string{"Table stream is created."},
  146. },
  147. {
  148. s: `CREATE TABLE topic1 (
  149. USERID BIGINT,
  150. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  151. err: "Create table fails: Item topic1 already exists.",
  152. },
  153. {
  154. s: `EXPLAIN TABLE topic1;`,
  155. r: []string{"TO BE SUPPORTED"},
  156. },
  157. {
  158. s: `DESCRIBE TABLE topic1;`,
  159. r: []string{"Fields\n--------------------------------------------------------------------------------\nUSERID\tbigint\nFIRST_NAME\tstring\nLAST_NAME\tstring\nNICKNAMES\t" +
  160. "array(string)\nGender\tboolean\nADDRESS\tstruct(STREET_NAME string, NUMBER bigint)\n\n" +
  161. "DATASOURCE: users\nFORMAT: JSON\nKEY: USERID\n"},
  162. },
  163. {
  164. s: `DROP TABLE topic1;`,
  165. r: []string{"Table topic1 is dropped."},
  166. },
  167. {
  168. s: `SHOW TABLES;`,
  169. r: []string{"stream"},
  170. },
  171. {
  172. s: `DESCRIBE TABLE topic1;`,
  173. err: "Describe table fails, topic1 is not found.",
  174. },
  175. {
  176. s: `DROP TABLE topic1;`,
  177. err: "Drop table fails: topic1 is not found.",
  178. },
  179. {
  180. s: "DROP TABLE `stream`;",
  181. r: []string{"Table stream is dropped."},
  182. },
  183. }
  184. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  185. streamDB := path.Join(testx.GetDbDir(), "streamTest")
  186. for i, tt := range tests {
  187. results, err := NewStreamProcessor(streamDB).ExecStmt(tt.s)
  188. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  189. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.s, tt.err, err)
  190. } else if tt.err == "" {
  191. if !reflect.DeepEqual(tt.r, results) {
  192. t.Errorf("%d. %q\n\nstmt mismatch:\nexp=%s\ngot=%#v\n\n", i, tt.s, tt.r, results)
  193. }
  194. }
  195. }
  196. }