rest.go 27 KB


  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package server
  15. import (
  16. "crypto/tls"
  17. "encoding/json"
  18. "fmt"
  19. "github.com/gorilla/handlers"
  20. "github.com/gorilla/mux"
  21. "github.com/lf-edge/ekuiper/internal/conf"
  22. "github.com/lf-edge/ekuiper/internal/meta"
  23. "github.com/lf-edge/ekuiper/internal/plugin/native"
  24. "github.com/lf-edge/ekuiper/internal/service"
  25. "github.com/lf-edge/ekuiper/pkg/api"
  26. "github.com/lf-edge/ekuiper/pkg/ast"
  27. "github.com/lf-edge/ekuiper/pkg/errorx"
  28. "golang.org/x/net/html"
  29. "io"
  30. "io/ioutil"
  31. "net/http"
  32. "os"
  33. "runtime"
  34. "strings"
  35. "time"
  36. )
  37. const (
  38. ContentType = "Content-Type"
  39. ContentTypeJSON = "application/json"
  40. )
  41. type statementDescriptor struct {
  42. Sql string `json:"sql,omitempty"`
  43. }
  44. func decodeStatementDescriptor(reader io.ReadCloser) (statementDescriptor, error) {
  45. sd := statementDescriptor{}
  46. err := json.NewDecoder(reader).Decode(&sd)
  47. // Problems decoding
  48. if err != nil {
  49. return sd, fmt.Errorf("Error decoding the statement descriptor: %v", err)
  50. }
  51. return sd, nil
  52. }
  53. // Handle applies the specified error and error concept tot he HTTP response writer
  54. func handleError(w http.ResponseWriter, err error, prefix string, logger api.Logger) {
  55. message := prefix
  56. if message != "" {
  57. message += ": "
  58. }
  59. message += err.Error()
  60. logger.Error(message)
  61. var ec int
  62. switch e := err.(type) {
  63. case *errorx.Error:
  64. switch e.Code() {
  65. case errorx.NOT_FOUND:
  66. ec = http.StatusNotFound
  67. default:
  68. ec = http.StatusBadRequest
  69. }
  70. default:
  71. ec = http.StatusBadRequest
  72. }
  73. http.Error(w, message, ec)
  74. }
  75. func jsonResponse(i interface{}, w http.ResponseWriter, logger api.Logger) {
  76. w.Header().Add(ContentType, ContentTypeJSON)
  77. enc := json.NewEncoder(w)
  78. err := enc.Encode(i)
  79. // Problems encoding
  80. if err != nil {
  81. handleError(w, err, "", logger)
  82. }
  83. }
  84. func createRestServer(ip string, port int) *http.Server {
  85. r := mux.NewRouter()
  86. r.HandleFunc("/", rootHandler).Methods(http.MethodGet, http.MethodPost)
  87. r.HandleFunc("/ping", pingHandler).Methods(http.MethodGet)
  88. r.HandleFunc("/streams", streamsHandler).Methods(http.MethodGet, http.MethodPost)
  89. r.HandleFunc("/streams/{name}", streamHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
  90. r.HandleFunc("/tables", tablesHandler).Methods(http.MethodGet, http.MethodPost)
  91. r.HandleFunc("/tables/{name}", tableHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
  92. r.HandleFunc("/rules", rulesHandler).Methods(http.MethodGet, http.MethodPost)
  93. r.HandleFunc("/rules/{name}", ruleHandler).Methods(http.MethodDelete, http.MethodGet, http.MethodPut)
  94. r.HandleFunc("/rules/{name}/status", getStatusRuleHandler).Methods(http.MethodGet)
  95. r.HandleFunc("/rules/{name}/start", startRuleHandler).Methods(http.MethodPost)
  96. r.HandleFunc("/rules/{name}/stop", stopRuleHandler).Methods(http.MethodPost)
  97. r.HandleFunc("/rules/{name}/restart", restartRuleHandler).Methods(http.MethodPost)
  98. r.HandleFunc("/rules/{name}/topo", getTopoRuleHandler).Methods(http.MethodGet)
  99. r.HandleFunc("/plugins/sources", sourcesHandler).Methods(http.MethodGet, http.MethodPost)
  100. r.HandleFunc("/plugins/sources/prebuild", prebuildSourcePlugins).Methods(http.MethodGet)
  101. r.HandleFunc("/plugins/sources/{name}", sourceHandler).Methods(http.MethodDelete, http.MethodGet)
  102. r.HandleFunc("/plugins/sinks", sinksHandler).Methods(http.MethodGet, http.MethodPost)
  103. r.HandleFunc("/plugins/sinks/prebuild", prebuildSinkPlugins).Methods(http.MethodGet)
  104. r.HandleFunc("/plugins/sinks/{name}", sinkHandler).Methods(http.MethodDelete, http.MethodGet)
  105. r.HandleFunc("/plugins/functions", functionsHandler).Methods(http.MethodGet, http.MethodPost)
  106. r.HandleFunc("/plugins/functions/prebuild", prebuildFuncsPlugins).Methods(http.MethodGet)
  107. r.HandleFunc("/plugins/functions/{name}", functionHandler).Methods(http.MethodDelete, http.MethodGet)
  108. r.HandleFunc("/plugins/functions/{name}/register", functionRegisterHandler).Methods(http.MethodPost)
  109. r.HandleFunc("/plugins/udfs", functionsListHandler).Methods(http.MethodGet)
  110. r.HandleFunc("/plugins/udfs/{name}", functionsGetHandler).Methods(http.MethodGet)
  111. r.HandleFunc("/metadata/functions", functionsMetaHandler).Methods(http.MethodGet)
  112. r.HandleFunc("/metadata/sinks", sinksMetaHandler).Methods(http.MethodGet)
  113. r.HandleFunc("/metadata/sinks/{name}", newSinkMetaHandler).Methods(http.MethodGet)
  114. r.HandleFunc("/metadata/sources", sourcesMetaHandler).Methods(http.MethodGet)
  115. r.HandleFunc("/metadata/sources/yaml/{name}", sourceConfHandler).Methods(http.MethodGet)
  116. r.HandleFunc("/metadata/sources/{name}", sourceMetaHandler).Methods(http.MethodGet)
  117. r.HandleFunc("/metadata/sources/{name}/confKeys", sourceConfKeysHandler).Methods(http.MethodGet)
  118. r.HandleFunc("/metadata/sources/{name}/confKeys/{confKey}", sourceConfKeyHandler).Methods(http.MethodDelete, http.MethodPost)
  119. r.HandleFunc("/metadata/sources/{name}/confKeys/{confKey}/field", sourceConfKeyFieldsHandler).Methods(http.MethodDelete, http.MethodPost)
  120. r.HandleFunc("/services", servicesHandler).Methods(http.MethodGet, http.MethodPost)
  121. r.HandleFunc("/services/functions", serviceFunctionsHandler).Methods(http.MethodGet)
  122. r.HandleFunc("/services/functions/{name}", serviceFunctionHandler).Methods(http.MethodGet)
  123. r.HandleFunc("/services/{name}", serviceHandler).Methods(http.MethodDelete, http.MethodGet, http.MethodPut)
  124. server := &http.Server{
  125. Addr: fmt.Sprintf("%s:%d", ip, port),
  126. // Good practice to set timeouts to avoid Slowloris attacks.
  127. WriteTimeout: time.Second * 60 * 5,
  128. ReadTimeout: time.Second * 60 * 5,
  129. IdleTimeout: time.Second * 60,
  130. Handler: handlers.CORS(handlers.AllowedHeaders([]string{"Accept", "Accept-Language", "Content-Type", "Content-Language", "Origin"}))(r),
  131. }
  132. server.SetKeepAlivesEnabled(false)
  133. return server
  134. }
  135. type information struct {
  136. Version string `json:"version"`
  137. Os string `json:"os"`
  138. UpTimeSeconds int64 `json:"upTimeSeconds"`
  139. }
  140. //The handler for root
  141. func rootHandler(w http.ResponseWriter, r *http.Request) {
  142. defer r.Body.Close()
  143. switch r.Method {
  144. case http.MethodGet, http.MethodPost:
  145. w.WriteHeader(http.StatusOK)
  146. info := new(information)
  147. info.Version = version
  148. info.UpTimeSeconds = time.Now().Unix() - startTimeStamp
  149. info.Os = runtime.GOOS
  150. byteInfo, _ := json.Marshal(info)
  151. w.Write(byteInfo)
  152. }
  153. }
  154. func pingHandler(w http.ResponseWriter, r *http.Request) {
  155. w.WriteHeader(http.StatusOK)
  156. }
  157. func sourcesManageHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
  158. defer r.Body.Close()
  159. switch r.Method {
  160. case http.MethodGet:
  161. content, err := streamProcessor.ShowStream(st)
  162. if err != nil {
  163. handleError(w, err, fmt.Sprintf("%s command error", strings.Title(ast.StreamTypeMap[st])), logger)
  164. return
  165. }
  166. jsonResponse(content, w, logger)
  167. case http.MethodPost:
  168. v, err := decodeStatementDescriptor(r.Body)
  169. if err != nil {
  170. handleError(w, err, "Invalid body", logger)
  171. return
  172. }
  173. content, err := streamProcessor.ExecStreamSql(v.Sql)
  174. if err != nil {
  175. handleError(w, err, fmt.Sprintf("%s command error", strings.Title(ast.StreamTypeMap[st])), logger)
  176. return
  177. }
  178. w.WriteHeader(http.StatusCreated)
  179. w.Write([]byte(content))
  180. }
  181. }
  182. func sourceManageHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
  183. defer r.Body.Close()
  184. vars := mux.Vars(r)
  185. name := vars["name"]
  186. switch r.Method {
  187. case http.MethodGet:
  188. content, err := streamProcessor.DescStream(name, st)
  189. if err != nil {
  190. handleError(w, err, fmt.Sprintf("describe %s error", ast.StreamTypeMap[st]), logger)
  191. return
  192. }
  193. jsonResponse(content, w, logger)
  194. case http.MethodDelete:
  195. content, err := streamProcessor.DropStream(name, st)
  196. if err != nil {
  197. handleError(w, err, fmt.Sprintf("delete %s error", ast.StreamTypeMap[st]), logger)
  198. return
  199. }
  200. w.WriteHeader(http.StatusOK)
  201. w.Write([]byte(content))
  202. case http.MethodPut:
  203. v, err := decodeStatementDescriptor(r.Body)
  204. if err != nil {
  205. handleError(w, err, "Invalid body", logger)
  206. return
  207. }
  208. content, err := streamProcessor.ExecReplaceStream(v.Sql, st)
  209. if err != nil {
  210. handleError(w, err, fmt.Sprintf("%s command error", strings.Title(ast.StreamTypeMap[st])), logger)
  211. return
  212. }
  213. w.WriteHeader(http.StatusOK)
  214. w.Write([]byte(content))
  215. }
  216. }
  217. //list or create streams
  218. func streamsHandler(w http.ResponseWriter, r *http.Request) {
  219. sourcesManageHandler(w, r, ast.TypeStream)
  220. }
  221. //describe or delete a stream
  222. func streamHandler(w http.ResponseWriter, r *http.Request) {
  223. sourceManageHandler(w, r, ast.TypeStream)
  224. }
  225. //list or create tables
  226. func tablesHandler(w http.ResponseWriter, r *http.Request) {
  227. sourcesManageHandler(w, r, ast.TypeTable)
  228. }
  229. func tableHandler(w http.ResponseWriter, r *http.Request) {
  230. sourceManageHandler(w, r, ast.TypeTable)
  231. }
  232. //list or create rules
  233. func rulesHandler(w http.ResponseWriter, r *http.Request) {
  234. defer r.Body.Close()
  235. switch r.Method {
  236. case http.MethodPost:
  237. body, err := ioutil.ReadAll(r.Body)
  238. if err != nil {
  239. handleError(w, err, "Invalid body", logger)
  240. return
  241. }
  242. r, err := ruleProcessor.ExecCreate("", string(body))
  243. var result string
  244. if err != nil {
  245. handleError(w, err, "Create rule error", logger)
  246. return
  247. } else {
  248. result = fmt.Sprintf("Rule %s was created successfully.", r.Id)
  249. }
  250. //Start the rule
  251. rs, err := createRuleState(r)
  252. if err != nil {
  253. result = err.Error()
  254. } else {
  255. err = doStartRule(rs)
  256. if err != nil {
  257. result = err.Error()
  258. }
  259. }
  260. w.WriteHeader(http.StatusCreated)
  261. w.Write([]byte(result))
  262. case http.MethodGet:
  263. content, err := getAllRulesWithStatus()
  264. if err != nil {
  265. handleError(w, err, "Show rules error", logger)
  266. return
  267. }
  268. jsonResponse(content, w, logger)
  269. }
  270. }
  271. //describe or delete a rule
  272. func ruleHandler(w http.ResponseWriter, r *http.Request) {
  273. defer r.Body.Close()
  274. vars := mux.Vars(r)
  275. name := vars["name"]
  276. switch r.Method {
  277. case http.MethodGet:
  278. rule, err := ruleProcessor.GetRuleByName(name)
  279. if err != nil {
  280. handleError(w, err, "describe rule error", logger)
  281. return
  282. }
  283. jsonResponse(rule, w, logger)
  284. case http.MethodDelete:
  285. deleteRule(name)
  286. content, err := ruleProcessor.ExecDrop(name)
  287. if err != nil {
  288. handleError(w, err, "delete rule error", logger)
  289. return
  290. }
  291. w.WriteHeader(http.StatusOK)
  292. w.Write([]byte(content))
  293. case http.MethodPut:
  294. _, err := ruleProcessor.GetRuleByName(name)
  295. if err != nil {
  296. handleError(w, err, "not found this rule", logger)
  297. return
  298. }
  299. body, err := ioutil.ReadAll(r.Body)
  300. if err != nil {
  301. handleError(w, err, "Invalid body", logger)
  302. return
  303. }
  304. r, err := ruleProcessor.ExecUpdate(name, string(body))
  305. var result string
  306. if err != nil {
  307. handleError(w, err, "Update rule error", logger)
  308. return
  309. } else {
  310. result = fmt.Sprintf("Rule %s was updated successfully.", r.Id)
  311. }
  312. err = restartRule(name)
  313. if err != nil {
  314. handleError(w, err, "restart rule error", logger)
  315. return
  316. }
  317. w.WriteHeader(http.StatusOK)
  318. w.Write([]byte(result))
  319. }
  320. }
  321. //get status of a rule
  322. func getStatusRuleHandler(w http.ResponseWriter, r *http.Request) {
  323. defer r.Body.Close()
  324. vars := mux.Vars(r)
  325. name := vars["name"]
  326. content, err := getRuleStatus(name)
  327. if err != nil {
  328. handleError(w, err, "get rule status error", logger)
  329. return
  330. }
  331. w.WriteHeader(http.StatusOK)
  332. w.Write([]byte(content))
  333. }
  334. //start a rule
  335. func startRuleHandler(w http.ResponseWriter, r *http.Request) {
  336. defer r.Body.Close()
  337. vars := mux.Vars(r)
  338. name := vars["name"]
  339. err := startRule(name)
  340. if err != nil {
  341. handleError(w, err, "start rule error", logger)
  342. return
  343. }
  344. w.WriteHeader(http.StatusOK)
  345. w.Write([]byte(fmt.Sprintf("Rule %s was started", name)))
  346. }
  347. //stop a rule
  348. func stopRuleHandler(w http.ResponseWriter, r *http.Request) {
  349. defer r.Body.Close()
  350. vars := mux.Vars(r)
  351. name := vars["name"]
  352. result := stopRule(name)
  353. w.WriteHeader(http.StatusOK)
  354. w.Write([]byte(result))
  355. }
  356. //restart a rule
  357. func restartRuleHandler(w http.ResponseWriter, r *http.Request) {
  358. defer r.Body.Close()
  359. vars := mux.Vars(r)
  360. name := vars["name"]
  361. err := restartRule(name)
  362. if err != nil {
  363. handleError(w, err, "restart rule error", logger)
  364. return
  365. }
  366. w.WriteHeader(http.StatusOK)
  367. w.Write([]byte(fmt.Sprintf("Rule %s was restarted", name)))
  368. }
  369. //get topo of a rule
  370. func getTopoRuleHandler(w http.ResponseWriter, r *http.Request) {
  371. defer r.Body.Close()
  372. vars := mux.Vars(r)
  373. name := vars["name"]
  374. content, err := getRuleTopo(name)
  375. if err != nil {
  376. handleError(w, err, "get rule topo error", logger)
  377. return
  378. }
  379. w.Header().Set(ContentType, ContentTypeJSON)
  380. w.Write([]byte(content))
  381. }
  382. func pluginsHandler(w http.ResponseWriter, r *http.Request, t native.PluginType) {
  383. pluginManager := native.GetManager()
  384. defer r.Body.Close()
  385. switch r.Method {
  386. case http.MethodGet:
  387. content := pluginManager.List(t)
  388. jsonResponse(content, w, logger)
  389. case http.MethodPost:
  390. sd := native.NewPluginByType(t)
  391. err := json.NewDecoder(r.Body).Decode(sd)
  392. // Problems decoding
  393. if err != nil {
  394. handleError(w, err, fmt.Sprintf("Invalid body: Error decoding the %s plugin json", native.PluginTypes[t]), logger)
  395. return
  396. }
  397. err = pluginManager.Register(t, sd)
  398. if err != nil {
  399. handleError(w, err, fmt.Sprintf("%s plugins create command error", native.PluginTypes[t]), logger)
  400. return
  401. }
  402. w.WriteHeader(http.StatusCreated)
  403. w.Write([]byte(fmt.Sprintf("%s plugin %s is created", native.PluginTypes[t], sd.GetName())))
  404. }
  405. }
  406. func pluginHandler(w http.ResponseWriter, r *http.Request, t native.PluginType) {
  407. defer r.Body.Close()
  408. vars := mux.Vars(r)
  409. name := vars["name"]
  410. cb := r.URL.Query().Get("stop")
  411. pluginManager := native.GetManager()
  412. switch r.Method {
  413. case http.MethodDelete:
  414. r := cb == "1"
  415. err := pluginManager.Delete(t, name, r)
  416. if err != nil {
  417. handleError(w, err, fmt.Sprintf("delete %s plugin %s error", native.PluginTypes[t], name), logger)
  418. return
  419. }
  420. w.WriteHeader(http.StatusOK)
  421. result := fmt.Sprintf("%s plugin %s is deleted", native.PluginTypes[t], name)
  422. if r {
  423. result = fmt.Sprintf("%s and Kuiper will be stopped", result)
  424. } else {
  425. result = fmt.Sprintf("%s and Kuiper must restart for the change to take effect.", result)
  426. }
  427. w.Write([]byte(result))
  428. case http.MethodGet:
  429. j, ok := pluginManager.GetPluginInfo(t, name)
  430. if !ok {
  431. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe %s plugin %s error", native.PluginTypes[t], name), logger)
  432. return
  433. }
  434. jsonResponse(j, w, logger)
  435. }
  436. }
  437. //list or create source plugin
  438. func sourcesHandler(w http.ResponseWriter, r *http.Request) {
  439. pluginsHandler(w, r, native.SOURCE)
  440. }
  441. //delete a source plugin
  442. func sourceHandler(w http.ResponseWriter, r *http.Request) {
  443. pluginHandler(w, r, native.SOURCE)
  444. }
  445. //list or create sink plugin
  446. func sinksHandler(w http.ResponseWriter, r *http.Request) {
  447. pluginsHandler(w, r, native.SINK)
  448. }
  449. //delete a sink plugin
  450. func sinkHandler(w http.ResponseWriter, r *http.Request) {
  451. pluginHandler(w, r, native.SINK)
  452. }
  453. //list or create function plugin
  454. func functionsHandler(w http.ResponseWriter, r *http.Request) {
  455. pluginsHandler(w, r, native.FUNCTION)
  456. }
  457. //list all user defined functions in all function plugins
  458. func functionsListHandler(w http.ResponseWriter, r *http.Request) {
  459. pluginManager := native.GetManager()
  460. content := pluginManager.ListSymbols()
  461. jsonResponse(content, w, logger)
  462. }
  463. func functionsGetHandler(w http.ResponseWriter, r *http.Request) {
  464. vars := mux.Vars(r)
  465. name := vars["name"]
  466. pluginManager := native.GetManager()
  467. j, ok := pluginManager.GetPluginBySymbol(native.FUNCTION, name)
  468. if !ok {
  469. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe function %s error", name), logger)
  470. return
  471. }
  472. jsonResponse(map[string]string{"name": name, "plugin": j}, w, logger)
  473. }
  474. //delete a function plugin
  475. func functionHandler(w http.ResponseWriter, r *http.Request) {
  476. pluginHandler(w, r, native.FUNCTION)
  477. }
  478. type functionList struct {
  479. Functions []string `json:"functions,omitempty"`
  480. }
  481. // register function list for function plugin. If a plugin exports multiple functions, the function list must be registered
  482. // either by create or register. If the function plugin has been loaded because of auto load through so file, the function
  483. // list MUST be registered by this API or only the function with the same name as the plugin can be used.
  484. func functionRegisterHandler(w http.ResponseWriter, r *http.Request) {
  485. defer r.Body.Close()
  486. vars := mux.Vars(r)
  487. name := vars["name"]
  488. pluginManager := native.GetManager()
  489. _, ok := pluginManager.GetPluginInfo(native.FUNCTION, name)
  490. if !ok {
  491. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("register %s plugin %s error", native.PluginTypes[native.FUNCTION], name), logger)
  492. return
  493. }
  494. sd := functionList{}
  495. err := json.NewDecoder(r.Body).Decode(&sd)
  496. // Problems decoding
  497. if err != nil {
  498. handleError(w, err, fmt.Sprintf("Invalid body: Error decoding the function list json %s", r.Body), logger)
  499. return
  500. }
  501. err = pluginManager.RegisterFuncs(name, sd.Functions)
  502. if err != nil {
  503. handleError(w, err, fmt.Sprintf("function plugins %s regiser functions error", name), logger)
  504. return
  505. }
  506. w.WriteHeader(http.StatusOK)
  507. w.Write([]byte(fmt.Sprintf("function plugin %s function list is registered", name)))
  508. }
  509. func prebuildSourcePlugins(w http.ResponseWriter, r *http.Request) {
  510. prebuildPluginsHandler(w, r, native.SOURCE)
  511. }
  512. func prebuildSinkPlugins(w http.ResponseWriter, r *http.Request) {
  513. prebuildPluginsHandler(w, r, native.SINK)
  514. }
  515. func prebuildFuncsPlugins(w http.ResponseWriter, r *http.Request) {
  516. prebuildPluginsHandler(w, r, native.FUNCTION)
  517. }
  518. func isOffcialDockerImage() bool {
  519. if strings.ToLower(os.Getenv("MAINTAINER")) != "emqx.io" {
  520. return false
  521. }
  522. return true
  523. }
  524. func prebuildPluginsHandler(w http.ResponseWriter, r *http.Request, t native.PluginType) {
  525. emsg := "It's strongly recommended to install plugins at official released Debian Docker images. If you choose to proceed to install plugin, please make sure the plugin is already validated in your own build."
  526. if !isOffcialDockerImage() {
  527. handleError(w, fmt.Errorf(emsg), "", logger)
  528. return
  529. } else if runtime.GOOS == "linux" {
  530. osrelease, err := Read()
  531. if err != nil {
  532. logger.Infof("")
  533. return
  534. }
  535. prettyName := strings.ToUpper(osrelease["PRETTY_NAME"])
  536. os := "debian"
  537. if strings.Contains(prettyName, "DEBIAN") {
  538. hosts := conf.Config.Basic.PluginHosts
  539. ptype := "sources"
  540. if t == native.SINK {
  541. ptype = "sinks"
  542. } else if t == native.FUNCTION {
  543. ptype = "functions"
  544. }
  545. if err, plugins := fetchPluginList(hosts, ptype, os, runtime.GOARCH); err != nil {
  546. handleError(w, err, "", logger)
  547. } else {
  548. jsonResponse(plugins, w, logger)
  549. }
  550. } else {
  551. handleError(w, fmt.Errorf(emsg), "", logger)
  552. return
  553. }
  554. } else {
  555. handleError(w, fmt.Errorf(emsg), "", logger)
  556. }
  557. }
  558. func fetchPluginList(hosts, ptype, os, arch string) (err error, result map[string]string) {
  559. if hosts == "" || ptype == "" || os == "" {
  560. logger.Errorf("Invalid parameter value: hosts %s, ptype %s or os: %s should not be empty.", hosts, ptype, os)
  561. return fmt.Errorf("Invalid configruation for plugin host in kuiper.yaml."), nil
  562. }
  563. result = make(map[string]string)
  564. hostsArr := strings.Split(hosts, ",")
  565. for _, host := range hostsArr {
  566. host := strings.Trim(host, " ")
  567. tmp := []string{host, "kuiper-plugins", version, os, ptype}
  568. //The url is similar to http://host:port/kuiper-plugins/0.9.1/debian/sinks/
  569. url := strings.Join(tmp, "/")
  570. timeout := time.Duration(30 * time.Second)
  571. client := &http.Client{
  572. Timeout: timeout,
  573. Transport: &http.Transport{
  574. TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
  575. },
  576. }
  577. resp, err := client.Get(url)
  578. logger.Infof("Trying to fetch plugins from url: %s\n", url)
  579. if err != nil {
  580. return err, nil
  581. }
  582. defer resp.Body.Close()
  583. if resp.StatusCode != http.StatusOK {
  584. return fmt.Errorf("Cannot fetch plugin list from %s, with status error: %v", url, resp.StatusCode), nil
  585. }
  586. data, err := ioutil.ReadAll(resp.Body)
  587. if err != nil {
  588. return err, nil
  589. }
  590. plugins := extractFromHtml(string(data), arch)
  591. for _, p := range plugins {
  592. //If already existed, using the existed.
  593. if _, ok := result[p]; !ok {
  594. result[p] = url + "/" + p + "_" + arch + ".zip"
  595. }
  596. logger.Debugf("Plugin %s, download address is %s\n", p, result[p])
  597. }
  598. }
  599. return
  600. }
  601. func extractFromHtml(content, arch string) []string {
  602. plugins := []string{}
  603. htmlTokens := html.NewTokenizer(strings.NewReader(content))
  604. loop:
  605. for {
  606. tt := htmlTokens.Next()
  607. switch tt {
  608. case html.ErrorToken:
  609. break loop
  610. case html.StartTagToken:
  611. t := htmlTokens.Token()
  612. isAnchor := t.Data == "a"
  613. if isAnchor {
  614. found := false
  615. for _, prop := range t.Attr {
  616. if strings.ToUpper(prop.Key) == "HREF" {
  617. if strings.HasSuffix(prop.Val, "_"+arch+".zip") {
  618. if index := strings.LastIndex(prop.Val, "_"); index != -1 {
  619. plugins = append(plugins, prop.Val[0:index])
  620. }
  621. }
  622. found = true
  623. }
  624. }
  625. if !found {
  626. logger.Infof("Invalid plugin download link %s", t)
  627. }
  628. }
  629. }
  630. }
  631. return plugins
  632. }
  633. //list sink plugin
  634. func sinksMetaHandler(w http.ResponseWriter, r *http.Request) {
  635. defer r.Body.Close()
  636. sinks := meta.GetSinks()
  637. jsonResponse(sinks, w, logger)
  638. return
  639. }
  640. //Get sink metadata when creating rules
  641. func newSinkMetaHandler(w http.ResponseWriter, r *http.Request) {
  642. defer r.Body.Close()
  643. vars := mux.Vars(r)
  644. pluginName := vars["name"]
  645. language := getLanguage(r)
  646. ptrMetadata, err := meta.GetSinkMeta(pluginName, language)
  647. if err != nil {
  648. handleError(w, err, "", logger)
  649. return
  650. }
  651. jsonResponse(ptrMetadata, w, logger)
  652. }
  653. //list functions
  654. func functionsMetaHandler(w http.ResponseWriter, r *http.Request) {
  655. defer r.Body.Close()
  656. sinks := meta.GetFunctions()
  657. jsonResponse(sinks, w, logger)
  658. return
  659. }
  660. //list source plugin
  661. func sourcesMetaHandler(w http.ResponseWriter, r *http.Request) {
  662. defer r.Body.Close()
  663. ret := meta.GetSources()
  664. if nil != ret {
  665. jsonResponse(ret, w, logger)
  666. return
  667. }
  668. }
  669. //Get source metadata when creating stream
  670. func sourceMetaHandler(w http.ResponseWriter, r *http.Request) {
  671. defer r.Body.Close()
  672. vars := mux.Vars(r)
  673. pluginName := vars["name"]
  674. language := getLanguage(r)
  675. ret, err := meta.GetSourceMeta(pluginName, language)
  676. if err != nil {
  677. handleError(w, err, "", logger)
  678. return
  679. }
  680. if nil != ret {
  681. jsonResponse(ret, w, logger)
  682. return
  683. }
  684. }
  685. //Get source yaml
  686. func sourceConfHandler(w http.ResponseWriter, r *http.Request) {
  687. defer r.Body.Close()
  688. vars := mux.Vars(r)
  689. pluginName := vars["name"]
  690. language := getLanguage(r)
  691. ret, err := meta.GetSourceConf(pluginName, language)
  692. if err != nil {
  693. handleError(w, err, "", logger)
  694. return
  695. } else {
  696. w.Write(ret)
  697. }
  698. }
  699. //Get confKeys of the source metadata
  700. func sourceConfKeysHandler(w http.ResponseWriter, r *http.Request) {
  701. defer r.Body.Close()
  702. vars := mux.Vars(r)
  703. pluginName := vars["name"]
  704. ret := meta.GetSourceConfKeys(pluginName)
  705. if nil != ret {
  706. jsonResponse(ret, w, logger)
  707. return
  708. }
  709. }
  710. //Add del confkey
  711. func sourceConfKeyHandler(w http.ResponseWriter, r *http.Request) {
  712. defer r.Body.Close()
  713. var ret interface{}
  714. var err error
  715. vars := mux.Vars(r)
  716. pluginName := vars["name"]
  717. confKey := vars["confKey"]
  718. language := getLanguage(r)
  719. switch r.Method {
  720. case http.MethodDelete:
  721. err = meta.DelSourceConfKey(pluginName, confKey, language)
  722. case http.MethodPost:
  723. v, err := ioutil.ReadAll(r.Body)
  724. if err != nil {
  725. handleError(w, err, "Invalid body", logger)
  726. return
  727. }
  728. err = meta.AddSourceConfKey(pluginName, confKey, language, v)
  729. }
  730. if err != nil {
  731. handleError(w, err, "", logger)
  732. return
  733. }
  734. if nil != ret {
  735. jsonResponse(ret, w, logger)
  736. return
  737. }
  738. }
  739. //Del and Update field of confkey
  740. func sourceConfKeyFieldsHandler(w http.ResponseWriter, r *http.Request) {
  741. defer r.Body.Close()
  742. var ret interface{}
  743. var err error
  744. vars := mux.Vars(r)
  745. pluginName := vars["name"]
  746. confKey := vars["confKey"]
  747. v, err := ioutil.ReadAll(r.Body)
  748. if err != nil {
  749. handleError(w, err, "Invalid body", logger)
  750. return
  751. }
  752. language := getLanguage(r)
  753. switch r.Method {
  754. case http.MethodDelete:
  755. err = meta.DelSourceConfKeyField(pluginName, confKey, language, v)
  756. case http.MethodPost:
  757. err = meta.AddSourceConfKeyField(pluginName, confKey, language, v)
  758. }
  759. if err != nil {
  760. handleError(w, err, "", logger)
  761. return
  762. }
  763. if nil != ret {
  764. jsonResponse(ret, w, logger)
  765. return
  766. }
  767. }
  768. func getLanguage(r *http.Request) string {
  769. language := r.Header.Get("Content-Language")
  770. if 0 == len(language) {
  771. language = "en_US"
  772. }
  773. return language
  774. }
  775. func servicesHandler(w http.ResponseWriter, r *http.Request) {
  776. defer r.Body.Close()
  777. serviceManager := service.GetManager()
  778. switch r.Method {
  779. case http.MethodGet:
  780. content, err := serviceManager.List()
  781. if err != nil {
  782. handleError(w, err, "service list command error", logger)
  783. return
  784. }
  785. jsonResponse(content, w, logger)
  786. case http.MethodPost:
  787. sd := &service.ServiceCreationRequest{}
  788. err := json.NewDecoder(r.Body).Decode(sd)
  789. // Problems decoding
  790. if err != nil {
  791. handleError(w, err, "Invalid body: Error decoding the %s service request payload", logger)
  792. return
  793. }
  794. err = serviceManager.Create(sd)
  795. if err != nil {
  796. handleError(w, err, "service create command error", logger)
  797. return
  798. }
  799. w.WriteHeader(http.StatusCreated)
  800. w.Write([]byte(fmt.Sprintf("service %s is created", sd.Name)))
  801. }
  802. }
  803. func serviceHandler(w http.ResponseWriter, r *http.Request) {
  804. defer r.Body.Close()
  805. vars := mux.Vars(r)
  806. name := vars["name"]
  807. serviceManager := service.GetManager()
  808. switch r.Method {
  809. case http.MethodDelete:
  810. err := serviceManager.Delete(name)
  811. if err != nil {
  812. handleError(w, err, fmt.Sprintf("delete service %s error", name), logger)
  813. return
  814. }
  815. w.WriteHeader(http.StatusOK)
  816. result := fmt.Sprintf("service %s is deleted", name)
  817. w.Write([]byte(result))
  818. case http.MethodGet:
  819. j, err := serviceManager.Get(name)
  820. if err != nil {
  821. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe service %s error", name), logger)
  822. return
  823. }
  824. jsonResponse(j, w, logger)
  825. case http.MethodPut:
  826. sd := &service.ServiceCreationRequest{}
  827. err := json.NewDecoder(r.Body).Decode(sd)
  828. // Problems decoding
  829. if err != nil {
  830. handleError(w, err, "Invalid body: Error decoding the %s service request payload", logger)
  831. return
  832. }
  833. sd.Name = name
  834. err = serviceManager.Update(sd)
  835. if err != nil {
  836. handleError(w, err, "service update command error", logger)
  837. return
  838. }
  839. w.WriteHeader(http.StatusOK)
  840. w.Write([]byte(fmt.Sprintf("service %s is updated", sd.Name)))
  841. }
  842. }
  843. func serviceFunctionsHandler(w http.ResponseWriter, r *http.Request) {
  844. serviceManager := service.GetManager()
  845. content, err := serviceManager.ListFunctions()
  846. if err != nil {
  847. handleError(w, err, "service list command error", logger)
  848. return
  849. }
  850. jsonResponse(content, w, logger)
  851. }
  852. func serviceFunctionHandler(w http.ResponseWriter, r *http.Request) {
  853. vars := mux.Vars(r)
  854. name := vars["name"]
  855. serviceManager := service.GetManager()
  856. j, err := serviceManager.GetFunction(name)
  857. if err != nil {
  858. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe function %s error", name), logger)
  859. return
  860. }
  861. jsonResponse(j, w, logger)
  862. }