rest.go 29 KB


  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package server
  15. import (
  16. "crypto/tls"
  17. "encoding/json"
  18. "fmt"
  19. "github.com/gorilla/handlers"
  20. "github.com/gorilla/mux"
  21. "github.com/lf-edge/ekuiper/internal/conf"
  22. "github.com/lf-edge/ekuiper/internal/meta"
  23. "github.com/lf-edge/ekuiper/internal/plugin"
  24. "github.com/lf-edge/ekuiper/internal/plugin/native"
  25. "github.com/lf-edge/ekuiper/internal/plugin/portable"
  26. "github.com/lf-edge/ekuiper/internal/server/middleware"
  27. "github.com/lf-edge/ekuiper/internal/service"
  28. "github.com/lf-edge/ekuiper/pkg/api"
  29. "github.com/lf-edge/ekuiper/pkg/ast"
  30. "github.com/lf-edge/ekuiper/pkg/errorx"
  31. "golang.org/x/net/html"
  32. "io"
  33. "io/ioutil"
  34. "net/http"
  35. "os"
  36. "runtime"
  37. "strings"
  38. "time"
  39. )
  40. const (
  41. ContentType = "Content-Type"
  42. ContentTypeJSON = "application/json"
  43. )
  44. type statementDescriptor struct {
  45. Sql string `json:"sql,omitempty"`
  46. }
  47. func decodeStatementDescriptor(reader io.ReadCloser) (statementDescriptor, error) {
  48. sd := statementDescriptor{}
  49. err := json.NewDecoder(reader).Decode(&sd)
  50. // Problems decoding
  51. if err != nil {
  52. return sd, fmt.Errorf("Error decoding the statement descriptor: %v", err)
  53. }
  54. return sd, nil
  55. }
  56. // Handle applies the specified error and error concept tot he HTTP response writer
  57. func handleError(w http.ResponseWriter, err error, prefix string, logger api.Logger) {
  58. message := prefix
  59. if message != "" {
  60. message += ": "
  61. }
  62. message += err.Error()
  63. logger.Error(message)
  64. var ec int
  65. switch e := err.(type) {
  66. case *errorx.Error:
  67. switch e.Code() {
  68. case errorx.NOT_FOUND:
  69. ec = http.StatusNotFound
  70. default:
  71. ec = http.StatusBadRequest
  72. }
  73. default:
  74. ec = http.StatusBadRequest
  75. }
  76. http.Error(w, message, ec)
  77. }
  78. func jsonResponse(i interface{}, w http.ResponseWriter, logger api.Logger) {
  79. w.Header().Add(ContentType, ContentTypeJSON)
  80. enc := json.NewEncoder(w)
  81. err := enc.Encode(i)
  82. // Problems encoding
  83. if err != nil {
  84. handleError(w, err, "", logger)
  85. }
  86. }
  87. func createRestServer(ip string, port int, needToken bool) *http.Server {
  88. r := mux.NewRouter()
  89. r.HandleFunc("/", rootHandler).Methods(http.MethodGet, http.MethodPost)
  90. r.HandleFunc("/ping", pingHandler).Methods(http.MethodGet)
  91. r.HandleFunc("/streams", streamsHandler).Methods(http.MethodGet, http.MethodPost)
  92. r.HandleFunc("/streams/{name}", streamHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
  93. r.HandleFunc("/tables", tablesHandler).Methods(http.MethodGet, http.MethodPost)
  94. r.HandleFunc("/tables/{name}", tableHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
  95. r.HandleFunc("/rules", rulesHandler).Methods(http.MethodGet, http.MethodPost)
  96. r.HandleFunc("/rules/{name}", ruleHandler).Methods(http.MethodDelete, http.MethodGet, http.MethodPut)
  97. r.HandleFunc("/rules/{name}/status", getStatusRuleHandler).Methods(http.MethodGet)
  98. r.HandleFunc("/rules/{name}/start", startRuleHandler).Methods(http.MethodPost)
  99. r.HandleFunc("/rules/{name}/stop", stopRuleHandler).Methods(http.MethodPost)
  100. r.HandleFunc("/rules/{name}/restart", restartRuleHandler).Methods(http.MethodPost)
  101. r.HandleFunc("/rules/{name}/topo", getTopoRuleHandler).Methods(http.MethodGet)
  102. r.HandleFunc("/plugins/sources", sourcesHandler).Methods(http.MethodGet, http.MethodPost)
  103. r.HandleFunc("/plugins/sources/prebuild", prebuildSourcePlugins).Methods(http.MethodGet)
  104. r.HandleFunc("/plugins/sources/{name}", sourceHandler).Methods(http.MethodDelete, http.MethodGet)
  105. r.HandleFunc("/plugins/sinks", sinksHandler).Methods(http.MethodGet, http.MethodPost)
  106. r.HandleFunc("/plugins/sinks/prebuild", prebuildSinkPlugins).Methods(http.MethodGet)
  107. r.HandleFunc("/plugins/sinks/{name}", sinkHandler).Methods(http.MethodDelete, http.MethodGet)
  108. r.HandleFunc("/plugins/functions", functionsHandler).Methods(http.MethodGet, http.MethodPost)
  109. r.HandleFunc("/plugins/functions/prebuild", prebuildFuncsPlugins).Methods(http.MethodGet)
  110. r.HandleFunc("/plugins/functions/{name}", functionHandler).Methods(http.MethodDelete, http.MethodGet)
  111. r.HandleFunc("/plugins/functions/{name}/register", functionRegisterHandler).Methods(http.MethodPost)
  112. r.HandleFunc("/plugins/udfs", functionsListHandler).Methods(http.MethodGet)
  113. r.HandleFunc("/plugins/udfs/{name}", functionsGetHandler).Methods(http.MethodGet)
  114. r.HandleFunc("/plugins/portables", portablesHandler).Methods(http.MethodGet, http.MethodPost)
  115. r.HandleFunc("/plugins/portables/{name}", portableHandler).Methods(http.MethodGet, http.MethodDelete)
  116. r.HandleFunc("/metadata/functions", functionsMetaHandler).Methods(http.MethodGet)
  117. r.HandleFunc("/metadata/sinks", sinksMetaHandler).Methods(http.MethodGet)
  118. r.HandleFunc("/metadata/sinks/{name}", newSinkMetaHandler).Methods(http.MethodGet)
  119. r.HandleFunc("/metadata/sources", sourcesMetaHandler).Methods(http.MethodGet)
  120. r.HandleFunc("/metadata/sources/yaml/{name}", sourceConfHandler).Methods(http.MethodGet)
  121. r.HandleFunc("/metadata/sources/{name}", sourceMetaHandler).Methods(http.MethodGet)
  122. r.HandleFunc("/metadata/sources/{name}/confKeys", sourceConfKeysHandler).Methods(http.MethodGet)
  123. r.HandleFunc("/metadata/sources/{name}/confKeys/{confKey}", sourceConfKeyHandler).Methods(http.MethodDelete, http.MethodPost)
  124. r.HandleFunc("/metadata/sources/{name}/confKeys/{confKey}/field", sourceConfKeyFieldsHandler).Methods(http.MethodDelete, http.MethodPost)
  125. r.HandleFunc("/services", servicesHandler).Methods(http.MethodGet, http.MethodPost)
  126. r.HandleFunc("/services/functions", serviceFunctionsHandler).Methods(http.MethodGet)
  127. r.HandleFunc("/services/functions/{name}", serviceFunctionHandler).Methods(http.MethodGet)
  128. r.HandleFunc("/services/{name}", serviceHandler).Methods(http.MethodDelete, http.MethodGet, http.MethodPut)
  129. if needToken {
  130. r.Use(middleware.Auth)
  131. }
  132. server := &http.Server{
  133. Addr: fmt.Sprintf("%s:%d", ip, port),
  134. // Good practice to set timeouts to avoid Slowloris attacks.
  135. WriteTimeout: time.Second * 60 * 5,
  136. ReadTimeout: time.Second * 60 * 5,
  137. IdleTimeout: time.Second * 60,
  138. Handler: handlers.CORS(handlers.AllowedHeaders([]string{"Accept", "Accept-Language", "Content-Type", "Content-Language", "Origin"}))(r),
  139. }
  140. server.SetKeepAlivesEnabled(false)
  141. return server
  142. }
  143. type information struct {
  144. Version string `json:"version"`
  145. Os string `json:"os"`
  146. UpTimeSeconds int64 `json:"upTimeSeconds"`
  147. }
  148. //The handler for root
  149. func rootHandler(w http.ResponseWriter, r *http.Request) {
  150. defer r.Body.Close()
  151. switch r.Method {
  152. case http.MethodGet, http.MethodPost:
  153. w.WriteHeader(http.StatusOK)
  154. info := new(information)
  155. info.Version = version
  156. info.UpTimeSeconds = time.Now().Unix() - startTimeStamp
  157. info.Os = runtime.GOOS
  158. byteInfo, _ := json.Marshal(info)
  159. w.Write(byteInfo)
  160. }
  161. }
  162. func pingHandler(w http.ResponseWriter, r *http.Request) {
  163. w.WriteHeader(http.StatusOK)
  164. }
  165. func sourcesManageHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
  166. defer r.Body.Close()
  167. switch r.Method {
  168. case http.MethodGet:
  169. content, err := streamProcessor.ShowStream(st)
  170. if err != nil {
  171. handleError(w, err, fmt.Sprintf("%s command error", strings.Title(ast.StreamTypeMap[st])), logger)
  172. return
  173. }
  174. jsonResponse(content, w, logger)
  175. case http.MethodPost:
  176. v, err := decodeStatementDescriptor(r.Body)
  177. if err != nil {
  178. handleError(w, err, "Invalid body", logger)
  179. return
  180. }
  181. content, err := streamProcessor.ExecStreamSql(v.Sql)
  182. if err != nil {
  183. handleError(w, err, fmt.Sprintf("%s command error", strings.Title(ast.StreamTypeMap[st])), logger)
  184. return
  185. }
  186. w.WriteHeader(http.StatusCreated)
  187. w.Write([]byte(content))
  188. }
  189. }
  190. func sourceManageHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
  191. defer r.Body.Close()
  192. vars := mux.Vars(r)
  193. name := vars["name"]
  194. switch r.Method {
  195. case http.MethodGet:
  196. content, err := streamProcessor.DescStream(name, st)
  197. if err != nil {
  198. handleError(w, err, fmt.Sprintf("describe %s error", ast.StreamTypeMap[st]), logger)
  199. return
  200. }
  201. jsonResponse(content, w, logger)
  202. case http.MethodDelete:
  203. content, err := streamProcessor.DropStream(name, st)
  204. if err != nil {
  205. handleError(w, err, fmt.Sprintf("delete %s error", ast.StreamTypeMap[st]), logger)
  206. return
  207. }
  208. w.WriteHeader(http.StatusOK)
  209. w.Write([]byte(content))
  210. case http.MethodPut:
  211. v, err := decodeStatementDescriptor(r.Body)
  212. if err != nil {
  213. handleError(w, err, "Invalid body", logger)
  214. return
  215. }
  216. content, err := streamProcessor.ExecReplaceStream(v.Sql, st)
  217. if err != nil {
  218. handleError(w, err, fmt.Sprintf("%s command error", strings.Title(ast.StreamTypeMap[st])), logger)
  219. return
  220. }
  221. w.WriteHeader(http.StatusOK)
  222. w.Write([]byte(content))
  223. }
  224. }
  225. //list or create streams
  226. func streamsHandler(w http.ResponseWriter, r *http.Request) {
  227. sourcesManageHandler(w, r, ast.TypeStream)
  228. }
  229. //describe or delete a stream
  230. func streamHandler(w http.ResponseWriter, r *http.Request) {
  231. sourceManageHandler(w, r, ast.TypeStream)
  232. }
  233. //list or create tables
  234. func tablesHandler(w http.ResponseWriter, r *http.Request) {
  235. sourcesManageHandler(w, r, ast.TypeTable)
  236. }
  237. func tableHandler(w http.ResponseWriter, r *http.Request) {
  238. sourceManageHandler(w, r, ast.TypeTable)
  239. }
  240. //list or create rules
  241. func rulesHandler(w http.ResponseWriter, r *http.Request) {
  242. defer r.Body.Close()
  243. switch r.Method {
  244. case http.MethodPost:
  245. body, err := ioutil.ReadAll(r.Body)
  246. if err != nil {
  247. handleError(w, err, "Invalid body", logger)
  248. return
  249. }
  250. r, err := ruleProcessor.ExecCreate("", string(body))
  251. var result string
  252. if err != nil {
  253. handleError(w, err, "Create rule error", logger)
  254. return
  255. } else {
  256. result = fmt.Sprintf("Rule %s was created successfully.", r.Id)
  257. }
  258. //Start the rule
  259. rs, err := createRuleState(r)
  260. if err != nil {
  261. result = err.Error()
  262. } else {
  263. err = doStartRule(rs)
  264. if err != nil {
  265. result = err.Error()
  266. }
  267. }
  268. w.WriteHeader(http.StatusCreated)
  269. w.Write([]byte(result))
  270. case http.MethodGet:
  271. content, err := getAllRulesWithStatus()
  272. if err != nil {
  273. handleError(w, err, "Show rules error", logger)
  274. return
  275. }
  276. jsonResponse(content, w, logger)
  277. }
  278. }
  279. //describe or delete a rule
  280. func ruleHandler(w http.ResponseWriter, r *http.Request) {
  281. defer r.Body.Close()
  282. vars := mux.Vars(r)
  283. name := vars["name"]
  284. switch r.Method {
  285. case http.MethodGet:
  286. rule, err := ruleProcessor.GetRuleByName(name)
  287. if err != nil {
  288. handleError(w, err, "describe rule error", logger)
  289. return
  290. }
  291. jsonResponse(rule, w, logger)
  292. case http.MethodDelete:
  293. deleteRule(name)
  294. content, err := ruleProcessor.ExecDrop(name)
  295. if err != nil {
  296. handleError(w, err, "delete rule error", logger)
  297. return
  298. }
  299. w.WriteHeader(http.StatusOK)
  300. w.Write([]byte(content))
  301. case http.MethodPut:
  302. _, err := ruleProcessor.GetRuleByName(name)
  303. if err != nil {
  304. handleError(w, err, "not found this rule", logger)
  305. return
  306. }
  307. body, err := ioutil.ReadAll(r.Body)
  308. if err != nil {
  309. handleError(w, err, "Invalid body", logger)
  310. return
  311. }
  312. r, err := ruleProcessor.ExecUpdate(name, string(body))
  313. var result string
  314. if err != nil {
  315. handleError(w, err, "Update rule error", logger)
  316. return
  317. } else {
  318. result = fmt.Sprintf("Rule %s was updated successfully.", r.Id)
  319. }
  320. err = restartRule(name)
  321. if err != nil {
  322. handleError(w, err, "restart rule error", logger)
  323. return
  324. }
  325. w.WriteHeader(http.StatusOK)
  326. w.Write([]byte(result))
  327. }
  328. }
  329. //get status of a rule
  330. func getStatusRuleHandler(w http.ResponseWriter, r *http.Request) {
  331. defer r.Body.Close()
  332. vars := mux.Vars(r)
  333. name := vars["name"]
  334. content, err := getRuleStatus(name)
  335. if err != nil {
  336. handleError(w, err, "get rule status error", logger)
  337. return
  338. }
  339. w.WriteHeader(http.StatusOK)
  340. w.Write([]byte(content))
  341. }
  342. //start a rule
  343. func startRuleHandler(w http.ResponseWriter, r *http.Request) {
  344. defer r.Body.Close()
  345. vars := mux.Vars(r)
  346. name := vars["name"]
  347. err := startRule(name)
  348. if err != nil {
  349. handleError(w, err, "start rule error", logger)
  350. return
  351. }
  352. w.WriteHeader(http.StatusOK)
  353. w.Write([]byte(fmt.Sprintf("Rule %s was started", name)))
  354. }
  355. //stop a rule
  356. func stopRuleHandler(w http.ResponseWriter, r *http.Request) {
  357. defer r.Body.Close()
  358. vars := mux.Vars(r)
  359. name := vars["name"]
  360. result := stopRule(name)
  361. w.WriteHeader(http.StatusOK)
  362. w.Write([]byte(result))
  363. }
  364. //restart a rule
  365. func restartRuleHandler(w http.ResponseWriter, r *http.Request) {
  366. defer r.Body.Close()
  367. vars := mux.Vars(r)
  368. name := vars["name"]
  369. err := restartRule(name)
  370. if err != nil {
  371. handleError(w, err, "restart rule error", logger)
  372. return
  373. }
  374. w.WriteHeader(http.StatusOK)
  375. w.Write([]byte(fmt.Sprintf("Rule %s was restarted", name)))
  376. }
  377. //get topo of a rule
  378. func getTopoRuleHandler(w http.ResponseWriter, r *http.Request) {
  379. defer r.Body.Close()
  380. vars := mux.Vars(r)
  381. name := vars["name"]
  382. content, err := getRuleTopo(name)
  383. if err != nil {
  384. handleError(w, err, "get rule topo error", logger)
  385. return
  386. }
  387. w.Header().Set(ContentType, ContentTypeJSON)
  388. w.Write([]byte(content))
  389. }
  390. func pluginsHandler(w http.ResponseWriter, r *http.Request, t plugin.PluginType) {
  391. pluginManager := native.GetManager()
  392. defer r.Body.Close()
  393. switch r.Method {
  394. case http.MethodGet:
  395. content := pluginManager.List(t)
  396. jsonResponse(content, w, logger)
  397. case http.MethodPost:
  398. sd := plugin.NewPluginByType(t)
  399. err := json.NewDecoder(r.Body).Decode(sd)
  400. // Problems decoding
  401. if err != nil {
  402. handleError(w, err, fmt.Sprintf("Invalid body: Error decoding the %s plugin json", plugin.PluginTypes[t]), logger)
  403. return
  404. }
  405. err = pluginManager.Register(t, sd)
  406. if err != nil {
  407. handleError(w, err, fmt.Sprintf("%s plugins create command error", plugin.PluginTypes[t]), logger)
  408. return
  409. }
  410. w.WriteHeader(http.StatusCreated)
  411. w.Write([]byte(fmt.Sprintf("%s plugin %s is created", plugin.PluginTypes[t], sd.GetName())))
  412. }
  413. }
  414. func pluginHandler(w http.ResponseWriter, r *http.Request, t plugin.PluginType) {
  415. defer r.Body.Close()
  416. vars := mux.Vars(r)
  417. name := vars["name"]
  418. cb := r.URL.Query().Get("stop")
  419. pluginManager := native.GetManager()
  420. switch r.Method {
  421. case http.MethodDelete:
  422. r := cb == "1"
  423. err := pluginManager.Delete(t, name, r)
  424. if err != nil {
  425. handleError(w, err, fmt.Sprintf("delete %s plugin %s error", plugin.PluginTypes[t], name), logger)
  426. return
  427. }
  428. w.WriteHeader(http.StatusOK)
  429. result := fmt.Sprintf("%s plugin %s is deleted", plugin.PluginTypes[t], name)
  430. if r {
  431. result = fmt.Sprintf("%s and Kuiper will be stopped", result)
  432. } else {
  433. result = fmt.Sprintf("%s and Kuiper must restart for the change to take effect.", result)
  434. }
  435. w.Write([]byte(result))
  436. case http.MethodGet:
  437. j, ok := pluginManager.GetPluginInfo(t, name)
  438. if !ok {
  439. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe %s plugin %s error", plugin.PluginTypes[t], name), logger)
  440. return
  441. }
  442. jsonResponse(j, w, logger)
  443. }
  444. }
  445. //list or create source plugin
  446. func sourcesHandler(w http.ResponseWriter, r *http.Request) {
  447. pluginsHandler(w, r, plugin.SOURCE)
  448. }
  449. //delete a source plugin
  450. func sourceHandler(w http.ResponseWriter, r *http.Request) {
  451. pluginHandler(w, r, plugin.SOURCE)
  452. }
  453. //list or create sink plugin
  454. func sinksHandler(w http.ResponseWriter, r *http.Request) {
  455. pluginsHandler(w, r, plugin.SINK)
  456. }
  457. //delete a sink plugin
  458. func sinkHandler(w http.ResponseWriter, r *http.Request) {
  459. pluginHandler(w, r, plugin.SINK)
  460. }
  461. //list or create function plugin
  462. func functionsHandler(w http.ResponseWriter, r *http.Request) {
  463. pluginsHandler(w, r, plugin.FUNCTION)
  464. }
  465. //list all user defined functions in all function plugins
  466. func functionsListHandler(w http.ResponseWriter, r *http.Request) {
  467. pluginManager := native.GetManager()
  468. content := pluginManager.ListSymbols()
  469. jsonResponse(content, w, logger)
  470. }
  471. func functionsGetHandler(w http.ResponseWriter, r *http.Request) {
  472. vars := mux.Vars(r)
  473. name := vars["name"]
  474. pluginManager := native.GetManager()
  475. j, ok := pluginManager.GetPluginBySymbol(plugin.FUNCTION, name)
  476. if !ok {
  477. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe function %s error", name), logger)
  478. return
  479. }
  480. jsonResponse(map[string]string{"name": name, "plugin": j}, w, logger)
  481. }
  482. //delete a function plugin
  483. func functionHandler(w http.ResponseWriter, r *http.Request) {
  484. pluginHandler(w, r, plugin.FUNCTION)
  485. }
  486. type functionList struct {
  487. Functions []string `json:"functions,omitempty"`
  488. }
  489. // register function list for function plugin. If a plugin exports multiple functions, the function list must be registered
  490. // either by create or register. If the function plugin has been loaded because of auto load through so file, the function
  491. // list MUST be registered by this API or only the function with the same name as the plugin can be used.
  492. func functionRegisterHandler(w http.ResponseWriter, r *http.Request) {
  493. defer r.Body.Close()
  494. vars := mux.Vars(r)
  495. name := vars["name"]
  496. pluginManager := native.GetManager()
  497. _, ok := pluginManager.GetPluginInfo(plugin.FUNCTION, name)
  498. if !ok {
  499. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("register %s plugin %s error", plugin.PluginTypes[plugin.FUNCTION], name), logger)
  500. return
  501. }
  502. sd := functionList{}
  503. err := json.NewDecoder(r.Body).Decode(&sd)
  504. // Problems decoding
  505. if err != nil {
  506. handleError(w, err, fmt.Sprintf("Invalid body: Error decoding the function list json %s", r.Body), logger)
  507. return
  508. }
  509. err = pluginManager.RegisterFuncs(name, sd.Functions)
  510. if err != nil {
  511. handleError(w, err, fmt.Sprintf("function plugins %s regiser functions error", name), logger)
  512. return
  513. }
  514. w.WriteHeader(http.StatusOK)
  515. w.Write([]byte(fmt.Sprintf("function plugin %s function list is registered", name)))
  516. }
  517. func portablesHandler(w http.ResponseWriter, r *http.Request) {
  518. m := portable.GetManager()
  519. defer r.Body.Close()
  520. switch r.Method {
  521. case http.MethodGet:
  522. content := m.List()
  523. jsonResponse(content, w, logger)
  524. case http.MethodPost:
  525. sd := plugin.NewPluginByType(plugin.PORTABLE)
  526. err := json.NewDecoder(r.Body).Decode(sd)
  527. // Problems decoding
  528. if err != nil {
  529. handleError(w, err, "Invalid body: Error decoding the portable plugin json", logger)
  530. return
  531. }
  532. err = m.Register(sd)
  533. if err != nil {
  534. handleError(w, err, "portable plugin create command error", logger)
  535. return
  536. }
  537. w.WriteHeader(http.StatusCreated)
  538. w.Write([]byte(fmt.Sprintf("portable plugin %s is created", sd.GetName())))
  539. }
  540. }
  541. func portableHandler(w http.ResponseWriter, r *http.Request) {
  542. defer r.Body.Close()
  543. vars := mux.Vars(r)
  544. name := vars["name"]
  545. m := portable.GetManager()
  546. switch r.Method {
  547. case http.MethodDelete:
  548. err := m.Delete(name)
  549. if err != nil {
  550. handleError(w, err, fmt.Sprintf("delete portable plugin %s error", name), logger)
  551. return
  552. }
  553. w.WriteHeader(http.StatusOK)
  554. result := fmt.Sprintf("portable plugin %s is deleted", name)
  555. w.Write([]byte(result))
  556. case http.MethodGet:
  557. j, ok := m.GetPluginInfo(name)
  558. if !ok {
  559. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe portable plugin %s error", name), logger)
  560. return
  561. }
  562. jsonResponse(j, w, logger)
  563. }
  564. }
  565. func prebuildSourcePlugins(w http.ResponseWriter, r *http.Request) {
  566. prebuildPluginsHandler(w, r, plugin.SOURCE)
  567. }
  568. func prebuildSinkPlugins(w http.ResponseWriter, r *http.Request) {
  569. prebuildPluginsHandler(w, r, plugin.SINK)
  570. }
  571. func prebuildFuncsPlugins(w http.ResponseWriter, r *http.Request) {
  572. prebuildPluginsHandler(w, r, plugin.FUNCTION)
  573. }
  574. func isOffcialDockerImage() bool {
  575. if !strings.EqualFold(os.Getenv("MAINTAINER"), "emqx.io") {
  576. return false
  577. }
  578. return true
  579. }
  580. func prebuildPluginsHandler(w http.ResponseWriter, r *http.Request, t plugin.PluginType) {
  581. emsg := "It's strongly recommended to install plugins at official released Debian Docker images. If you choose to proceed to install plugin, please make sure the plugin is already validated in your own build."
  582. if !isOffcialDockerImage() {
  583. handleError(w, fmt.Errorf(emsg), "", logger)
  584. return
  585. } else if runtime.GOOS == "linux" {
  586. osrelease, err := Read()
  587. if err != nil {
  588. logger.Infof("")
  589. return
  590. }
  591. prettyName := strings.ToUpper(osrelease["PRETTY_NAME"])
  592. os := "debian"
  593. if strings.Contains(prettyName, "DEBIAN") {
  594. hosts := conf.Config.Basic.PluginHosts
  595. ptype := "sources"
  596. if t == plugin.SINK {
  597. ptype = "sinks"
  598. } else if t == plugin.FUNCTION {
  599. ptype = "functions"
  600. }
  601. if err, plugins := fetchPluginList(hosts, ptype, os, runtime.GOARCH); err != nil {
  602. handleError(w, err, "", logger)
  603. } else {
  604. jsonResponse(plugins, w, logger)
  605. }
  606. } else {
  607. handleError(w, fmt.Errorf(emsg), "", logger)
  608. return
  609. }
  610. } else {
  611. handleError(w, fmt.Errorf(emsg), "", logger)
  612. }
  613. }
  614. func fetchPluginList(hosts, ptype, os, arch string) (err error, result map[string]string) {
  615. if hosts == "" || ptype == "" || os == "" {
  616. logger.Errorf("Invalid parameter value: hosts %s, ptype %s or os: %s should not be empty.", hosts, ptype, os)
  617. return fmt.Errorf("Invalid configruation for plugin host in kuiper.yaml."), nil
  618. }
  619. result = make(map[string]string)
  620. hostsArr := strings.Split(hosts, ",")
  621. for _, host := range hostsArr {
  622. host := strings.Trim(host, " ")
  623. tmp := []string{host, "kuiper-plugins", version, os, ptype}
  624. //The url is similar to http://host:port/kuiper-plugins/0.9.1/debian/sinks/
  625. url := strings.Join(tmp, "/")
  626. timeout := time.Duration(30 * time.Second)
  627. client := &http.Client{
  628. Timeout: timeout,
  629. Transport: &http.Transport{
  630. TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
  631. },
  632. }
  633. resp, err := client.Get(url)
  634. logger.Infof("Trying to fetch plugins from url: %s\n", url)
  635. if err != nil {
  636. return err, nil
  637. }
  638. defer resp.Body.Close()
  639. if resp.StatusCode != http.StatusOK {
  640. return fmt.Errorf("Cannot fetch plugin list from %s, with status error: %v", url, resp.StatusCode), nil
  641. }
  642. data, err := ioutil.ReadAll(resp.Body)
  643. if err != nil {
  644. return err, nil
  645. }
  646. plugins := extractFromHtml(string(data), arch)
  647. for _, p := range plugins {
  648. //If already existed, using the existed.
  649. if _, ok := result[p]; !ok {
  650. result[p] = url + "/" + p + "_" + arch + ".zip"
  651. }
  652. logger.Debugf("Plugin %s, download address is %s\n", p, result[p])
  653. }
  654. }
  655. return
  656. }
  657. func extractFromHtml(content, arch string) []string {
  658. plugins := []string{}
  659. htmlTokens := html.NewTokenizer(strings.NewReader(content))
  660. loop:
  661. for {
  662. tt := htmlTokens.Next()
  663. switch tt {
  664. case html.ErrorToken:
  665. break loop
  666. case html.StartTagToken:
  667. t := htmlTokens.Token()
  668. isAnchor := t.Data == "a"
  669. if isAnchor {
  670. found := false
  671. for _, prop := range t.Attr {
  672. if strings.ToUpper(prop.Key) == "HREF" {
  673. if strings.HasSuffix(prop.Val, "_"+arch+".zip") {
  674. if index := strings.LastIndex(prop.Val, "_"); index != -1 {
  675. plugins = append(plugins, prop.Val[0:index])
  676. }
  677. }
  678. found = true
  679. }
  680. }
  681. if !found {
  682. logger.Infof("Invalid plugin download link %s", t)
  683. }
  684. }
  685. }
  686. }
  687. return plugins
  688. }
  689. //list sink plugin
  690. func sinksMetaHandler(w http.ResponseWriter, r *http.Request) {
  691. defer r.Body.Close()
  692. sinks := meta.GetSinks()
  693. jsonResponse(sinks, w, logger)
  694. return
  695. }
  696. //Get sink metadata when creating rules
  697. func newSinkMetaHandler(w http.ResponseWriter, r *http.Request) {
  698. defer r.Body.Close()
  699. vars := mux.Vars(r)
  700. pluginName := vars["name"]
  701. language := getLanguage(r)
  702. ptrMetadata, err := meta.GetSinkMeta(pluginName, language)
  703. if err != nil {
  704. handleError(w, err, "", logger)
  705. return
  706. }
  707. jsonResponse(ptrMetadata, w, logger)
  708. }
  709. //list functions
  710. func functionsMetaHandler(w http.ResponseWriter, r *http.Request) {
  711. defer r.Body.Close()
  712. sinks := meta.GetFunctions()
  713. jsonResponse(sinks, w, logger)
  714. return
  715. }
  716. //list source plugin
  717. func sourcesMetaHandler(w http.ResponseWriter, r *http.Request) {
  718. defer r.Body.Close()
  719. ret := meta.GetSources()
  720. if nil != ret {
  721. jsonResponse(ret, w, logger)
  722. return
  723. }
  724. }
  725. //Get source metadata when creating stream
  726. func sourceMetaHandler(w http.ResponseWriter, r *http.Request) {
  727. defer r.Body.Close()
  728. vars := mux.Vars(r)
  729. pluginName := vars["name"]
  730. language := getLanguage(r)
  731. ret, err := meta.GetSourceMeta(pluginName, language)
  732. if err != nil {
  733. handleError(w, err, "", logger)
  734. return
  735. }
  736. if nil != ret {
  737. jsonResponse(ret, w, logger)
  738. return
  739. }
  740. }
  741. //Get source yaml
  742. func sourceConfHandler(w http.ResponseWriter, r *http.Request) {
  743. defer r.Body.Close()
  744. vars := mux.Vars(r)
  745. pluginName := vars["name"]
  746. language := getLanguage(r)
  747. ret, err := meta.GetSourceConf(pluginName, language)
  748. if err != nil {
  749. handleError(w, err, "", logger)
  750. return
  751. } else {
  752. w.Write(ret)
  753. }
  754. }
  755. //Get confKeys of the source metadata
  756. func sourceConfKeysHandler(w http.ResponseWriter, r *http.Request) {
  757. defer r.Body.Close()
  758. vars := mux.Vars(r)
  759. pluginName := vars["name"]
  760. ret := meta.GetSourceConfKeys(pluginName)
  761. if nil != ret {
  762. jsonResponse(ret, w, logger)
  763. return
  764. }
  765. }
  766. //Add del confkey
  767. func sourceConfKeyHandler(w http.ResponseWriter, r *http.Request) {
  768. defer r.Body.Close()
  769. var ret interface{}
  770. var err error
  771. vars := mux.Vars(r)
  772. pluginName := vars["name"]
  773. confKey := vars["confKey"]
  774. language := getLanguage(r)
  775. switch r.Method {
  776. case http.MethodDelete:
  777. err = meta.DelSourceConfKey(pluginName, confKey, language)
  778. case http.MethodPost:
  779. v, err := ioutil.ReadAll(r.Body)
  780. if err != nil {
  781. handleError(w, err, "Invalid body", logger)
  782. return
  783. }
  784. err = meta.AddSourceConfKey(pluginName, confKey, language, v)
  785. }
  786. if err != nil {
  787. handleError(w, err, "", logger)
  788. return
  789. }
  790. if nil != ret {
  791. jsonResponse(ret, w, logger)
  792. return
  793. }
  794. }
  795. //Del and Update field of confkey
  796. func sourceConfKeyFieldsHandler(w http.ResponseWriter, r *http.Request) {
  797. defer r.Body.Close()
  798. var ret interface{}
  799. var err error
  800. vars := mux.Vars(r)
  801. pluginName := vars["name"]
  802. confKey := vars["confKey"]
  803. v, err := ioutil.ReadAll(r.Body)
  804. if err != nil {
  805. handleError(w, err, "Invalid body", logger)
  806. return
  807. }
  808. language := getLanguage(r)
  809. switch r.Method {
  810. case http.MethodDelete:
  811. err = meta.DelSourceConfKeyField(pluginName, confKey, language, v)
  812. case http.MethodPost:
  813. err = meta.AddSourceConfKeyField(pluginName, confKey, language, v)
  814. }
  815. if err != nil {
  816. handleError(w, err, "", logger)
  817. return
  818. }
  819. if nil != ret {
  820. jsonResponse(ret, w, logger)
  821. return
  822. }
  823. }
  824. func getLanguage(r *http.Request) string {
  825. language := r.Header.Get("Content-Language")
  826. if 0 == len(language) {
  827. language = "en_US"
  828. }
  829. return language
  830. }
  831. func servicesHandler(w http.ResponseWriter, r *http.Request) {
  832. defer r.Body.Close()
  833. serviceManager := service.GetManager()
  834. switch r.Method {
  835. case http.MethodGet:
  836. content, err := serviceManager.List()
  837. if err != nil {
  838. handleError(w, err, "service list command error", logger)
  839. return
  840. }
  841. jsonResponse(content, w, logger)
  842. case http.MethodPost:
  843. sd := &service.ServiceCreationRequest{}
  844. err := json.NewDecoder(r.Body).Decode(sd)
  845. // Problems decoding
  846. if err != nil {
  847. handleError(w, err, "Invalid body: Error decoding the %s service request payload", logger)
  848. return
  849. }
  850. err = serviceManager.Create(sd)
  851. if err != nil {
  852. handleError(w, err, "service create command error", logger)
  853. return
  854. }
  855. w.WriteHeader(http.StatusCreated)
  856. w.Write([]byte(fmt.Sprintf("service %s is created", sd.Name)))
  857. }
  858. }
  859. func serviceHandler(w http.ResponseWriter, r *http.Request) {
  860. defer r.Body.Close()
  861. vars := mux.Vars(r)
  862. name := vars["name"]
  863. serviceManager := service.GetManager()
  864. switch r.Method {
  865. case http.MethodDelete:
  866. err := serviceManager.Delete(name)
  867. if err != nil {
  868. handleError(w, err, fmt.Sprintf("delete service %s error", name), logger)
  869. return
  870. }
  871. w.WriteHeader(http.StatusOK)
  872. result := fmt.Sprintf("service %s is deleted", name)
  873. w.Write([]byte(result))
  874. case http.MethodGet:
  875. j, err := serviceManager.Get(name)
  876. if err != nil {
  877. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe service %s error", name), logger)
  878. return
  879. }
  880. jsonResponse(j, w, logger)
  881. case http.MethodPut:
  882. sd := &service.ServiceCreationRequest{}
  883. err := json.NewDecoder(r.Body).Decode(sd)
  884. // Problems decoding
  885. if err != nil {
  886. handleError(w, err, "Invalid body: Error decoding the %s service request payload", logger)
  887. return
  888. }
  889. sd.Name = name
  890. err = serviceManager.Update(sd)
  891. if err != nil {
  892. handleError(w, err, "service update command error", logger)
  893. return
  894. }
  895. w.WriteHeader(http.StatusOK)
  896. w.Write([]byte(fmt.Sprintf("service %s is updated", sd.Name)))
  897. }
  898. }
  899. func serviceFunctionsHandler(w http.ResponseWriter, r *http.Request) {
  900. serviceManager := service.GetManager()
  901. content, err := serviceManager.ListFunctions()
  902. if err != nil {
  903. handleError(w, err, "service list command error", logger)
  904. return
  905. }
  906. jsonResponse(content, w, logger)
  907. }
  908. func serviceFunctionHandler(w http.ResponseWriter, r *http.Request) {
  909. vars := mux.Vars(r)
  910. name := vars["name"]
  911. serviceManager := service.GetManager()
  912. j, err := serviceManager.GetFunction(name)
  913. if err != nil {
  914. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe function %s error", name), logger)
  915. return
  916. }
  917. jsonResponse(j, w, logger)
  918. }