manager_test.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package pubsub
  15. import (
  16. "fmt"
  17. "reflect"
  18. "regexp"
  19. "strings"
  20. "testing"
  21. "github.com/gdexlab/go-render/render"
  22. "github.com/lf-edge/ekuiper/pkg/api"
  23. )
  24. func TestCreateAndClose(t *testing.T) {
  25. Reset()
  26. var (
  27. sourceTopics = []string{"h/d1/c1/s2", "h/+/+/s1", "h/d3/#", "h/d1/c1/s2", "h/+/c1/s1"}
  28. sinkTopics = []string{"h/d1/c1/s1", "h/d1/c1/s2", "h/d2/c2/s1", "h/d3/c3/s1", "h/d1/c1/s1"}
  29. chans []chan api.SourceTuple
  30. )
  31. for i, topic := range sinkTopics {
  32. CreatePub(topic)
  33. var (
  34. r *regexp.Regexp
  35. err error
  36. )
  37. if strings.ContainsAny(sourceTopics[i], "+#") {
  38. r, err = getRegexp(sourceTopics[i])
  39. if err != nil {
  40. t.Error(err)
  41. return
  42. }
  43. }
  44. c := CreateSub(sourceTopics[i], r, fmt.Sprintf("%d", i), 100)
  45. chans = append(chans, c)
  46. }
  47. expPub := map[string]*pubConsumers{
  48. "h/d1/c1/s1": {
  49. count: 2,
  50. consumers: map[string]chan api.SourceTuple{
  51. "1": chans[1],
  52. "4": chans[4],
  53. },
  54. },
  55. "h/d1/c1/s2": {
  56. count: 1,
  57. consumers: map[string]chan api.SourceTuple{
  58. "0": chans[0],
  59. "3": chans[3],
  60. },
  61. },
  62. "h/d2/c2/s1": {
  63. count: 1,
  64. consumers: map[string]chan api.SourceTuple{
  65. "1": chans[1],
  66. },
  67. },
  68. "h/d3/c3/s1": {
  69. count: 1,
  70. consumers: map[string]chan api.SourceTuple{
  71. "1": chans[1],
  72. "2": chans[2],
  73. },
  74. },
  75. }
  76. if !reflect.DeepEqual(expPub, pubTopics) {
  77. t.Errorf("Error adding: Expect\n\t%v\nbut got\n\t%v", render.AsCode(expPub), render.AsCode(pubTopics))
  78. return
  79. }
  80. i := 0
  81. for i < 3 {
  82. CloseSourceConsumerChannel(sourceTopics[i], fmt.Sprintf("%d", i))
  83. RemovePub(sinkTopics[i])
  84. i++
  85. }
  86. expPub = map[string]*pubConsumers{
  87. "h/d1/c1/s1": {
  88. count: 1,
  89. consumers: map[string]chan api.SourceTuple{
  90. "4": chans[4],
  91. },
  92. },
  93. "h/d1/c1/s2": {
  94. count: 0,
  95. consumers: map[string]chan api.SourceTuple{
  96. "3": chans[3],
  97. },
  98. },
  99. "h/d3/c3/s1": {
  100. count: 1,
  101. consumers: map[string]chan api.SourceTuple{},
  102. },
  103. }
  104. if !reflect.DeepEqual(expPub, pubTopics) {
  105. t.Errorf("Error closing: Expect\n\t%v\nbut got\n\t %v", render.AsCode(expPub), render.AsCode(pubTopics))
  106. }
  107. }
  108. func getRegexp(topic string) (*regexp.Regexp, error) {
  109. if len(topic) == 0 {
  110. return nil, fmt.Errorf("invalid empty topic")
  111. }
  112. levels := strings.Split(topic, "/")
  113. for i, level := range levels {
  114. if level == "#" && i != len(levels)-1 {
  115. return nil, fmt.Errorf("invalid topic %s: # must at the last level", topic)
  116. }
  117. }
  118. regstr := strings.Replace(strings.ReplaceAll(topic, "+", "([^/]+)"), "#", ".", 1)
  119. return regexp.Compile(regstr)
  120. }