rest.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356
  1. package server
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/plugins"
  6. "github.com/emqx/kuiper/xstream/api"
  7. "github.com/gorilla/mux"
  8. "io"
  9. "io/ioutil"
  10. "log"
  11. "net/http"
  12. "time"
  13. )
  14. const (
  15. ContentType = "Content-Type"
  16. ContentTypeJSON = "application/json"
  17. )
  18. type statementDescriptor struct {
  19. Sql string `json:"sql,omitempty"`
  20. }
  21. func decodeStatementDescriptor(reader io.ReadCloser) (statementDescriptor, error) {
  22. sd := statementDescriptor{}
  23. err := json.NewDecoder(reader).Decode(&sd)
  24. // Problems decoding
  25. if err != nil {
  26. return sd, fmt.Errorf("Error decoding the statement descriptor: %v", err)
  27. }
  28. return sd, nil
  29. }
  30. // Handle applies the specified error and error concept tot he HTTP response writer
  31. func handleError(w http.ResponseWriter, err error, ec int, logger api.Logger) {
  32. message := err.Error()
  33. logger.Error(message)
  34. http.Error(w, message, ec)
  35. }
  36. func jsonResponse(i interface{}, w http.ResponseWriter, logger api.Logger) {
  37. w.Header().Add(ContentType, ContentTypeJSON)
  38. enc := json.NewEncoder(w)
  39. err := enc.Encode(i)
  40. // Problems encoding
  41. if err != nil {
  42. handleError(w, err, http.StatusBadRequest, logger)
  43. return
  44. }
  45. }
  46. func createRestServer(port int) *http.Server {
  47. r := mux.NewRouter()
  48. r.HandleFunc("/", rootHandler).Methods(http.MethodGet, http.MethodPost)
  49. r.HandleFunc("/streams", streamsHandler).Methods(http.MethodGet, http.MethodPost)
  50. r.HandleFunc("/streams/{name}", streamHandler).Methods(http.MethodGet, http.MethodDelete)
  51. r.HandleFunc("/rules", rulesHandler).Methods(http.MethodGet, http.MethodPost)
  52. r.HandleFunc("/rules/{name}", ruleHandler).Methods(http.MethodDelete, http.MethodGet)
  53. r.HandleFunc("/rules/{name}/status", getStatusRuleHandler).Methods(http.MethodGet)
  54. r.HandleFunc("/rules/{name}/start", startRuleHandler).Methods(http.MethodPost)
  55. r.HandleFunc("/rules/{name}/stop", stopRuleHandler).Methods(http.MethodPost)
  56. r.HandleFunc("/rules/{name}/restart", restartRuleHandler).Methods(http.MethodPost)
  57. r.HandleFunc("/plugins/sources", sourcesHandler).Methods(http.MethodGet, http.MethodPost)
  58. r.HandleFunc("/plugins/sources/{name}", sourceHandler).Methods(http.MethodDelete, http.MethodGet)
  59. r.HandleFunc("/plugins/sinks", sinksHandler).Methods(http.MethodGet, http.MethodPost)
  60. r.HandleFunc("/plugins/sinks/{name}", sinkHandler).Methods(http.MethodDelete, http.MethodGet)
  61. r.HandleFunc("/plugins/functions", functionsHandler).Methods(http.MethodGet, http.MethodPost)
  62. r.HandleFunc("/plugins/functions/{name}", functionHandler).Methods(http.MethodDelete, http.MethodGet)
  63. server := &http.Server{
  64. Addr: fmt.Sprintf("0.0.0.0:%d", port),
  65. // Good practice to set timeouts to avoid Slowloris attacks.
  66. WriteTimeout: time.Second * 15,
  67. ReadTimeout: time.Second * 15,
  68. IdleTimeout: time.Second * 60,
  69. Handler: r, // Pass our instance of gorilla/mux in.
  70. }
  71. server.SetKeepAlivesEnabled(false)
  72. return server
  73. }
  74. //The handler for root
  75. func rootHandler(w http.ResponseWriter, r *http.Request) {
  76. defer r.Body.Close()
  77. switch r.Method {
  78. case http.MethodGet, http.MethodPost:
  79. w.WriteHeader(http.StatusOK)
  80. w.Write([]byte("OK\n"))
  81. }
  82. }
  83. //list or create streams
  84. func streamsHandler(w http.ResponseWriter, r *http.Request) {
  85. defer r.Body.Close()
  86. switch r.Method {
  87. case http.MethodGet:
  88. content, err := streamProcessor.ShowStream()
  89. if err != nil {
  90. handleError(w, fmt.Errorf("Stream command error: %s", err), http.StatusBadRequest, logger)
  91. return
  92. }
  93. jsonResponse(content, w, logger)
  94. case http.MethodPost:
  95. v, err := decodeStatementDescriptor(r.Body)
  96. if err != nil {
  97. handleError(w, fmt.Errorf("Invalid body: %s", err), http.StatusBadRequest, logger)
  98. return
  99. }
  100. content, err := streamProcessor.ExecStreamSql(v.Sql)
  101. if err != nil {
  102. handleError(w, fmt.Errorf("Stream command error: %s", err), http.StatusBadRequest, logger)
  103. return
  104. }
  105. w.WriteHeader(http.StatusCreated)
  106. w.Write([]byte(content))
  107. }
  108. }
  109. //describe or delete a stream
  110. func streamHandler(w http.ResponseWriter, r *http.Request) {
  111. defer r.Body.Close()
  112. vars := mux.Vars(r)
  113. name := vars["name"]
  114. switch r.Method {
  115. case http.MethodGet:
  116. content, err := streamProcessor.DescStream(name)
  117. if err != nil {
  118. handleError(w, fmt.Errorf("describe stream error: %s", err), http.StatusBadRequest, logger)
  119. return
  120. }
  121. jsonResponse(content, w, logger)
  122. case http.MethodDelete:
  123. content, err := streamProcessor.DropStream(name)
  124. if err != nil {
  125. handleError(w, fmt.Errorf("describe stream error: %s", err), http.StatusBadRequest, logger)
  126. return
  127. }
  128. w.WriteHeader(http.StatusOK)
  129. w.Write([]byte(content))
  130. }
  131. }
  132. //list or create rules
  133. func rulesHandler(w http.ResponseWriter, r *http.Request) {
  134. defer r.Body.Close()
  135. switch r.Method {
  136. case http.MethodPost:
  137. body, err := ioutil.ReadAll(r.Body)
  138. if err != nil {
  139. log.Printf("Error reading body: %v", err)
  140. handleError(w, fmt.Errorf("Invalid body: %s", err), http.StatusBadRequest, logger)
  141. return
  142. }
  143. r, err := ruleProcessor.ExecCreate("", string(body))
  144. var result string
  145. if err != nil {
  146. handleError(w, fmt.Errorf("Create rule error : %s.", err), http.StatusBadRequest, logger)
  147. return
  148. } else {
  149. result = fmt.Sprintf("Rule %s was created, please use 'cli getstatus rule $rule_name' command to get rule status.", r.Id)
  150. }
  151. //Start the rule
  152. rs, err := createRuleState(r)
  153. if err != nil {
  154. result = err.Error()
  155. } else {
  156. err = doStartRule(rs)
  157. if err != nil {
  158. result = err.Error()
  159. }
  160. }
  161. w.WriteHeader(http.StatusCreated)
  162. w.Write([]byte(result))
  163. case http.MethodGet:
  164. content, err := ruleProcessor.GetAllRules()
  165. if err != nil {
  166. handleError(w, fmt.Errorf("Show rules error: %s", err), http.StatusBadRequest, logger)
  167. return
  168. }
  169. jsonResponse(content, w, logger)
  170. }
  171. }
  172. //describe or delete a rule
  173. func ruleHandler(w http.ResponseWriter, r *http.Request) {
  174. defer r.Body.Close()
  175. vars := mux.Vars(r)
  176. name := vars["name"]
  177. switch r.Method {
  178. case http.MethodGet:
  179. rule, err := ruleProcessor.GetRuleByName(name)
  180. if err != nil {
  181. handleError(w, fmt.Errorf("describe stream error: %s", err), http.StatusBadRequest, logger)
  182. return
  183. }
  184. jsonResponse(rule, w, logger)
  185. case http.MethodDelete:
  186. stopRule(name)
  187. content, err := ruleProcessor.ExecDrop(name)
  188. if err != nil {
  189. handleError(w, fmt.Errorf("drop rule error: %s", err), http.StatusBadRequest, logger)
  190. return
  191. }
  192. w.WriteHeader(http.StatusOK)
  193. w.Write([]byte(content))
  194. }
  195. }
  196. //get status of a rule
  197. func getStatusRuleHandler(w http.ResponseWriter, r *http.Request) {
  198. defer r.Body.Close()
  199. vars := mux.Vars(r)
  200. name := vars["name"]
  201. content, err := getRuleStatus(name)
  202. if err != nil {
  203. handleError(w, fmt.Errorf("get rule status error: %s", err), http.StatusBadRequest, logger)
  204. return
  205. }
  206. w.WriteHeader(http.StatusOK)
  207. w.Write([]byte(content))
  208. }
  209. //start a rule
  210. func startRuleHandler(w http.ResponseWriter, r *http.Request) {
  211. defer r.Body.Close()
  212. vars := mux.Vars(r)
  213. name := vars["name"]
  214. err := startRule(name)
  215. if err != nil {
  216. handleError(w, fmt.Errorf("start rule error: %s", err), http.StatusBadRequest, logger)
  217. return
  218. }
  219. w.WriteHeader(http.StatusOK)
  220. w.Write([]byte(fmt.Sprintf("Rule %s was started", name)))
  221. }
  222. //stop a rule
  223. func stopRuleHandler(w http.ResponseWriter, r *http.Request) {
  224. defer r.Body.Close()
  225. vars := mux.Vars(r)
  226. name := vars["name"]
  227. result := stopRule(name)
  228. w.WriteHeader(http.StatusOK)
  229. w.Write([]byte(result))
  230. }
  231. //restart a rule
  232. func restartRuleHandler(w http.ResponseWriter, r *http.Request) {
  233. defer r.Body.Close()
  234. vars := mux.Vars(r)
  235. name := vars["name"]
  236. err := restartRule(name)
  237. if err != nil {
  238. handleError(w, fmt.Errorf("restart rule error: %s", err), http.StatusBadRequest, logger)
  239. return
  240. }
  241. w.WriteHeader(http.StatusOK)
  242. w.Write([]byte(fmt.Sprintf("Rule %s was restarted", name)))
  243. }
  244. func pluginsHandler(w http.ResponseWriter, r *http.Request, t plugins.PluginType) {
  245. defer r.Body.Close()
  246. switch r.Method {
  247. case http.MethodGet:
  248. content, err := pluginManager.List(t)
  249. if err != nil {
  250. handleError(w, fmt.Errorf("%s plugins list command error: %s", plugins.PluginTypes[t], err), http.StatusBadRequest, logger)
  251. return
  252. }
  253. jsonResponse(content, w, logger)
  254. case http.MethodPost:
  255. sd := plugins.Plugin{}
  256. err := json.NewDecoder(r.Body).Decode(&sd)
  257. // Problems decoding
  258. if err != nil {
  259. handleError(w, fmt.Errorf("Invalid body: Error decoding the %s plugin json: %v", plugins.PluginTypes[t], err), http.StatusBadRequest, logger)
  260. return
  261. }
  262. err = pluginManager.Register(t, &sd)
  263. if err != nil {
  264. handleError(w, fmt.Errorf("%s plugins create command error: %s", plugins.PluginTypes[t], err), http.StatusBadRequest, logger)
  265. return
  266. }
  267. w.WriteHeader(http.StatusCreated)
  268. w.Write([]byte(fmt.Sprintf("%s plugin %s is created", plugins.PluginTypes[t], sd.Name)))
  269. }
  270. }
  271. func pluginHandler(w http.ResponseWriter, r *http.Request, t plugins.PluginType) {
  272. defer r.Body.Close()
  273. vars := mux.Vars(r)
  274. name := vars["name"]
  275. cb := r.URL.Query().Get("stop")
  276. switch r.Method {
  277. case http.MethodDelete:
  278. r := cb == "1"
  279. err := pluginManager.Delete(t, name, r)
  280. if err != nil {
  281. handleError(w, fmt.Errorf("delete %s plugin %s error: %s", plugins.PluginTypes[t], name, err), http.StatusBadRequest, logger)
  282. return
  283. }
  284. w.WriteHeader(http.StatusOK)
  285. result := fmt.Sprintf("%s plugin %s is deleted", plugins.PluginTypes[t], name)
  286. if r {
  287. result = fmt.Sprintf("%s and Kuiper will be stopped", result)
  288. }
  289. w.Write([]byte(result))
  290. case http.MethodGet:
  291. j, ok := pluginManager.Get(t, name)
  292. if !ok {
  293. handleError(w, fmt.Errorf("describe %s plugin %s error: not found", plugins.PluginTypes[t], name), http.StatusBadRequest, logger)
  294. return
  295. }
  296. jsonResponse(j, w, logger)
  297. }
  298. }
  299. //list or create source plugin
  300. func sourcesHandler(w http.ResponseWriter, r *http.Request) {
  301. pluginsHandler(w, r, plugins.SOURCE)
  302. }
  303. //delete a source plugin
  304. func sourceHandler(w http.ResponseWriter, r *http.Request) {
  305. pluginHandler(w, r, plugins.SOURCE)
  306. }
  307. //list or create sink plugin
  308. func sinksHandler(w http.ResponseWriter, r *http.Request) {
  309. pluginsHandler(w, r, plugins.SINK)
  310. }
  311. //delete a sink plugin
  312. func sinkHandler(w http.ResponseWriter, r *http.Request) {
  313. pluginHandler(w, r, plugins.SINK)
  314. }
  315. //list or create function plugin
  316. func functionsHandler(w http.ResponseWriter, r *http.Request) {
  317. pluginsHandler(w, r, plugins.FUNCTION)
  318. }
  319. //delete a function plugin
  320. func functionHandler(w http.ResponseWriter, r *http.Request) {
  321. pluginHandler(w, r, plugins.FUNCTION)
  322. }