rest.go 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package server
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "github.com/gorilla/handlers"
  19. "github.com/gorilla/mux"
  20. "github.com/lf-edge/ekuiper/internal/conf"
  21. "github.com/lf-edge/ekuiper/internal/meta"
  22. "github.com/lf-edge/ekuiper/internal/plugin"
  23. "github.com/lf-edge/ekuiper/internal/plugin/native"
  24. "github.com/lf-edge/ekuiper/internal/plugin/portable"
  25. "github.com/lf-edge/ekuiper/internal/server/middleware"
  26. "github.com/lf-edge/ekuiper/internal/service"
  27. "github.com/lf-edge/ekuiper/pkg/api"
  28. "github.com/lf-edge/ekuiper/pkg/ast"
  29. "github.com/lf-edge/ekuiper/pkg/errorx"
  30. "io"
  31. "io/ioutil"
  32. "net/http"
  33. "os"
  34. "runtime"
  35. "strings"
  36. "time"
  37. )
  38. const (
  39. ContentType = "Content-Type"
  40. ContentTypeJSON = "application/json"
  41. )
  42. type statementDescriptor struct {
  43. Sql string `json:"sql,omitempty"`
  44. }
  45. func decodeStatementDescriptor(reader io.ReadCloser) (statementDescriptor, error) {
  46. sd := statementDescriptor{}
  47. err := json.NewDecoder(reader).Decode(&sd)
  48. // Problems decoding
  49. if err != nil {
  50. return sd, fmt.Errorf("Error decoding the statement descriptor: %v", err)
  51. }
  52. return sd, nil
  53. }
  54. // Handle applies the specified error and error concept tot he HTTP response writer
  55. func handleError(w http.ResponseWriter, err error, prefix string, logger api.Logger) {
  56. message := prefix
  57. if message != "" {
  58. message += ": "
  59. }
  60. message += err.Error()
  61. logger.Error(message)
  62. var ec int
  63. switch e := err.(type) {
  64. case *errorx.Error:
  65. switch e.Code() {
  66. case errorx.NOT_FOUND:
  67. ec = http.StatusNotFound
  68. default:
  69. ec = http.StatusBadRequest
  70. }
  71. default:
  72. ec = http.StatusBadRequest
  73. }
  74. http.Error(w, message, ec)
  75. }
  76. func jsonResponse(i interface{}, w http.ResponseWriter, logger api.Logger) {
  77. w.Header().Add(ContentType, ContentTypeJSON)
  78. enc := json.NewEncoder(w)
  79. err := enc.Encode(i)
  80. // Problems encoding
  81. if err != nil {
  82. handleError(w, err, "", logger)
  83. }
  84. }
  85. func createRestServer(ip string, port int, needToken bool) *http.Server {
  86. r := mux.NewRouter()
  87. r.HandleFunc("/", rootHandler).Methods(http.MethodGet, http.MethodPost)
  88. r.HandleFunc("/ping", pingHandler).Methods(http.MethodGet)
  89. r.HandleFunc("/streams", streamsHandler).Methods(http.MethodGet, http.MethodPost)
  90. r.HandleFunc("/streams/{name}", streamHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
  91. r.HandleFunc("/tables", tablesHandler).Methods(http.MethodGet, http.MethodPost)
  92. r.HandleFunc("/tables/{name}", tableHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
  93. r.HandleFunc("/rules", rulesHandler).Methods(http.MethodGet, http.MethodPost)
  94. r.HandleFunc("/rules/{name}", ruleHandler).Methods(http.MethodDelete, http.MethodGet, http.MethodPut)
  95. r.HandleFunc("/rules/{name}/status", getStatusRuleHandler).Methods(http.MethodGet)
  96. r.HandleFunc("/rules/{name}/start", startRuleHandler).Methods(http.MethodPost)
  97. r.HandleFunc("/rules/{name}/stop", stopRuleHandler).Methods(http.MethodPost)
  98. r.HandleFunc("/rules/{name}/restart", restartRuleHandler).Methods(http.MethodPost)
  99. r.HandleFunc("/rules/{name}/topo", getTopoRuleHandler).Methods(http.MethodGet)
  100. r.HandleFunc("/plugins/sources", sourcesHandler).Methods(http.MethodGet, http.MethodPost)
  101. r.HandleFunc("/plugins/sources/prebuild", prebuildSourcePlugins).Methods(http.MethodGet)
  102. r.HandleFunc("/plugins/sources/{name}", sourceHandler).Methods(http.MethodDelete, http.MethodGet)
  103. r.HandleFunc("/plugins/sinks", sinksHandler).Methods(http.MethodGet, http.MethodPost)
  104. r.HandleFunc("/plugins/sinks/prebuild", prebuildSinkPlugins).Methods(http.MethodGet)
  105. r.HandleFunc("/plugins/sinks/{name}", sinkHandler).Methods(http.MethodDelete, http.MethodGet)
  106. r.HandleFunc("/plugins/functions", functionsHandler).Methods(http.MethodGet, http.MethodPost)
  107. r.HandleFunc("/plugins/functions/prebuild", prebuildFuncsPlugins).Methods(http.MethodGet)
  108. r.HandleFunc("/plugins/functions/{name}", functionHandler).Methods(http.MethodDelete, http.MethodGet)
  109. r.HandleFunc("/plugins/functions/{name}/register", functionRegisterHandler).Methods(http.MethodPost)
  110. r.HandleFunc("/plugins/udfs", functionsListHandler).Methods(http.MethodGet)
  111. r.HandleFunc("/plugins/udfs/{name}", functionsGetHandler).Methods(http.MethodGet)
  112. r.HandleFunc("/plugins/portables", portablesHandler).Methods(http.MethodGet, http.MethodPost)
  113. r.HandleFunc("/plugins/portables/{name}", portableHandler).Methods(http.MethodGet, http.MethodDelete)
  114. r.HandleFunc("/metadata/functions", functionsMetaHandler).Methods(http.MethodGet)
  115. r.HandleFunc("/metadata/sinks", sinksMetaHandler).Methods(http.MethodGet)
  116. r.HandleFunc("/metadata/sinks/{name}", newSinkMetaHandler).Methods(http.MethodGet)
  117. r.HandleFunc("/metadata/sources", sourcesMetaHandler).Methods(http.MethodGet)
  118. r.HandleFunc("/metadata/sources/{name}", sourceMetaHandler).Methods(http.MethodGet)
  119. r.HandleFunc("/metadata/sources/yaml/{name}", sourceConfHandler).Methods(http.MethodGet)
  120. r.HandleFunc("/metadata/sources/{name}/confKeys/{confKey}", sourceConfKeyHandler).Methods(http.MethodDelete, http.MethodPut)
  121. r.HandleFunc("/metadata/connections", connectionsMetaHandler).Methods(http.MethodGet)
  122. r.HandleFunc("/metadata/connections/{name}", connectionMetaHandler).Methods(http.MethodGet)
  123. r.HandleFunc("/metadata/connections/yaml/{name}", connectionConfHandler).Methods(http.MethodGet)
  124. r.HandleFunc("/metadata/connections/{name}/confKeys/{confKey}", connectionConfKeyHandler).Methods(http.MethodDelete, http.MethodPut)
  125. r.HandleFunc("/services", servicesHandler).Methods(http.MethodGet, http.MethodPost)
  126. r.HandleFunc("/services/functions", serviceFunctionsHandler).Methods(http.MethodGet)
  127. r.HandleFunc("/services/functions/{name}", serviceFunctionHandler).Methods(http.MethodGet)
  128. r.HandleFunc("/services/{name}", serviceHandler).Methods(http.MethodDelete, http.MethodGet, http.MethodPut)
  129. if needToken {
  130. r.Use(middleware.Auth)
  131. }
  132. server := &http.Server{
  133. Addr: fmt.Sprintf("%s:%d", ip, port),
  134. // Good practice to set timeouts to avoid Slowloris attacks.
  135. WriteTimeout: time.Second * 60 * 5,
  136. ReadTimeout: time.Second * 60 * 5,
  137. IdleTimeout: time.Second * 60,
  138. Handler: handlers.CORS(handlers.AllowedHeaders([]string{"Accept", "Accept-Language", "Content-Type", "Content-Language", "Origin"}))(r),
  139. }
  140. server.SetKeepAlivesEnabled(false)
  141. return server
  142. }
  143. type information struct {
  144. Version string `json:"version"`
  145. Os string `json:"os"`
  146. Arch string `json:"arch"`
  147. UpTimeSeconds int64 `json:"upTimeSeconds"`
  148. }
  149. //The handler for root
  150. func rootHandler(w http.ResponseWriter, r *http.Request) {
  151. defer r.Body.Close()
  152. switch r.Method {
  153. case http.MethodGet, http.MethodPost:
  154. w.WriteHeader(http.StatusOK)
  155. info := new(information)
  156. info.Version = version
  157. info.UpTimeSeconds = time.Now().Unix() - startTimeStamp
  158. info.Os = runtime.GOOS
  159. info.Arch = runtime.GOARCH
  160. byteInfo, _ := json.Marshal(info)
  161. w.Write(byteInfo)
  162. }
  163. }
  164. func pingHandler(w http.ResponseWriter, _ *http.Request) {
  165. w.WriteHeader(http.StatusOK)
  166. }
  167. func sourcesManageHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
  168. defer r.Body.Close()
  169. switch r.Method {
  170. case http.MethodGet:
  171. content, err := streamProcessor.ShowStream(st)
  172. if err != nil {
  173. handleError(w, err, fmt.Sprintf("%s command error", strings.Title(ast.StreamTypeMap[st])), logger)
  174. return
  175. }
  176. jsonResponse(content, w, logger)
  177. case http.MethodPost:
  178. v, err := decodeStatementDescriptor(r.Body)
  179. if err != nil {
  180. handleError(w, err, "Invalid body", logger)
  181. return
  182. }
  183. content, err := streamProcessor.ExecStreamSql(v.Sql)
  184. if err != nil {
  185. handleError(w, err, fmt.Sprintf("%s command error", strings.Title(ast.StreamTypeMap[st])), logger)
  186. return
  187. }
  188. w.WriteHeader(http.StatusCreated)
  189. w.Write([]byte(content))
  190. }
  191. }
  192. func sourceManageHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
  193. defer r.Body.Close()
  194. vars := mux.Vars(r)
  195. name := vars["name"]
  196. switch r.Method {
  197. case http.MethodGet:
  198. content, err := streamProcessor.DescStream(name, st)
  199. if err != nil {
  200. handleError(w, err, fmt.Sprintf("describe %s error", ast.StreamTypeMap[st]), logger)
  201. return
  202. }
  203. jsonResponse(content, w, logger)
  204. case http.MethodDelete:
  205. content, err := streamProcessor.DropStream(name, st)
  206. if err != nil {
  207. handleError(w, err, fmt.Sprintf("delete %s error", ast.StreamTypeMap[st]), logger)
  208. return
  209. }
  210. w.WriteHeader(http.StatusOK)
  211. w.Write([]byte(content))
  212. case http.MethodPut:
  213. v, err := decodeStatementDescriptor(r.Body)
  214. if err != nil {
  215. handleError(w, err, "Invalid body", logger)
  216. return
  217. }
  218. content, err := streamProcessor.ExecReplaceStream(name, v.Sql, st)
  219. if err != nil {
  220. handleError(w, err, fmt.Sprintf("%s command error", strings.Title(ast.StreamTypeMap[st])), logger)
  221. return
  222. }
  223. w.WriteHeader(http.StatusOK)
  224. w.Write([]byte(content))
  225. }
  226. }
  227. //list or create streams
  228. func streamsHandler(w http.ResponseWriter, r *http.Request) {
  229. sourcesManageHandler(w, r, ast.TypeStream)
  230. }
  231. //describe or delete a stream
  232. func streamHandler(w http.ResponseWriter, r *http.Request) {
  233. sourceManageHandler(w, r, ast.TypeStream)
  234. }
  235. //list or create tables
  236. func tablesHandler(w http.ResponseWriter, r *http.Request) {
  237. sourcesManageHandler(w, r, ast.TypeTable)
  238. }
  239. func tableHandler(w http.ResponseWriter, r *http.Request) {
  240. sourceManageHandler(w, r, ast.TypeTable)
  241. }
  242. //list or create rules
  243. func rulesHandler(w http.ResponseWriter, r *http.Request) {
  244. defer r.Body.Close()
  245. switch r.Method {
  246. case http.MethodPost:
  247. body, err := ioutil.ReadAll(r.Body)
  248. if err != nil {
  249. handleError(w, err, "Invalid body", logger)
  250. return
  251. }
  252. r, err := ruleProcessor.ExecCreate("", string(body))
  253. var result string
  254. if err != nil {
  255. handleError(w, err, "Create rule error", logger)
  256. return
  257. } else {
  258. result = fmt.Sprintf("Rule %s was created successfully.", r.Id)
  259. }
  260. go func() {
  261. //Start the rule
  262. rs, err := createRuleState(r)
  263. if err != nil {
  264. result = err.Error()
  265. } else {
  266. err = doStartRule(rs)
  267. if err != nil {
  268. result = err.Error()
  269. }
  270. }
  271. }()
  272. w.WriteHeader(http.StatusCreated)
  273. w.Write([]byte(result))
  274. case http.MethodGet:
  275. content, err := getAllRulesWithStatus()
  276. if err != nil {
  277. handleError(w, err, "Show rules error", logger)
  278. return
  279. }
  280. jsonResponse(content, w, logger)
  281. }
  282. }
  283. //describe or delete a rule
  284. func ruleHandler(w http.ResponseWriter, r *http.Request) {
  285. defer r.Body.Close()
  286. vars := mux.Vars(r)
  287. name := vars["name"]
  288. switch r.Method {
  289. case http.MethodGet:
  290. rule, err := ruleProcessor.GetRuleByName(name)
  291. if err != nil {
  292. handleError(w, err, "describe rule error", logger)
  293. return
  294. }
  295. jsonResponse(rule, w, logger)
  296. case http.MethodDelete:
  297. deleteRule(name)
  298. content, err := ruleProcessor.ExecDrop(name)
  299. if err != nil {
  300. handleError(w, err, "delete rule error", logger)
  301. return
  302. }
  303. w.WriteHeader(http.StatusOK)
  304. w.Write([]byte(content))
  305. case http.MethodPut:
  306. _, err := ruleProcessor.GetRuleByName(name)
  307. if err != nil {
  308. handleError(w, err, "not found this rule", logger)
  309. return
  310. }
  311. body, err := ioutil.ReadAll(r.Body)
  312. if err != nil {
  313. handleError(w, err, "Invalid body", logger)
  314. return
  315. }
  316. r, err := ruleProcessor.ExecUpdate(name, string(body))
  317. var result string
  318. if err != nil {
  319. handleError(w, err, "Update rule error", logger)
  320. return
  321. } else {
  322. result = fmt.Sprintf("Rule %s was updated successfully.", r.Id)
  323. }
  324. err = restartRule(name)
  325. if err != nil {
  326. handleError(w, err, "restart rule error", logger)
  327. return
  328. }
  329. w.WriteHeader(http.StatusOK)
  330. w.Write([]byte(result))
  331. }
  332. }
  333. //get status of a rule
  334. func getStatusRuleHandler(w http.ResponseWriter, r *http.Request) {
  335. defer r.Body.Close()
  336. vars := mux.Vars(r)
  337. name := vars["name"]
  338. content, err := getRuleStatus(name)
  339. if err != nil {
  340. handleError(w, err, "get rule status error", logger)
  341. return
  342. }
  343. w.WriteHeader(http.StatusOK)
  344. w.Write([]byte(content))
  345. }
  346. //start a rule
  347. func startRuleHandler(w http.ResponseWriter, r *http.Request) {
  348. defer r.Body.Close()
  349. vars := mux.Vars(r)
  350. name := vars["name"]
  351. err := startRule(name)
  352. if err != nil {
  353. handleError(w, err, "start rule error", logger)
  354. return
  355. }
  356. w.WriteHeader(http.StatusOK)
  357. w.Write([]byte(fmt.Sprintf("Rule %s was started", name)))
  358. }
  359. //stop a rule
  360. func stopRuleHandler(w http.ResponseWriter, r *http.Request) {
  361. defer r.Body.Close()
  362. vars := mux.Vars(r)
  363. name := vars["name"]
  364. result := stopRule(name)
  365. w.WriteHeader(http.StatusOK)
  366. w.Write([]byte(result))
  367. }
  368. //restart a rule
  369. func restartRuleHandler(w http.ResponseWriter, r *http.Request) {
  370. defer r.Body.Close()
  371. vars := mux.Vars(r)
  372. name := vars["name"]
  373. err := restartRule(name)
  374. if err != nil {
  375. handleError(w, err, "restart rule error", logger)
  376. return
  377. }
  378. w.WriteHeader(http.StatusOK)
  379. w.Write([]byte(fmt.Sprintf("Rule %s was restarted", name)))
  380. }
  381. //get topo of a rule
  382. func getTopoRuleHandler(w http.ResponseWriter, r *http.Request) {
  383. defer r.Body.Close()
  384. vars := mux.Vars(r)
  385. name := vars["name"]
  386. content, err := getRuleTopo(name)
  387. if err != nil {
  388. handleError(w, err, "get rule topo error", logger)
  389. return
  390. }
  391. w.Header().Set(ContentType, ContentTypeJSON)
  392. w.Write([]byte(content))
  393. }
  394. func pluginsHandler(w http.ResponseWriter, r *http.Request, t plugin.PluginType) {
  395. pluginManager := native.GetManager()
  396. defer r.Body.Close()
  397. switch r.Method {
  398. case http.MethodGet:
  399. content := pluginManager.List(t)
  400. jsonResponse(content, w, logger)
  401. case http.MethodPost:
  402. sd := plugin.NewPluginByType(t)
  403. err := json.NewDecoder(r.Body).Decode(sd)
  404. // Problems decoding
  405. if err != nil {
  406. handleError(w, err, fmt.Sprintf("Invalid body: Error decoding the %s plugin json", plugin.PluginTypes[t]), logger)
  407. return
  408. }
  409. err = pluginManager.Register(t, sd)
  410. if err != nil {
  411. handleError(w, err, fmt.Sprintf("%s plugins create command error", plugin.PluginTypes[t]), logger)
  412. return
  413. }
  414. w.WriteHeader(http.StatusCreated)
  415. w.Write([]byte(fmt.Sprintf("%s plugin %s is created", plugin.PluginTypes[t], sd.GetName())))
  416. }
  417. }
  418. func pluginHandler(w http.ResponseWriter, r *http.Request, t plugin.PluginType) {
  419. defer r.Body.Close()
  420. vars := mux.Vars(r)
  421. name := vars["name"]
  422. cb := r.URL.Query().Get("stop")
  423. pluginManager := native.GetManager()
  424. switch r.Method {
  425. case http.MethodDelete:
  426. r := cb == "1"
  427. err := pluginManager.Delete(t, name, r)
  428. if err != nil {
  429. handleError(w, err, fmt.Sprintf("delete %s plugin %s error", plugin.PluginTypes[t], name), logger)
  430. return
  431. }
  432. w.WriteHeader(http.StatusOK)
  433. result := fmt.Sprintf("%s plugin %s is deleted", plugin.PluginTypes[t], name)
  434. if r {
  435. result = fmt.Sprintf("%s and Kuiper will be stopped", result)
  436. } else {
  437. result = fmt.Sprintf("%s and Kuiper must restart for the change to take effect.", result)
  438. }
  439. w.Write([]byte(result))
  440. case http.MethodGet:
  441. j, ok := pluginManager.GetPluginInfo(t, name)
  442. if !ok {
  443. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe %s plugin %s error", plugin.PluginTypes[t], name), logger)
  444. return
  445. }
  446. jsonResponse(j, w, logger)
  447. }
  448. }
  449. //list or create source plugin
  450. func sourcesHandler(w http.ResponseWriter, r *http.Request) {
  451. pluginsHandler(w, r, plugin.SOURCE)
  452. }
  453. //delete a source plugin
  454. func sourceHandler(w http.ResponseWriter, r *http.Request) {
  455. pluginHandler(w, r, plugin.SOURCE)
  456. }
  457. //list or create sink plugin
  458. func sinksHandler(w http.ResponseWriter, r *http.Request) {
  459. pluginsHandler(w, r, plugin.SINK)
  460. }
  461. //delete a sink plugin
  462. func sinkHandler(w http.ResponseWriter, r *http.Request) {
  463. pluginHandler(w, r, plugin.SINK)
  464. }
  465. //list or create function plugin
  466. func functionsHandler(w http.ResponseWriter, r *http.Request) {
  467. pluginsHandler(w, r, plugin.FUNCTION)
  468. }
  469. //list all user defined functions in all function plugins
  470. func functionsListHandler(w http.ResponseWriter, _ *http.Request) {
  471. pluginManager := native.GetManager()
  472. content := pluginManager.ListSymbols()
  473. jsonResponse(content, w, logger)
  474. }
  475. func functionsGetHandler(w http.ResponseWriter, r *http.Request) {
  476. vars := mux.Vars(r)
  477. name := vars["name"]
  478. pluginManager := native.GetManager()
  479. j, ok := pluginManager.GetPluginBySymbol(plugin.FUNCTION, name)
  480. if !ok {
  481. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe function %s error", name), logger)
  482. return
  483. }
  484. jsonResponse(map[string]string{"name": name, "plugin": j}, w, logger)
  485. }
  486. //delete a function plugin
  487. func functionHandler(w http.ResponseWriter, r *http.Request) {
  488. pluginHandler(w, r, plugin.FUNCTION)
  489. }
  490. type functionList struct {
  491. Functions []string `json:"functions,omitempty"`
  492. }
  493. // register function list for function plugin. If a plugin exports multiple functions, the function list must be registered
  494. // either by create or register. If the function plugin has been loaded because of auto load through so file, the function
  495. // list MUST be registered by this API or only the function with the same name as the plugin can be used.
  496. func functionRegisterHandler(w http.ResponseWriter, r *http.Request) {
  497. defer r.Body.Close()
  498. vars := mux.Vars(r)
  499. name := vars["name"]
  500. pluginManager := native.GetManager()
  501. _, ok := pluginManager.GetPluginInfo(plugin.FUNCTION, name)
  502. if !ok {
  503. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("register %s plugin %s error", plugin.PluginTypes[plugin.FUNCTION], name), logger)
  504. return
  505. }
  506. sd := functionList{}
  507. err := json.NewDecoder(r.Body).Decode(&sd)
  508. // Problems decoding
  509. if err != nil {
  510. handleError(w, err, fmt.Sprintf("Invalid body: Error decoding the function list json %s", r.Body), logger)
  511. return
  512. }
  513. err = pluginManager.RegisterFuncs(name, sd.Functions)
  514. if err != nil {
  515. handleError(w, err, fmt.Sprintf("function plugins %s regiser functions error", name), logger)
  516. return
  517. }
  518. w.WriteHeader(http.StatusOK)
  519. w.Write([]byte(fmt.Sprintf("function plugin %s function list is registered", name)))
  520. }
  521. func portablesHandler(w http.ResponseWriter, r *http.Request) {
  522. m := portable.GetManager()
  523. defer r.Body.Close()
  524. switch r.Method {
  525. case http.MethodGet:
  526. content := m.List()
  527. jsonResponse(content, w, logger)
  528. case http.MethodPost:
  529. sd := plugin.NewPluginByType(plugin.PORTABLE)
  530. err := json.NewDecoder(r.Body).Decode(sd)
  531. // Problems decoding
  532. if err != nil {
  533. handleError(w, err, "Invalid body: Error decoding the portable plugin json", logger)
  534. return
  535. }
  536. err = m.Register(sd)
  537. if err != nil {
  538. handleError(w, err, "portable plugin create command error", logger)
  539. return
  540. }
  541. w.WriteHeader(http.StatusCreated)
  542. w.Write([]byte(fmt.Sprintf("portable plugin %s is created", sd.GetName())))
  543. }
  544. }
  545. func portableHandler(w http.ResponseWriter, r *http.Request) {
  546. defer r.Body.Close()
  547. vars := mux.Vars(r)
  548. name := vars["name"]
  549. m := portable.GetManager()
  550. switch r.Method {
  551. case http.MethodDelete:
  552. err := m.Delete(name)
  553. if err != nil {
  554. handleError(w, err, fmt.Sprintf("delete portable plugin %s error", name), logger)
  555. return
  556. }
  557. w.WriteHeader(http.StatusOK)
  558. result := fmt.Sprintf("portable plugin %s is deleted", name)
  559. w.Write([]byte(result))
  560. case http.MethodGet:
  561. j, ok := m.GetPluginInfo(name)
  562. if !ok {
  563. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe portable plugin %s error", name), logger)
  564. return
  565. }
  566. jsonResponse(j, w, logger)
  567. }
  568. }
  569. func prebuildSourcePlugins(w http.ResponseWriter, _ *http.Request) {
  570. prebuildPluginsHandler(w, plugin.SOURCE)
  571. }
  572. func prebuildSinkPlugins(w http.ResponseWriter, _ *http.Request) {
  573. prebuildPluginsHandler(w, plugin.SINK)
  574. }
  575. func prebuildFuncsPlugins(w http.ResponseWriter, _ *http.Request) {
  576. prebuildPluginsHandler(w, plugin.FUNCTION)
  577. }
  578. func isOffcialDockerImage() bool {
  579. if !strings.EqualFold(os.Getenv("MAINTAINER"), "emqx.io") {
  580. return false
  581. }
  582. return true
  583. }
  584. func prebuildPluginsHandler(w http.ResponseWriter, t plugin.PluginType) {
  585. emsg := "It's strongly recommended to install plugins at official released Debian Docker images. If you choose to proceed to install plugin, please make sure the plugin is already validated in your own build."
  586. if !isOffcialDockerImage() {
  587. handleError(w, fmt.Errorf(emsg), "", logger)
  588. return
  589. } else if runtime.GOOS == "linux" {
  590. osrelease, err := Read()
  591. if err != nil {
  592. logger.Infof("")
  593. return
  594. }
  595. prettyName := strings.ToUpper(osrelease["PRETTY_NAME"])
  596. o := "debian"
  597. if strings.Contains(prettyName, "DEBIAN") {
  598. hosts := conf.Config.Basic.PluginHosts
  599. if err, plugins := fetchPluginList(t, hosts, o, runtime.GOARCH); err != nil {
  600. handleError(w, err, "", logger)
  601. } else {
  602. jsonResponse(plugins, w, logger)
  603. }
  604. } else {
  605. handleError(w, fmt.Errorf(emsg), "", logger)
  606. return
  607. }
  608. } else {
  609. handleError(w, fmt.Errorf(emsg), "", logger)
  610. }
  611. }
  612. var NativeSourcePlugin = []string{"random", "zmq"}
  613. var NativeSinkPlugin = []string{"file", "image", "influx", "redis", "tdengine", "zmq"}
  614. var NativeFunctionPlugin = []string{"accumulateWordCount", "countPlusOne", "echo", "geohash", "image", "labelImage"}
  615. func fetchPluginList(t plugin.PluginType, hosts, os, arch string) (err error, result map[string]string) {
  616. ptype := "sources"
  617. plugins := NativeSourcePlugin
  618. if t == plugin.SINK {
  619. ptype = "sinks"
  620. plugins = NativeSinkPlugin
  621. } else if t == plugin.FUNCTION {
  622. ptype = "functions"
  623. plugins = NativeFunctionPlugin
  624. }
  625. if hosts == "" || ptype == "" || os == "" {
  626. logger.Errorf("Invalid parameter value: hosts %s, ptype %s or os: %s should not be empty.", hosts, ptype, os)
  627. return fmt.Errorf("invalid configruation for plugin host in kuiper.yaml"), nil
  628. }
  629. result = make(map[string]string)
  630. hostsArr := strings.Split(hosts, ",")
  631. for _, host := range hostsArr {
  632. host := strings.Trim(host, " ")
  633. tmp := []string{host, "kuiper-plugins", version, os, ptype}
  634. //The url is similar to http://host:port/kuiper-plugins/0.9.1/debian/sinks/
  635. url := strings.Join(tmp, "/")
  636. for _, p := range plugins {
  637. result[p] = url + "/" + p + "_" + arch + ".zip"
  638. }
  639. }
  640. return
  641. }
  642. //list sink plugin
  643. func sinksMetaHandler(w http.ResponseWriter, r *http.Request) {
  644. defer r.Body.Close()
  645. sinks := meta.GetSinks()
  646. jsonResponse(sinks, w, logger)
  647. return
  648. }
  649. //Get sink metadata when creating rules
  650. func newSinkMetaHandler(w http.ResponseWriter, r *http.Request) {
  651. defer r.Body.Close()
  652. vars := mux.Vars(r)
  653. pluginName := vars["name"]
  654. language := getLanguage(r)
  655. ptrMetadata, err := meta.GetSinkMeta(pluginName, language)
  656. if err != nil {
  657. handleError(w, err, "", logger)
  658. return
  659. }
  660. jsonResponse(ptrMetadata, w, logger)
  661. }
  662. //list functions
  663. func functionsMetaHandler(w http.ResponseWriter, r *http.Request) {
  664. defer r.Body.Close()
  665. sinks := meta.GetFunctions()
  666. jsonResponse(sinks, w, logger)
  667. return
  668. }
  669. //list source plugin
  670. func sourcesMetaHandler(w http.ResponseWriter, r *http.Request) {
  671. defer r.Body.Close()
  672. ret := meta.GetSourcesPlugins()
  673. if nil != ret {
  674. jsonResponse(ret, w, logger)
  675. return
  676. }
  677. }
  678. //list shareMeta
  679. func connectionsMetaHandler(w http.ResponseWriter, r *http.Request) {
  680. defer r.Body.Close()
  681. ret := meta.GetConnectionPlugins()
  682. if nil != ret {
  683. jsonResponse(ret, w, logger)
  684. return
  685. }
  686. }
  687. //Get source metadata when creating stream
  688. func sourceMetaHandler(w http.ResponseWriter, r *http.Request) {
  689. defer r.Body.Close()
  690. vars := mux.Vars(r)
  691. pluginName := vars["name"]
  692. language := getLanguage(r)
  693. ret, err := meta.GetSourceMeta(pluginName, language)
  694. if err != nil {
  695. handleError(w, err, "", logger)
  696. return
  697. }
  698. if nil != ret {
  699. jsonResponse(ret, w, logger)
  700. return
  701. }
  702. }
  703. //Get source metadata when creating stream
  704. func connectionMetaHandler(w http.ResponseWriter, r *http.Request) {
  705. defer r.Body.Close()
  706. vars := mux.Vars(r)
  707. pluginName := vars["name"]
  708. language := getLanguage(r)
  709. ret, err := meta.GetConnectionMeta(pluginName, language)
  710. if err != nil {
  711. handleError(w, err, "", logger)
  712. return
  713. }
  714. if nil != ret {
  715. jsonResponse(ret, w, logger)
  716. return
  717. }
  718. }
  719. //Get source yaml
  720. func sourceConfHandler(w http.ResponseWriter, r *http.Request) {
  721. defer r.Body.Close()
  722. vars := mux.Vars(r)
  723. pluginName := vars["name"]
  724. language := getLanguage(r)
  725. configOperatorKey := fmt.Sprintf(meta.SourceCfgOperatorKeyTemplate, pluginName)
  726. ret, err := meta.GetYamlConf(configOperatorKey, language)
  727. if err != nil {
  728. handleError(w, err, "", logger)
  729. return
  730. } else {
  731. w.Write(ret)
  732. }
  733. }
  734. //Get share yaml
  735. func connectionConfHandler(w http.ResponseWriter, r *http.Request) {
  736. defer r.Body.Close()
  737. vars := mux.Vars(r)
  738. pluginName := vars["name"]
  739. language := getLanguage(r)
  740. configOperatorKey := fmt.Sprintf(meta.ConnectionCfgOperatorKeyTemplate, pluginName)
  741. ret, err := meta.GetYamlConf(configOperatorKey, language)
  742. if err != nil {
  743. handleError(w, err, "", logger)
  744. return
  745. } else {
  746. w.Write(ret)
  747. }
  748. }
  749. //Add del confkey
  750. func sourceConfKeyHandler(w http.ResponseWriter, r *http.Request) {
  751. defer r.Body.Close()
  752. var err error
  753. vars := mux.Vars(r)
  754. pluginName := vars["name"]
  755. confKey := vars["confKey"]
  756. language := getLanguage(r)
  757. switch r.Method {
  758. case http.MethodDelete:
  759. err = meta.DelSourceConfKey(pluginName, confKey, language)
  760. case http.MethodPut:
  761. v, err1 := ioutil.ReadAll(r.Body)
  762. if err1 != nil {
  763. handleError(w, err, "Invalid body", logger)
  764. return
  765. }
  766. err = meta.AddSourceConfKey(pluginName, confKey, language, v)
  767. }
  768. if err != nil {
  769. handleError(w, err, "", logger)
  770. return
  771. }
  772. }
  773. //Add del confkey
  774. func connectionConfKeyHandler(w http.ResponseWriter, r *http.Request) {
  775. defer r.Body.Close()
  776. var err error
  777. vars := mux.Vars(r)
  778. pluginName := vars["name"]
  779. confKey := vars["confKey"]
  780. language := getLanguage(r)
  781. switch r.Method {
  782. case http.MethodDelete:
  783. err = meta.DelConnectionConfKey(pluginName, confKey, language)
  784. case http.MethodPut:
  785. v, err1 := ioutil.ReadAll(r.Body)
  786. if err1 != nil {
  787. handleError(w, err1, "Invalid body", logger)
  788. return
  789. }
  790. err = meta.AddConnectionConfKey(pluginName, confKey, language, v)
  791. }
  792. if err != nil {
  793. handleError(w, err, "", logger)
  794. return
  795. }
  796. }
  797. func getLanguage(r *http.Request) string {
  798. language := r.Header.Get("Content-Language")
  799. if 0 == len(language) {
  800. language = "en_US"
  801. }
  802. return language
  803. }
  804. func servicesHandler(w http.ResponseWriter, r *http.Request) {
  805. defer r.Body.Close()
  806. serviceManager := service.GetManager()
  807. switch r.Method {
  808. case http.MethodGet:
  809. content, err := serviceManager.List()
  810. if err != nil {
  811. handleError(w, err, "service list command error", logger)
  812. return
  813. }
  814. jsonResponse(content, w, logger)
  815. case http.MethodPost:
  816. sd := &service.ServiceCreationRequest{}
  817. err := json.NewDecoder(r.Body).Decode(sd)
  818. // Problems decoding
  819. if err != nil {
  820. handleError(w, err, "Invalid body: Error decoding the %s service request payload", logger)
  821. return
  822. }
  823. err = serviceManager.Create(sd)
  824. if err != nil {
  825. handleError(w, err, "service create command error", logger)
  826. return
  827. }
  828. w.WriteHeader(http.StatusCreated)
  829. w.Write([]byte(fmt.Sprintf("service %s is created", sd.Name)))
  830. }
  831. }
  832. func serviceHandler(w http.ResponseWriter, r *http.Request) {
  833. defer r.Body.Close()
  834. vars := mux.Vars(r)
  835. name := vars["name"]
  836. serviceManager := service.GetManager()
  837. switch r.Method {
  838. case http.MethodDelete:
  839. err := serviceManager.Delete(name)
  840. if err != nil {
  841. handleError(w, err, fmt.Sprintf("delete service %s error", name), logger)
  842. return
  843. }
  844. w.WriteHeader(http.StatusOK)
  845. result := fmt.Sprintf("service %s is deleted", name)
  846. w.Write([]byte(result))
  847. case http.MethodGet:
  848. j, err := serviceManager.Get(name)
  849. if err != nil {
  850. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe service %s error", name), logger)
  851. return
  852. }
  853. jsonResponse(j, w, logger)
  854. case http.MethodPut:
  855. sd := &service.ServiceCreationRequest{}
  856. err := json.NewDecoder(r.Body).Decode(sd)
  857. // Problems decoding
  858. if err != nil {
  859. handleError(w, err, "Invalid body: Error decoding the %s service request payload", logger)
  860. return
  861. }
  862. sd.Name = name
  863. err = serviceManager.Update(sd)
  864. if err != nil {
  865. handleError(w, err, "service update command error", logger)
  866. return
  867. }
  868. w.WriteHeader(http.StatusOK)
  869. w.Write([]byte(fmt.Sprintf("service %s is updated", sd.Name)))
  870. }
  871. }
  872. func serviceFunctionsHandler(w http.ResponseWriter, _ *http.Request) {
  873. serviceManager := service.GetManager()
  874. content, err := serviceManager.ListFunctions()
  875. if err != nil {
  876. handleError(w, err, "service list command error", logger)
  877. return
  878. }
  879. jsonResponse(content, w, logger)
  880. }
  881. func serviceFunctionHandler(w http.ResponseWriter, r *http.Request) {
  882. vars := mux.Vars(r)
  883. name := vars["name"]
  884. serviceManager := service.GetManager()
  885. j, err := serviceManager.GetFunction(name)
  886. if err != nil {
  887. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe function %s error", name), logger)
  888. return
  889. }
  890. jsonResponse(j, w, logger)
  891. }