manager_test.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package service
  15. import (
  16. "archive/zip"
  17. "fmt"
  18. "net/url"
  19. "os"
  20. "path/filepath"
  21. "reflect"
  22. "sort"
  23. "strings"
  24. "testing"
  25. "github.com/lf-edge/ekuiper/internal/binder"
  26. "github.com/lf-edge/ekuiper/internal/binder/function"
  27. )
  28. var m *Manager
  29. func init() {
  30. serviceManager, err := InitManager()
  31. if err != nil {
  32. panic(err)
  33. }
  34. err = function.Initialize([]binder.FactoryEntry{{Name: "external service", Factory: serviceManager}})
  35. if err != nil {
  36. panic(err)
  37. }
  38. m = GetManager()
  39. m.InitByFiles()
  40. }
  41. func TestInitByFiles(t *testing.T) {
  42. // expects
  43. name := "sample"
  44. info := &serviceInfo{
  45. About: &about{
  46. Author: &author{
  47. Name: "EMQ",
  48. Email: "contact@emqx.io",
  49. Company: "EMQ Technologies Co., Ltd",
  50. Website: "https://www.emqx.io",
  51. },
  52. HelpUrl: &fileLanguage{
  53. English: "https://github.com/lf-edge/ekuiper/blob/master/docs/en_US/plugins/functions/functions.md",
  54. Chinese: "https://github.com/lf-edge/ekuiper/blob/master/docs/zh_CN/plugins/functions/functions.md",
  55. },
  56. Description: &fileLanguage{
  57. English: "Sample external services for test only",
  58. Chinese: "示例外部函数配置,仅供测试",
  59. },
  60. },
  61. Interfaces: map[string]*interfaceInfo{
  62. "tsrpc": {
  63. Addr: "tcp://localhost:50051",
  64. Protocol: GRPC,
  65. Schema: &schemaInfo{
  66. SchemaType: PROTOBUFF,
  67. SchemaFile: "hw.proto",
  68. },
  69. Functions: []string{
  70. "helloFromGrpc",
  71. "ComputeFromGrpc",
  72. "getFeatureFromGrpc",
  73. "objectDetectFromGrpc",
  74. "getStatusFromGrpc",
  75. "notUsedRpc",
  76. },
  77. },
  78. "tsrest": {
  79. Addr: "http://localhost:51234",
  80. Protocol: REST,
  81. Schema: &schemaInfo{
  82. SchemaType: PROTOBUFF,
  83. SchemaFile: "hw.proto",
  84. },
  85. Options: map[string]interface{}{
  86. "insecureSkipVerify": true,
  87. "headers": map[string]interface{}{
  88. "Accept-Charset": "utf-8",
  89. },
  90. },
  91. Functions: []string{
  92. "helloFromRest",
  93. "ComputeFromRest",
  94. "getFeatureFromRest",
  95. "objectDetectFromRest",
  96. "getStatusFromRest",
  97. "restEncodedJson",
  98. },
  99. },
  100. "tsmsgpack": {
  101. Addr: "tcp://localhost:50000",
  102. Protocol: MSGPACK,
  103. Schema: &schemaInfo{
  104. SchemaType: PROTOBUFF,
  105. SchemaFile: "hw.proto",
  106. },
  107. Functions: []string{
  108. "helloFromMsgpack",
  109. "ComputeFromMsgpack",
  110. "getFeatureFromMsgpack",
  111. "objectDetectFromMsgpack",
  112. "getStatusFromMsgpack",
  113. "notUsedMsgpack",
  114. },
  115. },
  116. },
  117. }
  118. funcs := map[string]*functionContainer{
  119. "ListShelves": {
  120. ServiceName: "httpSample",
  121. InterfaceName: "bookshelf",
  122. MethodName: "ListShelves",
  123. },
  124. "CreateShelf": {
  125. ServiceName: "httpSample",
  126. InterfaceName: "bookshelf",
  127. MethodName: "CreateShelf",
  128. },
  129. "GetShelf": {
  130. ServiceName: "httpSample",
  131. InterfaceName: "bookshelf",
  132. MethodName: "GetShelf",
  133. },
  134. "DeleteShelf": {
  135. ServiceName: "httpSample",
  136. InterfaceName: "bookshelf",
  137. MethodName: "DeleteShelf",
  138. },
  139. "ListBooks": {
  140. ServiceName: "httpSample",
  141. InterfaceName: "bookshelf",
  142. MethodName: "ListBooks",
  143. },
  144. "createBook": {
  145. ServiceName: "httpSample",
  146. InterfaceName: "bookshelf",
  147. MethodName: "CreateBook",
  148. },
  149. "GetBook": {
  150. ServiceName: "httpSample",
  151. InterfaceName: "bookshelf",
  152. MethodName: "GetBook",
  153. },
  154. "DeleteBook": {
  155. ServiceName: "httpSample",
  156. InterfaceName: "bookshelf",
  157. MethodName: "DeleteBook",
  158. },
  159. "GetMessage": {
  160. ServiceName: "httpSample",
  161. InterfaceName: "messaging",
  162. MethodName: "GetMessage",
  163. },
  164. "SearchMessage": {
  165. ServiceName: "httpSample",
  166. InterfaceName: "messaging",
  167. MethodName: "SearchMessage",
  168. },
  169. "UpdateMessage": {
  170. ServiceName: "httpSample",
  171. InterfaceName: "messaging",
  172. MethodName: "UpdateMessage",
  173. },
  174. "PatchMessage": {
  175. ServiceName: "httpSample",
  176. InterfaceName: "messaging",
  177. MethodName: "PatchMessage",
  178. },
  179. "helloFromGrpc": {
  180. ServiceName: "sample",
  181. InterfaceName: "tsrpc",
  182. MethodName: "SayHello",
  183. },
  184. "helloFromRest": {
  185. ServiceName: "sample",
  186. InterfaceName: "tsrest",
  187. MethodName: "SayHello",
  188. },
  189. "helloFromMsgpack": {
  190. ServiceName: "sample",
  191. InterfaceName: "tsmsgpack",
  192. MethodName: "SayHello",
  193. },
  194. "objectDetectFromGrpc": {
  195. ServiceName: "sample",
  196. InterfaceName: "tsrpc",
  197. MethodName: "object_detection",
  198. },
  199. "objectDetectFromRest": {
  200. ServiceName: "sample",
  201. InterfaceName: "tsrest",
  202. MethodName: "object_detection",
  203. },
  204. "objectDetectFromMsgpack": {
  205. ServiceName: "sample",
  206. InterfaceName: "tsmsgpack",
  207. MethodName: "object_detection",
  208. },
  209. "getFeatureFromGrpc": {
  210. ServiceName: "sample",
  211. InterfaceName: "tsrpc",
  212. MethodName: "get_feature",
  213. },
  214. "getFeatureFromRest": {
  215. ServiceName: "sample",
  216. InterfaceName: "tsrest",
  217. MethodName: "get_feature",
  218. },
  219. "getFeatureFromMsgpack": {
  220. ServiceName: "sample",
  221. InterfaceName: "tsmsgpack",
  222. MethodName: "get_feature",
  223. },
  224. "getStatusFromGrpc": {
  225. ServiceName: "sample",
  226. InterfaceName: "tsrpc",
  227. MethodName: "getStatus",
  228. },
  229. "getStatusFromRest": {
  230. ServiceName: "sample",
  231. InterfaceName: "tsrest",
  232. MethodName: "getStatus",
  233. },
  234. "getStatusFromMsgpack": {
  235. ServiceName: "sample",
  236. InterfaceName: "tsmsgpack",
  237. MethodName: "getStatus",
  238. },
  239. "ComputeFromGrpc": {
  240. ServiceName: "sample",
  241. InterfaceName: "tsrpc",
  242. MethodName: "Compute",
  243. },
  244. "ComputeFromRest": {
  245. ServiceName: "sample",
  246. InterfaceName: "tsrest",
  247. MethodName: "Compute",
  248. },
  249. "ComputeFromMsgpack": {
  250. ServiceName: "sample",
  251. InterfaceName: "tsmsgpack",
  252. MethodName: "Compute",
  253. },
  254. "notUsedRpc": {
  255. ServiceName: "sample",
  256. InterfaceName: "tsrpc",
  257. MethodName: "RestEncodedJson",
  258. },
  259. "restEncodedJson": {
  260. ServiceName: "sample",
  261. InterfaceName: "tsrest",
  262. MethodName: "RestEncodedJson",
  263. },
  264. "notUsedMsgpack": {
  265. ServiceName: "sample",
  266. InterfaceName: "tsmsgpack",
  267. MethodName: "RestEncodedJson",
  268. },
  269. }
  270. actualService := &serviceInfo{}
  271. ok, err := m.serviceKV.Get(name, actualService)
  272. if err != nil {
  273. t.Error(err)
  274. t.FailNow()
  275. }
  276. if !ok {
  277. t.Errorf("service %s not found", name)
  278. t.FailNow()
  279. }
  280. if !reflect.DeepEqual(info, actualService) {
  281. t.Errorf("service info mismatch, expect %v but got %v", info, actualService)
  282. }
  283. actualKeys, _ := m.functionKV.Keys()
  284. if len(funcs) != len(actualKeys) {
  285. t.Errorf("functions info mismatch: expect %d funcs but got %v", len(funcs), actualKeys)
  286. }
  287. for f, c := range funcs {
  288. actualFunc := &functionContainer{}
  289. ok, err := m.functionKV.Get(f, actualFunc)
  290. if err != nil {
  291. t.Error(err)
  292. break
  293. }
  294. if !ok {
  295. t.Errorf("function %s not found", f)
  296. break
  297. }
  298. if !reflect.DeepEqual(c, actualFunc) {
  299. t.Errorf("func info mismatch, expect %v but got %v", c, actualFunc)
  300. }
  301. }
  302. }
  303. func TestManage(t *testing.T) {
  304. // Test HasFunctionSet
  305. if m.HasFunctionSet("sample") {
  306. t.Error("HasFunctionSet failed, got true should be false")
  307. }
  308. if !m.HasService("sample") {
  309. t.Error("service sample not found")
  310. }
  311. _, err := m.Function("ListShelves")
  312. if err != nil {
  313. t.Errorf("Function ListShelves failed: %v", err)
  314. }
  315. _, ok := m.ConvName("ListShelves")
  316. if !ok {
  317. t.Error("ConvName for ListShelves failed")
  318. }
  319. _, ok = m.ConvName("NotExist")
  320. if ok {
  321. t.Error("ConvName for NotExist should failed")
  322. }
  323. initServices := []string{"httpSample", "sample"}
  324. list, err := m.List()
  325. if err != nil {
  326. t.Error(err)
  327. }
  328. if !reflect.DeepEqual(initServices, list) {
  329. t.Errorf("Get service list error, \nexpect\t\t%v, \nbut got\t\t%v", initServices, list)
  330. }
  331. // Create the zip file
  332. baseFolder := filepath.Join(m.etcDir, "toadd")
  333. os.MkdirAll(filepath.Join(m.etcDir, "temp"), 0o755)
  334. outPath := filepath.Join(m.etcDir, "temp", "dynamic.zip")
  335. outFile, err := os.Create(outPath)
  336. if err != nil {
  337. fmt.Println(err)
  338. }
  339. defer os.Remove(outPath)
  340. // Create a new zip archive.
  341. w := zip.NewWriter(outFile)
  342. addFiles(w, baseFolder, "")
  343. err = w.Close()
  344. if err != nil {
  345. fmt.Println(err)
  346. }
  347. // Install the dynamic zip
  348. url, err := urlFromFilePath(outPath)
  349. if err != nil {
  350. t.Errorf("Create URL from file path %s: %v", outPath, err)
  351. return
  352. }
  353. err = m.Create(&ServiceCreationRequest{
  354. Name: "dynamic",
  355. File: url.String(),
  356. })
  357. if err != nil {
  358. t.Errorf("Create dynamic service failed: %v", err)
  359. return
  360. }
  361. dService, err := m.Get("dynamic")
  362. if err != nil {
  363. t.Errorf("Get dynamic service error: %v", err)
  364. } else if len(dService.Interfaces) != 1 {
  365. t.Errorf("dynamic service should have 1 interface, but got %d", len(dService.Interfaces))
  366. }
  367. expectedService := map[string]string{
  368. "dynamic": `{"name":"dynamic","file":"` + url.String() + `"}`,
  369. }
  370. allServices := m.GetAllServices()
  371. if !reflect.DeepEqual(expectedService, allServices) {
  372. t.Errorf("Get all installed service faile \nexpect\t\t%v, \nbut got\t\t%v", expectedService, allServices)
  373. }
  374. allServicesStatus := m.GetAllServicesStatus()
  375. if len(allServicesStatus) != 0 {
  376. t.Errorf("Get all installed service status faile, expect 0 but got %d", len(allServicesStatus))
  377. }
  378. expectedFunctions := []string{"ListShelves", "CreateShelf", "GetShelf", "DeleteShelf", "ListBooks", "createBook", "GetBook", "DeleteBook", "GetMessage", "SearchMessage", "UpdateMessage", "PatchMessage", "helloFromGrpc", "ComputeFromGrpc", "getFeatureFromGrpc", "objectDetectFromGrpc", "getStatusFromGrpc", "notUsedRpc", "helloFromRest", "ComputeFromRest", "getFeatureFromRest", "objectDetectFromRest", "getStatusFromRest", "restEncodedJson", "helloFromMsgpack", "ComputeFromMsgpack", "getFeatureFromMsgpack", "objectDetectFromMsgpack", "getStatusFromMsgpack", "notUsedMsgpack", "SayHello2"}
  379. sort.Strings(expectedFunctions)
  380. functions, _ := m.ListFunctions()
  381. sort.Strings(functions)
  382. if !reflect.DeepEqual(expectedFunctions, functions) {
  383. t.Errorf("Get all installed functions faile \nexpect\t\t%v, \nbut got\t\t%v", expectedFunctions, functions)
  384. }
  385. err = m.Update(&ServiceCreationRequest{
  386. Name: "dynamic",
  387. File: url.String(),
  388. })
  389. if err != nil {
  390. t.Errorf("Create dynamic service failed: %v", err)
  391. return
  392. }
  393. m.UninstallAllServices()
  394. allServices = m.GetAllServices()
  395. if len(allServices) != 0 {
  396. t.Errorf("Uninstall all services failed, expect 0 but got %d", len(allServices))
  397. }
  398. importedService := map[string]string{
  399. "wrongFormat": "nnn",
  400. "dynamic": `{"name":"dynamic","file":"` + url.String() + `"}`,
  401. "wrongPath": `{"name":"dynamic","file":"wrongpath"}`,
  402. }
  403. m.ImportServices(importedService)
  404. allServicesStatus = m.GetAllServicesStatus()
  405. if len(allServicesStatus) != 2 {
  406. t.Errorf("Get all installed service status faile, expect 2 error but got %v", allServicesStatus)
  407. }
  408. expectedList := []string{"httpSample", "sample", "dynamic"}
  409. list, err = m.List()
  410. if err != nil {
  411. t.Error(err)
  412. }
  413. if !reflect.DeepEqual(expectedList, list) {
  414. t.Errorf("Get service list error, \nexpect\t\t%v, \nbut got\t\t%v", expectedList, list)
  415. }
  416. err = m.Delete("dynamic")
  417. if err != nil {
  418. t.Errorf("Delete dynamic service error: %v", err)
  419. }
  420. list, err = m.List()
  421. if err != nil {
  422. t.Error(err)
  423. }
  424. if !reflect.DeepEqual(initServices, list) {
  425. t.Errorf("Get service list error, \nexpect\t\t%v, \nbut got\t\t%v", initServices, list)
  426. }
  427. }
  428. func addFiles(w *zip.Writer, basePath, baseInZip string) {
  429. // Open the Directory
  430. files, err := os.ReadDir(basePath)
  431. if err != nil {
  432. fmt.Println(err)
  433. }
  434. for _, file := range files {
  435. if !file.IsDir() {
  436. dat, err := os.ReadFile(filepath.Join(basePath, file.Name()))
  437. if err != nil {
  438. fmt.Println(err)
  439. }
  440. // Add some files to the archive.
  441. f, err := w.Create(filepath.Join(baseInZip, file.Name()))
  442. if err != nil {
  443. fmt.Println(err)
  444. }
  445. _, err = f.Write(dat)
  446. if err != nil {
  447. fmt.Println(err)
  448. }
  449. } else if file.IsDir() {
  450. // Recurse
  451. newBase := filepath.Join(basePath, file.Name())
  452. fmt.Println("Recursing and Adding SubDir: " + file.Name())
  453. fmt.Println("Recursing and Adding SubDir: " + newBase)
  454. addFiles(w, newBase, filepath.Join(baseInZip, file.Name()))
  455. }
  456. }
  457. }
  458. func urlFromFilePath(path string) (*url.URL, error) {
  459. if !filepath.IsAbs(path) {
  460. return nil, fmt.Errorf("path %s is not absolute", path)
  461. }
  462. // If path has a Windows volume name, convert the volume to a host and prefix
  463. // per https://blogs.msdn.microsoft.com/ie/2006/12/06/file-uris-in-windows/.
  464. if vol := filepath.VolumeName(path); vol != "" {
  465. if strings.HasPrefix(vol, `\\`) {
  466. path = filepath.ToSlash(path[2:])
  467. i := strings.IndexByte(path, '/')
  468. if i < 0 {
  469. // A degenerate case.
  470. // \\host.example.com (without a share name)
  471. // becomes
  472. // file://host.example.com/
  473. return &url.URL{
  474. Scheme: "file",
  475. Host: path,
  476. Path: "/",
  477. }, nil
  478. }
  479. // \\host.example.com\Share\path\to\file
  480. // becomes
  481. // file://host.example.com/Share/path/to/file
  482. return &url.URL{
  483. Scheme: "file",
  484. Host: path[:i],
  485. Path: filepath.ToSlash(path[i:]),
  486. }, nil
  487. }
  488. // C:\path\to\file
  489. // becomes
  490. // file:///C:/path/to/file
  491. return &url.URL{
  492. Scheme: "file",
  493. Path: "/" + filepath.ToSlash(path),
  494. }, nil
  495. }
  496. // /path/to/file
  497. // becomes
  498. // file:///path/to/file
  499. return &url.URL{
  500. Scheme: "file",
  501. Path: filepath.ToSlash(path),
  502. }, nil
  503. }