rest.go 27 KB


  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package server
  15. import (
  16. "crypto/tls"
  17. "encoding/json"
  18. "fmt"
  19. "github.com/gorilla/handlers"
  20. "github.com/gorilla/mux"
  21. "github.com/lf-edge/ekuiper/internal/conf"
  22. "github.com/lf-edge/ekuiper/internal/plugin"
  23. "github.com/lf-edge/ekuiper/internal/service"
  24. "github.com/lf-edge/ekuiper/pkg/api"
  25. "github.com/lf-edge/ekuiper/pkg/ast"
  26. "github.com/lf-edge/ekuiper/pkg/errorx"
  27. "golang.org/x/net/html"
  28. "io"
  29. "io/ioutil"
  30. "net/http"
  31. "os"
  32. "runtime"
  33. "strings"
  34. "time"
  35. )
  36. const (
  37. ContentType = "Content-Type"
  38. ContentTypeJSON = "application/json"
  39. )
  40. type statementDescriptor struct {
  41. Sql string `json:"sql,omitempty"`
  42. }
  43. func decodeStatementDescriptor(reader io.ReadCloser) (statementDescriptor, error) {
  44. sd := statementDescriptor{}
  45. err := json.NewDecoder(reader).Decode(&sd)
  46. // Problems decoding
  47. if err != nil {
  48. return sd, fmt.Errorf("Error decoding the statement descriptor: %v", err)
  49. }
  50. return sd, nil
  51. }
  52. // Handle applies the specified error and error concept tot he HTTP response writer
  53. func handleError(w http.ResponseWriter, err error, prefix string, logger api.Logger) {
  54. message := prefix
  55. if message != "" {
  56. message += ": "
  57. }
  58. message += err.Error()
  59. logger.Error(message)
  60. var ec int
  61. switch e := err.(type) {
  62. case *errorx.Error:
  63. switch e.Code() {
  64. case errorx.NOT_FOUND:
  65. ec = http.StatusNotFound
  66. default:
  67. ec = http.StatusBadRequest
  68. }
  69. default:
  70. ec = http.StatusBadRequest
  71. }
  72. http.Error(w, message, ec)
  73. }
  74. func jsonResponse(i interface{}, w http.ResponseWriter, logger api.Logger) {
  75. w.Header().Add(ContentType, ContentTypeJSON)
  76. enc := json.NewEncoder(w)
  77. err := enc.Encode(i)
  78. // Problems encoding
  79. if err != nil {
  80. handleError(w, err, "", logger)
  81. }
  82. }
  83. func createRestServer(ip string, port int) *http.Server {
  84. r := mux.NewRouter()
  85. r.HandleFunc("/", rootHandler).Methods(http.MethodGet, http.MethodPost)
  86. r.HandleFunc("/ping", pingHandler).Methods(http.MethodGet)
  87. r.HandleFunc("/streams", streamsHandler).Methods(http.MethodGet, http.MethodPost)
  88. r.HandleFunc("/streams/{name}", streamHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
  89. r.HandleFunc("/tables", tablesHandler).Methods(http.MethodGet, http.MethodPost)
  90. r.HandleFunc("/tables/{name}", tableHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
  91. r.HandleFunc("/rules", rulesHandler).Methods(http.MethodGet, http.MethodPost)
  92. r.HandleFunc("/rules/{name}", ruleHandler).Methods(http.MethodDelete, http.MethodGet, http.MethodPut)
  93. r.HandleFunc("/rules/{name}/status", getStatusRuleHandler).Methods(http.MethodGet)
  94. r.HandleFunc("/rules/{name}/start", startRuleHandler).Methods(http.MethodPost)
  95. r.HandleFunc("/rules/{name}/stop", stopRuleHandler).Methods(http.MethodPost)
  96. r.HandleFunc("/rules/{name}/restart", restartRuleHandler).Methods(http.MethodPost)
  97. r.HandleFunc("/rules/{name}/topo", getTopoRuleHandler).Methods(http.MethodGet)
  98. r.HandleFunc("/plugins/sources", sourcesHandler).Methods(http.MethodGet, http.MethodPost)
  99. r.HandleFunc("/plugins/sources/prebuild", prebuildSourcePlugins).Methods(http.MethodGet)
  100. r.HandleFunc("/plugins/sources/{name}", sourceHandler).Methods(http.MethodDelete, http.MethodGet)
  101. r.HandleFunc("/plugins/sinks", sinksHandler).Methods(http.MethodGet, http.MethodPost)
  102. r.HandleFunc("/plugins/sinks/prebuild", prebuildSinkPlugins).Methods(http.MethodGet)
  103. r.HandleFunc("/plugins/sinks/{name}", sinkHandler).Methods(http.MethodDelete, http.MethodGet)
  104. r.HandleFunc("/plugins/functions", functionsHandler).Methods(http.MethodGet, http.MethodPost)
  105. r.HandleFunc("/plugins/functions/prebuild", prebuildFuncsPlugins).Methods(http.MethodGet)
  106. r.HandleFunc("/plugins/functions/{name}", functionHandler).Methods(http.MethodDelete, http.MethodGet)
  107. r.HandleFunc("/plugins/functions/{name}/register", functionRegisterHandler).Methods(http.MethodPost)
  108. r.HandleFunc("/plugins/udfs", functionsListHandler).Methods(http.MethodGet)
  109. r.HandleFunc("/plugins/udfs/{name}", functionsGetHandler).Methods(http.MethodGet)
  110. r.HandleFunc("/metadata/functions", functionsMetaHandler).Methods(http.MethodGet)
  111. r.HandleFunc("/metadata/sinks", sinksMetaHandler).Methods(http.MethodGet)
  112. r.HandleFunc("/metadata/sinks/{name}", newSinkMetaHandler).Methods(http.MethodGet)
  113. r.HandleFunc("/metadata/sources", sourcesMetaHandler).Methods(http.MethodGet)
  114. r.HandleFunc("/metadata/sources/yaml/{name}", sourceConfHandler).Methods(http.MethodGet)
  115. r.HandleFunc("/metadata/sources/{name}", sourceMetaHandler).Methods(http.MethodGet)
  116. r.HandleFunc("/metadata/sources/{name}/confKeys", sourceConfKeysHandler).Methods(http.MethodGet)
  117. r.HandleFunc("/metadata/sources/{name}/confKeys/{confKey}", sourceConfKeyHandler).Methods(http.MethodDelete, http.MethodPost)
  118. r.HandleFunc("/metadata/sources/{name}/confKeys/{confKey}/field", sourceConfKeyFieldsHandler).Methods(http.MethodDelete, http.MethodPost)
  119. r.HandleFunc("/services", servicesHandler).Methods(http.MethodGet, http.MethodPost)
  120. r.HandleFunc("/services/functions", serviceFunctionsHandler).Methods(http.MethodGet)
  121. r.HandleFunc("/services/functions/{name}", serviceFunctionHandler).Methods(http.MethodGet)
  122. r.HandleFunc("/services/{name}", serviceHandler).Methods(http.MethodDelete, http.MethodGet, http.MethodPut)
  123. server := &http.Server{
  124. Addr: fmt.Sprintf("%s:%d", ip, port),
  125. // Good practice to set timeouts to avoid Slowloris attacks.
  126. WriteTimeout: time.Second * 60 * 5,
  127. ReadTimeout: time.Second * 60 * 5,
  128. IdleTimeout: time.Second * 60,
  129. Handler: handlers.CORS(handlers.AllowedHeaders([]string{"Accept", "Accept-Language", "Content-Type", "Content-Language", "Origin"}))(r),
  130. }
  131. server.SetKeepAlivesEnabled(false)
  132. return server
  133. }
  134. type information struct {
  135. Version string `json:"version"`
  136. Os string `json:"os"`
  137. UpTimeSeconds int64 `json:"upTimeSeconds"`
  138. }
  139. //The handler for root
  140. func rootHandler(w http.ResponseWriter, r *http.Request) {
  141. defer r.Body.Close()
  142. switch r.Method {
  143. case http.MethodGet, http.MethodPost:
  144. w.WriteHeader(http.StatusOK)
  145. info := new(information)
  146. info.Version = version
  147. info.UpTimeSeconds = time.Now().Unix() - startTimeStamp
  148. info.Os = runtime.GOOS
  149. byteInfo, _ := json.Marshal(info)
  150. w.Write(byteInfo)
  151. }
  152. }
  153. func pingHandler(w http.ResponseWriter, r *http.Request) {
  154. w.WriteHeader(http.StatusOK)
  155. }
  156. func sourcesManageHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
  157. defer r.Body.Close()
  158. switch r.Method {
  159. case http.MethodGet:
  160. content, err := streamProcessor.ShowStream(st)
  161. if err != nil {
  162. handleError(w, err, fmt.Sprintf("%s command error", strings.Title(ast.StreamTypeMap[st])), logger)
  163. return
  164. }
  165. jsonResponse(content, w, logger)
  166. case http.MethodPost:
  167. v, err := decodeStatementDescriptor(r.Body)
  168. if err != nil {
  169. handleError(w, err, "Invalid body", logger)
  170. return
  171. }
  172. content, err := streamProcessor.ExecStreamSql(v.Sql)
  173. if err != nil {
  174. handleError(w, err, fmt.Sprintf("%s command error", strings.Title(ast.StreamTypeMap[st])), logger)
  175. return
  176. }
  177. w.WriteHeader(http.StatusCreated)
  178. w.Write([]byte(content))
  179. }
  180. }
  181. func sourceManageHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
  182. defer r.Body.Close()
  183. vars := mux.Vars(r)
  184. name := vars["name"]
  185. switch r.Method {
  186. case http.MethodGet:
  187. content, err := streamProcessor.DescStream(name, st)
  188. if err != nil {
  189. handleError(w, err, fmt.Sprintf("describe %s error", ast.StreamTypeMap[st]), logger)
  190. return
  191. }
  192. jsonResponse(content, w, logger)
  193. case http.MethodDelete:
  194. content, err := streamProcessor.DropStream(name, st)
  195. if err != nil {
  196. handleError(w, err, fmt.Sprintf("delete %s error", ast.StreamTypeMap[st]), logger)
  197. return
  198. }
  199. w.WriteHeader(http.StatusOK)
  200. w.Write([]byte(content))
  201. case http.MethodPut:
  202. v, err := decodeStatementDescriptor(r.Body)
  203. if err != nil {
  204. handleError(w, err, "Invalid body", logger)
  205. return
  206. }
  207. content, err := streamProcessor.ExecReplaceStream(v.Sql, st)
  208. if err != nil {
  209. handleError(w, err, fmt.Sprintf("%s command error", strings.Title(ast.StreamTypeMap[st])), logger)
  210. return
  211. }
  212. w.WriteHeader(http.StatusOK)
  213. w.Write([]byte(content))
  214. }
  215. }
  216. //list or create streams
  217. func streamsHandler(w http.ResponseWriter, r *http.Request) {
  218. sourcesManageHandler(w, r, ast.TypeStream)
  219. }
  220. //describe or delete a stream
  221. func streamHandler(w http.ResponseWriter, r *http.Request) {
  222. sourceManageHandler(w, r, ast.TypeStream)
  223. }
  224. //list or create tables
  225. func tablesHandler(w http.ResponseWriter, r *http.Request) {
  226. sourcesManageHandler(w, r, ast.TypeTable)
  227. }
  228. func tableHandler(w http.ResponseWriter, r *http.Request) {
  229. sourceManageHandler(w, r, ast.TypeTable)
  230. }
  231. //list or create rules
  232. func rulesHandler(w http.ResponseWriter, r *http.Request) {
  233. defer r.Body.Close()
  234. switch r.Method {
  235. case http.MethodPost:
  236. body, err := ioutil.ReadAll(r.Body)
  237. if err != nil {
  238. handleError(w, err, "Invalid body", logger)
  239. return
  240. }
  241. r, err := ruleProcessor.ExecCreate("", string(body))
  242. var result string
  243. if err != nil {
  244. handleError(w, err, "Create rule error", logger)
  245. return
  246. } else {
  247. result = fmt.Sprintf("Rule %s was created successfully.", r.Id)
  248. }
  249. //Start the rule
  250. rs, err := createRuleState(r)
  251. if err != nil {
  252. result = err.Error()
  253. } else {
  254. err = doStartRule(rs)
  255. if err != nil {
  256. result = err.Error()
  257. }
  258. }
  259. w.WriteHeader(http.StatusCreated)
  260. w.Write([]byte(result))
  261. case http.MethodGet:
  262. content, err := getAllRulesWithStatus()
  263. if err != nil {
  264. handleError(w, err, "Show rules error", logger)
  265. return
  266. }
  267. jsonResponse(content, w, logger)
  268. }
  269. }
  270. //describe or delete a rule
  271. func ruleHandler(w http.ResponseWriter, r *http.Request) {
  272. defer r.Body.Close()
  273. vars := mux.Vars(r)
  274. name := vars["name"]
  275. switch r.Method {
  276. case http.MethodGet:
  277. rule, err := ruleProcessor.GetRuleByName(name)
  278. if err != nil {
  279. handleError(w, err, "describe rule error", logger)
  280. return
  281. }
  282. jsonResponse(rule, w, logger)
  283. case http.MethodDelete:
  284. deleteRule(name)
  285. content, err := ruleProcessor.ExecDrop(name)
  286. if err != nil {
  287. handleError(w, err, "delete rule error", logger)
  288. return
  289. }
  290. w.WriteHeader(http.StatusOK)
  291. w.Write([]byte(content))
  292. case http.MethodPut:
  293. _, err := ruleProcessor.GetRuleByName(name)
  294. if err != nil {
  295. handleError(w, err, "not found this rule", logger)
  296. return
  297. }
  298. body, err := ioutil.ReadAll(r.Body)
  299. if err != nil {
  300. handleError(w, err, "Invalid body", logger)
  301. return
  302. }
  303. r, err := ruleProcessor.ExecUpdate(name, string(body))
  304. var result string
  305. if err != nil {
  306. handleError(w, err, "Update rule error", logger)
  307. return
  308. } else {
  309. result = fmt.Sprintf("Rule %s was updated successfully.", r.Id)
  310. }
  311. err = restartRule(name)
  312. if err != nil {
  313. handleError(w, err, "restart rule error", logger)
  314. return
  315. }
  316. w.WriteHeader(http.StatusOK)
  317. w.Write([]byte(result))
  318. }
  319. }
  320. //get status of a rule
  321. func getStatusRuleHandler(w http.ResponseWriter, r *http.Request) {
  322. defer r.Body.Close()
  323. vars := mux.Vars(r)
  324. name := vars["name"]
  325. content, err := getRuleStatus(name)
  326. if err != nil {
  327. handleError(w, err, "get rule status error", logger)
  328. return
  329. }
  330. w.WriteHeader(http.StatusOK)
  331. w.Write([]byte(content))
  332. }
  333. //start a rule
  334. func startRuleHandler(w http.ResponseWriter, r *http.Request) {
  335. defer r.Body.Close()
  336. vars := mux.Vars(r)
  337. name := vars["name"]
  338. err := startRule(name)
  339. if err != nil {
  340. handleError(w, err, "start rule error", logger)
  341. return
  342. }
  343. w.WriteHeader(http.StatusOK)
  344. w.Write([]byte(fmt.Sprintf("Rule %s was started", name)))
  345. }
  346. //stop a rule
  347. func stopRuleHandler(w http.ResponseWriter, r *http.Request) {
  348. defer r.Body.Close()
  349. vars := mux.Vars(r)
  350. name := vars["name"]
  351. result := stopRule(name)
  352. w.WriteHeader(http.StatusOK)
  353. w.Write([]byte(result))
  354. }
  355. //restart a rule
  356. func restartRuleHandler(w http.ResponseWriter, r *http.Request) {
  357. defer r.Body.Close()
  358. vars := mux.Vars(r)
  359. name := vars["name"]
  360. err := restartRule(name)
  361. if err != nil {
  362. handleError(w, err, "restart rule error", logger)
  363. return
  364. }
  365. w.WriteHeader(http.StatusOK)
  366. w.Write([]byte(fmt.Sprintf("Rule %s was restarted", name)))
  367. }
  368. //get topo of a rule
  369. func getTopoRuleHandler(w http.ResponseWriter, r *http.Request) {
  370. defer r.Body.Close()
  371. vars := mux.Vars(r)
  372. name := vars["name"]
  373. content, err := getRuleTopo(name)
  374. if err != nil {
  375. handleError(w, err, "get rule topo error", logger)
  376. return
  377. }
  378. w.Header().Set(ContentType, ContentTypeJSON)
  379. w.Write([]byte(content))
  380. }
  381. func pluginsHandler(w http.ResponseWriter, r *http.Request, t plugin.PluginType) {
  382. defer r.Body.Close()
  383. switch r.Method {
  384. case http.MethodGet:
  385. content, err := pluginManager.List(t)
  386. if err != nil {
  387. handleError(w, err, fmt.Sprintf("%s plugins list command error", plugin.PluginTypes[t]), logger)
  388. return
  389. }
  390. jsonResponse(content, w, logger)
  391. case http.MethodPost:
  392. sd := plugin.NewPluginByType(t)
  393. err := json.NewDecoder(r.Body).Decode(sd)
  394. // Problems decoding
  395. if err != nil {
  396. handleError(w, err, fmt.Sprintf("Invalid body: Error decoding the %s plugin json", plugin.PluginTypes[t]), logger)
  397. return
  398. }
  399. err = pluginManager.Register(t, sd)
  400. if err != nil {
  401. handleError(w, err, fmt.Sprintf("%s plugins create command error", plugin.PluginTypes[t]), logger)
  402. return
  403. }
  404. w.WriteHeader(http.StatusCreated)
  405. w.Write([]byte(fmt.Sprintf("%s plugin %s is created", plugin.PluginTypes[t], sd.GetName())))
  406. }
  407. }
  408. func pluginHandler(w http.ResponseWriter, r *http.Request, t plugin.PluginType) {
  409. defer r.Body.Close()
  410. vars := mux.Vars(r)
  411. name := vars["name"]
  412. cb := r.URL.Query().Get("stop")
  413. switch r.Method {
  414. case http.MethodDelete:
  415. r := cb == "1"
  416. err := pluginManager.Delete(t, name, r)
  417. if err != nil {
  418. handleError(w, err, fmt.Sprintf("delete %s plugin %s error", plugin.PluginTypes[t], name), logger)
  419. return
  420. }
  421. w.WriteHeader(http.StatusOK)
  422. result := fmt.Sprintf("%s plugin %s is deleted", plugin.PluginTypes[t], name)
  423. if r {
  424. result = fmt.Sprintf("%s and Kuiper will be stopped", result)
  425. } else {
  426. result = fmt.Sprintf("%s and Kuiper must restart for the change to take effect.", result)
  427. }
  428. w.Write([]byte(result))
  429. case http.MethodGet:
  430. j, ok := pluginManager.Get(t, name)
  431. if !ok {
  432. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe %s plugin %s error", plugin.PluginTypes[t], name), logger)
  433. return
  434. }
  435. jsonResponse(j, w, logger)
  436. }
  437. }
  438. //list or create source plugin
  439. func sourcesHandler(w http.ResponseWriter, r *http.Request) {
  440. pluginsHandler(w, r, plugin.SOURCE)
  441. }
  442. //delete a source plugin
  443. func sourceHandler(w http.ResponseWriter, r *http.Request) {
  444. pluginHandler(w, r, plugin.SOURCE)
  445. }
  446. //list or create sink plugin
  447. func sinksHandler(w http.ResponseWriter, r *http.Request) {
  448. pluginsHandler(w, r, plugin.SINK)
  449. }
  450. //delete a sink plugin
  451. func sinkHandler(w http.ResponseWriter, r *http.Request) {
  452. pluginHandler(w, r, plugin.SINK)
  453. }
  454. //list or create function plugin
  455. func functionsHandler(w http.ResponseWriter, r *http.Request) {
  456. pluginsHandler(w, r, plugin.FUNCTION)
  457. }
  458. //list all user defined functions in all function plugins
  459. func functionsListHandler(w http.ResponseWriter, r *http.Request) {
  460. content, err := pluginManager.ListSymbols()
  461. if err != nil {
  462. handleError(w, err, "udfs list command error", logger)
  463. return
  464. }
  465. jsonResponse(content, w, logger)
  466. }
  467. func functionsGetHandler(w http.ResponseWriter, r *http.Request) {
  468. vars := mux.Vars(r)
  469. name := vars["name"]
  470. j, ok := pluginManager.GetSymbol(name)
  471. if !ok {
  472. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe function %s error", name), logger)
  473. return
  474. }
  475. jsonResponse(map[string]string{"name": name, "plugin": j}, w, logger)
  476. }
  477. //delete a function plugin
  478. func functionHandler(w http.ResponseWriter, r *http.Request) {
  479. pluginHandler(w, r, plugin.FUNCTION)
  480. }
  481. type functionList struct {
  482. Functions []string `json:"functions,omitempty"`
  483. }
  484. // register function list for function plugin. If a plugin exports multiple functions, the function list must be registered
  485. // either by create or register. If the function plugin has been loaded because of auto load through so file, the function
  486. // list MUST be registered by this API or only the function with the same name as the plugin can be used.
  487. func functionRegisterHandler(w http.ResponseWriter, r *http.Request) {
  488. defer r.Body.Close()
  489. vars := mux.Vars(r)
  490. name := vars["name"]
  491. _, ok := pluginManager.Get(plugin.FUNCTION, name)
  492. if !ok {
  493. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("register %s plugin %s error", plugin.PluginTypes[plugin.FUNCTION], name), logger)
  494. return
  495. }
  496. sd := functionList{}
  497. err := json.NewDecoder(r.Body).Decode(&sd)
  498. // Problems decoding
  499. if err != nil {
  500. handleError(w, err, fmt.Sprintf("Invalid body: Error decoding the function list json %s", r.Body), logger)
  501. return
  502. }
  503. err = pluginManager.RegisterFuncs(name, sd.Functions)
  504. if err != nil {
  505. handleError(w, err, fmt.Sprintf("function plugins %s regiser functions error", name), logger)
  506. return
  507. }
  508. w.WriteHeader(http.StatusOK)
  509. w.Write([]byte(fmt.Sprintf("function plugin %s function list is registered", name)))
  510. }
  511. func prebuildSourcePlugins(w http.ResponseWriter, r *http.Request) {
  512. prebuildPluginsHandler(w, r, plugin.SOURCE)
  513. }
  514. func prebuildSinkPlugins(w http.ResponseWriter, r *http.Request) {
  515. prebuildPluginsHandler(w, r, plugin.SINK)
  516. }
  517. func prebuildFuncsPlugins(w http.ResponseWriter, r *http.Request) {
  518. prebuildPluginsHandler(w, r, plugin.FUNCTION)
  519. }
  520. func isOffcialDockerImage() bool {
  521. if strings.ToLower(os.Getenv("MAINTAINER")) != "emqx.io" {
  522. return false
  523. }
  524. return true
  525. }
  526. func prebuildPluginsHandler(w http.ResponseWriter, r *http.Request, t plugin.PluginType) {
  527. emsg := "It's strongly recommended to install plugins at official released Debian Docker images. If you choose to proceed to install plugin, please make sure the plugin is already validated in your own build."
  528. if !isOffcialDockerImage() {
  529. handleError(w, fmt.Errorf(emsg), "", logger)
  530. return
  531. } else if runtime.GOOS == "linux" {
  532. osrelease, err := Read()
  533. if err != nil {
  534. logger.Infof("")
  535. return
  536. }
  537. prettyName := strings.ToUpper(osrelease["PRETTY_NAME"])
  538. os := "debian"
  539. if strings.Contains(prettyName, "DEBIAN") {
  540. hosts := conf.Config.Basic.PluginHosts
  541. ptype := "sources"
  542. if t == plugin.SINK {
  543. ptype = "sinks"
  544. } else if t == plugin.FUNCTION {
  545. ptype = "functions"
  546. }
  547. if err, plugins := fetchPluginList(hosts, ptype, os, runtime.GOARCH); err != nil {
  548. handleError(w, err, "", logger)
  549. } else {
  550. jsonResponse(plugins, w, logger)
  551. }
  552. } else {
  553. handleError(w, fmt.Errorf(emsg), "", logger)
  554. return
  555. }
  556. } else {
  557. handleError(w, fmt.Errorf(emsg), "", logger)
  558. }
  559. }
  560. func fetchPluginList(hosts, ptype, os, arch string) (err error, result map[string]string) {
  561. if hosts == "" || ptype == "" || os == "" {
  562. logger.Errorf("Invalid parameter value: hosts %s, ptype %s or os: %s should not be empty.", hosts, ptype, os)
  563. return fmt.Errorf("Invalid configruation for plugin host in kuiper.yaml."), nil
  564. }
  565. result = make(map[string]string)
  566. hostsArr := strings.Split(hosts, ",")
  567. for _, host := range hostsArr {
  568. host := strings.Trim(host, " ")
  569. tmp := []string{host, "kuiper-plugins", version, os, ptype}
  570. //The url is similar to http://host:port/kuiper-plugins/0.9.1/debian/sinks/
  571. url := strings.Join(tmp, "/")
  572. timeout := time.Duration(30 * time.Second)
  573. client := &http.Client{
  574. Timeout: timeout,
  575. Transport: &http.Transport{
  576. TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
  577. },
  578. }
  579. resp, err := client.Get(url)
  580. logger.Infof("Trying to fetch plugins from url: %s\n", url)
  581. if err != nil {
  582. return err, nil
  583. }
  584. defer resp.Body.Close()
  585. if resp.StatusCode != http.StatusOK {
  586. return fmt.Errorf("Cannot fetch plugin list from %s, with status error: %v", url, resp.StatusCode), nil
  587. }
  588. data, err := ioutil.ReadAll(resp.Body)
  589. if err != nil {
  590. return err, nil
  591. }
  592. plugins := extractFromHtml(string(data), arch)
  593. for _, p := range plugins {
  594. //If already existed, using the existed.
  595. if _, ok := result[p]; !ok {
  596. result[p] = url + "/" + p + "_" + arch + ".zip"
  597. }
  598. logger.Debugf("Plugin %s, download address is %s\n", p, result[p])
  599. }
  600. }
  601. return
  602. }
  603. func extractFromHtml(content, arch string) []string {
  604. plugins := []string{}
  605. htmlTokens := html.NewTokenizer(strings.NewReader(content))
  606. loop:
  607. for {
  608. tt := htmlTokens.Next()
  609. switch tt {
  610. case html.ErrorToken:
  611. break loop
  612. case html.StartTagToken:
  613. t := htmlTokens.Token()
  614. isAnchor := t.Data == "a"
  615. if isAnchor {
  616. found := false
  617. for _, prop := range t.Attr {
  618. if strings.ToUpper(prop.Key) == "HREF" {
  619. if strings.HasSuffix(prop.Val, "_"+arch+".zip") {
  620. if index := strings.LastIndex(prop.Val, "_"); index != -1 {
  621. plugins = append(plugins, prop.Val[0:index])
  622. }
  623. }
  624. found = true
  625. }
  626. }
  627. if !found {
  628. logger.Infof("Invalid plugin download link %s", t)
  629. }
  630. }
  631. }
  632. }
  633. return plugins
  634. }
  635. //list sink plugin
  636. func sinksMetaHandler(w http.ResponseWriter, r *http.Request) {
  637. defer r.Body.Close()
  638. sinks := plugin.GetSinks()
  639. jsonResponse(sinks, w, logger)
  640. return
  641. }
  642. //Get sink metadata when creating rules
  643. func newSinkMetaHandler(w http.ResponseWriter, r *http.Request) {
  644. defer r.Body.Close()
  645. vars := mux.Vars(r)
  646. pluginName := vars["name"]
  647. language := getLanguage(r)
  648. ptrMetadata, err := plugin.GetSinkMeta(pluginName, language)
  649. if err != nil {
  650. handleError(w, err, "", logger)
  651. return
  652. }
  653. jsonResponse(ptrMetadata, w, logger)
  654. }
  655. //list functions
  656. func functionsMetaHandler(w http.ResponseWriter, r *http.Request) {
  657. defer r.Body.Close()
  658. sinks := plugin.GetFunctions()
  659. jsonResponse(sinks, w, logger)
  660. return
  661. }
  662. //list source plugin
  663. func sourcesMetaHandler(w http.ResponseWriter, r *http.Request) {
  664. defer r.Body.Close()
  665. ret := plugin.GetSources()
  666. if nil != ret {
  667. jsonResponse(ret, w, logger)
  668. return
  669. }
  670. }
  671. //Get source metadata when creating stream
  672. func sourceMetaHandler(w http.ResponseWriter, r *http.Request) {
  673. defer r.Body.Close()
  674. vars := mux.Vars(r)
  675. pluginName := vars["name"]
  676. language := getLanguage(r)
  677. ret, err := plugin.GetSourceMeta(pluginName, language)
  678. if err != nil {
  679. handleError(w, err, "", logger)
  680. return
  681. }
  682. if nil != ret {
  683. jsonResponse(ret, w, logger)
  684. return
  685. }
  686. }
  687. //Get source yaml
  688. func sourceConfHandler(w http.ResponseWriter, r *http.Request) {
  689. defer r.Body.Close()
  690. vars := mux.Vars(r)
  691. pluginName := vars["name"]
  692. language := getLanguage(r)
  693. ret, err := plugin.GetSourceConf(pluginName, language)
  694. if err != nil {
  695. handleError(w, err, "", logger)
  696. return
  697. } else {
  698. w.Write(ret)
  699. }
  700. }
  701. //Get confKeys of the source metadata
  702. func sourceConfKeysHandler(w http.ResponseWriter, r *http.Request) {
  703. defer r.Body.Close()
  704. vars := mux.Vars(r)
  705. pluginName := vars["name"]
  706. ret := plugin.GetSourceConfKeys(pluginName)
  707. if nil != ret {
  708. jsonResponse(ret, w, logger)
  709. return
  710. }
  711. }
  712. //Add del confkey
  713. func sourceConfKeyHandler(w http.ResponseWriter, r *http.Request) {
  714. defer r.Body.Close()
  715. var ret interface{}
  716. var err error
  717. vars := mux.Vars(r)
  718. pluginName := vars["name"]
  719. confKey := vars["confKey"]
  720. language := getLanguage(r)
  721. switch r.Method {
  722. case http.MethodDelete:
  723. err = plugin.DelSourceConfKey(pluginName, confKey, language)
  724. case http.MethodPost:
  725. v, err := ioutil.ReadAll(r.Body)
  726. if err != nil {
  727. handleError(w, err, "Invalid body", logger)
  728. return
  729. }
  730. err = plugin.AddSourceConfKey(pluginName, confKey, language, v)
  731. }
  732. if err != nil {
  733. handleError(w, err, "", logger)
  734. return
  735. }
  736. if nil != ret {
  737. jsonResponse(ret, w, logger)
  738. return
  739. }
  740. }
  741. //Del and Update field of confkey
  742. func sourceConfKeyFieldsHandler(w http.ResponseWriter, r *http.Request) {
  743. defer r.Body.Close()
  744. var ret interface{}
  745. var err error
  746. vars := mux.Vars(r)
  747. pluginName := vars["name"]
  748. confKey := vars["confKey"]
  749. v, err := ioutil.ReadAll(r.Body)
  750. if err != nil {
  751. handleError(w, err, "Invalid body", logger)
  752. return
  753. }
  754. language := getLanguage(r)
  755. switch r.Method {
  756. case http.MethodDelete:
  757. err = plugin.DelSourceConfKeyField(pluginName, confKey, language, v)
  758. case http.MethodPost:
  759. err = plugin.AddSourceConfKeyField(pluginName, confKey, language, v)
  760. }
  761. if err != nil {
  762. handleError(w, err, "", logger)
  763. return
  764. }
  765. if nil != ret {
  766. jsonResponse(ret, w, logger)
  767. return
  768. }
  769. }
  770. func getLanguage(r *http.Request) string {
  771. language := r.Header.Get("Content-Language")
  772. if 0 == len(language) {
  773. language = "en_US"
  774. }
  775. return language
  776. }
  777. func servicesHandler(w http.ResponseWriter, r *http.Request) {
  778. defer r.Body.Close()
  779. switch r.Method {
  780. case http.MethodGet:
  781. content, err := serviceManager.List()
  782. if err != nil {
  783. handleError(w, err, "service list command error", logger)
  784. return
  785. }
  786. jsonResponse(content, w, logger)
  787. case http.MethodPost:
  788. sd := &service.ServiceCreationRequest{}
  789. err := json.NewDecoder(r.Body).Decode(sd)
  790. // Problems decoding
  791. if err != nil {
  792. handleError(w, err, "Invalid body: Error decoding the %s service request payload", logger)
  793. return
  794. }
  795. err = serviceManager.Create(sd)
  796. if err != nil {
  797. handleError(w, err, "service create command error", logger)
  798. return
  799. }
  800. w.WriteHeader(http.StatusCreated)
  801. w.Write([]byte(fmt.Sprintf("service %s is created", sd.Name)))
  802. }
  803. }
  804. func serviceHandler(w http.ResponseWriter, r *http.Request) {
  805. defer r.Body.Close()
  806. vars := mux.Vars(r)
  807. name := vars["name"]
  808. switch r.Method {
  809. case http.MethodDelete:
  810. err := serviceManager.Delete(name)
  811. if err != nil {
  812. handleError(w, err, fmt.Sprintf("delete service %s error", name), logger)
  813. return
  814. }
  815. w.WriteHeader(http.StatusOK)
  816. result := fmt.Sprintf("service %s is deleted", name)
  817. w.Write([]byte(result))
  818. case http.MethodGet:
  819. j, err := serviceManager.Get(name)
  820. if err != nil {
  821. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe service %s error", name), logger)
  822. return
  823. }
  824. jsonResponse(j, w, logger)
  825. case http.MethodPut:
  826. sd := &service.ServiceCreationRequest{}
  827. err := json.NewDecoder(r.Body).Decode(sd)
  828. // Problems decoding
  829. if err != nil {
  830. handleError(w, err, "Invalid body: Error decoding the %s service request payload", logger)
  831. return
  832. }
  833. sd.Name = name
  834. err = serviceManager.Update(sd)
  835. if err != nil {
  836. handleError(w, err, "service update command error", logger)
  837. return
  838. }
  839. w.WriteHeader(http.StatusOK)
  840. w.Write([]byte(fmt.Sprintf("service %s is updated", sd.Name)))
  841. }
  842. }
  843. func serviceFunctionsHandler(w http.ResponseWriter, r *http.Request) {
  844. content, err := serviceManager.ListFunctions()
  845. if err != nil {
  846. handleError(w, err, "service list command error", logger)
  847. return
  848. }
  849. jsonResponse(content, w, logger)
  850. }
  851. func serviceFunctionHandler(w http.ResponseWriter, r *http.Request) {
  852. vars := mux.Vars(r)
  853. name := vars["name"]
  854. j, err := serviceManager.GetFunction(name)
  855. if err != nil {
  856. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe function %s error", name), logger)
  857. return
  858. }
  859. jsonResponse(j, w, logger)
  860. }