rest.go 26 KB


  1. package server
  2. import (
  3. "crypto/tls"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/gorilla/handlers"
  7. "github.com/gorilla/mux"
  8. "github.com/lf-edge/ekuiper/internal/conf"
  9. "github.com/lf-edge/ekuiper/internal/plugin"
  10. "github.com/lf-edge/ekuiper/internal/service"
  11. "github.com/lf-edge/ekuiper/pkg/api"
  12. "github.com/lf-edge/ekuiper/pkg/ast"
  13. "github.com/lf-edge/ekuiper/pkg/errorx"
  14. "golang.org/x/net/html"
  15. "io"
  16. "io/ioutil"
  17. "net/http"
  18. "os"
  19. "runtime"
  20. "strings"
  21. "time"
  22. )
  23. const (
  24. ContentType = "Content-Type"
  25. ContentTypeJSON = "application/json"
  26. )
  27. type statementDescriptor struct {
  28. Sql string `json:"sql,omitempty"`
  29. }
  30. func decodeStatementDescriptor(reader io.ReadCloser) (statementDescriptor, error) {
  31. sd := statementDescriptor{}
  32. err := json.NewDecoder(reader).Decode(&sd)
  33. // Problems decoding
  34. if err != nil {
  35. return sd, fmt.Errorf("Error decoding the statement descriptor: %v", err)
  36. }
  37. return sd, nil
  38. }
  39. // Handle applies the specified error and error concept tot he HTTP response writer
  40. func handleError(w http.ResponseWriter, err error, prefix string, logger api.Logger) {
  41. message := prefix
  42. if message != "" {
  43. message += ": "
  44. }
  45. message += err.Error()
  46. logger.Error(message)
  47. var ec int
  48. switch e := err.(type) {
  49. case *errorx.Error:
  50. switch e.Code() {
  51. case errorx.NOT_FOUND:
  52. ec = http.StatusNotFound
  53. default:
  54. ec = http.StatusBadRequest
  55. }
  56. default:
  57. ec = http.StatusBadRequest
  58. }
  59. http.Error(w, message, ec)
  60. }
  61. func jsonResponse(i interface{}, w http.ResponseWriter, logger api.Logger) {
  62. w.Header().Add(ContentType, ContentTypeJSON)
  63. enc := json.NewEncoder(w)
  64. err := enc.Encode(i)
  65. // Problems encoding
  66. if err != nil {
  67. handleError(w, err, "", logger)
  68. }
  69. }
  70. func createRestServer(ip string, port int) *http.Server {
  71. r := mux.NewRouter()
  72. r.HandleFunc("/", rootHandler).Methods(http.MethodGet, http.MethodPost)
  73. r.HandleFunc("/ping", pingHandler).Methods(http.MethodGet)
  74. r.HandleFunc("/streams", streamsHandler).Methods(http.MethodGet, http.MethodPost)
  75. r.HandleFunc("/streams/{name}", streamHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
  76. r.HandleFunc("/tables", tablesHandler).Methods(http.MethodGet, http.MethodPost)
  77. r.HandleFunc("/tables/{name}", tableHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
  78. r.HandleFunc("/rules", rulesHandler).Methods(http.MethodGet, http.MethodPost)
  79. r.HandleFunc("/rules/{name}", ruleHandler).Methods(http.MethodDelete, http.MethodGet, http.MethodPut)
  80. r.HandleFunc("/rules/{name}/status", getStatusRuleHandler).Methods(http.MethodGet)
  81. r.HandleFunc("/rules/{name}/start", startRuleHandler).Methods(http.MethodPost)
  82. r.HandleFunc("/rules/{name}/stop", stopRuleHandler).Methods(http.MethodPost)
  83. r.HandleFunc("/rules/{name}/restart", restartRuleHandler).Methods(http.MethodPost)
  84. r.HandleFunc("/rules/{name}/topo", getTopoRuleHandler).Methods(http.MethodGet)
  85. r.HandleFunc("/plugins/sources", sourcesHandler).Methods(http.MethodGet, http.MethodPost)
  86. r.HandleFunc("/plugins/sources/prebuild", prebuildSourcePlugins).Methods(http.MethodGet)
  87. r.HandleFunc("/plugins/sources/{name}", sourceHandler).Methods(http.MethodDelete, http.MethodGet)
  88. r.HandleFunc("/plugins/sinks", sinksHandler).Methods(http.MethodGet, http.MethodPost)
  89. r.HandleFunc("/plugins/sinks/prebuild", prebuildSinkPlugins).Methods(http.MethodGet)
  90. r.HandleFunc("/plugins/sinks/{name}", sinkHandler).Methods(http.MethodDelete, http.MethodGet)
  91. r.HandleFunc("/plugins/functions", functionsHandler).Methods(http.MethodGet, http.MethodPost)
  92. r.HandleFunc("/plugins/functions/prebuild", prebuildFuncsPlugins).Methods(http.MethodGet)
  93. r.HandleFunc("/plugins/functions/{name}", functionHandler).Methods(http.MethodDelete, http.MethodGet)
  94. r.HandleFunc("/plugins/functions/{name}/register", functionRegisterHandler).Methods(http.MethodPost)
  95. r.HandleFunc("/plugins/udfs", functionsListHandler).Methods(http.MethodGet)
  96. r.HandleFunc("/plugins/udfs/{name}", functionsGetHandler).Methods(http.MethodGet)
  97. r.HandleFunc("/metadata/functions", functionsMetaHandler).Methods(http.MethodGet)
  98. r.HandleFunc("/metadata/sinks", sinksMetaHandler).Methods(http.MethodGet)
  99. r.HandleFunc("/metadata/sinks/{name}", newSinkMetaHandler).Methods(http.MethodGet)
  100. r.HandleFunc("/metadata/sources", sourcesMetaHandler).Methods(http.MethodGet)
  101. r.HandleFunc("/metadata/sources/yaml/{name}", sourceConfHandler).Methods(http.MethodGet)
  102. r.HandleFunc("/metadata/sources/{name}", sourceMetaHandler).Methods(http.MethodGet)
  103. r.HandleFunc("/metadata/sources/{name}/confKeys", sourceConfKeysHandler).Methods(http.MethodGet)
  104. r.HandleFunc("/metadata/sources/{name}/confKeys/{confKey}", sourceConfKeyHandler).Methods(http.MethodDelete, http.MethodPost)
  105. r.HandleFunc("/metadata/sources/{name}/confKeys/{confKey}/field", sourceConfKeyFieldsHandler).Methods(http.MethodDelete, http.MethodPost)
  106. r.HandleFunc("/services", servicesHandler).Methods(http.MethodGet, http.MethodPost)
  107. r.HandleFunc("/services/functions", serviceFunctionsHandler).Methods(http.MethodGet)
  108. r.HandleFunc("/services/functions/{name}", serviceFunctionHandler).Methods(http.MethodGet)
  109. r.HandleFunc("/services/{name}", serviceHandler).Methods(http.MethodDelete, http.MethodGet, http.MethodPut)
  110. server := &http.Server{
  111. Addr: fmt.Sprintf("%s:%d", ip, port),
  112. // Good practice to set timeouts to avoid Slowloris attacks.
  113. WriteTimeout: time.Second * 60 * 5,
  114. ReadTimeout: time.Second * 60 * 5,
  115. IdleTimeout: time.Second * 60,
  116. Handler: handlers.CORS(handlers.AllowedHeaders([]string{"Accept", "Accept-Language", "Content-Type", "Content-Language", "Origin"}))(r),
  117. }
  118. server.SetKeepAlivesEnabled(false)
  119. return server
  120. }
  121. type information struct {
  122. Version string `json:"version"`
  123. Os string `json:"os"`
  124. UpTimeSeconds int64 `json:"upTimeSeconds"`
  125. }
  126. //The handler for root
  127. func rootHandler(w http.ResponseWriter, r *http.Request) {
  128. defer r.Body.Close()
  129. switch r.Method {
  130. case http.MethodGet, http.MethodPost:
  131. w.WriteHeader(http.StatusOK)
  132. info := new(information)
  133. info.Version = version
  134. info.UpTimeSeconds = time.Now().Unix() - startTimeStamp
  135. info.Os = runtime.GOOS
  136. byteInfo, _ := json.Marshal(info)
  137. w.Write(byteInfo)
  138. }
  139. }
  140. func pingHandler(w http.ResponseWriter, r *http.Request) {
  141. w.WriteHeader(http.StatusOK)
  142. }
  143. func sourcesManageHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
  144. defer r.Body.Close()
  145. switch r.Method {
  146. case http.MethodGet:
  147. content, err := streamProcessor.ShowStream(st)
  148. if err != nil {
  149. handleError(w, err, fmt.Sprintf("%s command error", strings.Title(ast.StreamTypeMap[st])), logger)
  150. return
  151. }
  152. jsonResponse(content, w, logger)
  153. case http.MethodPost:
  154. v, err := decodeStatementDescriptor(r.Body)
  155. if err != nil {
  156. handleError(w, err, "Invalid body", logger)
  157. return
  158. }
  159. content, err := streamProcessor.ExecStreamSql(v.Sql)
  160. if err != nil {
  161. handleError(w, err, fmt.Sprintf("%s command error", strings.Title(ast.StreamTypeMap[st])), logger)
  162. return
  163. }
  164. w.WriteHeader(http.StatusCreated)
  165. w.Write([]byte(content))
  166. }
  167. }
  168. func sourceManageHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
  169. defer r.Body.Close()
  170. vars := mux.Vars(r)
  171. name := vars["name"]
  172. switch r.Method {
  173. case http.MethodGet:
  174. content, err := streamProcessor.DescStream(name, st)
  175. if err != nil {
  176. handleError(w, err, fmt.Sprintf("describe %s error", ast.StreamTypeMap[st]), logger)
  177. return
  178. }
  179. jsonResponse(content, w, logger)
  180. case http.MethodDelete:
  181. content, err := streamProcessor.DropStream(name, st)
  182. if err != nil {
  183. handleError(w, err, fmt.Sprintf("delete %s error", ast.StreamTypeMap[st]), logger)
  184. return
  185. }
  186. w.WriteHeader(http.StatusOK)
  187. w.Write([]byte(content))
  188. case http.MethodPut:
  189. v, err := decodeStatementDescriptor(r.Body)
  190. if err != nil {
  191. handleError(w, err, "Invalid body", logger)
  192. return
  193. }
  194. content, err := streamProcessor.ExecReplaceStream(v.Sql, st)
  195. if err != nil {
  196. handleError(w, err, fmt.Sprintf("%s command error", strings.Title(ast.StreamTypeMap[st])), logger)
  197. return
  198. }
  199. w.WriteHeader(http.StatusOK)
  200. w.Write([]byte(content))
  201. }
  202. }
  203. //list or create streams
  204. func streamsHandler(w http.ResponseWriter, r *http.Request) {
  205. sourcesManageHandler(w, r, ast.TypeStream)
  206. }
  207. //describe or delete a stream
  208. func streamHandler(w http.ResponseWriter, r *http.Request) {
  209. sourceManageHandler(w, r, ast.TypeStream)
  210. }
  211. //list or create tables
  212. func tablesHandler(w http.ResponseWriter, r *http.Request) {
  213. sourcesManageHandler(w, r, ast.TypeTable)
  214. }
  215. func tableHandler(w http.ResponseWriter, r *http.Request) {
  216. sourceManageHandler(w, r, ast.TypeTable)
  217. }
  218. //list or create rules
  219. func rulesHandler(w http.ResponseWriter, r *http.Request) {
  220. defer r.Body.Close()
  221. switch r.Method {
  222. case http.MethodPost:
  223. body, err := ioutil.ReadAll(r.Body)
  224. if err != nil {
  225. handleError(w, err, "Invalid body", logger)
  226. return
  227. }
  228. r, err := ruleProcessor.ExecCreate("", string(body))
  229. var result string
  230. if err != nil {
  231. handleError(w, err, "Create rule error", logger)
  232. return
  233. } else {
  234. result = fmt.Sprintf("Rule %s was created successfully.", r.Id)
  235. }
  236. //Start the rule
  237. rs, err := createRuleState(r)
  238. if err != nil {
  239. result = err.Error()
  240. } else {
  241. err = doStartRule(rs)
  242. if err != nil {
  243. result = err.Error()
  244. }
  245. }
  246. w.WriteHeader(http.StatusCreated)
  247. w.Write([]byte(result))
  248. case http.MethodGet:
  249. content, err := getAllRulesWithStatus()
  250. if err != nil {
  251. handleError(w, err, "Show rules error", logger)
  252. return
  253. }
  254. jsonResponse(content, w, logger)
  255. }
  256. }
  257. //describe or delete a rule
  258. func ruleHandler(w http.ResponseWriter, r *http.Request) {
  259. defer r.Body.Close()
  260. vars := mux.Vars(r)
  261. name := vars["name"]
  262. switch r.Method {
  263. case http.MethodGet:
  264. rule, err := ruleProcessor.GetRuleByName(name)
  265. if err != nil {
  266. handleError(w, err, "describe rule error", logger)
  267. return
  268. }
  269. jsonResponse(rule, w, logger)
  270. case http.MethodDelete:
  271. deleteRule(name)
  272. content, err := ruleProcessor.ExecDrop(name)
  273. if err != nil {
  274. handleError(w, err, "delete rule error", logger)
  275. return
  276. }
  277. w.WriteHeader(http.StatusOK)
  278. w.Write([]byte(content))
  279. case http.MethodPut:
  280. _, err := ruleProcessor.GetRuleByName(name)
  281. if err != nil {
  282. handleError(w, err, "not found this rule", logger)
  283. return
  284. }
  285. body, err := ioutil.ReadAll(r.Body)
  286. if err != nil {
  287. handleError(w, err, "Invalid body", logger)
  288. return
  289. }
  290. r, err := ruleProcessor.ExecUpdate(name, string(body))
  291. var result string
  292. if err != nil {
  293. handleError(w, err, "Update rule error", logger)
  294. return
  295. } else {
  296. result = fmt.Sprintf("Rule %s was updated successfully.", r.Id)
  297. }
  298. err = restartRule(name)
  299. if err != nil {
  300. handleError(w, err, "restart rule error", logger)
  301. return
  302. }
  303. w.WriteHeader(http.StatusOK)
  304. w.Write([]byte(result))
  305. }
  306. }
  307. //get status of a rule
  308. func getStatusRuleHandler(w http.ResponseWriter, r *http.Request) {
  309. defer r.Body.Close()
  310. vars := mux.Vars(r)
  311. name := vars["name"]
  312. content, err := getRuleStatus(name)
  313. if err != nil {
  314. handleError(w, err, "get rule status error", logger)
  315. return
  316. }
  317. w.WriteHeader(http.StatusOK)
  318. w.Write([]byte(content))
  319. }
  320. //start a rule
  321. func startRuleHandler(w http.ResponseWriter, r *http.Request) {
  322. defer r.Body.Close()
  323. vars := mux.Vars(r)
  324. name := vars["name"]
  325. err := startRule(name)
  326. if err != nil {
  327. handleError(w, err, "start rule error", logger)
  328. return
  329. }
  330. w.WriteHeader(http.StatusOK)
  331. w.Write([]byte(fmt.Sprintf("Rule %s was started", name)))
  332. }
  333. //stop a rule
  334. func stopRuleHandler(w http.ResponseWriter, r *http.Request) {
  335. defer r.Body.Close()
  336. vars := mux.Vars(r)
  337. name := vars["name"]
  338. result := stopRule(name)
  339. w.WriteHeader(http.StatusOK)
  340. w.Write([]byte(result))
  341. }
  342. //restart a rule
  343. func restartRuleHandler(w http.ResponseWriter, r *http.Request) {
  344. defer r.Body.Close()
  345. vars := mux.Vars(r)
  346. name := vars["name"]
  347. err := restartRule(name)
  348. if err != nil {
  349. handleError(w, err, "restart rule error", logger)
  350. return
  351. }
  352. w.WriteHeader(http.StatusOK)
  353. w.Write([]byte(fmt.Sprintf("Rule %s was restarted", name)))
  354. }
  355. //get topo of a rule
  356. func getTopoRuleHandler(w http.ResponseWriter, r *http.Request) {
  357. defer r.Body.Close()
  358. vars := mux.Vars(r)
  359. name := vars["name"]
  360. content, err := getRuleTopo(name)
  361. if err != nil {
  362. handleError(w, err, "get rule topo error", logger)
  363. return
  364. }
  365. w.Header().Set(ContentType, ContentTypeJSON)
  366. w.Write([]byte(content))
  367. }
  368. func pluginsHandler(w http.ResponseWriter, r *http.Request, t plugin.PluginType) {
  369. defer r.Body.Close()
  370. switch r.Method {
  371. case http.MethodGet:
  372. content, err := pluginManager.List(t)
  373. if err != nil {
  374. handleError(w, err, fmt.Sprintf("%s plugins list command error", plugin.PluginTypes[t]), logger)
  375. return
  376. }
  377. jsonResponse(content, w, logger)
  378. case http.MethodPost:
  379. sd := plugin.NewPluginByType(t)
  380. err := json.NewDecoder(r.Body).Decode(sd)
  381. // Problems decoding
  382. if err != nil {
  383. handleError(w, err, fmt.Sprintf("Invalid body: Error decoding the %s plugin json", plugin.PluginTypes[t]), logger)
  384. return
  385. }
  386. err = pluginManager.Register(t, sd)
  387. if err != nil {
  388. handleError(w, err, fmt.Sprintf("%s plugins create command error", plugin.PluginTypes[t]), logger)
  389. return
  390. }
  391. w.WriteHeader(http.StatusCreated)
  392. w.Write([]byte(fmt.Sprintf("%s plugin %s is created", plugin.PluginTypes[t], sd.GetName())))
  393. }
  394. }
  395. func pluginHandler(w http.ResponseWriter, r *http.Request, t plugin.PluginType) {
  396. defer r.Body.Close()
  397. vars := mux.Vars(r)
  398. name := vars["name"]
  399. cb := r.URL.Query().Get("stop")
  400. switch r.Method {
  401. case http.MethodDelete:
  402. r := cb == "1"
  403. err := pluginManager.Delete(t, name, r)
  404. if err != nil {
  405. handleError(w, err, fmt.Sprintf("delete %s plugin %s error", plugin.PluginTypes[t], name), logger)
  406. return
  407. }
  408. w.WriteHeader(http.StatusOK)
  409. result := fmt.Sprintf("%s plugin %s is deleted", plugin.PluginTypes[t], name)
  410. if r {
  411. result = fmt.Sprintf("%s and Kuiper will be stopped", result)
  412. } else {
  413. result = fmt.Sprintf("%s and Kuiper must restart for the change to take effect.", result)
  414. }
  415. w.Write([]byte(result))
  416. case http.MethodGet:
  417. j, ok := pluginManager.Get(t, name)
  418. if !ok {
  419. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe %s plugin %s error", plugin.PluginTypes[t], name), logger)
  420. return
  421. }
  422. jsonResponse(j, w, logger)
  423. }
  424. }
  425. //list or create source plugin
  426. func sourcesHandler(w http.ResponseWriter, r *http.Request) {
  427. pluginsHandler(w, r, plugin.SOURCE)
  428. }
  429. //delete a source plugin
  430. func sourceHandler(w http.ResponseWriter, r *http.Request) {
  431. pluginHandler(w, r, plugin.SOURCE)
  432. }
  433. //list or create sink plugin
  434. func sinksHandler(w http.ResponseWriter, r *http.Request) {
  435. pluginsHandler(w, r, plugin.SINK)
  436. }
  437. //delete a sink plugin
  438. func sinkHandler(w http.ResponseWriter, r *http.Request) {
  439. pluginHandler(w, r, plugin.SINK)
  440. }
  441. //list or create function plugin
  442. func functionsHandler(w http.ResponseWriter, r *http.Request) {
  443. pluginsHandler(w, r, plugin.FUNCTION)
  444. }
  445. //list all user defined functions in all function plugins
  446. func functionsListHandler(w http.ResponseWriter, r *http.Request) {
  447. content, err := pluginManager.ListSymbols()
  448. if err != nil {
  449. handleError(w, err, "udfs list command error", logger)
  450. return
  451. }
  452. jsonResponse(content, w, logger)
  453. }
  454. func functionsGetHandler(w http.ResponseWriter, r *http.Request) {
  455. vars := mux.Vars(r)
  456. name := vars["name"]
  457. j, ok := pluginManager.GetSymbol(name)
  458. if !ok {
  459. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe function %s error", name), logger)
  460. return
  461. }
  462. jsonResponse(map[string]string{"name": name, "plugin": j}, w, logger)
  463. }
  464. //delete a function plugin
  465. func functionHandler(w http.ResponseWriter, r *http.Request) {
  466. pluginHandler(w, r, plugin.FUNCTION)
  467. }
  468. type functionList struct {
  469. Functions []string `json:"functions,omitempty"`
  470. }
  471. // register function list for function plugin. If a plugin exports multiple functions, the function list must be registered
  472. // either by create or register. If the function plugin has been loaded because of auto load through so file, the function
  473. // list MUST be registered by this API or only the function with the same name as the plugin can be used.
  474. func functionRegisterHandler(w http.ResponseWriter, r *http.Request) {
  475. defer r.Body.Close()
  476. vars := mux.Vars(r)
  477. name := vars["name"]
  478. _, ok := pluginManager.Get(plugin.FUNCTION, name)
  479. if !ok {
  480. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("register %s plugin %s error", plugin.PluginTypes[plugin.FUNCTION], name), logger)
  481. return
  482. }
  483. sd := functionList{}
  484. err := json.NewDecoder(r.Body).Decode(&sd)
  485. // Problems decoding
  486. if err != nil {
  487. handleError(w, err, fmt.Sprintf("Invalid body: Error decoding the function list json %s", r.Body), logger)
  488. return
  489. }
  490. err = pluginManager.RegisterFuncs(name, sd.Functions)
  491. if err != nil {
  492. handleError(w, err, fmt.Sprintf("function plugins %s regiser functions error", name), logger)
  493. return
  494. }
  495. w.WriteHeader(http.StatusOK)
  496. w.Write([]byte(fmt.Sprintf("function plugin %s function list is registered", name)))
  497. }
  498. func prebuildSourcePlugins(w http.ResponseWriter, r *http.Request) {
  499. prebuildPluginsHandler(w, r, plugin.SOURCE)
  500. }
  501. func prebuildSinkPlugins(w http.ResponseWriter, r *http.Request) {
  502. prebuildPluginsHandler(w, r, plugin.SINK)
  503. }
  504. func prebuildFuncsPlugins(w http.ResponseWriter, r *http.Request) {
  505. prebuildPluginsHandler(w, r, plugin.FUNCTION)
  506. }
  507. func isOffcialDockerImage() bool {
  508. if strings.ToLower(os.Getenv("MAINTAINER")) != "emqx.io" {
  509. return false
  510. }
  511. return true
  512. }
  513. func prebuildPluginsHandler(w http.ResponseWriter, r *http.Request, t plugin.PluginType) {
  514. emsg := "It's strongly recommended to install plugins at official released Debian Docker images. If you choose to proceed to install plugin, please make sure the plugin is already validated in your own build."
  515. if !isOffcialDockerImage() {
  516. handleError(w, fmt.Errorf(emsg), "", logger)
  517. return
  518. } else if runtime.GOOS == "linux" {
  519. osrelease, err := Read()
  520. if err != nil {
  521. logger.Infof("")
  522. return
  523. }
  524. prettyName := strings.ToUpper(osrelease["PRETTY_NAME"])
  525. os := "debian"
  526. if strings.Contains(prettyName, "DEBIAN") {
  527. hosts := conf.Config.Basic.PluginHosts
  528. ptype := "sources"
  529. if t == plugin.SINK {
  530. ptype = "sinks"
  531. } else if t == plugin.FUNCTION {
  532. ptype = "functions"
  533. }
  534. if err, plugins := fetchPluginList(hosts, ptype, os, runtime.GOARCH); err != nil {
  535. handleError(w, err, "", logger)
  536. } else {
  537. jsonResponse(plugins, w, logger)
  538. }
  539. } else {
  540. handleError(w, fmt.Errorf(emsg), "", logger)
  541. return
  542. }
  543. } else {
  544. handleError(w, fmt.Errorf(emsg), "", logger)
  545. }
  546. }
  547. func fetchPluginList(hosts, ptype, os, arch string) (err error, result map[string]string) {
  548. if hosts == "" || ptype == "" || os == "" {
  549. logger.Errorf("Invalid parameter value: hosts %s, ptype %s or os: %s should not be empty.", hosts, ptype, os)
  550. return fmt.Errorf("Invalid configruation for plugin host in kuiper.yaml."), nil
  551. }
  552. result = make(map[string]string)
  553. hostsArr := strings.Split(hosts, ",")
  554. for _, host := range hostsArr {
  555. host := strings.Trim(host, " ")
  556. tmp := []string{host, "kuiper-plugins", version, os, ptype}
  557. //The url is similar to http://host:port/kuiper-plugins/0.9.1/debian/sinks/
  558. url := strings.Join(tmp, "/")
  559. timeout := time.Duration(30 * time.Second)
  560. client := &http.Client{
  561. Timeout: timeout,
  562. Transport: &http.Transport{
  563. TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
  564. },
  565. }
  566. resp, err := client.Get(url)
  567. logger.Infof("Trying to fetch plugins from url: %s\n", url)
  568. if err != nil {
  569. return err, nil
  570. }
  571. defer resp.Body.Close()
  572. if resp.StatusCode != http.StatusOK {
  573. return fmt.Errorf("Cannot fetch plugin list from %s, with status error: %v", url, resp.StatusCode), nil
  574. }
  575. data, err := ioutil.ReadAll(resp.Body)
  576. if err != nil {
  577. return err, nil
  578. }
  579. plugins := extractFromHtml(string(data), arch)
  580. for _, p := range plugins {
  581. //If already existed, using the existed.
  582. if _, ok := result[p]; !ok {
  583. result[p] = url + "/" + p + "_" + arch + ".zip"
  584. }
  585. logger.Debugf("Plugin %s, download address is %s\n", p, result[p])
  586. }
  587. }
  588. return
  589. }
  590. func extractFromHtml(content, arch string) []string {
  591. plugins := []string{}
  592. htmlTokens := html.NewTokenizer(strings.NewReader(content))
  593. loop:
  594. for {
  595. tt := htmlTokens.Next()
  596. switch tt {
  597. case html.ErrorToken:
  598. break loop
  599. case html.StartTagToken:
  600. t := htmlTokens.Token()
  601. isAnchor := t.Data == "a"
  602. if isAnchor {
  603. found := false
  604. for _, prop := range t.Attr {
  605. if strings.ToUpper(prop.Key) == "HREF" {
  606. if strings.HasSuffix(prop.Val, "_"+arch+".zip") {
  607. if index := strings.LastIndex(prop.Val, "_"); index != -1 {
  608. plugins = append(plugins, prop.Val[0:index])
  609. }
  610. }
  611. found = true
  612. }
  613. }
  614. if !found {
  615. logger.Infof("Invalid plugin download link %s", t)
  616. }
  617. }
  618. }
  619. }
  620. return plugins
  621. }
  622. //list sink plugin
  623. func sinksMetaHandler(w http.ResponseWriter, r *http.Request) {
  624. defer r.Body.Close()
  625. sinks := plugin.GetSinks()
  626. jsonResponse(sinks, w, logger)
  627. return
  628. }
  629. //Get sink metadata when creating rules
  630. func newSinkMetaHandler(w http.ResponseWriter, r *http.Request) {
  631. defer r.Body.Close()
  632. vars := mux.Vars(r)
  633. pluginName := vars["name"]
  634. language := getLanguage(r)
  635. ptrMetadata, err := plugin.GetSinkMeta(pluginName, language)
  636. if err != nil {
  637. handleError(w, err, "", logger)
  638. return
  639. }
  640. jsonResponse(ptrMetadata, w, logger)
  641. }
  642. //list functions
  643. func functionsMetaHandler(w http.ResponseWriter, r *http.Request) {
  644. defer r.Body.Close()
  645. sinks := plugin.GetFunctions()
  646. jsonResponse(sinks, w, logger)
  647. return
  648. }
  649. //list source plugin
  650. func sourcesMetaHandler(w http.ResponseWriter, r *http.Request) {
  651. defer r.Body.Close()
  652. ret := plugin.GetSources()
  653. if nil != ret {
  654. jsonResponse(ret, w, logger)
  655. return
  656. }
  657. }
  658. //Get source metadata when creating stream
  659. func sourceMetaHandler(w http.ResponseWriter, r *http.Request) {
  660. defer r.Body.Close()
  661. vars := mux.Vars(r)
  662. pluginName := vars["name"]
  663. language := getLanguage(r)
  664. ret, err := plugin.GetSourceMeta(pluginName, language)
  665. if err != nil {
  666. handleError(w, err, "", logger)
  667. return
  668. }
  669. if nil != ret {
  670. jsonResponse(ret, w, logger)
  671. return
  672. }
  673. }
  674. //Get source yaml
  675. func sourceConfHandler(w http.ResponseWriter, r *http.Request) {
  676. defer r.Body.Close()
  677. vars := mux.Vars(r)
  678. pluginName := vars["name"]
  679. language := getLanguage(r)
  680. ret, err := plugin.GetSourceConf(pluginName, language)
  681. if err != nil {
  682. handleError(w, err, "", logger)
  683. return
  684. } else {
  685. w.Write(ret)
  686. }
  687. }
  688. //Get confKeys of the source metadata
  689. func sourceConfKeysHandler(w http.ResponseWriter, r *http.Request) {
  690. defer r.Body.Close()
  691. vars := mux.Vars(r)
  692. pluginName := vars["name"]
  693. ret := plugin.GetSourceConfKeys(pluginName)
  694. if nil != ret {
  695. jsonResponse(ret, w, logger)
  696. return
  697. }
  698. }
  699. //Add del confkey
  700. func sourceConfKeyHandler(w http.ResponseWriter, r *http.Request) {
  701. defer r.Body.Close()
  702. var ret interface{}
  703. var err error
  704. vars := mux.Vars(r)
  705. pluginName := vars["name"]
  706. confKey := vars["confKey"]
  707. language := getLanguage(r)
  708. switch r.Method {
  709. case http.MethodDelete:
  710. err = plugin.DelSourceConfKey(pluginName, confKey, language)
  711. case http.MethodPost:
  712. v, err := ioutil.ReadAll(r.Body)
  713. if err != nil {
  714. handleError(w, err, "Invalid body", logger)
  715. return
  716. }
  717. err = plugin.AddSourceConfKey(pluginName, confKey, language, v)
  718. }
  719. if err != nil {
  720. handleError(w, err, "", logger)
  721. return
  722. }
  723. if nil != ret {
  724. jsonResponse(ret, w, logger)
  725. return
  726. }
  727. }
  728. //Del and Update field of confkey
  729. func sourceConfKeyFieldsHandler(w http.ResponseWriter, r *http.Request) {
  730. defer r.Body.Close()
  731. var ret interface{}
  732. var err error
  733. vars := mux.Vars(r)
  734. pluginName := vars["name"]
  735. confKey := vars["confKey"]
  736. v, err := ioutil.ReadAll(r.Body)
  737. if err != nil {
  738. handleError(w, err, "Invalid body", logger)
  739. return
  740. }
  741. language := getLanguage(r)
  742. switch r.Method {
  743. case http.MethodDelete:
  744. err = plugin.DelSourceConfKeyField(pluginName, confKey, language, v)
  745. case http.MethodPost:
  746. err = plugin.AddSourceConfKeyField(pluginName, confKey, language, v)
  747. }
  748. if err != nil {
  749. handleError(w, err, "", logger)
  750. return
  751. }
  752. if nil != ret {
  753. jsonResponse(ret, w, logger)
  754. return
  755. }
  756. }
  757. func getLanguage(r *http.Request) string {
  758. language := r.Header.Get("Content-Language")
  759. if 0 == len(language) {
  760. language = "en_US"
  761. }
  762. return language
  763. }
  764. func servicesHandler(w http.ResponseWriter, r *http.Request) {
  765. defer r.Body.Close()
  766. switch r.Method {
  767. case http.MethodGet:
  768. content, err := serviceManager.List()
  769. if err != nil {
  770. handleError(w, err, "service list command error", logger)
  771. return
  772. }
  773. jsonResponse(content, w, logger)
  774. case http.MethodPost:
  775. sd := &service.ServiceCreationRequest{}
  776. err := json.NewDecoder(r.Body).Decode(sd)
  777. // Problems decoding
  778. if err != nil {
  779. handleError(w, err, "Invalid body: Error decoding the %s service request payload", logger)
  780. return
  781. }
  782. err = serviceManager.Create(sd)
  783. if err != nil {
  784. handleError(w, err, "service create command error", logger)
  785. return
  786. }
  787. w.WriteHeader(http.StatusCreated)
  788. w.Write([]byte(fmt.Sprintf("service %s is created", sd.Name)))
  789. }
  790. }
  791. func serviceHandler(w http.ResponseWriter, r *http.Request) {
  792. defer r.Body.Close()
  793. vars := mux.Vars(r)
  794. name := vars["name"]
  795. switch r.Method {
  796. case http.MethodDelete:
  797. err := serviceManager.Delete(name)
  798. if err != nil {
  799. handleError(w, err, fmt.Sprintf("delete service %s error", name), logger)
  800. return
  801. }
  802. w.WriteHeader(http.StatusOK)
  803. result := fmt.Sprintf("service %s is deleted", name)
  804. w.Write([]byte(result))
  805. case http.MethodGet:
  806. j, err := serviceManager.Get(name)
  807. if err != nil {
  808. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe service %s error", name), logger)
  809. return
  810. }
  811. jsonResponse(j, w, logger)
  812. case http.MethodPut:
  813. sd := &service.ServiceCreationRequest{}
  814. err := json.NewDecoder(r.Body).Decode(sd)
  815. // Problems decoding
  816. if err != nil {
  817. handleError(w, err, "Invalid body: Error decoding the %s service request payload", logger)
  818. return
  819. }
  820. sd.Name = name
  821. err = serviceManager.Update(sd)
  822. if err != nil {
  823. handleError(w, err, "service update command error", logger)
  824. return
  825. }
  826. w.WriteHeader(http.StatusOK)
  827. w.Write([]byte(fmt.Sprintf("service %s is updated", sd.Name)))
  828. }
  829. }
  830. func serviceFunctionsHandler(w http.ResponseWriter, r *http.Request) {
  831. content, err := serviceManager.ListFunctions()
  832. if err != nil {
  833. handleError(w, err, "service list command error", logger)
  834. return
  835. }
  836. jsonResponse(content, w, logger)
  837. }
  838. func serviceFunctionHandler(w http.ResponseWriter, r *http.Request) {
  839. vars := mux.Vars(r)
  840. name := vars["name"]
  841. j, err := serviceManager.GetFunction(name)
  842. if err != nil {
  843. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe function %s error", name), logger)
  844. return
  845. }
  846. jsonResponse(j, w, logger)
  847. }