stream_processor_test.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. package processors
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/xstream"
  5. "path"
  6. "reflect"
  7. "testing"
  8. )
  9. func TestStreamCreateProcessor(t *testing.T) {
  10. var tests = []struct {
  11. s string
  12. r []string
  13. err string
  14. }{
  15. {
  16. s: `SHOW STREAMS;`,
  17. r: []string{"No stream definitions are found."},
  18. },
  19. {
  20. s: `EXPLAIN STREAM topic1;`,
  21. err: "Stream topic1 is not found.",
  22. },
  23. {
  24. s: `CREATE STREAM topic1 (
  25. USERID BIGINT,
  26. FIRST_NAME STRING,
  27. LAST_NAME STRING,
  28. NICKNAMES ARRAY(STRING),
  29. Gender BOOLEAN,
  30. ADDRESS STRUCT(STREET_NAME STRING, NUMBER BIGINT),
  31. ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID");`,
  32. r: []string{"Stream topic1 is created."},
  33. },
  34. {
  35. s: `CREATE STREAM ` + "`stream`" + ` (
  36. USERID BIGINT,
  37. FIRST_NAME STRING,
  38. LAST_NAME STRING,
  39. NICKNAMES ARRAY(STRING),
  40. Gender BOOLEAN,
  41. ` + "`地址`" + ` STRUCT(STREET_NAME STRING, NUMBER BIGINT),
  42. ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID");`,
  43. r: []string{"Stream stream is created."},
  44. },
  45. {
  46. s: `CREATE STREAM topic1 (
  47. USERID BIGINT,
  48. ) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID");`,
  49. err: "Create stream fails: Item topic1 already exists.",
  50. },
  51. {
  52. s: `EXPLAIN STREAM topic1;`,
  53. r: []string{"TO BE SUPPORTED"},
  54. },
  55. {
  56. s: `DESCRIBE STREAM topic1;`,
  57. r: []string{"Fields\n--------------------------------------------------------------------------------\nUSERID\tbigint\nFIRST_NAME\tstring\nLAST_NAME\tstring\nNICKNAMES\t" +
  58. "array(string)\nGender\tboolean\nADDRESS\tstruct(STREET_NAME string, NUMBER bigint)\n\n" +
  59. "DATASOURCE: users\nFORMAT: AVRO\nKEY: USERID\n"},
  60. },
  61. {
  62. s: `DROP STREAM topic1;`,
  63. r: []string{"Stream topic1 is dropped."},
  64. },
  65. {
  66. s: `SHOW STREAMS;`,
  67. r: []string{"stream"},
  68. },
  69. {
  70. s: `DESCRIBE STREAM topic1;`,
  71. err: "Stream topic1 is not found.",
  72. },
  73. {
  74. s: `DROP STREAM topic1;`,
  75. err: "Drop stream fails: topic1 is not found.",
  76. },
  77. {
  78. s: "DROP STREAM `stream`;",
  79. r: []string{"Stream stream is dropped."},
  80. },
  81. }
  82. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  83. streamDB := path.Join(getDbDir(), "streamTest")
  84. for i, tt := range tests {
  85. results, err := NewStreamProcessor(streamDB).ExecStmt(tt.s)
  86. if !reflect.DeepEqual(tt.err, errstring(err)) {
  87. t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.s, tt.err, err)
  88. } else if tt.err == "" {
  89. if !reflect.DeepEqual(tt.r, results) {
  90. t.Errorf("%d. %q\n\nstmt mismatch:\nexp=%s\ngot=%#v\n\n", i, tt.s, tt.r, results)
  91. }
  92. }
  93. }
  94. }
  95. func getMetric(tp *xstream.TopologyNew, name string) int {
  96. keys, values := tp.GetMetrics()
  97. for index, key := range keys {
  98. if key == name {
  99. return int(values[index].(int64))
  100. }
  101. }
  102. fmt.Println("can't find " + name)
  103. return 0
  104. }