rest.go 29 KB


  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package server
  15. import (
  16. "crypto/tls"
  17. "encoding/json"
  18. "fmt"
  19. "github.com/gorilla/handlers"
  20. "github.com/gorilla/mux"
  21. "github.com/lf-edge/ekuiper/internal/conf"
  22. "github.com/lf-edge/ekuiper/internal/meta"
  23. "github.com/lf-edge/ekuiper/internal/plugin"
  24. "github.com/lf-edge/ekuiper/internal/plugin/native"
  25. "github.com/lf-edge/ekuiper/internal/plugin/portable"
  26. "github.com/lf-edge/ekuiper/internal/service"
  27. "github.com/lf-edge/ekuiper/pkg/api"
  28. "github.com/lf-edge/ekuiper/pkg/ast"
  29. "github.com/lf-edge/ekuiper/pkg/errorx"
  30. "golang.org/x/net/html"
  31. "io"
  32. "io/ioutil"
  33. "net/http"
  34. "os"
  35. "runtime"
  36. "strings"
  37. "time"
  38. )
  39. const (
  40. ContentType = "Content-Type"
  41. ContentTypeJSON = "application/json"
  42. )
  43. type statementDescriptor struct {
  44. Sql string `json:"sql,omitempty"`
  45. }
  46. func decodeStatementDescriptor(reader io.ReadCloser) (statementDescriptor, error) {
  47. sd := statementDescriptor{}
  48. err := json.NewDecoder(reader).Decode(&sd)
  49. // Problems decoding
  50. if err != nil {
  51. return sd, fmt.Errorf("Error decoding the statement descriptor: %v", err)
  52. }
  53. return sd, nil
  54. }
  55. // Handle applies the specified error and error concept tot he HTTP response writer
  56. func handleError(w http.ResponseWriter, err error, prefix string, logger api.Logger) {
  57. message := prefix
  58. if message != "" {
  59. message += ": "
  60. }
  61. message += err.Error()
  62. logger.Error(message)
  63. var ec int
  64. switch e := err.(type) {
  65. case *errorx.Error:
  66. switch e.Code() {
  67. case errorx.NOT_FOUND:
  68. ec = http.StatusNotFound
  69. default:
  70. ec = http.StatusBadRequest
  71. }
  72. default:
  73. ec = http.StatusBadRequest
  74. }
  75. http.Error(w, message, ec)
  76. }
  77. func jsonResponse(i interface{}, w http.ResponseWriter, logger api.Logger) {
  78. w.Header().Add(ContentType, ContentTypeJSON)
  79. enc := json.NewEncoder(w)
  80. err := enc.Encode(i)
  81. // Problems encoding
  82. if err != nil {
  83. handleError(w, err, "", logger)
  84. }
  85. }
  86. func createRestServer(ip string, port int) *http.Server {
  87. r := mux.NewRouter()
  88. r.HandleFunc("/", rootHandler).Methods(http.MethodGet, http.MethodPost)
  89. r.HandleFunc("/ping", pingHandler).Methods(http.MethodGet)
  90. r.HandleFunc("/streams", streamsHandler).Methods(http.MethodGet, http.MethodPost)
  91. r.HandleFunc("/streams/{name}", streamHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
  92. r.HandleFunc("/tables", tablesHandler).Methods(http.MethodGet, http.MethodPost)
  93. r.HandleFunc("/tables/{name}", tableHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
  94. r.HandleFunc("/rules", rulesHandler).Methods(http.MethodGet, http.MethodPost)
  95. r.HandleFunc("/rules/{name}", ruleHandler).Methods(http.MethodDelete, http.MethodGet, http.MethodPut)
  96. r.HandleFunc("/rules/{name}/status", getStatusRuleHandler).Methods(http.MethodGet)
  97. r.HandleFunc("/rules/{name}/start", startRuleHandler).Methods(http.MethodPost)
  98. r.HandleFunc("/rules/{name}/stop", stopRuleHandler).Methods(http.MethodPost)
  99. r.HandleFunc("/rules/{name}/restart", restartRuleHandler).Methods(http.MethodPost)
  100. r.HandleFunc("/rules/{name}/topo", getTopoRuleHandler).Methods(http.MethodGet)
  101. r.HandleFunc("/plugins/sources", sourcesHandler).Methods(http.MethodGet, http.MethodPost)
  102. r.HandleFunc("/plugins/sources/prebuild", prebuildSourcePlugins).Methods(http.MethodGet)
  103. r.HandleFunc("/plugins/sources/{name}", sourceHandler).Methods(http.MethodDelete, http.MethodGet)
  104. r.HandleFunc("/plugins/sinks", sinksHandler).Methods(http.MethodGet, http.MethodPost)
  105. r.HandleFunc("/plugins/sinks/prebuild", prebuildSinkPlugins).Methods(http.MethodGet)
  106. r.HandleFunc("/plugins/sinks/{name}", sinkHandler).Methods(http.MethodDelete, http.MethodGet)
  107. r.HandleFunc("/plugins/functions", functionsHandler).Methods(http.MethodGet, http.MethodPost)
  108. r.HandleFunc("/plugins/functions/prebuild", prebuildFuncsPlugins).Methods(http.MethodGet)
  109. r.HandleFunc("/plugins/functions/{name}", functionHandler).Methods(http.MethodDelete, http.MethodGet)
  110. r.HandleFunc("/plugins/functions/{name}/register", functionRegisterHandler).Methods(http.MethodPost)
  111. r.HandleFunc("/plugins/udfs", functionsListHandler).Methods(http.MethodGet)
  112. r.HandleFunc("/plugins/udfs/{name}", functionsGetHandler).Methods(http.MethodGet)
  113. r.HandleFunc("/plugins/portables", portablesHandler).Methods(http.MethodGet, http.MethodPost)
  114. r.HandleFunc("/plugins/portables/{name}", portableHandler).Methods(http.MethodGet, http.MethodDelete)
  115. r.HandleFunc("/metadata/functions", functionsMetaHandler).Methods(http.MethodGet)
  116. r.HandleFunc("/metadata/sinks", sinksMetaHandler).Methods(http.MethodGet)
  117. r.HandleFunc("/metadata/sinks/{name}", newSinkMetaHandler).Methods(http.MethodGet)
  118. r.HandleFunc("/metadata/sources", sourcesMetaHandler).Methods(http.MethodGet)
  119. r.HandleFunc("/metadata/sources/yaml/{name}", sourceConfHandler).Methods(http.MethodGet)
  120. r.HandleFunc("/metadata/sources/{name}", sourceMetaHandler).Methods(http.MethodGet)
  121. r.HandleFunc("/metadata/sources/{name}/confKeys", sourceConfKeysHandler).Methods(http.MethodGet)
  122. r.HandleFunc("/metadata/sources/{name}/confKeys/{confKey}", sourceConfKeyHandler).Methods(http.MethodDelete, http.MethodPost)
  123. r.HandleFunc("/metadata/sources/{name}/confKeys/{confKey}/field", sourceConfKeyFieldsHandler).Methods(http.MethodDelete, http.MethodPost)
  124. r.HandleFunc("/services", servicesHandler).Methods(http.MethodGet, http.MethodPost)
  125. r.HandleFunc("/services/functions", serviceFunctionsHandler).Methods(http.MethodGet)
  126. r.HandleFunc("/services/functions/{name}", serviceFunctionHandler).Methods(http.MethodGet)
  127. r.HandleFunc("/services/{name}", serviceHandler).Methods(http.MethodDelete, http.MethodGet, http.MethodPut)
  128. server := &http.Server{
  129. Addr: fmt.Sprintf("%s:%d", ip, port),
  130. // Good practice to set timeouts to avoid Slowloris attacks.
  131. WriteTimeout: time.Second * 60 * 5,
  132. ReadTimeout: time.Second * 60 * 5,
  133. IdleTimeout: time.Second * 60,
  134. Handler: handlers.CORS(handlers.AllowedHeaders([]string{"Accept", "Accept-Language", "Content-Type", "Content-Language", "Origin"}))(r),
  135. }
  136. server.SetKeepAlivesEnabled(false)
  137. return server
  138. }
  139. type information struct {
  140. Version string `json:"version"`
  141. Os string `json:"os"`
  142. UpTimeSeconds int64 `json:"upTimeSeconds"`
  143. }
  144. //The handler for root
  145. func rootHandler(w http.ResponseWriter, r *http.Request) {
  146. defer r.Body.Close()
  147. switch r.Method {
  148. case http.MethodGet, http.MethodPost:
  149. w.WriteHeader(http.StatusOK)
  150. info := new(information)
  151. info.Version = version
  152. info.UpTimeSeconds = time.Now().Unix() - startTimeStamp
  153. info.Os = runtime.GOOS
  154. byteInfo, _ := json.Marshal(info)
  155. w.Write(byteInfo)
  156. }
  157. }
  158. func pingHandler(w http.ResponseWriter, r *http.Request) {
  159. w.WriteHeader(http.StatusOK)
  160. }
  161. func sourcesManageHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
  162. defer r.Body.Close()
  163. switch r.Method {
  164. case http.MethodGet:
  165. content, err := streamProcessor.ShowStream(st)
  166. if err != nil {
  167. handleError(w, err, fmt.Sprintf("%s command error", strings.Title(ast.StreamTypeMap[st])), logger)
  168. return
  169. }
  170. jsonResponse(content, w, logger)
  171. case http.MethodPost:
  172. v, err := decodeStatementDescriptor(r.Body)
  173. if err != nil {
  174. handleError(w, err, "Invalid body", logger)
  175. return
  176. }
  177. content, err := streamProcessor.ExecStreamSql(v.Sql)
  178. if err != nil {
  179. handleError(w, err, fmt.Sprintf("%s command error", strings.Title(ast.StreamTypeMap[st])), logger)
  180. return
  181. }
  182. w.WriteHeader(http.StatusCreated)
  183. w.Write([]byte(content))
  184. }
  185. }
  186. func sourceManageHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
  187. defer r.Body.Close()
  188. vars := mux.Vars(r)
  189. name := vars["name"]
  190. switch r.Method {
  191. case http.MethodGet:
  192. content, err := streamProcessor.DescStream(name, st)
  193. if err != nil {
  194. handleError(w, err, fmt.Sprintf("describe %s error", ast.StreamTypeMap[st]), logger)
  195. return
  196. }
  197. jsonResponse(content, w, logger)
  198. case http.MethodDelete:
  199. content, err := streamProcessor.DropStream(name, st)
  200. if err != nil {
  201. handleError(w, err, fmt.Sprintf("delete %s error", ast.StreamTypeMap[st]), logger)
  202. return
  203. }
  204. w.WriteHeader(http.StatusOK)
  205. w.Write([]byte(content))
  206. case http.MethodPut:
  207. v, err := decodeStatementDescriptor(r.Body)
  208. if err != nil {
  209. handleError(w, err, "Invalid body", logger)
  210. return
  211. }
  212. content, err := streamProcessor.ExecReplaceStream(v.Sql, st)
  213. if err != nil {
  214. handleError(w, err, fmt.Sprintf("%s command error", strings.Title(ast.StreamTypeMap[st])), logger)
  215. return
  216. }
  217. w.WriteHeader(http.StatusOK)
  218. w.Write([]byte(content))
  219. }
  220. }
  221. //list or create streams
  222. func streamsHandler(w http.ResponseWriter, r *http.Request) {
  223. sourcesManageHandler(w, r, ast.TypeStream)
  224. }
  225. //describe or delete a stream
  226. func streamHandler(w http.ResponseWriter, r *http.Request) {
  227. sourceManageHandler(w, r, ast.TypeStream)
  228. }
  229. //list or create tables
  230. func tablesHandler(w http.ResponseWriter, r *http.Request) {
  231. sourcesManageHandler(w, r, ast.TypeTable)
  232. }
  233. func tableHandler(w http.ResponseWriter, r *http.Request) {
  234. sourceManageHandler(w, r, ast.TypeTable)
  235. }
  236. //list or create rules
  237. func rulesHandler(w http.ResponseWriter, r *http.Request) {
  238. defer r.Body.Close()
  239. switch r.Method {
  240. case http.MethodPost:
  241. body, err := ioutil.ReadAll(r.Body)
  242. if err != nil {
  243. handleError(w, err, "Invalid body", logger)
  244. return
  245. }
  246. r, err := ruleProcessor.ExecCreate("", string(body))
  247. var result string
  248. if err != nil {
  249. handleError(w, err, "Create rule error", logger)
  250. return
  251. } else {
  252. result = fmt.Sprintf("Rule %s was created successfully.", r.Id)
  253. }
  254. //Start the rule
  255. rs, err := createRuleState(r)
  256. if err != nil {
  257. result = err.Error()
  258. } else {
  259. err = doStartRule(rs)
  260. if err != nil {
  261. result = err.Error()
  262. }
  263. }
  264. w.WriteHeader(http.StatusCreated)
  265. w.Write([]byte(result))
  266. case http.MethodGet:
  267. content, err := getAllRulesWithStatus()
  268. if err != nil {
  269. handleError(w, err, "Show rules error", logger)
  270. return
  271. }
  272. jsonResponse(content, w, logger)
  273. }
  274. }
  275. //describe or delete a rule
  276. func ruleHandler(w http.ResponseWriter, r *http.Request) {
  277. defer r.Body.Close()
  278. vars := mux.Vars(r)
  279. name := vars["name"]
  280. switch r.Method {
  281. case http.MethodGet:
  282. rule, err := ruleProcessor.GetRuleByName(name)
  283. if err != nil {
  284. handleError(w, err, "describe rule error", logger)
  285. return
  286. }
  287. jsonResponse(rule, w, logger)
  288. case http.MethodDelete:
  289. deleteRule(name)
  290. content, err := ruleProcessor.ExecDrop(name)
  291. if err != nil {
  292. handleError(w, err, "delete rule error", logger)
  293. return
  294. }
  295. w.WriteHeader(http.StatusOK)
  296. w.Write([]byte(content))
  297. case http.MethodPut:
  298. _, err := ruleProcessor.GetRuleByName(name)
  299. if err != nil {
  300. handleError(w, err, "not found this rule", logger)
  301. return
  302. }
  303. body, err := ioutil.ReadAll(r.Body)
  304. if err != nil {
  305. handleError(w, err, "Invalid body", logger)
  306. return
  307. }
  308. r, err := ruleProcessor.ExecUpdate(name, string(body))
  309. var result string
  310. if err != nil {
  311. handleError(w, err, "Update rule error", logger)
  312. return
  313. } else {
  314. result = fmt.Sprintf("Rule %s was updated successfully.", r.Id)
  315. }
  316. err = restartRule(name)
  317. if err != nil {
  318. handleError(w, err, "restart rule error", logger)
  319. return
  320. }
  321. w.WriteHeader(http.StatusOK)
  322. w.Write([]byte(result))
  323. }
  324. }
  325. //get status of a rule
  326. func getStatusRuleHandler(w http.ResponseWriter, r *http.Request) {
  327. defer r.Body.Close()
  328. vars := mux.Vars(r)
  329. name := vars["name"]
  330. content, err := getRuleStatus(name)
  331. if err != nil {
  332. handleError(w, err, "get rule status error", logger)
  333. return
  334. }
  335. w.WriteHeader(http.StatusOK)
  336. w.Write([]byte(content))
  337. }
  338. //start a rule
  339. func startRuleHandler(w http.ResponseWriter, r *http.Request) {
  340. defer r.Body.Close()
  341. vars := mux.Vars(r)
  342. name := vars["name"]
  343. err := startRule(name)
  344. if err != nil {
  345. handleError(w, err, "start rule error", logger)
  346. return
  347. }
  348. w.WriteHeader(http.StatusOK)
  349. w.Write([]byte(fmt.Sprintf("Rule %s was started", name)))
  350. }
  351. //stop a rule
  352. func stopRuleHandler(w http.ResponseWriter, r *http.Request) {
  353. defer r.Body.Close()
  354. vars := mux.Vars(r)
  355. name := vars["name"]
  356. result := stopRule(name)
  357. w.WriteHeader(http.StatusOK)
  358. w.Write([]byte(result))
  359. }
  360. //restart a rule
  361. func restartRuleHandler(w http.ResponseWriter, r *http.Request) {
  362. defer r.Body.Close()
  363. vars := mux.Vars(r)
  364. name := vars["name"]
  365. err := restartRule(name)
  366. if err != nil {
  367. handleError(w, err, "restart rule error", logger)
  368. return
  369. }
  370. w.WriteHeader(http.StatusOK)
  371. w.Write([]byte(fmt.Sprintf("Rule %s was restarted", name)))
  372. }
  373. //get topo of a rule
  374. func getTopoRuleHandler(w http.ResponseWriter, r *http.Request) {
  375. defer r.Body.Close()
  376. vars := mux.Vars(r)
  377. name := vars["name"]
  378. content, err := getRuleTopo(name)
  379. if err != nil {
  380. handleError(w, err, "get rule topo error", logger)
  381. return
  382. }
  383. w.Header().Set(ContentType, ContentTypeJSON)
  384. w.Write([]byte(content))
  385. }
  386. func pluginsHandler(w http.ResponseWriter, r *http.Request, t plugin.PluginType) {
  387. pluginManager := native.GetManager()
  388. defer r.Body.Close()
  389. switch r.Method {
  390. case http.MethodGet:
  391. content := pluginManager.List(t)
  392. jsonResponse(content, w, logger)
  393. case http.MethodPost:
  394. sd := plugin.NewPluginByType(t)
  395. err := json.NewDecoder(r.Body).Decode(sd)
  396. // Problems decoding
  397. if err != nil {
  398. handleError(w, err, fmt.Sprintf("Invalid body: Error decoding the %s plugin json", plugin.PluginTypes[t]), logger)
  399. return
  400. }
  401. err = pluginManager.Register(t, sd)
  402. if err != nil {
  403. handleError(w, err, fmt.Sprintf("%s plugins create command error", plugin.PluginTypes[t]), logger)
  404. return
  405. }
  406. w.WriteHeader(http.StatusCreated)
  407. w.Write([]byte(fmt.Sprintf("%s plugin %s is created", plugin.PluginTypes[t], sd.GetName())))
  408. }
  409. }
  410. func pluginHandler(w http.ResponseWriter, r *http.Request, t plugin.PluginType) {
  411. defer r.Body.Close()
  412. vars := mux.Vars(r)
  413. name := vars["name"]
  414. cb := r.URL.Query().Get("stop")
  415. pluginManager := native.GetManager()
  416. switch r.Method {
  417. case http.MethodDelete:
  418. r := cb == "1"
  419. err := pluginManager.Delete(t, name, r)
  420. if err != nil {
  421. handleError(w, err, fmt.Sprintf("delete %s plugin %s error", plugin.PluginTypes[t], name), logger)
  422. return
  423. }
  424. w.WriteHeader(http.StatusOK)
  425. result := fmt.Sprintf("%s plugin %s is deleted", plugin.PluginTypes[t], name)
  426. if r {
  427. result = fmt.Sprintf("%s and Kuiper will be stopped", result)
  428. } else {
  429. result = fmt.Sprintf("%s and Kuiper must restart for the change to take effect.", result)
  430. }
  431. w.Write([]byte(result))
  432. case http.MethodGet:
  433. j, ok := pluginManager.GetPluginInfo(t, name)
  434. if !ok {
  435. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe %s plugin %s error", plugin.PluginTypes[t], name), logger)
  436. return
  437. }
  438. jsonResponse(j, w, logger)
  439. }
  440. }
  441. //list or create source plugin
  442. func sourcesHandler(w http.ResponseWriter, r *http.Request) {
  443. pluginsHandler(w, r, plugin.SOURCE)
  444. }
  445. //delete a source plugin
  446. func sourceHandler(w http.ResponseWriter, r *http.Request) {
  447. pluginHandler(w, r, plugin.SOURCE)
  448. }
  449. //list or create sink plugin
  450. func sinksHandler(w http.ResponseWriter, r *http.Request) {
  451. pluginsHandler(w, r, plugin.SINK)
  452. }
  453. //delete a sink plugin
  454. func sinkHandler(w http.ResponseWriter, r *http.Request) {
  455. pluginHandler(w, r, plugin.SINK)
  456. }
  457. //list or create function plugin
  458. func functionsHandler(w http.ResponseWriter, r *http.Request) {
  459. pluginsHandler(w, r, plugin.FUNCTION)
  460. }
  461. //list all user defined functions in all function plugins
  462. func functionsListHandler(w http.ResponseWriter, r *http.Request) {
  463. pluginManager := native.GetManager()
  464. content := pluginManager.ListSymbols()
  465. jsonResponse(content, w, logger)
  466. }
  467. func functionsGetHandler(w http.ResponseWriter, r *http.Request) {
  468. vars := mux.Vars(r)
  469. name := vars["name"]
  470. pluginManager := native.GetManager()
  471. j, ok := pluginManager.GetPluginBySymbol(plugin.FUNCTION, name)
  472. if !ok {
  473. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe function %s error", name), logger)
  474. return
  475. }
  476. jsonResponse(map[string]string{"name": name, "plugin": j}, w, logger)
  477. }
  478. //delete a function plugin
  479. func functionHandler(w http.ResponseWriter, r *http.Request) {
  480. pluginHandler(w, r, plugin.FUNCTION)
  481. }
  482. type functionList struct {
  483. Functions []string `json:"functions,omitempty"`
  484. }
  485. // register function list for function plugin. If a plugin exports multiple functions, the function list must be registered
  486. // either by create or register. If the function plugin has been loaded because of auto load through so file, the function
  487. // list MUST be registered by this API or only the function with the same name as the plugin can be used.
  488. func functionRegisterHandler(w http.ResponseWriter, r *http.Request) {
  489. defer r.Body.Close()
  490. vars := mux.Vars(r)
  491. name := vars["name"]
  492. pluginManager := native.GetManager()
  493. _, ok := pluginManager.GetPluginInfo(plugin.FUNCTION, name)
  494. if !ok {
  495. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("register %s plugin %s error", plugin.PluginTypes[plugin.FUNCTION], name), logger)
  496. return
  497. }
  498. sd := functionList{}
  499. err := json.NewDecoder(r.Body).Decode(&sd)
  500. // Problems decoding
  501. if err != nil {
  502. handleError(w, err, fmt.Sprintf("Invalid body: Error decoding the function list json %s", r.Body), logger)
  503. return
  504. }
  505. err = pluginManager.RegisterFuncs(name, sd.Functions)
  506. if err != nil {
  507. handleError(w, err, fmt.Sprintf("function plugins %s regiser functions error", name), logger)
  508. return
  509. }
  510. w.WriteHeader(http.StatusOK)
  511. w.Write([]byte(fmt.Sprintf("function plugin %s function list is registered", name)))
  512. }
  513. func portablesHandler(w http.ResponseWriter, r *http.Request) {
  514. m := portable.GetManager()
  515. defer r.Body.Close()
  516. switch r.Method {
  517. case http.MethodGet:
  518. content := m.List()
  519. jsonResponse(content, w, logger)
  520. case http.MethodPost:
  521. sd := plugin.NewPluginByType(plugin.PORTABLE)
  522. err := json.NewDecoder(r.Body).Decode(sd)
  523. // Problems decoding
  524. if err != nil {
  525. handleError(w, err, "Invalid body: Error decoding the portable plugin json", logger)
  526. return
  527. }
  528. err = m.Register(sd)
  529. if err != nil {
  530. handleError(w, err, "portable plugin create command error", logger)
  531. return
  532. }
  533. w.WriteHeader(http.StatusCreated)
  534. w.Write([]byte(fmt.Sprintf("portable plugin %s is created", sd.GetName())))
  535. }
  536. }
  537. func portableHandler(w http.ResponseWriter, r *http.Request) {
  538. defer r.Body.Close()
  539. vars := mux.Vars(r)
  540. name := vars["name"]
  541. m := portable.GetManager()
  542. switch r.Method {
  543. case http.MethodDelete:
  544. err := m.Delete(name)
  545. if err != nil {
  546. handleError(w, err, fmt.Sprintf("delete portable plugin %s error", name), logger)
  547. return
  548. }
  549. w.WriteHeader(http.StatusOK)
  550. result := fmt.Sprintf("portable plugin %s is deleted", name)
  551. w.Write([]byte(result))
  552. case http.MethodGet:
  553. j, ok := m.GetPluginInfo(name)
  554. if !ok {
  555. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe portable plugin %s error", name), logger)
  556. return
  557. }
  558. jsonResponse(j, w, logger)
  559. }
  560. }
  561. func prebuildSourcePlugins(w http.ResponseWriter, r *http.Request) {
  562. prebuildPluginsHandler(w, r, plugin.SOURCE)
  563. }
  564. func prebuildSinkPlugins(w http.ResponseWriter, r *http.Request) {
  565. prebuildPluginsHandler(w, r, plugin.SINK)
  566. }
  567. func prebuildFuncsPlugins(w http.ResponseWriter, r *http.Request) {
  568. prebuildPluginsHandler(w, r, plugin.FUNCTION)
  569. }
  570. func isOffcialDockerImage() bool {
  571. if strings.ToLower(os.Getenv("MAINTAINER")) != "emqx.io" {
  572. return false
  573. }
  574. return true
  575. }
  576. func prebuildPluginsHandler(w http.ResponseWriter, r *http.Request, t plugin.PluginType) {
  577. emsg := "It's strongly recommended to install plugins at official released Debian Docker images. If you choose to proceed to install plugin, please make sure the plugin is already validated in your own build."
  578. if !isOffcialDockerImage() {
  579. handleError(w, fmt.Errorf(emsg), "", logger)
  580. return
  581. } else if runtime.GOOS == "linux" {
  582. osrelease, err := Read()
  583. if err != nil {
  584. logger.Infof("")
  585. return
  586. }
  587. prettyName := strings.ToUpper(osrelease["PRETTY_NAME"])
  588. os := "debian"
  589. if strings.Contains(prettyName, "DEBIAN") {
  590. hosts := conf.Config.Basic.PluginHosts
  591. ptype := "sources"
  592. if t == plugin.SINK {
  593. ptype = "sinks"
  594. } else if t == plugin.FUNCTION {
  595. ptype = "functions"
  596. }
  597. if err, plugins := fetchPluginList(hosts, ptype, os, runtime.GOARCH); err != nil {
  598. handleError(w, err, "", logger)
  599. } else {
  600. jsonResponse(plugins, w, logger)
  601. }
  602. } else {
  603. handleError(w, fmt.Errorf(emsg), "", logger)
  604. return
  605. }
  606. } else {
  607. handleError(w, fmt.Errorf(emsg), "", logger)
  608. }
  609. }
  610. func fetchPluginList(hosts, ptype, os, arch string) (err error, result map[string]string) {
  611. if hosts == "" || ptype == "" || os == "" {
  612. logger.Errorf("Invalid parameter value: hosts %s, ptype %s or os: %s should not be empty.", hosts, ptype, os)
  613. return fmt.Errorf("Invalid configruation for plugin host in kuiper.yaml."), nil
  614. }
  615. result = make(map[string]string)
  616. hostsArr := strings.Split(hosts, ",")
  617. for _, host := range hostsArr {
  618. host := strings.Trim(host, " ")
  619. tmp := []string{host, "kuiper-plugins", version, os, ptype}
  620. //The url is similar to http://host:port/kuiper-plugins/0.9.1/debian/sinks/
  621. url := strings.Join(tmp, "/")
  622. timeout := time.Duration(30 * time.Second)
  623. client := &http.Client{
  624. Timeout: timeout,
  625. Transport: &http.Transport{
  626. TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
  627. },
  628. }
  629. resp, err := client.Get(url)
  630. logger.Infof("Trying to fetch plugins from url: %s\n", url)
  631. if err != nil {
  632. return err, nil
  633. }
  634. defer resp.Body.Close()
  635. if resp.StatusCode != http.StatusOK {
  636. return fmt.Errorf("Cannot fetch plugin list from %s, with status error: %v", url, resp.StatusCode), nil
  637. }
  638. data, err := ioutil.ReadAll(resp.Body)
  639. if err != nil {
  640. return err, nil
  641. }
  642. plugins := extractFromHtml(string(data), arch)
  643. for _, p := range plugins {
  644. //If already existed, using the existed.
  645. if _, ok := result[p]; !ok {
  646. result[p] = url + "/" + p + "_" + arch + ".zip"
  647. }
  648. logger.Debugf("Plugin %s, download address is %s\n", p, result[p])
  649. }
  650. }
  651. return
  652. }
  653. func extractFromHtml(content, arch string) []string {
  654. plugins := []string{}
  655. htmlTokens := html.NewTokenizer(strings.NewReader(content))
  656. loop:
  657. for {
  658. tt := htmlTokens.Next()
  659. switch tt {
  660. case html.ErrorToken:
  661. break loop
  662. case html.StartTagToken:
  663. t := htmlTokens.Token()
  664. isAnchor := t.Data == "a"
  665. if isAnchor {
  666. found := false
  667. for _, prop := range t.Attr {
  668. if strings.ToUpper(prop.Key) == "HREF" {
  669. if strings.HasSuffix(prop.Val, "_"+arch+".zip") {
  670. if index := strings.LastIndex(prop.Val, "_"); index != -1 {
  671. plugins = append(plugins, prop.Val[0:index])
  672. }
  673. }
  674. found = true
  675. }
  676. }
  677. if !found {
  678. logger.Infof("Invalid plugin download link %s", t)
  679. }
  680. }
  681. }
  682. }
  683. return plugins
  684. }
  685. //list sink plugin
  686. func sinksMetaHandler(w http.ResponseWriter, r *http.Request) {
  687. defer r.Body.Close()
  688. sinks := meta.GetSinks()
  689. jsonResponse(sinks, w, logger)
  690. return
  691. }
  692. //Get sink metadata when creating rules
  693. func newSinkMetaHandler(w http.ResponseWriter, r *http.Request) {
  694. defer r.Body.Close()
  695. vars := mux.Vars(r)
  696. pluginName := vars["name"]
  697. language := getLanguage(r)
  698. ptrMetadata, err := meta.GetSinkMeta(pluginName, language)
  699. if err != nil {
  700. handleError(w, err, "", logger)
  701. return
  702. }
  703. jsonResponse(ptrMetadata, w, logger)
  704. }
  705. //list functions
  706. func functionsMetaHandler(w http.ResponseWriter, r *http.Request) {
  707. defer r.Body.Close()
  708. sinks := meta.GetFunctions()
  709. jsonResponse(sinks, w, logger)
  710. return
  711. }
  712. //list source plugin
  713. func sourcesMetaHandler(w http.ResponseWriter, r *http.Request) {
  714. defer r.Body.Close()
  715. ret := meta.GetSources()
  716. if nil != ret {
  717. jsonResponse(ret, w, logger)
  718. return
  719. }
  720. }
  721. //Get source metadata when creating stream
  722. func sourceMetaHandler(w http.ResponseWriter, r *http.Request) {
  723. defer r.Body.Close()
  724. vars := mux.Vars(r)
  725. pluginName := vars["name"]
  726. language := getLanguage(r)
  727. ret, err := meta.GetSourceMeta(pluginName, language)
  728. if err != nil {
  729. handleError(w, err, "", logger)
  730. return
  731. }
  732. if nil != ret {
  733. jsonResponse(ret, w, logger)
  734. return
  735. }
  736. }
  737. //Get source yaml
  738. func sourceConfHandler(w http.ResponseWriter, r *http.Request) {
  739. defer r.Body.Close()
  740. vars := mux.Vars(r)
  741. pluginName := vars["name"]
  742. language := getLanguage(r)
  743. ret, err := meta.GetSourceConf(pluginName, language)
  744. if err != nil {
  745. handleError(w, err, "", logger)
  746. return
  747. } else {
  748. w.Write(ret)
  749. }
  750. }
  751. //Get confKeys of the source metadata
  752. func sourceConfKeysHandler(w http.ResponseWriter, r *http.Request) {
  753. defer r.Body.Close()
  754. vars := mux.Vars(r)
  755. pluginName := vars["name"]
  756. ret := meta.GetSourceConfKeys(pluginName)
  757. if nil != ret {
  758. jsonResponse(ret, w, logger)
  759. return
  760. }
  761. }
  762. //Add del confkey
  763. func sourceConfKeyHandler(w http.ResponseWriter, r *http.Request) {
  764. defer r.Body.Close()
  765. var ret interface{}
  766. var err error
  767. vars := mux.Vars(r)
  768. pluginName := vars["name"]
  769. confKey := vars["confKey"]
  770. language := getLanguage(r)
  771. switch r.Method {
  772. case http.MethodDelete:
  773. err = meta.DelSourceConfKey(pluginName, confKey, language)
  774. case http.MethodPost:
  775. v, err := ioutil.ReadAll(r.Body)
  776. if err != nil {
  777. handleError(w, err, "Invalid body", logger)
  778. return
  779. }
  780. err = meta.AddSourceConfKey(pluginName, confKey, language, v)
  781. }
  782. if err != nil {
  783. handleError(w, err, "", logger)
  784. return
  785. }
  786. if nil != ret {
  787. jsonResponse(ret, w, logger)
  788. return
  789. }
  790. }
  791. //Del and Update field of confkey
  792. func sourceConfKeyFieldsHandler(w http.ResponseWriter, r *http.Request) {
  793. defer r.Body.Close()
  794. var ret interface{}
  795. var err error
  796. vars := mux.Vars(r)
  797. pluginName := vars["name"]
  798. confKey := vars["confKey"]
  799. v, err := ioutil.ReadAll(r.Body)
  800. if err != nil {
  801. handleError(w, err, "Invalid body", logger)
  802. return
  803. }
  804. language := getLanguage(r)
  805. switch r.Method {
  806. case http.MethodDelete:
  807. err = meta.DelSourceConfKeyField(pluginName, confKey, language, v)
  808. case http.MethodPost:
  809. err = meta.AddSourceConfKeyField(pluginName, confKey, language, v)
  810. }
  811. if err != nil {
  812. handleError(w, err, "", logger)
  813. return
  814. }
  815. if nil != ret {
  816. jsonResponse(ret, w, logger)
  817. return
  818. }
  819. }
  820. func getLanguage(r *http.Request) string {
  821. language := r.Header.Get("Content-Language")
  822. if 0 == len(language) {
  823. language = "en_US"
  824. }
  825. return language
  826. }
  827. func servicesHandler(w http.ResponseWriter, r *http.Request) {
  828. defer r.Body.Close()
  829. serviceManager := service.GetManager()
  830. switch r.Method {
  831. case http.MethodGet:
  832. content, err := serviceManager.List()
  833. if err != nil {
  834. handleError(w, err, "service list command error", logger)
  835. return
  836. }
  837. jsonResponse(content, w, logger)
  838. case http.MethodPost:
  839. sd := &service.ServiceCreationRequest{}
  840. err := json.NewDecoder(r.Body).Decode(sd)
  841. // Problems decoding
  842. if err != nil {
  843. handleError(w, err, "Invalid body: Error decoding the %s service request payload", logger)
  844. return
  845. }
  846. err = serviceManager.Create(sd)
  847. if err != nil {
  848. handleError(w, err, "service create command error", logger)
  849. return
  850. }
  851. w.WriteHeader(http.StatusCreated)
  852. w.Write([]byte(fmt.Sprintf("service %s is created", sd.Name)))
  853. }
  854. }
  855. func serviceHandler(w http.ResponseWriter, r *http.Request) {
  856. defer r.Body.Close()
  857. vars := mux.Vars(r)
  858. name := vars["name"]
  859. serviceManager := service.GetManager()
  860. switch r.Method {
  861. case http.MethodDelete:
  862. err := serviceManager.Delete(name)
  863. if err != nil {
  864. handleError(w, err, fmt.Sprintf("delete service %s error", name), logger)
  865. return
  866. }
  867. w.WriteHeader(http.StatusOK)
  868. result := fmt.Sprintf("service %s is deleted", name)
  869. w.Write([]byte(result))
  870. case http.MethodGet:
  871. j, err := serviceManager.Get(name)
  872. if err != nil {
  873. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe service %s error", name), logger)
  874. return
  875. }
  876. jsonResponse(j, w, logger)
  877. case http.MethodPut:
  878. sd := &service.ServiceCreationRequest{}
  879. err := json.NewDecoder(r.Body).Decode(sd)
  880. // Problems decoding
  881. if err != nil {
  882. handleError(w, err, "Invalid body: Error decoding the %s service request payload", logger)
  883. return
  884. }
  885. sd.Name = name
  886. err = serviceManager.Update(sd)
  887. if err != nil {
  888. handleError(w, err, "service update command error", logger)
  889. return
  890. }
  891. w.WriteHeader(http.StatusOK)
  892. w.Write([]byte(fmt.Sprintf("service %s is updated", sd.Name)))
  893. }
  894. }
  895. func serviceFunctionsHandler(w http.ResponseWriter, r *http.Request) {
  896. serviceManager := service.GetManager()
  897. content, err := serviceManager.ListFunctions()
  898. if err != nil {
  899. handleError(w, err, "service list command error", logger)
  900. return
  901. }
  902. jsonResponse(content, w, logger)
  903. }
  904. func serviceFunctionHandler(w http.ResponseWriter, r *http.Request) {
  905. vars := mux.Vars(r)
  906. name := vars["name"]
  907. serviceManager := service.GetManager()
  908. j, err := serviceManager.GetFunction(name)
  909. if err != nil {
  910. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe function %s error", name), logger)
  911. return
  912. }
  913. jsonResponse(j, w, logger)
  914. }