stream_test.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. package processor
  2. import (
  3. "fmt"
  4. "github.com/lf-edge/ekuiper/internal/testx"
  5. "path"
  6. "reflect"
  7. "testing"
  8. )
  9. var (
  10. DbDir = testx.GetDbDir()
  11. )
  12. func TestStreamCreateProcessor(t *testing.T) {
  13. var tests = []struct {
  14. s string
  15. r []string
  16. err string
  17. }{
  18. {
  19. s: `SHOW STREAMS;`,
  20. r: []string{"No stream definitions are found."},
  21. },
  22. {
  23. s: `EXPLAIN STREAM topic1;`,
  24. err: "Explain stream fails, topic1 is not found.",
  25. },
  26. {
  27. s: `CREATE STREAM topic1 (
  28. USERID BIGINT,
  29. FIRST_NAME STRING,
  30. LAST_NAME STRING,
  31. NICKNAMES ARRAY(STRING),
  32. Gender BOOLEAN,
  33. ADDRESS STRUCT(STREET_NAME STRING, NUMBER BIGINT, BUILDING STRUCT(NAME STRING, ROOM BIGINT)),
  34. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  35. r: []string{"Stream topic1 is created."},
  36. },
  37. {
  38. s: `CREATE STREAM ` + "`stream`" + ` (
  39. USERID BIGINT,
  40. FIRST_NAME STRING,
  41. LAST_NAME STRING,
  42. NICKNAMES ARRAY(STRING),
  43. Gender BOOLEAN,
  44. ` + "`地址`" + ` STRUCT(STREET_NAME STRING, NUMBER BIGINT),
  45. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  46. r: []string{"Stream stream is created."},
  47. },
  48. {
  49. s: `CREATE STREAM topic1 (
  50. USERID BIGINT,
  51. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  52. err: "Create stream fails: Item topic1 already exists.",
  53. },
  54. {
  55. s: `EXPLAIN STREAM topic1;`,
  56. r: []string{"TO BE SUPPORTED"},
  57. },
  58. {
  59. s: `DESCRIBE STREAM topic1;`,
  60. r: []string{"Fields\n--------------------------------------------------------------------------------\nUSERID\tbigint\nFIRST_NAME\tstring\nLAST_NAME\tstring\nNICKNAMES\t" +
  61. "array(string)\nGender\tboolean\nADDRESS\tstruct(STREET_NAME string, NUMBER bigint, BUILDING struct(NAME string, ROOM bigint))\n\n" +
  62. "DATASOURCE: users\nFORMAT: JSON\nKEY: USERID\n"},
  63. },
  64. {
  65. s: `DROP STREAM topic1;`,
  66. r: []string{"Stream topic1 is dropped."},
  67. },
  68. {
  69. s: `SHOW STREAMS;`,
  70. r: []string{"stream"},
  71. },
  72. {
  73. s: `DESCRIBE STREAM topic1;`,
  74. err: "Describe stream fails, topic1 is not found.",
  75. },
  76. {
  77. s: `DROP STREAM topic1;`,
  78. err: "Drop stream fails: topic1 is not found.",
  79. },
  80. {
  81. s: "DROP STREAM `stream`;",
  82. r: []string{"Stream stream is dropped."},
  83. },
  84. }
  85. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  86. streamDB := path.Join(DbDir, "streamTest")
  87. for i, tt := range tests {
  88. results, err := NewStreamProcessor(streamDB).ExecStmt(tt.s)
  89. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  90. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.s, tt.err, err)
  91. } else if tt.err == "" {
  92. if !reflect.DeepEqual(tt.r, results) {
  93. t.Errorf("%d. %q\n\nstmt mismatch:\nexp=%s\ngot=%#v\n\n", i, tt.s, tt.r, results)
  94. }
  95. }
  96. }
  97. }
  98. func TestTableProcessor(t *testing.T) {
  99. var tests = []struct {
  100. s string
  101. r []string
  102. err string
  103. }{
  104. {
  105. s: `SHOW TABLES;`,
  106. r: []string{"No table definitions are found."},
  107. },
  108. {
  109. s: `EXPLAIN TABLE topic1;`,
  110. err: "Explain table fails, topic1 is not found.",
  111. },
  112. {
  113. s: `CREATE TABLE topic1 (
  114. USERID BIGINT,
  115. FIRST_NAME STRING,
  116. LAST_NAME STRING,
  117. NICKNAMES ARRAY(STRING),
  118. Gender BOOLEAN,
  119. ADDRESS STRUCT(STREET_NAME STRING, NUMBER BIGINT),
  120. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  121. r: []string{"Table topic1 is created."},
  122. },
  123. {
  124. s: `CREATE TABLE ` + "`stream`" + ` (
  125. USERID BIGINT,
  126. FIRST_NAME STRING,
  127. LAST_NAME STRING,
  128. NICKNAMES ARRAY(STRING),
  129. Gender BOOLEAN,
  130. ` + "`地址`" + ` STRUCT(STREET_NAME STRING, NUMBER BIGINT),
  131. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  132. r: []string{"Table stream is created."},
  133. },
  134. {
  135. s: `CREATE TABLE topic1 (
  136. USERID BIGINT,
  137. ) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
  138. err: "Create table fails: Item topic1 already exists.",
  139. },
  140. {
  141. s: `EXPLAIN TABLE topic1;`,
  142. r: []string{"TO BE SUPPORTED"},
  143. },
  144. {
  145. s: `DESCRIBE TABLE topic1;`,
  146. r: []string{"Fields\n--------------------------------------------------------------------------------\nUSERID\tbigint\nFIRST_NAME\tstring\nLAST_NAME\tstring\nNICKNAMES\t" +
  147. "array(string)\nGender\tboolean\nADDRESS\tstruct(STREET_NAME string, NUMBER bigint)\n\n" +
  148. "DATASOURCE: users\nFORMAT: JSON\nKEY: USERID\n"},
  149. },
  150. {
  151. s: `DROP TABLE topic1;`,
  152. r: []string{"Table topic1 is dropped."},
  153. },
  154. {
  155. s: `SHOW TABLES;`,
  156. r: []string{"stream"},
  157. },
  158. {
  159. s: `DESCRIBE TABLE topic1;`,
  160. err: "Describe table fails, topic1 is not found.",
  161. },
  162. {
  163. s: `DROP TABLE topic1;`,
  164. err: "Drop table fails: topic1 is not found.",
  165. },
  166. {
  167. s: "DROP TABLE `stream`;",
  168. r: []string{"Table stream is dropped."},
  169. },
  170. }
  171. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  172. streamDB := path.Join(testx.GetDbDir(), "streamTest")
  173. for i, tt := range tests {
  174. results, err := NewStreamProcessor(streamDB).ExecStmt(tt.s)
  175. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  176. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.s, tt.err, err)
  177. } else if tt.err == "" {
  178. if !reflect.DeepEqual(tt.r, results) {
  179. t.Errorf("%d. %q\n\nstmt mismatch:\nexp=%s\ngot=%#v\n\n", i, tt.s, tt.r, results)
  180. }
  181. }
  182. }
  183. }