manager_test.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package plugin
  15. import (
  16. "errors"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/internal/testx"
  19. "github.com/lf-edge/ekuiper/internal/xsql"
  20. "net/http"
  21. "net/http/httptest"
  22. "os"
  23. "path"
  24. "reflect"
  25. "sort"
  26. "testing"
  27. )
  28. var manager *Manager
  29. func init() {
  30. var err error
  31. testx.InitEnv()
  32. manager, err = NewPluginManager()
  33. if err != nil {
  34. panic(err)
  35. }
  36. xsql.InitFuncRegisters(manager)
  37. }
  38. func TestManager_Register(t *testing.T) {
  39. s := httptest.NewServer(
  40. http.FileServer(http.Dir("testzips")),
  41. )
  42. defer s.Close()
  43. endpoint := s.URL
  44. data := []struct {
  45. t PluginType
  46. n string
  47. u string
  48. v string
  49. f []string
  50. lowerSo bool
  51. err error
  52. }{
  53. {
  54. t: SOURCE,
  55. n: "",
  56. u: "",
  57. err: errors.New("invalid name : should not be empty"),
  58. }, {
  59. t: SOURCE,
  60. n: "zipMissConf",
  61. u: endpoint + "/sources/zipMissConf.zip",
  62. err: errors.New("fail to install plugin: invalid zip file: so file or conf file is missing"),
  63. }, {
  64. t: SINK,
  65. n: "urlerror",
  66. u: endpoint + "/sinks/nozip",
  67. err: errors.New("invalid uri " + endpoint + "/sinks/nozip"),
  68. }, {
  69. t: SINK,
  70. n: "zipWrongname",
  71. u: endpoint + "/sinks/zipWrongName.zip",
  72. err: errors.New("fail to install plugin: invalid zip file: so file or conf file is missing"),
  73. }, {
  74. t: FUNCTION,
  75. n: "zipMissSo",
  76. u: endpoint + "/functions/zipMissSo.zip",
  77. err: errors.New("fail to install plugin: invalid zip file: so file or conf file is missing"),
  78. }, {
  79. t: SOURCE,
  80. n: "random2",
  81. u: endpoint + "/sources/random2.zip",
  82. }, {
  83. t: SOURCE,
  84. n: "random3",
  85. u: endpoint + "/sources/random3.zip",
  86. v: "1.0.0",
  87. }, {
  88. t: SINK,
  89. n: "file2",
  90. u: endpoint + "/sinks/file2.zip",
  91. lowerSo: true,
  92. }, {
  93. t: FUNCTION,
  94. n: "echo2",
  95. u: endpoint + "/functions/echo2.zip",
  96. f: []string{"echo2", "echo3"},
  97. }, {
  98. t: FUNCTION,
  99. n: "echo2",
  100. u: endpoint + "/functions/echo2.zip",
  101. err: errors.New("invalid name echo2: duplicate"),
  102. }, {
  103. t: FUNCTION,
  104. n: "misc",
  105. u: endpoint + "/functions/echo2.zip",
  106. f: []string{"misc", "echo3"},
  107. err: errors.New("function name echo3 already exists"),
  108. }, {
  109. t: FUNCTION,
  110. n: "comp",
  111. u: endpoint + "/functions/comp.zip",
  112. },
  113. }
  114. fmt.Printf("The test bucket size is %d.\n\n", len(data))
  115. for i, tt := range data {
  116. var p Plugin
  117. if tt.t == FUNCTION {
  118. p = &FuncPlugin{
  119. IOPlugin: IOPlugin{
  120. Name: tt.n,
  121. File: tt.u,
  122. },
  123. Functions: tt.f,
  124. }
  125. } else {
  126. p = &IOPlugin{
  127. Name: tt.n,
  128. File: tt.u,
  129. }
  130. }
  131. err := manager.Register(tt.t, p)
  132. if !reflect.DeepEqual(tt.err, err) {
  133. t.Errorf("%d: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.err, err)
  134. } else if tt.err == nil {
  135. err := checkFile(manager.pluginDir, manager.etcDir, tt.t, tt.n, tt.v, tt.lowerSo)
  136. if err != nil {
  137. t.Errorf("%d: error : %s\n\n", i, err)
  138. }
  139. }
  140. }
  141. }
  142. func TestManager_List(t *testing.T) {
  143. data := []struct {
  144. t PluginType
  145. r []string
  146. }{
  147. {
  148. t: SOURCE,
  149. r: []string{"random", "random2", "random3"},
  150. }, {
  151. t: SINK,
  152. r: []string{"file", "file2"},
  153. }, {
  154. t: FUNCTION,
  155. r: []string{"accumulateWordCount", "comp", "countPlusOne", "echo", "echo2"},
  156. },
  157. }
  158. fmt.Printf("The test bucket size is %d.\n\n", len(data))
  159. for i, p := range data {
  160. result, err := manager.List(p.t)
  161. if err != nil {
  162. t.Errorf("%d: list error : %s\n\n", i, err)
  163. return
  164. }
  165. sort.Strings(result)
  166. if !reflect.DeepEqual(p.r, result) {
  167. t.Errorf("%d: result mismatch:\n exp=%v\n got=%v\n\n", i, p.r, result)
  168. }
  169. }
  170. }
  171. func TestManager_Symbols(t *testing.T) {
  172. r := []string{"accumulateWordCount", "comp", "countPlusOne", "echo", "echo2", "echo3", "misc"}
  173. result, err := manager.ListSymbols()
  174. if err != nil {
  175. t.Errorf("list symbols error : %s\n\n", err)
  176. return
  177. }
  178. sort.Strings(result)
  179. if !reflect.DeepEqual(r, result) {
  180. t.Errorf("result mismatch:\n exp=%v\n got=%v\n\n", r, result)
  181. }
  182. p, ok := manager.GetSymbol("echo3")
  183. if !ok {
  184. t.Errorf("cannot find echo3 symbol")
  185. }
  186. if p != "echo2" {
  187. t.Errorf("wrong plugin %s for echo3 symbol", p)
  188. }
  189. }
  190. func TestManager_Desc(t *testing.T) {
  191. data := []struct {
  192. t PluginType
  193. n string
  194. r map[string]interface{}
  195. }{
  196. {
  197. t: SOURCE,
  198. n: "random2",
  199. r: map[string]interface{}{
  200. "name": "random2",
  201. "version": "",
  202. },
  203. }, {
  204. t: SOURCE,
  205. n: "random3",
  206. r: map[string]interface{}{
  207. "name": "random3",
  208. "version": "1.0.0",
  209. },
  210. }, {
  211. t: FUNCTION,
  212. n: "echo2",
  213. r: map[string]interface{}{
  214. "name": "echo2",
  215. "version": "",
  216. "functions": []string{"echo2", "echo3"},
  217. },
  218. },
  219. }
  220. fmt.Printf("The test bucket size is %d.\n\n", len(data))
  221. for i, p := range data {
  222. result, ok := manager.Get(p.t, p.n)
  223. if !ok {
  224. t.Errorf("%d: get error : not found\n\n", i)
  225. return
  226. }
  227. if !reflect.DeepEqual(p.r, result) {
  228. t.Errorf("%d: result mismatch:\n exp=%v\n got=%v\n\n", i, p.r, result)
  229. }
  230. }
  231. }
  232. func TestManager_Delete(t *testing.T) {
  233. data := []struct {
  234. t PluginType
  235. n string
  236. err error
  237. }{
  238. {
  239. t: SOURCE,
  240. n: "random2",
  241. }, {
  242. t: SINK,
  243. n: "file2",
  244. }, {
  245. t: FUNCTION,
  246. n: "echo2",
  247. }, {
  248. t: SOURCE,
  249. n: "random3",
  250. }, {
  251. t: FUNCTION,
  252. n: "comp",
  253. },
  254. }
  255. fmt.Printf("The test bucket size is %d.\n\n", len(data))
  256. for i, p := range data {
  257. err := manager.Delete(p.t, p.n, false)
  258. if err != nil {
  259. t.Errorf("%d: delete error : %s\n\n", i, err)
  260. }
  261. }
  262. }
  263. func checkFile(pluginDir string, etcDir string, t PluginType, name string, version string, lowerSo bool) error {
  264. var soName string
  265. if !lowerSo {
  266. soName = ucFirst(name) + ".so"
  267. if version != "" {
  268. soName = fmt.Sprintf("%s@v%s.so", ucFirst(name), version)
  269. }
  270. } else {
  271. soName = name + ".so"
  272. if version != "" {
  273. soName = fmt.Sprintf("%s@v%s.so", name, version)
  274. }
  275. }
  276. soPath := path.Join(pluginDir, PluginTypes[t], soName)
  277. _, err := os.Stat(soPath)
  278. if err != nil {
  279. return err
  280. }
  281. if t == SOURCE {
  282. etcPath := path.Join(etcDir, PluginTypes[t], name+".yaml")
  283. _, err = os.Stat(etcPath)
  284. if err != nil {
  285. return err
  286. }
  287. }
  288. return nil
  289. }