manager_test.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package native
  15. import (
  16. "errors"
  17. "fmt"
  18. "net/http"
  19. "net/http/httptest"
  20. "os"
  21. "path"
  22. "reflect"
  23. "sort"
  24. "testing"
  25. "github.com/lf-edge/ekuiper/internal/binder"
  26. "github.com/lf-edge/ekuiper/internal/binder/function"
  27. "github.com/lf-edge/ekuiper/internal/meta"
  28. "github.com/lf-edge/ekuiper/internal/plugin"
  29. "github.com/lf-edge/ekuiper/internal/testx"
  30. )
  31. func init() {
  32. testx.InitEnv()
  33. meta.InitYamlConfigManager()
  34. nativeManager, err := InitManager()
  35. if err != nil {
  36. panic(err)
  37. }
  38. err = function.Initialize([]binder.FactoryEntry{{Name: "native plugin", Factory: nativeManager}})
  39. if err != nil {
  40. panic(err)
  41. }
  42. }
  43. func TestManager_Register(t *testing.T) {
  44. s := httptest.NewServer(
  45. http.FileServer(http.Dir("../testzips")),
  46. )
  47. defer s.Close()
  48. endpoint := s.URL
  49. data := []struct {
  50. t plugin.PluginType
  51. n string
  52. u string
  53. v string
  54. f []string
  55. lowerSo bool
  56. err error
  57. }{
  58. {
  59. t: plugin.SOURCE,
  60. n: "",
  61. u: "",
  62. err: errors.New("invalid name : should not be empty"),
  63. }, {
  64. t: plugin.SOURCE,
  65. n: "zipMissConf",
  66. u: endpoint + "/sources/zipMissConf.zip",
  67. err: errors.New("fail to install plugin: invalid zip file: so file or conf file is missing"),
  68. }, {
  69. t: plugin.SINK,
  70. n: "urlerror",
  71. u: endpoint + "/sinks/nozip",
  72. err: errors.New("invalid uri " + endpoint + "/sinks/nozip"),
  73. }, {
  74. t: plugin.SINK,
  75. n: "zipWrongname",
  76. u: endpoint + "/sinks/zipWrongName.zip",
  77. err: errors.New("fail to install plugin: invalid zip file: so file or conf file is missing"),
  78. }, {
  79. t: plugin.FUNCTION,
  80. n: "zipMissSo",
  81. u: endpoint + "/functions/zipMissSo.zip",
  82. err: errors.New("fail to install plugin: invalid zip file: so file or conf file is missing"),
  83. }, {
  84. t: plugin.SOURCE,
  85. n: "random2",
  86. u: endpoint + "/sources/random2.zip",
  87. }, {
  88. t: plugin.SOURCE,
  89. n: "random3",
  90. u: endpoint + "/sources/random3.zip",
  91. v: "1.0.0",
  92. }, {
  93. t: plugin.SINK,
  94. n: "file2",
  95. u: endpoint + "/sinks/file2.zip",
  96. lowerSo: true,
  97. }, {
  98. t: plugin.FUNCTION,
  99. n: "echo2",
  100. u: endpoint + "/functions/echo2.zip",
  101. f: []string{"echo2", "echo3"},
  102. }, {
  103. t: plugin.FUNCTION,
  104. n: "echo2",
  105. u: endpoint + "/functions/echo2.zip",
  106. err: errors.New("invalid name echo2: duplicate"),
  107. }, {
  108. t: plugin.FUNCTION,
  109. n: "misc",
  110. u: endpoint + "/functions/echo2.zip",
  111. f: []string{"misc", "echo3"},
  112. err: errors.New("function name echo3 already exists"),
  113. }, {
  114. t: plugin.FUNCTION,
  115. n: "comp",
  116. u: endpoint + "/functions/comp.zip",
  117. },
  118. }
  119. fmt.Printf("The test bucket size is %d.\n\n", len(data))
  120. for i, tt := range data {
  121. var p plugin.Plugin
  122. if tt.t == plugin.FUNCTION {
  123. p = &plugin.FuncPlugin{
  124. IOPlugin: plugin.IOPlugin{
  125. Name: tt.n,
  126. File: tt.u,
  127. },
  128. Functions: tt.f,
  129. }
  130. } else {
  131. p = &plugin.IOPlugin{
  132. Name: tt.n,
  133. File: tt.u,
  134. }
  135. }
  136. err := manager.Register(tt.t, p)
  137. if !reflect.DeepEqual(tt.err, err) {
  138. t.Errorf("%d: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.err, err)
  139. } else if tt.err == nil {
  140. err := checkFile(manager.pluginDir, manager.pluginConfDir, tt.t, tt.n, tt.v, tt.lowerSo)
  141. if err != nil {
  142. t.Errorf("%d: error : %s\n\n", i, err)
  143. }
  144. }
  145. }
  146. }
  147. func TestManager_List(t *testing.T) {
  148. data := []struct {
  149. t plugin.PluginType
  150. r []string
  151. }{
  152. {
  153. t: plugin.SOURCE,
  154. r: []string{"random", "random2", "random3"},
  155. }, {
  156. t: plugin.SINK,
  157. r: []string{"file2"},
  158. }, {
  159. t: plugin.FUNCTION,
  160. r: []string{"accumulateWordCount", "comp", "countPlusOne", "echo", "echo2"},
  161. },
  162. }
  163. fmt.Printf("The test bucket size is %d.\n\n", len(data))
  164. for i, p := range data {
  165. result := manager.List(p.t)
  166. sort.Strings(result)
  167. if !reflect.DeepEqual(p.r, result) {
  168. t.Errorf("%d: result mismatch:\n exp=%v\n got=%v\n\n", i, p.r, result)
  169. }
  170. }
  171. }
  172. func TestManager_Symbols(t *testing.T) {
  173. r := []string{"accumulateWordCount", "comp", "countPlusOne", "echo", "echo2", "echo3", "misc"}
  174. result := manager.ListSymbols()
  175. sort.Strings(result)
  176. if !reflect.DeepEqual(r, result) {
  177. t.Errorf("result mismatch:\n exp=%v\n got=%v\n\n", r, result)
  178. }
  179. p, ok := manager.GetPluginBySymbol(plugin.FUNCTION, "echo3")
  180. if !ok {
  181. t.Errorf("cannot find echo3 symbol")
  182. }
  183. if p != "echo2" {
  184. t.Errorf("wrong plugin %s for echo3 symbol", p)
  185. }
  186. }
  187. func TestManager_Desc(t *testing.T) {
  188. data := []struct {
  189. t plugin.PluginType
  190. n string
  191. r map[string]interface{}
  192. }{
  193. {
  194. t: plugin.SOURCE,
  195. n: "random2",
  196. r: map[string]interface{}{
  197. "name": "random2",
  198. "version": "",
  199. },
  200. }, {
  201. t: plugin.SOURCE,
  202. n: "random3",
  203. r: map[string]interface{}{
  204. "name": "random3",
  205. "version": "1.0.0",
  206. },
  207. }, {
  208. t: plugin.FUNCTION,
  209. n: "echo2",
  210. r: map[string]interface{}{
  211. "name": "echo2",
  212. "version": "",
  213. "functions": []string{"echo2", "echo3"},
  214. },
  215. },
  216. }
  217. fmt.Printf("The test bucket size is %d.\n\n", len(data))
  218. for i, p := range data {
  219. result, ok := manager.GetPluginInfo(p.t, p.n)
  220. if !ok {
  221. t.Errorf("%d: get error : not found\n\n", i)
  222. return
  223. }
  224. if !reflect.DeepEqual(p.r, result) {
  225. t.Errorf("%d: result mismatch:\n exp=%v\n got=%v\n\n", i, p.r, result)
  226. }
  227. }
  228. }
  229. func TestManager_Delete(t *testing.T) {
  230. data := []struct {
  231. t plugin.PluginType
  232. n string
  233. err error
  234. }{
  235. {
  236. t: plugin.SOURCE,
  237. n: "random2",
  238. }, {
  239. t: plugin.SINK,
  240. n: "file2",
  241. }, {
  242. t: plugin.FUNCTION,
  243. n: "echo2",
  244. }, {
  245. t: plugin.SOURCE,
  246. n: "random3",
  247. }, {
  248. t: plugin.FUNCTION,
  249. n: "comp",
  250. },
  251. }
  252. fmt.Printf("The test bucket size is %d.\n\n", len(data))
  253. for i, p := range data {
  254. err := manager.Delete(p.t, p.n, false)
  255. if err != nil {
  256. t.Errorf("%d: delete error : %s\n\n", i, err)
  257. }
  258. }
  259. }
  260. func checkFile(pluginDir string, etcDir string, t plugin.PluginType, name string, version string, lowerSo bool) error {
  261. var soName string
  262. if !lowerSo {
  263. soName = ucFirst(name) + ".so"
  264. if version != "" {
  265. soName = fmt.Sprintf("%s@v%s.so", ucFirst(name), version)
  266. }
  267. } else {
  268. soName = name + ".so"
  269. if version != "" {
  270. soName = fmt.Sprintf("%s@v%s.so", name, version)
  271. }
  272. }
  273. soPath := path.Join(pluginDir, plugin.PluginTypes[t], soName)
  274. _, err := os.Stat(soPath)
  275. if err != nil {
  276. return err
  277. }
  278. if t == plugin.SOURCE {
  279. etcPath := path.Join(etcDir, plugin.PluginTypes[t], name+".yaml")
  280. _, err = os.Stat(etcPath)
  281. if err != nil {
  282. return err
  283. }
  284. }
  285. return nil
  286. }