registry_test.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package portable
  15. import (
  16. "github.com/lf-edge/ekuiper/internal/plugin"
  17. "github.com/lf-edge/ekuiper/internal/plugin/portable/runtime"
  18. "reflect"
  19. "sync"
  20. "testing"
  21. )
  22. func TestConcurrent(t *testing.T) {
  23. r := &registry{
  24. RWMutex: sync.RWMutex{},
  25. plugins: make(map[string]*PluginInfo),
  26. sources: make(map[string]string),
  27. sinks: make(map[string]string),
  28. functions: make(map[string]string),
  29. }
  30. allPlugins := []*PluginInfo{
  31. {
  32. PluginMeta: runtime.PluginMeta{
  33. Name: "mirror",
  34. Version: "1.3.0",
  35. Language: "go",
  36. Executable: "mirror",
  37. },
  38. Sources: []string{"random"},
  39. Sinks: []string{"file"},
  40. Functions: []string{"echo"},
  41. }, {
  42. PluginMeta: runtime.PluginMeta{
  43. Name: "next",
  44. Version: "1.3.0",
  45. Language: "python",
  46. Executable: "next",
  47. },
  48. Sinks: []string{"udp", "follower"},
  49. }, {
  50. PluginMeta: runtime.PluginMeta{
  51. Name: "dummy",
  52. Version: "v0.2",
  53. Language: "go",
  54. Executable: "dummy",
  55. },
  56. Sources: []string{"new", "can"},
  57. Functions: []string{"abc"},
  58. },
  59. }
  60. expectedPlugins := map[string]*PluginInfo{
  61. "mirror": allPlugins[0],
  62. "next": allPlugins[1], "dummy": allPlugins[2],
  63. }
  64. expectedSources := map[string]string{
  65. "can": "dummy", "new": "dummy", "random": "mirror",
  66. }
  67. expectedFunctions := map[string]string{
  68. "abc": "dummy", "echo": "mirror",
  69. }
  70. expectedSinks := map[string]string{
  71. "file": "mirror", "follower": "next", "udp": "next",
  72. }
  73. // set concurrently
  74. var wg sync.WaitGroup
  75. for n, pi := range expectedPlugins {
  76. wg.Add(1)
  77. go func(name string, pluginInfo *PluginInfo) {
  78. defer wg.Done()
  79. r.Set(name, pluginInfo)
  80. }(n, pi)
  81. }
  82. wg.Wait()
  83. if !reflect.DeepEqual(expectedPlugins, r.plugins) {
  84. t.Errorf("plugins mismatch: expected %v, got %v", expectedPlugins, r.plugins)
  85. return
  86. }
  87. result := r.List()
  88. if !reflect.DeepEqual(len(allPlugins), len(result)) {
  89. t.Errorf("list plugins count mismatch: expected %v, got %v", allPlugins, result)
  90. return
  91. }
  92. outer:
  93. for _, res := range result {
  94. for _, p := range allPlugins {
  95. if reflect.DeepEqual(p, res) {
  96. continue outer
  97. }
  98. }
  99. t.Errorf("list plugins mismatch: expected %v, got %v", allPlugins, result)
  100. return
  101. }
  102. if !reflect.DeepEqual(expectedSources, r.sources) {
  103. t.Errorf("sources mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", expectedSources, r.sources)
  104. return
  105. }
  106. if !reflect.DeepEqual(expectedFunctions, r.functions) {
  107. t.Errorf("functions mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", expectedFunctions, r.functions)
  108. return
  109. }
  110. if !reflect.DeepEqual(expectedSinks, r.sinks) {
  111. t.Errorf("sinks mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", expectedSinks, r.functions)
  112. return
  113. }
  114. pn, ok := r.GetSymbol(plugin.SOURCE, "new")
  115. if !ok {
  116. t.Error("can't find symbol new")
  117. return
  118. }
  119. if pn != "dummy" {
  120. t.Errorf("GetSymbol wrong, expect dummy but got %s", pn)
  121. }
  122. // Delete concurrently
  123. for n := range expectedPlugins {
  124. wg.Add(1)
  125. go func(name string) {
  126. defer wg.Done()
  127. r.Delete(name)
  128. }(n)
  129. }
  130. wg.Wait()
  131. result = r.List()
  132. if !reflect.DeepEqual(0, len(result)) {
  133. t.Errorf("list plugins count mismatch: expected no plugins, got %v", result)
  134. return
  135. }
  136. }