rest.go 31 KB


  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package server
  15. import (
  16. "crypto/tls"
  17. "encoding/json"
  18. "fmt"
  19. "github.com/gorilla/handlers"
  20. "github.com/gorilla/mux"
  21. "github.com/lf-edge/ekuiper/internal/conf"
  22. "github.com/lf-edge/ekuiper/internal/meta"
  23. "github.com/lf-edge/ekuiper/internal/plugin"
  24. "github.com/lf-edge/ekuiper/internal/plugin/native"
  25. "github.com/lf-edge/ekuiper/internal/plugin/portable"
  26. "github.com/lf-edge/ekuiper/internal/server/middleware"
  27. "github.com/lf-edge/ekuiper/internal/service"
  28. "github.com/lf-edge/ekuiper/pkg/api"
  29. "github.com/lf-edge/ekuiper/pkg/ast"
  30. "github.com/lf-edge/ekuiper/pkg/errorx"
  31. "golang.org/x/net/html"
  32. "io"
  33. "io/ioutil"
  34. "net/http"
  35. "os"
  36. "runtime"
  37. "strings"
  38. "time"
  39. )
  40. const (
  41. ContentType = "Content-Type"
  42. ContentTypeJSON = "application/json"
  43. )
  44. type statementDescriptor struct {
  45. Sql string `json:"sql,omitempty"`
  46. }
  47. func decodeStatementDescriptor(reader io.ReadCloser) (statementDescriptor, error) {
  48. sd := statementDescriptor{}
  49. err := json.NewDecoder(reader).Decode(&sd)
  50. // Problems decoding
  51. if err != nil {
  52. return sd, fmt.Errorf("Error decoding the statement descriptor: %v", err)
  53. }
  54. return sd, nil
  55. }
  56. // Handle applies the specified error and error concept tot he HTTP response writer
  57. func handleError(w http.ResponseWriter, err error, prefix string, logger api.Logger) {
  58. message := prefix
  59. if message != "" {
  60. message += ": "
  61. }
  62. message += err.Error()
  63. logger.Error(message)
  64. var ec int
  65. switch e := err.(type) {
  66. case *errorx.Error:
  67. switch e.Code() {
  68. case errorx.NOT_FOUND:
  69. ec = http.StatusNotFound
  70. default:
  71. ec = http.StatusBadRequest
  72. }
  73. default:
  74. ec = http.StatusBadRequest
  75. }
  76. http.Error(w, message, ec)
  77. }
  78. func jsonResponse(i interface{}, w http.ResponseWriter, logger api.Logger) {
  79. w.Header().Add(ContentType, ContentTypeJSON)
  80. enc := json.NewEncoder(w)
  81. err := enc.Encode(i)
  82. // Problems encoding
  83. if err != nil {
  84. handleError(w, err, "", logger)
  85. }
  86. }
  87. func createRestServer(ip string, port int, needToken bool) *http.Server {
  88. r := mux.NewRouter()
  89. r.HandleFunc("/", rootHandler).Methods(http.MethodGet, http.MethodPost)
  90. r.HandleFunc("/ping", pingHandler).Methods(http.MethodGet)
  91. r.HandleFunc("/streams", streamsHandler).Methods(http.MethodGet, http.MethodPost)
  92. r.HandleFunc("/streams/{name}", streamHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
  93. r.HandleFunc("/tables", tablesHandler).Methods(http.MethodGet, http.MethodPost)
  94. r.HandleFunc("/tables/{name}", tableHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
  95. r.HandleFunc("/rules", rulesHandler).Methods(http.MethodGet, http.MethodPost)
  96. r.HandleFunc("/rules/{name}", ruleHandler).Methods(http.MethodDelete, http.MethodGet, http.MethodPut)
  97. r.HandleFunc("/rules/{name}/status", getStatusRuleHandler).Methods(http.MethodGet)
  98. r.HandleFunc("/rules/{name}/start", startRuleHandler).Methods(http.MethodPost)
  99. r.HandleFunc("/rules/{name}/stop", stopRuleHandler).Methods(http.MethodPost)
  100. r.HandleFunc("/rules/{name}/restart", restartRuleHandler).Methods(http.MethodPost)
  101. r.HandleFunc("/rules/{name}/topo", getTopoRuleHandler).Methods(http.MethodGet)
  102. r.HandleFunc("/plugins/sources", sourcesHandler).Methods(http.MethodGet, http.MethodPost)
  103. r.HandleFunc("/plugins/sources/prebuild", prebuildSourcePlugins).Methods(http.MethodGet)
  104. r.HandleFunc("/plugins/sources/{name}", sourceHandler).Methods(http.MethodDelete, http.MethodGet)
  105. r.HandleFunc("/plugins/sinks", sinksHandler).Methods(http.MethodGet, http.MethodPost)
  106. r.HandleFunc("/plugins/sinks/prebuild", prebuildSinkPlugins).Methods(http.MethodGet)
  107. r.HandleFunc("/plugins/sinks/{name}", sinkHandler).Methods(http.MethodDelete, http.MethodGet)
  108. r.HandleFunc("/plugins/functions", functionsHandler).Methods(http.MethodGet, http.MethodPost)
  109. r.HandleFunc("/plugins/functions/prebuild", prebuildFuncsPlugins).Methods(http.MethodGet)
  110. r.HandleFunc("/plugins/functions/{name}", functionHandler).Methods(http.MethodDelete, http.MethodGet)
  111. r.HandleFunc("/plugins/functions/{name}/register", functionRegisterHandler).Methods(http.MethodPost)
  112. r.HandleFunc("/plugins/udfs", functionsListHandler).Methods(http.MethodGet)
  113. r.HandleFunc("/plugins/udfs/{name}", functionsGetHandler).Methods(http.MethodGet)
  114. r.HandleFunc("/plugins/portables", portablesHandler).Methods(http.MethodGet, http.MethodPost)
  115. r.HandleFunc("/plugins/portables/{name}", portableHandler).Methods(http.MethodGet, http.MethodDelete)
  116. r.HandleFunc("/metadata/functions", functionsMetaHandler).Methods(http.MethodGet)
  117. r.HandleFunc("/metadata/sinks", sinksMetaHandler).Methods(http.MethodGet)
  118. r.HandleFunc("/metadata/sinks/{name}", newSinkMetaHandler).Methods(http.MethodGet)
  119. r.HandleFunc("/metadata/sources", sourcesMetaHandler).Methods(http.MethodGet)
  120. r.HandleFunc("/metadata/sources/{name}", sourceMetaHandler).Methods(http.MethodGet)
  121. r.HandleFunc("/metadata/sources/yaml/{name}", sourceConfHandler).Methods(http.MethodGet)
  122. r.HandleFunc("/metadata/sources/{name}/confKeys/{confKey}", sourceConfKeyHandler).Methods(http.MethodDelete, http.MethodPost)
  123. r.HandleFunc("/metadata/sources/{name}/confKeys/{confKey}/field", sourceConfKeyFieldsHandler).Methods(http.MethodDelete, http.MethodPost)
  124. r.HandleFunc("/metadata/connections", connectionsMetaHandler).Methods(http.MethodGet)
  125. r.HandleFunc("/metadata/connections/{name}", connectionMetaHandler).Methods(http.MethodGet)
  126. r.HandleFunc("/metadata/connections/yaml/{name}", connectionConfHandler).Methods(http.MethodGet)
  127. r.HandleFunc("/metadata/connections/{name}/confKeys/{confKey}", connectionConfKeyHandler).Methods(http.MethodDelete, http.MethodPost)
  128. r.HandleFunc("/metadata/connections/{name}/confKeys/{confKey}/field", connectionConfKeyFieldsHandler).Methods(http.MethodDelete, http.MethodPost)
  129. r.HandleFunc("/services", servicesHandler).Methods(http.MethodGet, http.MethodPost)
  130. r.HandleFunc("/services/functions", serviceFunctionsHandler).Methods(http.MethodGet)
  131. r.HandleFunc("/services/functions/{name}", serviceFunctionHandler).Methods(http.MethodGet)
  132. r.HandleFunc("/services/{name}", serviceHandler).Methods(http.MethodDelete, http.MethodGet, http.MethodPut)
  133. if needToken {
  134. r.Use(middleware.Auth)
  135. }
  136. server := &http.Server{
  137. Addr: fmt.Sprintf("%s:%d", ip, port),
  138. // Good practice to set timeouts to avoid Slowloris attacks.
  139. WriteTimeout: time.Second * 60 * 5,
  140. ReadTimeout: time.Second * 60 * 5,
  141. IdleTimeout: time.Second * 60,
  142. Handler: handlers.CORS(handlers.AllowedHeaders([]string{"Accept", "Accept-Language", "Content-Type", "Content-Language", "Origin"}))(r),
  143. }
  144. server.SetKeepAlivesEnabled(false)
  145. return server
  146. }
  147. type information struct {
  148. Version string `json:"version"`
  149. Os string `json:"os"`
  150. Arch string `json:"arch"`
  151. UpTimeSeconds int64 `json:"upTimeSeconds"`
  152. }
  153. //The handler for root
  154. func rootHandler(w http.ResponseWriter, r *http.Request) {
  155. defer r.Body.Close()
  156. switch r.Method {
  157. case http.MethodGet, http.MethodPost:
  158. w.WriteHeader(http.StatusOK)
  159. info := new(information)
  160. info.Version = version
  161. info.UpTimeSeconds = time.Now().Unix() - startTimeStamp
  162. info.Os = runtime.GOOS
  163. info.Arch = runtime.GOARCH
  164. byteInfo, _ := json.Marshal(info)
  165. w.Write(byteInfo)
  166. }
  167. }
  168. func pingHandler(w http.ResponseWriter, r *http.Request) {
  169. w.WriteHeader(http.StatusOK)
  170. }
  171. func sourcesManageHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
  172. defer r.Body.Close()
  173. switch r.Method {
  174. case http.MethodGet:
  175. content, err := streamProcessor.ShowStream(st)
  176. if err != nil {
  177. handleError(w, err, fmt.Sprintf("%s command error", strings.Title(ast.StreamTypeMap[st])), logger)
  178. return
  179. }
  180. jsonResponse(content, w, logger)
  181. case http.MethodPost:
  182. v, err := decodeStatementDescriptor(r.Body)
  183. if err != nil {
  184. handleError(w, err, "Invalid body", logger)
  185. return
  186. }
  187. content, err := streamProcessor.ExecStreamSql(v.Sql)
  188. if err != nil {
  189. handleError(w, err, fmt.Sprintf("%s command error", strings.Title(ast.StreamTypeMap[st])), logger)
  190. return
  191. }
  192. w.WriteHeader(http.StatusCreated)
  193. w.Write([]byte(content))
  194. }
  195. }
  196. func sourceManageHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
  197. defer r.Body.Close()
  198. vars := mux.Vars(r)
  199. name := vars["name"]
  200. switch r.Method {
  201. case http.MethodGet:
  202. content, err := streamProcessor.DescStream(name, st)
  203. if err != nil {
  204. handleError(w, err, fmt.Sprintf("describe %s error", ast.StreamTypeMap[st]), logger)
  205. return
  206. }
  207. jsonResponse(content, w, logger)
  208. case http.MethodDelete:
  209. content, err := streamProcessor.DropStream(name, st)
  210. if err != nil {
  211. handleError(w, err, fmt.Sprintf("delete %s error", ast.StreamTypeMap[st]), logger)
  212. return
  213. }
  214. w.WriteHeader(http.StatusOK)
  215. w.Write([]byte(content))
  216. case http.MethodPut:
  217. v, err := decodeStatementDescriptor(r.Body)
  218. if err != nil {
  219. handleError(w, err, "Invalid body", logger)
  220. return
  221. }
  222. content, err := streamProcessor.ExecReplaceStream(v.Sql, st)
  223. if err != nil {
  224. handleError(w, err, fmt.Sprintf("%s command error", strings.Title(ast.StreamTypeMap[st])), logger)
  225. return
  226. }
  227. w.WriteHeader(http.StatusOK)
  228. w.Write([]byte(content))
  229. }
  230. }
  231. //list or create streams
  232. func streamsHandler(w http.ResponseWriter, r *http.Request) {
  233. sourcesManageHandler(w, r, ast.TypeStream)
  234. }
  235. //describe or delete a stream
  236. func streamHandler(w http.ResponseWriter, r *http.Request) {
  237. sourceManageHandler(w, r, ast.TypeStream)
  238. }
  239. //list or create tables
  240. func tablesHandler(w http.ResponseWriter, r *http.Request) {
  241. sourcesManageHandler(w, r, ast.TypeTable)
  242. }
  243. func tableHandler(w http.ResponseWriter, r *http.Request) {
  244. sourceManageHandler(w, r, ast.TypeTable)
  245. }
  246. //list or create rules
  247. func rulesHandler(w http.ResponseWriter, r *http.Request) {
  248. defer r.Body.Close()
  249. switch r.Method {
  250. case http.MethodPost:
  251. body, err := ioutil.ReadAll(r.Body)
  252. if err != nil {
  253. handleError(w, err, "Invalid body", logger)
  254. return
  255. }
  256. r, err := ruleProcessor.ExecCreate("", string(body))
  257. var result string
  258. if err != nil {
  259. handleError(w, err, "Create rule error", logger)
  260. return
  261. } else {
  262. result = fmt.Sprintf("Rule %s was created successfully.", r.Id)
  263. }
  264. //Start the rule
  265. rs, err := createRuleState(r)
  266. if err != nil {
  267. result = err.Error()
  268. } else {
  269. err = doStartRule(rs)
  270. if err != nil {
  271. result = err.Error()
  272. }
  273. }
  274. w.WriteHeader(http.StatusCreated)
  275. w.Write([]byte(result))
  276. case http.MethodGet:
  277. content, err := getAllRulesWithStatus()
  278. if err != nil {
  279. handleError(w, err, "Show rules error", logger)
  280. return
  281. }
  282. jsonResponse(content, w, logger)
  283. }
  284. }
  285. //describe or delete a rule
  286. func ruleHandler(w http.ResponseWriter, r *http.Request) {
  287. defer r.Body.Close()
  288. vars := mux.Vars(r)
  289. name := vars["name"]
  290. switch r.Method {
  291. case http.MethodGet:
  292. rule, err := ruleProcessor.GetRuleByName(name)
  293. if err != nil {
  294. handleError(w, err, "describe rule error", logger)
  295. return
  296. }
  297. jsonResponse(rule, w, logger)
  298. case http.MethodDelete:
  299. deleteRule(name)
  300. content, err := ruleProcessor.ExecDrop(name)
  301. if err != nil {
  302. handleError(w, err, "delete rule error", logger)
  303. return
  304. }
  305. w.WriteHeader(http.StatusOK)
  306. w.Write([]byte(content))
  307. case http.MethodPut:
  308. _, err := ruleProcessor.GetRuleByName(name)
  309. if err != nil {
  310. handleError(w, err, "not found this rule", logger)
  311. return
  312. }
  313. body, err := ioutil.ReadAll(r.Body)
  314. if err != nil {
  315. handleError(w, err, "Invalid body", logger)
  316. return
  317. }
  318. r, err := ruleProcessor.ExecUpdate(name, string(body))
  319. var result string
  320. if err != nil {
  321. handleError(w, err, "Update rule error", logger)
  322. return
  323. } else {
  324. result = fmt.Sprintf("Rule %s was updated successfully.", r.Id)
  325. }
  326. err = restartRule(name)
  327. if err != nil {
  328. handleError(w, err, "restart rule error", logger)
  329. return
  330. }
  331. w.WriteHeader(http.StatusOK)
  332. w.Write([]byte(result))
  333. }
  334. }
  335. //get status of a rule
  336. func getStatusRuleHandler(w http.ResponseWriter, r *http.Request) {
  337. defer r.Body.Close()
  338. vars := mux.Vars(r)
  339. name := vars["name"]
  340. content, err := getRuleStatus(name)
  341. if err != nil {
  342. handleError(w, err, "get rule status error", logger)
  343. return
  344. }
  345. w.WriteHeader(http.StatusOK)
  346. w.Write([]byte(content))
  347. }
  348. //start a rule
  349. func startRuleHandler(w http.ResponseWriter, r *http.Request) {
  350. defer r.Body.Close()
  351. vars := mux.Vars(r)
  352. name := vars["name"]
  353. err := startRule(name)
  354. if err != nil {
  355. handleError(w, err, "start rule error", logger)
  356. return
  357. }
  358. w.WriteHeader(http.StatusOK)
  359. w.Write([]byte(fmt.Sprintf("Rule %s was started", name)))
  360. }
  361. //stop a rule
  362. func stopRuleHandler(w http.ResponseWriter, r *http.Request) {
  363. defer r.Body.Close()
  364. vars := mux.Vars(r)
  365. name := vars["name"]
  366. result := stopRule(name)
  367. w.WriteHeader(http.StatusOK)
  368. w.Write([]byte(result))
  369. }
  370. //restart a rule
  371. func restartRuleHandler(w http.ResponseWriter, r *http.Request) {
  372. defer r.Body.Close()
  373. vars := mux.Vars(r)
  374. name := vars["name"]
  375. err := restartRule(name)
  376. if err != nil {
  377. handleError(w, err, "restart rule error", logger)
  378. return
  379. }
  380. w.WriteHeader(http.StatusOK)
  381. w.Write([]byte(fmt.Sprintf("Rule %s was restarted", name)))
  382. }
  383. //get topo of a rule
  384. func getTopoRuleHandler(w http.ResponseWriter, r *http.Request) {
  385. defer r.Body.Close()
  386. vars := mux.Vars(r)
  387. name := vars["name"]
  388. content, err := getRuleTopo(name)
  389. if err != nil {
  390. handleError(w, err, "get rule topo error", logger)
  391. return
  392. }
  393. w.Header().Set(ContentType, ContentTypeJSON)
  394. w.Write([]byte(content))
  395. }
  396. func pluginsHandler(w http.ResponseWriter, r *http.Request, t plugin.PluginType) {
  397. pluginManager := native.GetManager()
  398. defer r.Body.Close()
  399. switch r.Method {
  400. case http.MethodGet:
  401. content := pluginManager.List(t)
  402. jsonResponse(content, w, logger)
  403. case http.MethodPost:
  404. sd := plugin.NewPluginByType(t)
  405. err := json.NewDecoder(r.Body).Decode(sd)
  406. // Problems decoding
  407. if err != nil {
  408. handleError(w, err, fmt.Sprintf("Invalid body: Error decoding the %s plugin json", plugin.PluginTypes[t]), logger)
  409. return
  410. }
  411. err = pluginManager.Register(t, sd)
  412. if err != nil {
  413. handleError(w, err, fmt.Sprintf("%s plugins create command error", plugin.PluginTypes[t]), logger)
  414. return
  415. }
  416. w.WriteHeader(http.StatusCreated)
  417. w.Write([]byte(fmt.Sprintf("%s plugin %s is created", plugin.PluginTypes[t], sd.GetName())))
  418. }
  419. }
  420. func pluginHandler(w http.ResponseWriter, r *http.Request, t plugin.PluginType) {
  421. defer r.Body.Close()
  422. vars := mux.Vars(r)
  423. name := vars["name"]
  424. cb := r.URL.Query().Get("stop")
  425. pluginManager := native.GetManager()
  426. switch r.Method {
  427. case http.MethodDelete:
  428. r := cb == "1"
  429. err := pluginManager.Delete(t, name, r)
  430. if err != nil {
  431. handleError(w, err, fmt.Sprintf("delete %s plugin %s error", plugin.PluginTypes[t], name), logger)
  432. return
  433. }
  434. w.WriteHeader(http.StatusOK)
  435. result := fmt.Sprintf("%s plugin %s is deleted", plugin.PluginTypes[t], name)
  436. if r {
  437. result = fmt.Sprintf("%s and Kuiper will be stopped", result)
  438. } else {
  439. result = fmt.Sprintf("%s and Kuiper must restart for the change to take effect.", result)
  440. }
  441. w.Write([]byte(result))
  442. case http.MethodGet:
  443. j, ok := pluginManager.GetPluginInfo(t, name)
  444. if !ok {
  445. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe %s plugin %s error", plugin.PluginTypes[t], name), logger)
  446. return
  447. }
  448. jsonResponse(j, w, logger)
  449. }
  450. }
  451. //list or create source plugin
  452. func sourcesHandler(w http.ResponseWriter, r *http.Request) {
  453. pluginsHandler(w, r, plugin.SOURCE)
  454. }
  455. //delete a source plugin
  456. func sourceHandler(w http.ResponseWriter, r *http.Request) {
  457. pluginHandler(w, r, plugin.SOURCE)
  458. }
  459. //list or create sink plugin
  460. func sinksHandler(w http.ResponseWriter, r *http.Request) {
  461. pluginsHandler(w, r, plugin.SINK)
  462. }
  463. //delete a sink plugin
  464. func sinkHandler(w http.ResponseWriter, r *http.Request) {
  465. pluginHandler(w, r, plugin.SINK)
  466. }
  467. //list or create function plugin
  468. func functionsHandler(w http.ResponseWriter, r *http.Request) {
  469. pluginsHandler(w, r, plugin.FUNCTION)
  470. }
  471. //list all user defined functions in all function plugins
  472. func functionsListHandler(w http.ResponseWriter, r *http.Request) {
  473. pluginManager := native.GetManager()
  474. content := pluginManager.ListSymbols()
  475. jsonResponse(content, w, logger)
  476. }
  477. func functionsGetHandler(w http.ResponseWriter, r *http.Request) {
  478. vars := mux.Vars(r)
  479. name := vars["name"]
  480. pluginManager := native.GetManager()
  481. j, ok := pluginManager.GetPluginBySymbol(plugin.FUNCTION, name)
  482. if !ok {
  483. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe function %s error", name), logger)
  484. return
  485. }
  486. jsonResponse(map[string]string{"name": name, "plugin": j}, w, logger)
  487. }
  488. //delete a function plugin
  489. func functionHandler(w http.ResponseWriter, r *http.Request) {
  490. pluginHandler(w, r, plugin.FUNCTION)
  491. }
  492. type functionList struct {
  493. Functions []string `json:"functions,omitempty"`
  494. }
  495. // register function list for function plugin. If a plugin exports multiple functions, the function list must be registered
  496. // either by create or register. If the function plugin has been loaded because of auto load through so file, the function
  497. // list MUST be registered by this API or only the function with the same name as the plugin can be used.
  498. func functionRegisterHandler(w http.ResponseWriter, r *http.Request) {
  499. defer r.Body.Close()
  500. vars := mux.Vars(r)
  501. name := vars["name"]
  502. pluginManager := native.GetManager()
  503. _, ok := pluginManager.GetPluginInfo(plugin.FUNCTION, name)
  504. if !ok {
  505. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("register %s plugin %s error", plugin.PluginTypes[plugin.FUNCTION], name), logger)
  506. return
  507. }
  508. sd := functionList{}
  509. err := json.NewDecoder(r.Body).Decode(&sd)
  510. // Problems decoding
  511. if err != nil {
  512. handleError(w, err, fmt.Sprintf("Invalid body: Error decoding the function list json %s", r.Body), logger)
  513. return
  514. }
  515. err = pluginManager.RegisterFuncs(name, sd.Functions)
  516. if err != nil {
  517. handleError(w, err, fmt.Sprintf("function plugins %s regiser functions error", name), logger)
  518. return
  519. }
  520. w.WriteHeader(http.StatusOK)
  521. w.Write([]byte(fmt.Sprintf("function plugin %s function list is registered", name)))
  522. }
  523. func portablesHandler(w http.ResponseWriter, r *http.Request) {
  524. m := portable.GetManager()
  525. defer r.Body.Close()
  526. switch r.Method {
  527. case http.MethodGet:
  528. content := m.List()
  529. jsonResponse(content, w, logger)
  530. case http.MethodPost:
  531. sd := plugin.NewPluginByType(plugin.PORTABLE)
  532. err := json.NewDecoder(r.Body).Decode(sd)
  533. // Problems decoding
  534. if err != nil {
  535. handleError(w, err, "Invalid body: Error decoding the portable plugin json", logger)
  536. return
  537. }
  538. err = m.Register(sd)
  539. if err != nil {
  540. handleError(w, err, "portable plugin create command error", logger)
  541. return
  542. }
  543. w.WriteHeader(http.StatusCreated)
  544. w.Write([]byte(fmt.Sprintf("portable plugin %s is created", sd.GetName())))
  545. }
  546. }
  547. func portableHandler(w http.ResponseWriter, r *http.Request) {
  548. defer r.Body.Close()
  549. vars := mux.Vars(r)
  550. name := vars["name"]
  551. m := portable.GetManager()
  552. switch r.Method {
  553. case http.MethodDelete:
  554. err := m.Delete(name)
  555. if err != nil {
  556. handleError(w, err, fmt.Sprintf("delete portable plugin %s error", name), logger)
  557. return
  558. }
  559. w.WriteHeader(http.StatusOK)
  560. result := fmt.Sprintf("portable plugin %s is deleted", name)
  561. w.Write([]byte(result))
  562. case http.MethodGet:
  563. j, ok := m.GetPluginInfo(name)
  564. if !ok {
  565. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe portable plugin %s error", name), logger)
  566. return
  567. }
  568. jsonResponse(j, w, logger)
  569. }
  570. }
  571. func prebuildSourcePlugins(w http.ResponseWriter, r *http.Request) {
  572. prebuildPluginsHandler(w, r, plugin.SOURCE)
  573. }
  574. func prebuildSinkPlugins(w http.ResponseWriter, r *http.Request) {
  575. prebuildPluginsHandler(w, r, plugin.SINK)
  576. }
  577. func prebuildFuncsPlugins(w http.ResponseWriter, r *http.Request) {
  578. prebuildPluginsHandler(w, r, plugin.FUNCTION)
  579. }
  580. func isOffcialDockerImage() bool {
  581. if !strings.EqualFold(os.Getenv("MAINTAINER"), "emqx.io") {
  582. return false
  583. }
  584. return true
  585. }
  586. func prebuildPluginsHandler(w http.ResponseWriter, r *http.Request, t plugin.PluginType) {
  587. emsg := "It's strongly recommended to install plugins at official released Debian Docker images. If you choose to proceed to install plugin, please make sure the plugin is already validated in your own build."
  588. if !isOffcialDockerImage() {
  589. handleError(w, fmt.Errorf(emsg), "", logger)
  590. return
  591. } else if runtime.GOOS == "linux" {
  592. osrelease, err := Read()
  593. if err != nil {
  594. logger.Infof("")
  595. return
  596. }
  597. prettyName := strings.ToUpper(osrelease["PRETTY_NAME"])
  598. os := "debian"
  599. if strings.Contains(prettyName, "DEBIAN") {
  600. hosts := conf.Config.Basic.PluginHosts
  601. ptype := "sources"
  602. if t == plugin.SINK {
  603. ptype = "sinks"
  604. } else if t == plugin.FUNCTION {
  605. ptype = "functions"
  606. }
  607. if err, plugins := fetchPluginList(hosts, ptype, os, runtime.GOARCH); err != nil {
  608. handleError(w, err, "", logger)
  609. } else {
  610. jsonResponse(plugins, w, logger)
  611. }
  612. } else {
  613. handleError(w, fmt.Errorf(emsg), "", logger)
  614. return
  615. }
  616. } else {
  617. handleError(w, fmt.Errorf(emsg), "", logger)
  618. }
  619. }
  620. func fetchPluginList(hosts, ptype, os, arch string) (err error, result map[string]string) {
  621. if hosts == "" || ptype == "" || os == "" {
  622. logger.Errorf("Invalid parameter value: hosts %s, ptype %s or os: %s should not be empty.", hosts, ptype, os)
  623. return fmt.Errorf("Invalid configruation for plugin host in kuiper.yaml."), nil
  624. }
  625. result = make(map[string]string)
  626. hostsArr := strings.Split(hosts, ",")
  627. for _, host := range hostsArr {
  628. host := strings.Trim(host, " ")
  629. tmp := []string{host, "kuiper-plugins", version, os, ptype}
  630. //The url is similar to http://host:port/kuiper-plugins/0.9.1/debian/sinks/
  631. url := strings.Join(tmp, "/")
  632. timeout := time.Duration(30 * time.Second)
  633. client := &http.Client{
  634. Timeout: timeout,
  635. Transport: &http.Transport{
  636. TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
  637. },
  638. }
  639. resp, err := client.Get(url)
  640. logger.Infof("Trying to fetch plugins from url: %s\n", url)
  641. if err != nil {
  642. return err, nil
  643. }
  644. defer resp.Body.Close()
  645. if resp.StatusCode != http.StatusOK {
  646. return fmt.Errorf("Cannot fetch plugin list from %s, with status error: %v", url, resp.StatusCode), nil
  647. }
  648. data, err := ioutil.ReadAll(resp.Body)
  649. if err != nil {
  650. return err, nil
  651. }
  652. plugins := extractFromHtml(string(data), arch)
  653. for _, p := range plugins {
  654. //If already existed, using the existed.
  655. if _, ok := result[p]; !ok {
  656. result[p] = url + "/" + p + "_" + arch + ".zip"
  657. }
  658. logger.Debugf("Plugin %s, download address is %s\n", p, result[p])
  659. }
  660. }
  661. return
  662. }
  663. func extractFromHtml(content, arch string) []string {
  664. plugins := []string{}
  665. htmlTokens := html.NewTokenizer(strings.NewReader(content))
  666. loop:
  667. for {
  668. tt := htmlTokens.Next()
  669. switch tt {
  670. case html.ErrorToken:
  671. break loop
  672. case html.StartTagToken:
  673. t := htmlTokens.Token()
  674. isAnchor := t.Data == "a"
  675. if isAnchor {
  676. found := false
  677. for _, prop := range t.Attr {
  678. if strings.ToUpper(prop.Key) == "HREF" {
  679. if strings.HasSuffix(prop.Val, "_"+arch+".zip") {
  680. if index := strings.LastIndex(prop.Val, "_"); index != -1 {
  681. plugins = append(plugins, prop.Val[0:index])
  682. }
  683. }
  684. found = true
  685. }
  686. }
  687. if !found {
  688. logger.Infof("Invalid plugin download link %s", t)
  689. }
  690. }
  691. }
  692. }
  693. return plugins
  694. }
  695. //list sink plugin
  696. func sinksMetaHandler(w http.ResponseWriter, r *http.Request) {
  697. defer r.Body.Close()
  698. sinks := meta.GetSinks()
  699. jsonResponse(sinks, w, logger)
  700. return
  701. }
  702. //Get sink metadata when creating rules
  703. func newSinkMetaHandler(w http.ResponseWriter, r *http.Request) {
  704. defer r.Body.Close()
  705. vars := mux.Vars(r)
  706. pluginName := vars["name"]
  707. language := getLanguage(r)
  708. ptrMetadata, err := meta.GetSinkMeta(pluginName, language)
  709. if err != nil {
  710. handleError(w, err, "", logger)
  711. return
  712. }
  713. jsonResponse(ptrMetadata, w, logger)
  714. }
  715. //list functions
  716. func functionsMetaHandler(w http.ResponseWriter, r *http.Request) {
  717. defer r.Body.Close()
  718. sinks := meta.GetFunctions()
  719. jsonResponse(sinks, w, logger)
  720. return
  721. }
  722. //list source plugin
  723. func sourcesMetaHandler(w http.ResponseWriter, r *http.Request) {
  724. defer r.Body.Close()
  725. ret := meta.GetSourcesPlugins()
  726. if nil != ret {
  727. jsonResponse(ret, w, logger)
  728. return
  729. }
  730. }
  731. //list shareMeta
  732. func connectionsMetaHandler(w http.ResponseWriter, r *http.Request) {
  733. defer r.Body.Close()
  734. ret := meta.GetConnectionPlugins()
  735. if nil != ret {
  736. jsonResponse(ret, w, logger)
  737. return
  738. }
  739. }
  740. //Get source metadata when creating stream
  741. func sourceMetaHandler(w http.ResponseWriter, r *http.Request) {
  742. defer r.Body.Close()
  743. vars := mux.Vars(r)
  744. pluginName := vars["name"]
  745. language := getLanguage(r)
  746. ret, err := meta.GetSourceMeta(pluginName, language)
  747. if err != nil {
  748. handleError(w, err, "", logger)
  749. return
  750. }
  751. if nil != ret {
  752. jsonResponse(ret, w, logger)
  753. return
  754. }
  755. }
  756. //Get source metadata when creating stream
  757. func connectionMetaHandler(w http.ResponseWriter, r *http.Request) {
  758. defer r.Body.Close()
  759. vars := mux.Vars(r)
  760. pluginName := vars["name"]
  761. language := getLanguage(r)
  762. ret, err := meta.GetConnectionMeta(pluginName, language)
  763. if err != nil {
  764. handleError(w, err, "", logger)
  765. return
  766. }
  767. if nil != ret {
  768. jsonResponse(ret, w, logger)
  769. return
  770. }
  771. }
  772. //Get source yaml
  773. func sourceConfHandler(w http.ResponseWriter, r *http.Request) {
  774. defer r.Body.Close()
  775. vars := mux.Vars(r)
  776. pluginName := vars["name"]
  777. language := getLanguage(r)
  778. configOperatorKey := fmt.Sprintf(meta.SourceCfgOperatorKeyTemplate, pluginName)
  779. ret, err := meta.GetYamlConf(configOperatorKey, language)
  780. if err != nil {
  781. handleError(w, err, "", logger)
  782. return
  783. } else {
  784. w.Write(ret)
  785. }
  786. }
  787. //Get share yaml
  788. func connectionConfHandler(w http.ResponseWriter, r *http.Request) {
  789. defer r.Body.Close()
  790. vars := mux.Vars(r)
  791. pluginName := vars["name"]
  792. language := getLanguage(r)
  793. configOperatorKey := fmt.Sprintf(meta.ConnectionCfgOperatorKeyTemplate, pluginName)
  794. ret, err := meta.GetYamlConf(configOperatorKey, language)
  795. if err != nil {
  796. handleError(w, err, "", logger)
  797. return
  798. } else {
  799. w.Write(ret)
  800. }
  801. }
  802. //Add del confkey
  803. func sourceConfKeyHandler(w http.ResponseWriter, r *http.Request) {
  804. defer r.Body.Close()
  805. var err error
  806. vars := mux.Vars(r)
  807. pluginName := vars["name"]
  808. confKey := vars["confKey"]
  809. language := getLanguage(r)
  810. switch r.Method {
  811. case http.MethodDelete:
  812. err = meta.DelSourceConfKey(pluginName, confKey, language)
  813. case http.MethodPost:
  814. v, err1 := ioutil.ReadAll(r.Body)
  815. if err1 != nil {
  816. handleError(w, err, "Invalid body", logger)
  817. return
  818. }
  819. err = meta.AddSourceConfKey(pluginName, confKey, language, v)
  820. }
  821. if err != nil {
  822. handleError(w, err, "", logger)
  823. return
  824. }
  825. }
  826. //Add del confkey
  827. func connectionConfKeyHandler(w http.ResponseWriter, r *http.Request) {
  828. defer r.Body.Close()
  829. var err error
  830. vars := mux.Vars(r)
  831. pluginName := vars["name"]
  832. confKey := vars["confKey"]
  833. language := getLanguage(r)
  834. switch r.Method {
  835. case http.MethodDelete:
  836. err = meta.DelConnectionConfKey(pluginName, confKey, language)
  837. case http.MethodPost:
  838. v, err1 := ioutil.ReadAll(r.Body)
  839. if err1 != nil {
  840. handleError(w, err1, "Invalid body", logger)
  841. return
  842. }
  843. err = meta.AddConnectionConfKey(pluginName, confKey, language, v)
  844. }
  845. if err != nil {
  846. handleError(w, err, "", logger)
  847. return
  848. }
  849. }
  850. //Del and Update field of confkey
  851. func sourceConfKeyFieldsHandler(w http.ResponseWriter, r *http.Request) {
  852. defer r.Body.Close()
  853. var err error
  854. vars := mux.Vars(r)
  855. pluginName := vars["name"]
  856. confKey := vars["confKey"]
  857. v, err := ioutil.ReadAll(r.Body)
  858. if err != nil {
  859. handleError(w, err, "Invalid body", logger)
  860. return
  861. }
  862. language := getLanguage(r)
  863. switch r.Method {
  864. case http.MethodDelete:
  865. err = meta.DelSourceConfKeyField(pluginName, confKey, language, v)
  866. case http.MethodPost:
  867. err = meta.AddSourceConfKeyField(pluginName, confKey, language, v)
  868. }
  869. if err != nil {
  870. handleError(w, err, "", logger)
  871. return
  872. }
  873. }
  874. //Del and Update field of confkey
  875. func connectionConfKeyFieldsHandler(w http.ResponseWriter, r *http.Request) {
  876. defer r.Body.Close()
  877. var err error
  878. vars := mux.Vars(r)
  879. pluginName := vars["name"]
  880. confKey := vars["confKey"]
  881. v, err := ioutil.ReadAll(r.Body)
  882. if err != nil {
  883. handleError(w, err, "Invalid body", logger)
  884. return
  885. }
  886. language := getLanguage(r)
  887. switch r.Method {
  888. case http.MethodDelete:
  889. err = meta.DelConnectionConfKeyField(pluginName, confKey, language, v)
  890. case http.MethodPost:
  891. err = meta.AddConnectionConfKeyField(pluginName, confKey, language, v)
  892. }
  893. if err != nil {
  894. handleError(w, err, "", logger)
  895. return
  896. }
  897. }
  898. func getLanguage(r *http.Request) string {
  899. language := r.Header.Get("Content-Language")
  900. if 0 == len(language) {
  901. language = "en_US"
  902. }
  903. return language
  904. }
  905. func servicesHandler(w http.ResponseWriter, r *http.Request) {
  906. defer r.Body.Close()
  907. serviceManager := service.GetManager()
  908. switch r.Method {
  909. case http.MethodGet:
  910. content, err := serviceManager.List()
  911. if err != nil {
  912. handleError(w, err, "service list command error", logger)
  913. return
  914. }
  915. jsonResponse(content, w, logger)
  916. case http.MethodPost:
  917. sd := &service.ServiceCreationRequest{}
  918. err := json.NewDecoder(r.Body).Decode(sd)
  919. // Problems decoding
  920. if err != nil {
  921. handleError(w, err, "Invalid body: Error decoding the %s service request payload", logger)
  922. return
  923. }
  924. err = serviceManager.Create(sd)
  925. if err != nil {
  926. handleError(w, err, "service create command error", logger)
  927. return
  928. }
  929. w.WriteHeader(http.StatusCreated)
  930. w.Write([]byte(fmt.Sprintf("service %s is created", sd.Name)))
  931. }
  932. }
  933. func serviceHandler(w http.ResponseWriter, r *http.Request) {
  934. defer r.Body.Close()
  935. vars := mux.Vars(r)
  936. name := vars["name"]
  937. serviceManager := service.GetManager()
  938. switch r.Method {
  939. case http.MethodDelete:
  940. err := serviceManager.Delete(name)
  941. if err != nil {
  942. handleError(w, err, fmt.Sprintf("delete service %s error", name), logger)
  943. return
  944. }
  945. w.WriteHeader(http.StatusOK)
  946. result := fmt.Sprintf("service %s is deleted", name)
  947. w.Write([]byte(result))
  948. case http.MethodGet:
  949. j, err := serviceManager.Get(name)
  950. if err != nil {
  951. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe service %s error", name), logger)
  952. return
  953. }
  954. jsonResponse(j, w, logger)
  955. case http.MethodPut:
  956. sd := &service.ServiceCreationRequest{}
  957. err := json.NewDecoder(r.Body).Decode(sd)
  958. // Problems decoding
  959. if err != nil {
  960. handleError(w, err, "Invalid body: Error decoding the %s service request payload", logger)
  961. return
  962. }
  963. sd.Name = name
  964. err = serviceManager.Update(sd)
  965. if err != nil {
  966. handleError(w, err, "service update command error", logger)
  967. return
  968. }
  969. w.WriteHeader(http.StatusOK)
  970. w.Write([]byte(fmt.Sprintf("service %s is updated", sd.Name)))
  971. }
  972. }
  973. func serviceFunctionsHandler(w http.ResponseWriter, r *http.Request) {
  974. serviceManager := service.GetManager()
  975. content, err := serviceManager.ListFunctions()
  976. if err != nil {
  977. handleError(w, err, "service list command error", logger)
  978. return
  979. }
  980. jsonResponse(content, w, logger)
  981. }
  982. func serviceFunctionHandler(w http.ResponseWriter, r *http.Request) {
  983. vars := mux.Vars(r)
  984. name := vars["name"]
  985. serviceManager := service.GetManager()
  986. j, err := serviceManager.GetFunction(name)
  987. if err != nil {
  988. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe function %s error", name), logger)
  989. return
  990. }
  991. jsonResponse(j, w, logger)
  992. }