rest.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682
  1. package server
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/plugins"
  7. "github.com/emqx/kuiper/xstream/api"
  8. "github.com/gorilla/handlers"
  9. "github.com/gorilla/mux"
  10. "golang.org/x/net/html"
  11. "io"
  12. "io/ioutil"
  13. "net/http"
  14. "os"
  15. "runtime"
  16. "strings"
  17. "time"
  18. )
  19. const (
  20. ContentType = "Content-Type"
  21. ContentTypeJSON = "application/json"
  22. )
  23. type statementDescriptor struct {
  24. Sql string `json:"sql,omitempty"`
  25. }
  26. func decodeStatementDescriptor(reader io.ReadCloser) (statementDescriptor, error) {
  27. sd := statementDescriptor{}
  28. err := json.NewDecoder(reader).Decode(&sd)
  29. // Problems decoding
  30. if err != nil {
  31. return sd, fmt.Errorf("Error decoding the statement descriptor: %v", err)
  32. }
  33. return sd, nil
  34. }
  35. // Handle applies the specified error and error concept tot he HTTP response writer
  36. func handleError(w http.ResponseWriter, err error, prefix string, logger api.Logger) {
  37. message := prefix
  38. if message != "" {
  39. message += ": "
  40. }
  41. message += err.Error()
  42. logger.Error(message)
  43. var ec int
  44. switch e := err.(type) {
  45. case *common.Error:
  46. switch e.Code() {
  47. case common.NOT_FOUND:
  48. ec = http.StatusNotFound
  49. default:
  50. ec = http.StatusBadRequest
  51. }
  52. default:
  53. ec = http.StatusBadRequest
  54. }
  55. http.Error(w, message, ec)
  56. }
  57. func jsonResponse(i interface{}, w http.ResponseWriter, logger api.Logger) {
  58. w.Header().Add(ContentType, ContentTypeJSON)
  59. enc := json.NewEncoder(w)
  60. err := enc.Encode(i)
  61. // Problems encoding
  62. if err != nil {
  63. handleError(w, err, "", logger)
  64. return
  65. }
  66. }
  67. func createRestServer(port int) *http.Server {
  68. r := mux.NewRouter()
  69. r.HandleFunc("/", rootHandler).Methods(http.MethodGet, http.MethodPost)
  70. r.HandleFunc("/streams", streamsHandler).Methods(http.MethodGet, http.MethodPost)
  71. r.HandleFunc("/streams/{name}", streamHandler).Methods(http.MethodGet, http.MethodDelete)
  72. r.HandleFunc("/rules", rulesHandler).Methods(http.MethodGet, http.MethodPost)
  73. r.HandleFunc("/rules/{name}", ruleHandler).Methods(http.MethodDelete, http.MethodGet)
  74. r.HandleFunc("/rules/{name}/status", getStatusRuleHandler).Methods(http.MethodGet)
  75. r.HandleFunc("/rules/{name}/start", startRuleHandler).Methods(http.MethodPost)
  76. r.HandleFunc("/rules/{name}/stop", stopRuleHandler).Methods(http.MethodPost)
  77. r.HandleFunc("/rules/{name}/restart", restartRuleHandler).Methods(http.MethodPost)
  78. r.HandleFunc("/rules/{name}/topo", getTopoRuleHandler).Methods(http.MethodGet)
  79. r.HandleFunc("/plugins/sources", sourcesHandler).Methods(http.MethodGet, http.MethodPost)
  80. r.HandleFunc("/plugins/sources/prebuild", prebuildSourcePlugins).Methods(http.MethodGet)
  81. r.HandleFunc("/plugins/sources/{name}", sourceHandler).Methods(http.MethodDelete, http.MethodGet)
  82. r.HandleFunc("/plugins/sinks", sinksHandler).Methods(http.MethodGet, http.MethodPost)
  83. r.HandleFunc("/plugins/sinks/prebuild", prebuildSinkPlugins).Methods(http.MethodGet)
  84. r.HandleFunc("/plugins/sinks/{name}", sinkHandler).Methods(http.MethodDelete, http.MethodGet)
  85. r.HandleFunc("/plugins/functions", functionsHandler).Methods(http.MethodGet, http.MethodPost)
  86. r.HandleFunc("/plugins/functions/prebuild", prebuildFuncsPlugins).Methods(http.MethodGet)
  87. r.HandleFunc("/plugins/functions/{name}", functionHandler).Methods(http.MethodDelete, http.MethodGet)
  88. r.HandleFunc("/metadata/sinks", sinksMetaHandler).Methods(http.MethodGet)
  89. r.HandleFunc("/metadata/sinks/{name}", newSinkMetaHandler).Methods(http.MethodGet)
  90. r.HandleFunc("/metadata/sinks/rule/{id}", showSinkMetaHandler).Methods(http.MethodGet)
  91. r.HandleFunc("/metadata/sources", sourcesMetaHandler).Methods(http.MethodGet)
  92. r.HandleFunc("/metadata/sources/{name}", sourceMetaHandler).Methods(http.MethodGet)
  93. r.HandleFunc("/metadata/sources/{name}/confKeys", sourceConfKeysHandler).Methods(http.MethodGet)
  94. r.HandleFunc("/metadata/sources/{name}/confKeys/{confKey}", sourceConfKeyHandler).Methods(http.MethodDelete, http.MethodPost)
  95. r.HandleFunc("/metadata/sources/{name}/confKeys/{confKey}/field", sourceConfKeyFieldsHandler).Methods(http.MethodDelete, http.MethodPost)
  96. server := &http.Server{
  97. Addr: fmt.Sprintf("0.0.0.0:%d", port),
  98. // Good practice to set timeouts to avoid Slowloris attacks.
  99. WriteTimeout: time.Second * 15,
  100. ReadTimeout: time.Second * 15,
  101. IdleTimeout: time.Second * 60,
  102. Handler: handlers.CORS(handlers.AllowedHeaders([]string{"Accept", "Accept-Language", "Content-Type", "Content-Language", "Origin"}))(r),
  103. }
  104. server.SetKeepAlivesEnabled(false)
  105. return server
  106. }
  107. type information struct {
  108. Version string `json:"version"`
  109. Os string `json:"os"`
  110. UpTimeSeconds int64 `json:"upTimeSeconds"`
  111. }
  112. //The handler for root
  113. func rootHandler(w http.ResponseWriter, r *http.Request) {
  114. defer r.Body.Close()
  115. switch r.Method {
  116. case http.MethodGet, http.MethodPost:
  117. w.WriteHeader(http.StatusOK)
  118. info := new(information)
  119. info.Version = version
  120. info.UpTimeSeconds = time.Now().Unix() - startTimeStamp
  121. info.Os = runtime.GOOS
  122. byteInfo, _ := json.Marshal(info)
  123. w.Write(byteInfo)
  124. }
  125. }
  126. //list or create streams
  127. func streamsHandler(w http.ResponseWriter, r *http.Request) {
  128. defer r.Body.Close()
  129. switch r.Method {
  130. case http.MethodGet:
  131. content, err := streamProcessor.ShowStream()
  132. if err != nil {
  133. handleError(w, err, "Stream command error", logger)
  134. return
  135. }
  136. jsonResponse(content, w, logger)
  137. case http.MethodPost:
  138. v, err := decodeStatementDescriptor(r.Body)
  139. if err != nil {
  140. handleError(w, err, "Invalid body", logger)
  141. return
  142. }
  143. content, err := streamProcessor.ExecStreamSql(v.Sql)
  144. if err != nil {
  145. handleError(w, err, "Stream command error", logger)
  146. return
  147. }
  148. w.WriteHeader(http.StatusCreated)
  149. w.Write([]byte(content))
  150. }
  151. }
  152. //describe or delete a stream
  153. func streamHandler(w http.ResponseWriter, r *http.Request) {
  154. defer r.Body.Close()
  155. vars := mux.Vars(r)
  156. name := vars["name"]
  157. switch r.Method {
  158. case http.MethodGet:
  159. content, err := streamProcessor.DescStream(name)
  160. if err != nil {
  161. handleError(w, err, "describe stream error", logger)
  162. return
  163. }
  164. jsonResponse(content, w, logger)
  165. case http.MethodDelete:
  166. content, err := streamProcessor.DropStream(name)
  167. if err != nil {
  168. handleError(w, err, "delete stream error", logger)
  169. return
  170. }
  171. w.WriteHeader(http.StatusOK)
  172. w.Write([]byte(content))
  173. }
  174. }
  175. //list or create rules
  176. func rulesHandler(w http.ResponseWriter, r *http.Request) {
  177. defer r.Body.Close()
  178. switch r.Method {
  179. case http.MethodPost:
  180. body, err := ioutil.ReadAll(r.Body)
  181. if err != nil {
  182. handleError(w, err, "Invalid body", logger)
  183. return
  184. }
  185. r, err := ruleProcessor.ExecCreate("", string(body))
  186. var result string
  187. if err != nil {
  188. handleError(w, err, "Create rule error", logger)
  189. return
  190. } else {
  191. result = fmt.Sprintf("Rule %s was created successfully.", r.Id)
  192. }
  193. //Start the rule
  194. rs, err := createRuleState(r)
  195. if err != nil {
  196. result = err.Error()
  197. } else {
  198. err = doStartRule(rs)
  199. if err != nil {
  200. result = err.Error()
  201. }
  202. }
  203. w.WriteHeader(http.StatusCreated)
  204. w.Write([]byte(result))
  205. case http.MethodGet:
  206. content, err := getAllRulesWithStatus()
  207. if err != nil {
  208. handleError(w, err, "Show rules error", logger)
  209. return
  210. }
  211. jsonResponse(content, w, logger)
  212. }
  213. }
  214. //describe or delete a rule
  215. func ruleHandler(w http.ResponseWriter, r *http.Request) {
  216. defer r.Body.Close()
  217. vars := mux.Vars(r)
  218. name := vars["name"]
  219. switch r.Method {
  220. case http.MethodGet:
  221. rule, err := ruleProcessor.GetRuleByName(name)
  222. if err != nil {
  223. handleError(w, err, "describe rule error", logger)
  224. return
  225. }
  226. jsonResponse(rule, w, logger)
  227. case http.MethodDelete:
  228. stopRule(name)
  229. content, err := ruleProcessor.ExecDrop(name)
  230. if err != nil {
  231. handleError(w, err, "delete rule error", logger)
  232. return
  233. }
  234. w.WriteHeader(http.StatusOK)
  235. w.Write([]byte(content))
  236. }
  237. }
  238. //get status of a rule
  239. func getStatusRuleHandler(w http.ResponseWriter, r *http.Request) {
  240. defer r.Body.Close()
  241. vars := mux.Vars(r)
  242. name := vars["name"]
  243. content, err := getRuleStatus(name)
  244. if err != nil {
  245. handleError(w, err, "get rule status error", logger)
  246. return
  247. }
  248. w.WriteHeader(http.StatusOK)
  249. w.Write([]byte(content))
  250. }
  251. //start a rule
  252. func startRuleHandler(w http.ResponseWriter, r *http.Request) {
  253. defer r.Body.Close()
  254. vars := mux.Vars(r)
  255. name := vars["name"]
  256. err := startRule(name)
  257. if err != nil {
  258. handleError(w, err, "start rule error", logger)
  259. return
  260. }
  261. w.WriteHeader(http.StatusOK)
  262. w.Write([]byte(fmt.Sprintf("Rule %s was started", name)))
  263. }
  264. //stop a rule
  265. func stopRuleHandler(w http.ResponseWriter, r *http.Request) {
  266. defer r.Body.Close()
  267. vars := mux.Vars(r)
  268. name := vars["name"]
  269. result := stopRule(name)
  270. w.WriteHeader(http.StatusOK)
  271. w.Write([]byte(result))
  272. }
  273. //restart a rule
  274. func restartRuleHandler(w http.ResponseWriter, r *http.Request) {
  275. defer r.Body.Close()
  276. vars := mux.Vars(r)
  277. name := vars["name"]
  278. err := restartRule(name)
  279. if err != nil {
  280. handleError(w, err, "restart rule error", logger)
  281. return
  282. }
  283. w.WriteHeader(http.StatusOK)
  284. w.Write([]byte(fmt.Sprintf("Rule %s was restarted", name)))
  285. }
  286. //get topo of a rule
  287. func getTopoRuleHandler(w http.ResponseWriter, r *http.Request) {
  288. defer r.Body.Close()
  289. vars := mux.Vars(r)
  290. name := vars["name"]
  291. content, err := getRuleTopo(name)
  292. if err != nil {
  293. handleError(w, err, "get rule topo error", logger)
  294. return
  295. }
  296. w.Header().Set(ContentType, ContentTypeJSON)
  297. w.Write([]byte(content))
  298. }
  299. func pluginsHandler(w http.ResponseWriter, r *http.Request, t plugins.PluginType) {
  300. defer r.Body.Close()
  301. switch r.Method {
  302. case http.MethodGet:
  303. content, err := pluginManager.List(t)
  304. if err != nil {
  305. handleError(w, err, fmt.Sprintf("%s plugins list command error", plugins.PluginTypes[t]), logger)
  306. return
  307. }
  308. jsonResponse(content, w, logger)
  309. case http.MethodPost:
  310. sd := plugins.Plugin{}
  311. err := json.NewDecoder(r.Body).Decode(&sd)
  312. // Problems decoding
  313. if err != nil {
  314. handleError(w, err, fmt.Sprintf("Invalid body: Error decoding the %s plugin json", plugins.PluginTypes[t]), logger)
  315. return
  316. }
  317. err = pluginManager.Register(t, &sd)
  318. if err != nil {
  319. handleError(w, err, fmt.Sprintf("%s plugins create command error", plugins.PluginTypes[t]), logger)
  320. return
  321. }
  322. w.WriteHeader(http.StatusCreated)
  323. w.Write([]byte(fmt.Sprintf("%s plugin %s is created", plugins.PluginTypes[t], sd.Name)))
  324. }
  325. }
  326. func pluginHandler(w http.ResponseWriter, r *http.Request, t plugins.PluginType) {
  327. defer r.Body.Close()
  328. vars := mux.Vars(r)
  329. name := vars["name"]
  330. cb := r.URL.Query().Get("stop")
  331. switch r.Method {
  332. case http.MethodDelete:
  333. r := cb == "1"
  334. err := pluginManager.Delete(t, name, r)
  335. if err != nil {
  336. handleError(w, err, fmt.Sprintf("delete %s plugin %s error", plugins.PluginTypes[t], name), logger)
  337. return
  338. }
  339. w.WriteHeader(http.StatusOK)
  340. result := fmt.Sprintf("%s plugin %s is deleted", plugins.PluginTypes[t], name)
  341. if r {
  342. result = fmt.Sprintf("%s and Kuiper will be stopped", result)
  343. } else {
  344. result = fmt.Sprintf("%s and Kuiper must restart for the change to take effect.", result)
  345. }
  346. w.Write([]byte(result))
  347. case http.MethodGet:
  348. j, ok := pluginManager.Get(t, name)
  349. if !ok {
  350. handleError(w, common.NewErrorWithCode(common.NOT_FOUND, "not found"), fmt.Sprintf("describe %s plugin %s error", plugins.PluginTypes[t], name), logger)
  351. return
  352. }
  353. jsonResponse(j, w, logger)
  354. }
  355. }
  356. //list or create source plugin
  357. func sourcesHandler(w http.ResponseWriter, r *http.Request) {
  358. pluginsHandler(w, r, plugins.SOURCE)
  359. }
  360. //delete a source plugin
  361. func sourceHandler(w http.ResponseWriter, r *http.Request) {
  362. pluginHandler(w, r, plugins.SOURCE)
  363. }
  364. //list or create sink plugin
  365. func sinksHandler(w http.ResponseWriter, r *http.Request) {
  366. pluginsHandler(w, r, plugins.SINK)
  367. }
  368. //delete a sink plugin
  369. func sinkHandler(w http.ResponseWriter, r *http.Request) {
  370. pluginHandler(w, r, plugins.SINK)
  371. }
  372. //list or create function plugin
  373. func functionsHandler(w http.ResponseWriter, r *http.Request) {
  374. pluginsHandler(w, r, plugins.FUNCTION)
  375. }
  376. //delete a function plugin
  377. func functionHandler(w http.ResponseWriter, r *http.Request) {
  378. pluginHandler(w, r, plugins.FUNCTION)
  379. }
  380. func prebuildSourcePlugins(w http.ResponseWriter, r *http.Request) {
  381. prebuildPluginsHandler(w, r, plugins.SOURCE)
  382. }
  383. func prebuildSinkPlugins(w http.ResponseWriter, r *http.Request) {
  384. prebuildPluginsHandler(w, r, plugins.SINK)
  385. }
  386. func prebuildFuncsPlugins(w http.ResponseWriter, r *http.Request) {
  387. prebuildPluginsHandler(w, r, plugins.FUNCTION)
  388. }
  389. func isOffcialDockerImage() bool {
  390. if strings.ToLower(os.Getenv("MAINTAINER")) != "emqx.io" {
  391. return false
  392. }
  393. return true
  394. }
  395. func prebuildPluginsHandler(w http.ResponseWriter, r *http.Request, t plugins.PluginType) {
  396. if runtime.GOOS != "linux" {
  397. handleError(w, fmt.Errorf("Plugins can be only installed at Linux."), "", logger)
  398. return
  399. } else if !isOffcialDockerImage() {
  400. handleError(w, fmt.Errorf("Plugins can be only installed at official released Docker images."), "", logger)
  401. return
  402. } else if runtime.GOOS == "linux" {
  403. osrelease, err := common.Read()
  404. if err != nil {
  405. logger.Infof("")
  406. return
  407. }
  408. prettyName := strings.ToUpper(osrelease["PRETTY_NAME"])
  409. if strings.Contains(prettyName, "ALPINE") || strings.Contains(prettyName, "DEBIAN") {
  410. hosts := common.Config.Basic.PluginHosts
  411. ptype := "sources"
  412. if t == plugins.SINK {
  413. ptype = "sinks"
  414. } else if t == plugins.FUNCTION {
  415. ptype = "functions"
  416. }
  417. if err, plugins := fetchPluginList(hosts, ptype, strings.ToLower(prettyName), runtime.GOARCH); err != nil {
  418. handleError(w, err, "", logger)
  419. } else {
  420. jsonResponse(plugins, w, logger)
  421. }
  422. } else {
  423. handleError(w, fmt.Errorf("Only ALPINE & DEBIAN docker images are supported."), "", logger)
  424. return
  425. }
  426. } else {
  427. handleError(w, fmt.Errorf("Please use official Kuiper docker images to install the plugins."), "", logger)
  428. }
  429. }
  430. func fetchPluginList(hosts, ptype, os, arch string) (err error, result map[string]string) {
  431. if hosts == "" || ptype == "" || os == "" {
  432. return fmt.Errorf("Invalid parameter value: hosts, ptype and os value should not be empty."), nil
  433. }
  434. result = make(map[string]string)
  435. hostsArr := strings.Split(hosts, ",")
  436. for _, host := range hostsArr {
  437. host := strings.Trim(host, " ")
  438. tmp := []string{host, "kuiper-plugins", version, ptype, os}
  439. //The url is similar to http://host:port/kuiper-plugins/0.9.1/sinks/alpine
  440. url := strings.Join(tmp, "/")
  441. resp, err := http.Get(url)
  442. if err != nil {
  443. return err, nil
  444. }
  445. defer resp.Body.Close()
  446. if resp.StatusCode != http.StatusOK {
  447. return fmt.Errorf("Status error: %v", resp.StatusCode), nil
  448. }
  449. data, err := ioutil.ReadAll(resp.Body)
  450. if err != nil {
  451. return err, nil
  452. }
  453. plugins := extractFromHtml(string(data), arch)
  454. for _, p := range plugins {
  455. //If already existed, using the existed.
  456. if _, ok := result[p]; !ok {
  457. result[p] = url + "/" + p + "_" + arch + ".zip"
  458. }
  459. }
  460. }
  461. return
  462. }
  463. func extractFromHtml(content, arch string) []string {
  464. plugins := []string{}
  465. htmlTokens := html.NewTokenizer(strings.NewReader(content))
  466. loop:
  467. for {
  468. tt := htmlTokens.Next()
  469. switch tt {
  470. case html.ErrorToken:
  471. break loop
  472. case html.StartTagToken:
  473. t := htmlTokens.Token()
  474. isAnchor := t.Data == "a"
  475. if isAnchor {
  476. found := false
  477. for _, prop := range t.Attr {
  478. if strings.ToUpper(prop.Key) == "HREF" {
  479. if strings.HasSuffix(prop.Val, "_"+arch+".zip") {
  480. if index := strings.LastIndex(prop.Val, "_"); index != -1 {
  481. plugins = append(plugins, prop.Val[0:index])
  482. }
  483. }
  484. found = true
  485. }
  486. }
  487. if !found {
  488. logger.Infof("Invalid plugin download link %s", t)
  489. }
  490. }
  491. }
  492. }
  493. return plugins
  494. }
  495. //list sink plugin
  496. func sinksMetaHandler(w http.ResponseWriter, r *http.Request) {
  497. defer r.Body.Close()
  498. sinks := plugins.GetSinks()
  499. jsonResponse(sinks, w, logger)
  500. return
  501. }
  502. //Get sink metadata when creating rules
  503. func newSinkMetaHandler(w http.ResponseWriter, r *http.Request) {
  504. defer r.Body.Close()
  505. vars := mux.Vars(r)
  506. pluginName := vars["name"]
  507. ptrMetadata, err := plugins.GetSinkMeta(pluginName, nil)
  508. if err != nil {
  509. handleError(w, err, "metadata error", logger)
  510. return
  511. }
  512. jsonResponse(ptrMetadata, w, logger)
  513. }
  514. //Get sink metadata when displaying rules
  515. func showSinkMetaHandler(w http.ResponseWriter, r *http.Request) {
  516. defer r.Body.Close()
  517. vars := mux.Vars(r)
  518. ruleid := vars["id"]
  519. rule, err := ruleProcessor.GetRuleByName(ruleid)
  520. if err != nil {
  521. handleError(w, err, "describe rule error", logger)
  522. return
  523. }
  524. ptrMetadata, err := plugins.GetSinkMeta("", rule)
  525. if err != nil {
  526. handleError(w, err, "metadata error", logger)
  527. return
  528. }
  529. jsonResponse(ptrMetadata, w, logger)
  530. }
  531. //list source plugin
  532. func sourcesMetaHandler(w http.ResponseWriter, r *http.Request) {
  533. defer r.Body.Close()
  534. ret := plugins.GetSources()
  535. if nil != ret {
  536. jsonResponse(ret, w, logger)
  537. return
  538. }
  539. }
  540. //Get source metadata when creating stream
  541. func sourceMetaHandler(w http.ResponseWriter, r *http.Request) {
  542. defer r.Body.Close()
  543. vars := mux.Vars(r)
  544. pluginName := vars["name"]
  545. ret, err := plugins.GetSourceMeta(pluginName)
  546. if err != nil {
  547. handleError(w, err, "metadata error", logger)
  548. return
  549. }
  550. if nil != ret {
  551. jsonResponse(ret, w, logger)
  552. return
  553. }
  554. }
  555. //Get confKeys of the source metadata
  556. func sourceConfKeysHandler(w http.ResponseWriter, r *http.Request) {
  557. defer r.Body.Close()
  558. vars := mux.Vars(r)
  559. pluginName := vars["name"]
  560. ret := plugins.GetSourceConfKeys(pluginName)
  561. if nil != ret {
  562. jsonResponse(ret, w, logger)
  563. return
  564. }
  565. }
  566. //Add del confkey
  567. func sourceConfKeyHandler(w http.ResponseWriter, r *http.Request) {
  568. defer r.Body.Close()
  569. var ret interface{}
  570. var err error
  571. vars := mux.Vars(r)
  572. pluginName := vars["name"]
  573. confKey := vars["confKey"]
  574. switch r.Method {
  575. case http.MethodDelete:
  576. err = plugins.DelSourceConfKey(pluginName, confKey)
  577. case http.MethodPost:
  578. v, err := ioutil.ReadAll(r.Body)
  579. if err != nil {
  580. handleError(w, err, "Invalid body", logger)
  581. return
  582. }
  583. err = plugins.AddSourceConfKey(pluginName, confKey, v)
  584. }
  585. if err != nil {
  586. handleError(w, err, "metadata error", logger)
  587. return
  588. }
  589. if nil != ret {
  590. jsonResponse(ret, w, logger)
  591. return
  592. }
  593. }
  594. //Del and Update field of confkey
  595. func sourceConfKeyFieldsHandler(w http.ResponseWriter, r *http.Request) {
  596. defer r.Body.Close()
  597. var ret interface{}
  598. var err error
  599. vars := mux.Vars(r)
  600. pluginName := vars["name"]
  601. confKey := vars["confKey"]
  602. v, err := ioutil.ReadAll(r.Body)
  603. if err != nil {
  604. handleError(w, err, "Invalid body", logger)
  605. return
  606. }
  607. switch r.Method {
  608. case http.MethodDelete:
  609. err = plugins.DelSourceConfKeyField(pluginName, confKey, v)
  610. case http.MethodPost:
  611. err = plugins.AddSourceConfKeyField(pluginName, confKey, v)
  612. }
  613. if err != nil {
  614. handleError(w, err, "metadata error", logger)
  615. return
  616. }
  617. if nil != ret {
  618. jsonResponse(ret, w, logger)
  619. return
  620. }
  621. }