rest.go 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package server
  15. import (
  16. "bytes"
  17. "encoding/json"
  18. "errors"
  19. "fmt"
  20. "io"
  21. "net/http"
  22. "os"
  23. "path/filepath"
  24. "reflect"
  25. "runtime"
  26. "strconv"
  27. "time"
  28. "github.com/gorilla/handlers"
  29. "github.com/gorilla/mux"
  30. "golang.org/x/text/cases"
  31. "golang.org/x/text/language"
  32. "github.com/lf-edge/ekuiper/internal/conf"
  33. "github.com/lf-edge/ekuiper/internal/meta"
  34. "github.com/lf-edge/ekuiper/internal/pkg/httpx"
  35. "github.com/lf-edge/ekuiper/internal/pkg/store"
  36. "github.com/lf-edge/ekuiper/internal/processor"
  37. "github.com/lf-edge/ekuiper/internal/server/middleware"
  38. "github.com/lf-edge/ekuiper/pkg/api"
  39. "github.com/lf-edge/ekuiper/pkg/ast"
  40. "github.com/lf-edge/ekuiper/pkg/cast"
  41. "github.com/lf-edge/ekuiper/pkg/errorx"
  42. "github.com/lf-edge/ekuiper/pkg/infra"
  43. "github.com/lf-edge/ekuiper/pkg/kv"
  44. )
  45. const (
  46. ContentType = "Content-Type"
  47. ContentTypeJSON = "application/json"
  48. )
  49. var (
  50. uploadDir string
  51. uploadsDb kv.KeyValue
  52. uploadsStatusDb kv.KeyValue
  53. )
  54. type statementDescriptor struct {
  55. Sql string `json:"sql,omitempty"`
  56. }
  57. func decodeStatementDescriptor(reader io.ReadCloser) (statementDescriptor, error) {
  58. sd := statementDescriptor{}
  59. err := json.NewDecoder(reader).Decode(&sd)
  60. // Problems decoding
  61. if err != nil {
  62. return sd, fmt.Errorf("Error decoding the statement descriptor: %v", err)
  63. }
  64. return sd, nil
  65. }
  66. // Handle applies the specified error and error concept to the HTTP response writer
  67. func handleError(w http.ResponseWriter, err error, prefix string, logger api.Logger) {
  68. message := prefix
  69. if message != "" {
  70. message += ": "
  71. }
  72. message += err.Error()
  73. logger.Error(message)
  74. var ec int
  75. switch e := err.(type) {
  76. case *errorx.Error:
  77. switch e.Code() {
  78. case errorx.NOT_FOUND:
  79. ec = http.StatusNotFound
  80. default:
  81. ec = http.StatusBadRequest
  82. }
  83. default:
  84. ec = http.StatusBadRequest
  85. }
  86. http.Error(w, message, ec)
  87. }
  88. func jsonResponse(i interface{}, w http.ResponseWriter, logger api.Logger) {
  89. w.Header().Add(ContentType, ContentTypeJSON)
  90. jsonByte, err := json.Marshal(i)
  91. if err != nil {
  92. handleError(w, err, "", logger)
  93. }
  94. w.Header().Add("Content-Length", strconv.Itoa(len(jsonByte)))
  95. _, err = w.Write(jsonByte)
  96. // Problems encoding
  97. if err != nil {
  98. handleError(w, err, "", logger)
  99. }
  100. }
  101. func jsonByteResponse(buffer bytes.Buffer, w http.ResponseWriter, logger api.Logger) {
  102. w.Header().Add(ContentType, ContentTypeJSON)
  103. w.Header().Add("Content-Length", strconv.Itoa(buffer.Len()))
  104. _, err := w.Write(buffer.Bytes())
  105. // Problems encoding
  106. if err != nil {
  107. handleError(w, err, "", logger)
  108. }
  109. }
  110. func createRestServer(ip string, port int, needToken bool) *http.Server {
  111. dataDir, err := conf.GetDataLoc()
  112. if err != nil {
  113. panic(err)
  114. }
  115. uploadDir = filepath.Join(dataDir, "uploads")
  116. uploadsDb, err = store.GetKV("uploads")
  117. if err != nil {
  118. panic(err)
  119. }
  120. uploadsStatusDb, err = store.GetKV("uploadsStatusDb")
  121. if err != nil {
  122. panic(err)
  123. }
  124. r := mux.NewRouter()
  125. r.HandleFunc("/", rootHandler).Methods(http.MethodGet, http.MethodPost)
  126. r.HandleFunc("/ping", pingHandler).Methods(http.MethodGet)
  127. r.HandleFunc("/streams", streamsHandler).Methods(http.MethodGet, http.MethodPost)
  128. r.HandleFunc("/streams/{name}", streamHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
  129. r.HandleFunc("/streams/{name}/schema", streamSchemaHandler).Methods(http.MethodGet)
  130. r.HandleFunc("/tables", tablesHandler).Methods(http.MethodGet, http.MethodPost)
  131. r.HandleFunc("/tables/{name}", tableHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
  132. r.HandleFunc("/tables/{name}/schema", tableSchemaHandler).Methods(http.MethodGet)
  133. r.HandleFunc("/rules", rulesHandler).Methods(http.MethodGet, http.MethodPost)
  134. r.HandleFunc("/rules/{name}", ruleHandler).Methods(http.MethodDelete, http.MethodGet, http.MethodPut)
  135. r.HandleFunc("/rules/{name}/status", getStatusRuleHandler).Methods(http.MethodGet)
  136. r.HandleFunc("/rules/{name}/start", startRuleHandler).Methods(http.MethodPost)
  137. r.HandleFunc("/rules/{name}/stop", stopRuleHandler).Methods(http.MethodPost)
  138. r.HandleFunc("/rules/{name}/restart", restartRuleHandler).Methods(http.MethodPost)
  139. r.HandleFunc("/rules/{name}/topo", getTopoRuleHandler).Methods(http.MethodGet)
  140. r.HandleFunc("/ruleset/export", exportHandler).Methods(http.MethodPost)
  141. r.HandleFunc("/ruleset/import", importHandler).Methods(http.MethodPost)
  142. r.HandleFunc("/config/uploads", fileUploadHandler).Methods(http.MethodPost, http.MethodGet)
  143. r.HandleFunc("/config/uploads/{name}", fileDeleteHandler).Methods(http.MethodDelete)
  144. r.HandleFunc("/data/export", configurationExportHandler).Methods(http.MethodGet, http.MethodPost)
  145. r.HandleFunc("/data/import", configurationImportHandler).Methods(http.MethodPost)
  146. r.HandleFunc("/data/import/status", configurationStatusHandler).Methods(http.MethodGet)
  147. // Register extended routes
  148. for k, v := range components {
  149. logger.Infof("register rest endpoint for component %s", k)
  150. v.rest(r)
  151. }
  152. if needToken {
  153. r.Use(middleware.Auth)
  154. }
  155. server := &http.Server{
  156. Addr: cast.JoinHostPortInt(ip, port),
  157. // Good practice to set timeouts to avoid Slowloris attacks.
  158. WriteTimeout: time.Second * 60 * 5,
  159. ReadTimeout: time.Second * 60 * 5,
  160. IdleTimeout: time.Second * 60,
  161. Handler: handlers.CORS(handlers.AllowedHeaders([]string{"Accept", "Accept-Language", "Content-Type", "Content-Language", "Origin", "Authorization"}), handlers.AllowedMethods([]string{"POST", "GET", "PUT", "DELETE", "HEAD"}))(r),
  162. }
  163. server.SetKeepAlivesEnabled(false)
  164. return server
  165. }
  166. type fileContent struct {
  167. Name string `json:"name"`
  168. Content string `json:"content"`
  169. FilePath string `json:"file"`
  170. }
  171. func (f *fileContent) InstallScript() string {
  172. marshal, err := json.Marshal(f)
  173. if err != nil {
  174. return ""
  175. }
  176. return string(marshal)
  177. }
  178. func (f *fileContent) Validate() error {
  179. if f.Content == "" && f.FilePath == "" {
  180. return fmt.Errorf("invalid body: content or FilePath is required")
  181. }
  182. if f.Name == "" {
  183. return fmt.Errorf("invalid body: name is required")
  184. }
  185. return nil
  186. }
  187. func upload(file *fileContent) error {
  188. err := getFile(file)
  189. if err != nil {
  190. _ = uploadsStatusDb.Set(file.Name, err.Error())
  191. return err
  192. }
  193. return uploadsDb.Set(file.Name, file.InstallScript())
  194. }
  195. func getFile(file *fileContent) error {
  196. filePath := filepath.Join(uploadDir, file.Name)
  197. dst, err := os.Create(filePath)
  198. if err != nil {
  199. return err
  200. }
  201. defer dst.Close()
  202. if file.FilePath != "" {
  203. err := httpx.DownloadFile(filePath, file.FilePath)
  204. if err != nil {
  205. return err
  206. }
  207. } else {
  208. _, err := dst.Write([]byte(file.Content))
  209. if err != nil {
  210. return err
  211. }
  212. }
  213. return nil
  214. }
  215. func fileUploadHandler(w http.ResponseWriter, r *http.Request) {
  216. switch r.Method {
  217. // Upload or overwrite a file
  218. case http.MethodPost:
  219. switch r.Header.Get("Content-Type") {
  220. case "application/json":
  221. fc := &fileContent{}
  222. defer r.Body.Close()
  223. err := json.NewDecoder(r.Body).Decode(fc)
  224. if err != nil {
  225. handleError(w, err, "Invalid body: Error decoding file json", logger)
  226. return
  227. }
  228. err = fc.Validate()
  229. if err != nil {
  230. handleError(w, err, "Invalid body: missing necessary field", logger)
  231. return
  232. }
  233. filePath := filepath.Join(uploadDir, fc.Name)
  234. err = upload(fc)
  235. if err != nil {
  236. handleError(w, err, "Upload error: getFile has error", logger)
  237. return
  238. }
  239. w.WriteHeader(http.StatusCreated)
  240. w.Write([]byte(filePath))
  241. default:
  242. // Maximum upload of 1 GB files
  243. err := r.ParseMultipartForm(1024 << 20)
  244. if err != nil {
  245. handleError(w, err, "Error parse the multi part form", logger)
  246. return
  247. }
  248. // Get handler for filename, size and headers
  249. file, handler, err := r.FormFile("uploadFile")
  250. if err != nil {
  251. handleError(w, err, "Error Retrieving the File", logger)
  252. return
  253. }
  254. defer file.Close()
  255. // Create file
  256. filePath := filepath.Join(uploadDir, handler.Filename)
  257. dst, err := os.Create(filePath)
  258. defer dst.Close()
  259. if err != nil {
  260. handleError(w, err, "Error creating the file", logger)
  261. return
  262. }
  263. // Copy the uploaded file to the created file on the filesystem
  264. if _, err := io.Copy(dst, file); err != nil {
  265. handleError(w, err, "Error writing the file", logger)
  266. return
  267. }
  268. w.WriteHeader(http.StatusCreated)
  269. w.Write([]byte(filePath))
  270. }
  271. case http.MethodGet:
  272. // Get the list of files in the upload directory
  273. files, err := os.ReadDir(uploadDir)
  274. if err != nil {
  275. handleError(w, err, "Error reading the file upload dir", logger)
  276. return
  277. }
  278. fileNames := make([]string, len(files))
  279. for i, f := range files {
  280. fileNames[i] = filepath.Join(uploadDir, f.Name())
  281. }
  282. jsonResponse(fileNames, w, logger)
  283. }
  284. }
  285. func fileDeleteHandler(w http.ResponseWriter, r *http.Request) {
  286. vars := mux.Vars(r)
  287. name := vars["name"]
  288. filePath := filepath.Join(uploadDir, name)
  289. e := os.Remove(filePath)
  290. if e != nil {
  291. handleError(w, e, "Error deleting the file", logger)
  292. return
  293. }
  294. _ = uploadsDb.Delete(name)
  295. _ = uploadsStatusDb.Delete(name)
  296. w.WriteHeader(http.StatusOK)
  297. w.Write([]byte("ok"))
  298. }
  299. type information struct {
  300. Version string `json:"version"`
  301. Os string `json:"os"`
  302. Arch string `json:"arch"`
  303. UpTimeSeconds int64 `json:"upTimeSeconds"`
  304. }
  305. // The handler for root
  306. func rootHandler(w http.ResponseWriter, r *http.Request) {
  307. defer r.Body.Close()
  308. switch r.Method {
  309. case http.MethodGet, http.MethodPost:
  310. w.WriteHeader(http.StatusOK)
  311. info := new(information)
  312. info.Version = version
  313. info.UpTimeSeconds = time.Now().Unix() - startTimeStamp
  314. info.Os = runtime.GOOS
  315. info.Arch = runtime.GOARCH
  316. byteInfo, _ := json.Marshal(info)
  317. w.Write(byteInfo)
  318. }
  319. }
  320. func pingHandler(w http.ResponseWriter, _ *http.Request) {
  321. w.WriteHeader(http.StatusOK)
  322. }
  323. func sourcesManageHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
  324. defer r.Body.Close()
  325. switch r.Method {
  326. case http.MethodGet:
  327. var (
  328. content []string
  329. err error
  330. kind string
  331. )
  332. if st == ast.TypeTable {
  333. kind = r.URL.Query().Get("kind")
  334. if kind == "scan" {
  335. kind = ast.StreamKindScan
  336. } else if kind == "lookup" {
  337. kind = ast.StreamKindLookup
  338. } else {
  339. kind = ""
  340. }
  341. }
  342. if kind != "" {
  343. content, err = streamProcessor.ShowTable(kind)
  344. } else {
  345. content, err = streamProcessor.ShowStream(st)
  346. }
  347. if err != nil {
  348. handleError(w, err, fmt.Sprintf("%s command error", cases.Title(language.Und).String(ast.StreamTypeMap[st])), logger)
  349. return
  350. }
  351. jsonResponse(content, w, logger)
  352. case http.MethodPost:
  353. v, err := decodeStatementDescriptor(r.Body)
  354. if err != nil {
  355. handleError(w, err, "Invalid body", logger)
  356. return
  357. }
  358. content, err := streamProcessor.ExecStreamSql(v.Sql)
  359. if err != nil {
  360. handleError(w, err, fmt.Sprintf("%s command error", cases.Title(language.Und).String(ast.StreamTypeMap[st])), logger)
  361. return
  362. }
  363. w.WriteHeader(http.StatusCreated)
  364. w.Write([]byte(content))
  365. }
  366. }
  367. func sourceManageHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
  368. defer r.Body.Close()
  369. vars := mux.Vars(r)
  370. name := vars["name"]
  371. switch r.Method {
  372. case http.MethodGet:
  373. content, err := streamProcessor.DescStream(name, st)
  374. if err != nil {
  375. handleError(w, err, fmt.Sprintf("describe %s error", ast.StreamTypeMap[st]), logger)
  376. return
  377. }
  378. jsonResponse(content, w, logger)
  379. case http.MethodDelete:
  380. content, err := streamProcessor.DropStream(name, st)
  381. if err != nil {
  382. handleError(w, err, fmt.Sprintf("delete %s error", ast.StreamTypeMap[st]), logger)
  383. return
  384. }
  385. w.WriteHeader(http.StatusOK)
  386. w.Write([]byte(content))
  387. case http.MethodPut:
  388. v, err := decodeStatementDescriptor(r.Body)
  389. if err != nil {
  390. handleError(w, err, "Invalid body", logger)
  391. return
  392. }
  393. content, err := streamProcessor.ExecReplaceStream(name, v.Sql, st)
  394. if err != nil {
  395. handleError(w, err, fmt.Sprintf("%s command error", cases.Title(language.Und).String(ast.StreamTypeMap[st])), logger)
  396. return
  397. }
  398. w.WriteHeader(http.StatusOK)
  399. w.Write([]byte(content))
  400. }
  401. }
  402. // list or create streams
  403. func streamsHandler(w http.ResponseWriter, r *http.Request) {
  404. sourcesManageHandler(w, r, ast.TypeStream)
  405. }
  406. // describe or delete a stream
  407. func streamHandler(w http.ResponseWriter, r *http.Request) {
  408. sourceManageHandler(w, r, ast.TypeStream)
  409. }
  410. // list or create tables
  411. func tablesHandler(w http.ResponseWriter, r *http.Request) {
  412. sourcesManageHandler(w, r, ast.TypeTable)
  413. }
  414. func tableHandler(w http.ResponseWriter, r *http.Request) {
  415. sourceManageHandler(w, r, ast.TypeTable)
  416. }
  417. func streamSchemaHandler(w http.ResponseWriter, r *http.Request) {
  418. sourceSchemaHandler(w, r, ast.TypeStream)
  419. }
  420. func tableSchemaHandler(w http.ResponseWriter, r *http.Request) {
  421. sourceSchemaHandler(w, r, ast.TypeTable)
  422. }
  423. func sourceSchemaHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
  424. vars := mux.Vars(r)
  425. name := vars["name"]
  426. content, err := streamProcessor.GetInferredJsonSchema(name, st)
  427. if err != nil {
  428. handleError(w, err, fmt.Sprintf("get schema of %s error", ast.StreamTypeMap[st]), logger)
  429. return
  430. }
  431. jsonResponse(content, w, logger)
  432. }
  433. // list or create rules
  434. func rulesHandler(w http.ResponseWriter, r *http.Request) {
  435. defer r.Body.Close()
  436. switch r.Method {
  437. case http.MethodPost:
  438. body, err := io.ReadAll(r.Body)
  439. if err != nil {
  440. handleError(w, err, "Invalid body", logger)
  441. return
  442. }
  443. id, err := createRule("", string(body))
  444. if err != nil {
  445. handleError(w, err, "", logger)
  446. return
  447. }
  448. result := fmt.Sprintf("Rule %s was created successfully.", id)
  449. w.WriteHeader(http.StatusCreated)
  450. w.Write([]byte(result))
  451. case http.MethodGet:
  452. content, err := getAllRulesWithStatus()
  453. if err != nil {
  454. handleError(w, err, "Show rules error", logger)
  455. return
  456. }
  457. jsonResponse(content, w, logger)
  458. }
  459. }
  460. // describe or delete a rule
  461. func ruleHandler(w http.ResponseWriter, r *http.Request) {
  462. defer r.Body.Close()
  463. vars := mux.Vars(r)
  464. name := vars["name"]
  465. switch r.Method {
  466. case http.MethodGet:
  467. rule, err := ruleProcessor.GetRuleJson(name)
  468. if err != nil {
  469. handleError(w, err, "Describe rule error", logger)
  470. return
  471. }
  472. w.Header().Add(ContentType, ContentTypeJSON)
  473. w.Write([]byte(rule))
  474. case http.MethodDelete:
  475. deleteRule(name)
  476. content, err := ruleProcessor.ExecDrop(name)
  477. if err != nil {
  478. handleError(w, err, "Delete rule error", logger)
  479. return
  480. }
  481. w.WriteHeader(http.StatusOK)
  482. w.Write([]byte(content))
  483. case http.MethodPut:
  484. _, err := ruleProcessor.GetRuleById(name)
  485. if err != nil {
  486. handleError(w, err, "Rule not found", logger)
  487. return
  488. }
  489. body, err := io.ReadAll(r.Body)
  490. if err != nil {
  491. handleError(w, err, "Invalid body", logger)
  492. return
  493. }
  494. err = updateRule(name, string(body))
  495. if err != nil {
  496. handleError(w, err, "Update rule error", logger)
  497. return
  498. }
  499. // Update to db after validation
  500. _, err = ruleProcessor.ExecUpdate(name, string(body))
  501. var result string
  502. if err != nil {
  503. handleError(w, err, "Update rule error, suggest to delete it and recreate", logger)
  504. return
  505. } else {
  506. result = fmt.Sprintf("Rule %s was updated successfully.", name)
  507. }
  508. w.WriteHeader(http.StatusOK)
  509. w.Write([]byte(result))
  510. }
  511. }
  512. // get status of a rule
  513. func getStatusRuleHandler(w http.ResponseWriter, r *http.Request) {
  514. defer r.Body.Close()
  515. vars := mux.Vars(r)
  516. name := vars["name"]
  517. content, err := getRuleStatus(name)
  518. if err != nil {
  519. handleError(w, err, "get rule status error", logger)
  520. return
  521. }
  522. w.Header().Set(ContentType, ContentTypeJSON)
  523. w.WriteHeader(http.StatusOK)
  524. w.Write([]byte(content))
  525. }
  526. // start a rule
  527. func startRuleHandler(w http.ResponseWriter, r *http.Request) {
  528. defer r.Body.Close()
  529. vars := mux.Vars(r)
  530. name := vars["name"]
  531. err := startRule(name)
  532. if err != nil {
  533. handleError(w, err, "start rule error", logger)
  534. return
  535. }
  536. w.WriteHeader(http.StatusOK)
  537. w.Write([]byte(fmt.Sprintf("Rule %s was started", name)))
  538. }
  539. // stop a rule
  540. func stopRuleHandler(w http.ResponseWriter, r *http.Request) {
  541. defer r.Body.Close()
  542. vars := mux.Vars(r)
  543. name := vars["name"]
  544. result := stopRule(name)
  545. w.WriteHeader(http.StatusOK)
  546. w.Write([]byte(result))
  547. }
  548. // restart a rule
  549. func restartRuleHandler(w http.ResponseWriter, r *http.Request) {
  550. defer r.Body.Close()
  551. vars := mux.Vars(r)
  552. name := vars["name"]
  553. err := restartRule(name)
  554. if err != nil {
  555. handleError(w, err, "restart rule error", logger)
  556. return
  557. }
  558. w.WriteHeader(http.StatusOK)
  559. w.Write([]byte(fmt.Sprintf("Rule %s was restarted", name)))
  560. }
  561. // get topo of a rule
  562. func getTopoRuleHandler(w http.ResponseWriter, r *http.Request) {
  563. defer r.Body.Close()
  564. vars := mux.Vars(r)
  565. name := vars["name"]
  566. content, err := getRuleTopo(name)
  567. if err != nil {
  568. handleError(w, err, "get rule topo error", logger)
  569. return
  570. }
  571. w.Header().Set(ContentType, ContentTypeJSON)
  572. w.Write([]byte(content))
  573. }
  574. type rulesetInfo struct {
  575. Content string `json:"content"`
  576. FilePath string `json:"file"`
  577. }
  578. func importHandler(w http.ResponseWriter, r *http.Request) {
  579. rsi := &rulesetInfo{}
  580. err := json.NewDecoder(r.Body).Decode(rsi)
  581. if err != nil {
  582. handleError(w, err, "Invalid body: Error decoding json", logger)
  583. return
  584. }
  585. if rsi.Content != "" && rsi.FilePath != "" {
  586. handleError(w, errors.New("bad request"), "Invalid body: Cannot specify both content and file", logger)
  587. return
  588. } else if rsi.Content == "" && rsi.FilePath == "" {
  589. handleError(w, errors.New("bad request"), "Invalid body: must specify content or file", logger)
  590. return
  591. }
  592. content := []byte(rsi.Content)
  593. if rsi.FilePath != "" {
  594. reader, err := httpx.ReadFile(rsi.FilePath)
  595. if err != nil {
  596. handleError(w, err, "Fail to read file", logger)
  597. return
  598. }
  599. defer reader.Close()
  600. buf := new(bytes.Buffer)
  601. _, err = io.Copy(buf, reader)
  602. if err != nil {
  603. handleError(w, err, "fail to convert file", logger)
  604. return
  605. }
  606. content = buf.Bytes()
  607. }
  608. rules, counts, err := rulesetProcessor.Import(content)
  609. if err != nil {
  610. handleError(w, nil, "Import ruleset error", logger)
  611. return
  612. }
  613. infra.SafeRun(func() error {
  614. for _, name := range rules {
  615. rul, ee := ruleProcessor.GetRuleById(name)
  616. if ee != nil {
  617. logger.Error(ee)
  618. continue
  619. }
  620. reply := recoverRule(rul)
  621. if reply != "" {
  622. logger.Error(reply)
  623. }
  624. }
  625. return nil
  626. })
  627. w.Write([]byte(fmt.Sprintf("imported %d streams, %d tables and %d rules", counts[0], counts[1], counts[2])))
  628. }
  629. func exportHandler(w http.ResponseWriter, r *http.Request) {
  630. const name = "ekuiper_export.json"
  631. exported, _, err := rulesetProcessor.Export()
  632. if err != nil {
  633. handleError(w, err, "export error", logger)
  634. return
  635. }
  636. w.Header().Set("Content-Type", "application/octet-stream")
  637. w.Header().Add("Content-Disposition", "Attachment")
  638. http.ServeContent(w, r, name, time.Now(), exported)
  639. }
  640. type Configuration struct {
  641. Streams map[string]string `json:"streams"`
  642. Tables map[string]string `json:"tables"`
  643. Rules map[string]string `json:"rules"`
  644. NativePlugins map[string]string `json:"nativePlugins"`
  645. PortablePlugins map[string]string `json:"portablePlugins"`
  646. SourceConfig map[string]string `json:"sourceConfig"`
  647. SinkConfig map[string]string `json:"sinkConfig"`
  648. ConnectionConfig map[string]string `json:"connectionConfig"`
  649. Service map[string]string `json:"Service"`
  650. Schema map[string]string `json:"Schema"`
  651. Uploads map[string]string `json:"uploads"`
  652. }
  653. func configurationExport() ([]byte, error) {
  654. conf := &Configuration{
  655. Streams: make(map[string]string),
  656. Tables: make(map[string]string),
  657. Rules: make(map[string]string),
  658. NativePlugins: make(map[string]string),
  659. PortablePlugins: make(map[string]string),
  660. SourceConfig: make(map[string]string),
  661. SinkConfig: make(map[string]string),
  662. ConnectionConfig: make(map[string]string),
  663. Service: make(map[string]string),
  664. Schema: make(map[string]string),
  665. Uploads: make(map[string]string),
  666. }
  667. ruleSet := rulesetProcessor.ExportRuleSet()
  668. if ruleSet != nil {
  669. conf.Streams = ruleSet.Streams
  670. conf.Tables = ruleSet.Tables
  671. conf.Rules = ruleSet.Rules
  672. }
  673. conf.NativePlugins = pluginExport()
  674. conf.PortablePlugins = portablePluginExport()
  675. conf.Service = serviceExport()
  676. conf.Schema = schemaExport()
  677. conf.Uploads = uploadsExport()
  678. yamlCfg := meta.GetConfigurations()
  679. conf.SourceConfig = yamlCfg.Sources
  680. conf.SinkConfig = yamlCfg.Sinks
  681. conf.ConnectionConfig = yamlCfg.Connections
  682. return json.Marshal(conf)
  683. }
  684. func configurationExportHandler(w http.ResponseWriter, r *http.Request) {
  685. var jsonBytes []byte
  686. const name = "ekuiper_export.json"
  687. switch r.Method {
  688. case http.MethodGet:
  689. jsonBytes, _ = configurationExport()
  690. case http.MethodPost:
  691. var rules []string
  692. _ = json.NewDecoder(r.Body).Decode(&rules)
  693. jsonBytes, _ = ruleMigrationProcessor.ConfigurationPartialExport(rules)
  694. }
  695. w.Header().Set("Content-Type", "application/octet-stream")
  696. w.Header().Add("Content-Disposition", "Attachment")
  697. http.ServeContent(w, r, name, time.Now(), bytes.NewReader(jsonBytes))
  698. }
  699. func configurationReset() {
  700. _ = resetAllRules()
  701. _ = resetAllStreams()
  702. pluginReset()
  703. portablePluginsReset()
  704. serviceReset()
  705. schemaReset()
  706. meta.ResetConfigs()
  707. uploadsReset()
  708. }
  709. type ImportConfigurationStatus struct {
  710. ErrorMsg string
  711. ConfigResponse Configuration
  712. }
  713. func configurationImport(data []byte, reboot bool) ImportConfigurationStatus {
  714. conf := &Configuration{
  715. Streams: make(map[string]string),
  716. Tables: make(map[string]string),
  717. Rules: make(map[string]string),
  718. NativePlugins: make(map[string]string),
  719. PortablePlugins: make(map[string]string),
  720. SourceConfig: make(map[string]string),
  721. SinkConfig: make(map[string]string),
  722. ConnectionConfig: make(map[string]string),
  723. Service: make(map[string]string),
  724. Schema: make(map[string]string),
  725. Uploads: make(map[string]string),
  726. }
  727. importStatus := ImportConfigurationStatus{}
  728. configResponse := Configuration{
  729. Streams: make(map[string]string),
  730. Tables: make(map[string]string),
  731. Rules: make(map[string]string),
  732. NativePlugins: make(map[string]string),
  733. PortablePlugins: make(map[string]string),
  734. SourceConfig: make(map[string]string),
  735. SinkConfig: make(map[string]string),
  736. ConnectionConfig: make(map[string]string),
  737. Service: make(map[string]string),
  738. Schema: make(map[string]string),
  739. Uploads: make(map[string]string),
  740. }
  741. ResponseNil := Configuration{
  742. Streams: make(map[string]string),
  743. Tables: make(map[string]string),
  744. Rules: make(map[string]string),
  745. NativePlugins: make(map[string]string),
  746. PortablePlugins: make(map[string]string),
  747. SourceConfig: make(map[string]string),
  748. SinkConfig: make(map[string]string),
  749. ConnectionConfig: make(map[string]string),
  750. Service: make(map[string]string),
  751. Schema: make(map[string]string),
  752. Uploads: make(map[string]string),
  753. }
  754. err := json.Unmarshal(data, conf)
  755. if err != nil {
  756. importStatus.ErrorMsg = fmt.Errorf("configuration unmarshal with error %v", err).Error()
  757. return importStatus
  758. }
  759. configResponse.Uploads = uploadsImport(conf.Uploads)
  760. if reboot {
  761. err = pluginImport(conf.NativePlugins)
  762. if err != nil {
  763. importStatus.ErrorMsg = fmt.Errorf("pluginImport NativePlugins import error %v", err).Error()
  764. return importStatus
  765. }
  766. err = schemaImport(conf.Schema)
  767. if err != nil {
  768. importStatus.ErrorMsg = fmt.Errorf("schemaImport Schema import error %v", err).Error()
  769. return importStatus
  770. }
  771. }
  772. configResponse.PortablePlugins = portablePluginImport(conf.PortablePlugins)
  773. configResponse.Service = serviceImport(conf.Service)
  774. yamlCfgSet := meta.YamlConfigurationSet{
  775. Sources: conf.SourceConfig,
  776. Sinks: conf.SinkConfig,
  777. Connections: conf.ConnectionConfig,
  778. }
  779. confRsp := meta.LoadConfigurations(yamlCfgSet)
  780. configResponse.SourceConfig = confRsp.Sources
  781. configResponse.SinkConfig = confRsp.Sinks
  782. configResponse.ConnectionConfig = confRsp.Connections
  783. ruleSet := processor.Ruleset{
  784. Streams: conf.Streams,
  785. Tables: conf.Tables,
  786. Rules: conf.Rules,
  787. }
  788. result := rulesetProcessor.ImportRuleSet(ruleSet)
  789. configResponse.Streams = result.Streams
  790. configResponse.Tables = result.Tables
  791. configResponse.Rules = result.Rules
  792. if !reboot {
  793. infra.SafeRun(func() error {
  794. for name := range ruleSet.Rules {
  795. rul, ee := ruleProcessor.GetRuleById(name)
  796. if ee != nil {
  797. logger.Error(ee)
  798. continue
  799. }
  800. reply := recoverRule(rul)
  801. if reply != "" {
  802. logger.Error(reply)
  803. }
  804. }
  805. return nil
  806. })
  807. }
  808. if reflect.DeepEqual(ResponseNil, configResponse) {
  809. importStatus.ConfigResponse = ResponseNil
  810. } else {
  811. importStatus.ErrorMsg = "process error"
  812. importStatus.ConfigResponse = configResponse
  813. }
  814. return importStatus
  815. }
  816. func configurationPartialImport(data []byte) ImportConfigurationStatus {
  817. conf := &Configuration{
  818. Streams: make(map[string]string),
  819. Tables: make(map[string]string),
  820. Rules: make(map[string]string),
  821. NativePlugins: make(map[string]string),
  822. PortablePlugins: make(map[string]string),
  823. SourceConfig: make(map[string]string),
  824. SinkConfig: make(map[string]string),
  825. ConnectionConfig: make(map[string]string),
  826. Service: make(map[string]string),
  827. Schema: make(map[string]string),
  828. Uploads: make(map[string]string),
  829. }
  830. importStatus := ImportConfigurationStatus{}
  831. configResponse := Configuration{
  832. Streams: make(map[string]string),
  833. Tables: make(map[string]string),
  834. Rules: make(map[string]string),
  835. NativePlugins: make(map[string]string),
  836. PortablePlugins: make(map[string]string),
  837. SourceConfig: make(map[string]string),
  838. SinkConfig: make(map[string]string),
  839. ConnectionConfig: make(map[string]string),
  840. Service: make(map[string]string),
  841. Schema: make(map[string]string),
  842. Uploads: make(map[string]string),
  843. }
  844. ResponseNil := Configuration{
  845. Streams: make(map[string]string),
  846. Tables: make(map[string]string),
  847. Rules: make(map[string]string),
  848. NativePlugins: make(map[string]string),
  849. PortablePlugins: make(map[string]string),
  850. SourceConfig: make(map[string]string),
  851. SinkConfig: make(map[string]string),
  852. ConnectionConfig: make(map[string]string),
  853. Service: make(map[string]string),
  854. Schema: make(map[string]string),
  855. Uploads: make(map[string]string),
  856. }
  857. err := json.Unmarshal(data, conf)
  858. if err != nil {
  859. importStatus.ErrorMsg = fmt.Errorf("configuration unmarshal with error %v", err).Error()
  860. return importStatus
  861. }
  862. yamlCfgSet := meta.YamlConfigurationSet{
  863. Sources: conf.SourceConfig,
  864. Sinks: conf.SinkConfig,
  865. Connections: conf.ConnectionConfig,
  866. }
  867. confRsp := meta.LoadConfigurationsPartial(yamlCfgSet)
  868. configResponse.Uploads = uploadsImport(conf.Uploads)
  869. configResponse.NativePlugins = pluginPartialImport(conf.NativePlugins)
  870. configResponse.Schema = schemaPartialImport(conf.Schema)
  871. configResponse.PortablePlugins = portablePluginPartialImport(conf.PortablePlugins)
  872. configResponse.Service = servicePartialImport(conf.Service)
  873. configResponse.SourceConfig = confRsp.Sources
  874. configResponse.SinkConfig = confRsp.Sinks
  875. configResponse.ConnectionConfig = confRsp.Connections
  876. ruleSet := processor.Ruleset{
  877. Streams: conf.Streams,
  878. Tables: conf.Tables,
  879. Rules: conf.Rules,
  880. }
  881. result := importRuleSetPartial(ruleSet)
  882. configResponse.Streams = result.Streams
  883. configResponse.Tables = result.Tables
  884. configResponse.Rules = result.Rules
  885. if reflect.DeepEqual(ResponseNil, configResponse) {
  886. importStatus.ConfigResponse = ResponseNil
  887. } else {
  888. importStatus.ErrorMsg = "process error"
  889. importStatus.ConfigResponse = configResponse
  890. }
  891. return importStatus
  892. }
  893. type configurationInfo struct {
  894. Content string `json:"content"`
  895. FilePath string `json:"file"`
  896. }
  897. func configurationImportHandler(w http.ResponseWriter, r *http.Request) {
  898. cb := r.URL.Query().Get("stop")
  899. stop := cb == "1"
  900. par := r.URL.Query().Get("partial")
  901. partial := par == "1"
  902. rsi := &configurationInfo{}
  903. err := json.NewDecoder(r.Body).Decode(rsi)
  904. if err != nil {
  905. handleError(w, err, "Invalid body: Error decoding json", logger)
  906. return
  907. }
  908. if rsi.Content != "" && rsi.FilePath != "" {
  909. handleError(w, errors.New("bad request"), "Invalid body: Cannot specify both content and file", logger)
  910. return
  911. } else if rsi.Content == "" && rsi.FilePath == "" {
  912. handleError(w, errors.New("bad request"), "Invalid body: must specify content or file", logger)
  913. return
  914. }
  915. content := []byte(rsi.Content)
  916. if rsi.FilePath != "" {
  917. reader, err := httpx.ReadFile(rsi.FilePath)
  918. if err != nil {
  919. handleError(w, err, "Fail to read file", logger)
  920. return
  921. }
  922. defer reader.Close()
  923. buf := new(bytes.Buffer)
  924. _, err = io.Copy(buf, reader)
  925. if err != nil {
  926. handleError(w, err, "fail to convert file", logger)
  927. return
  928. }
  929. content = buf.Bytes()
  930. }
  931. if !partial {
  932. configurationReset()
  933. result := configurationImport(content, stop)
  934. if result.ErrorMsg != "" {
  935. w.WriteHeader(http.StatusBadRequest)
  936. jsonResponse(result, w, logger)
  937. } else {
  938. w.WriteHeader(http.StatusOK)
  939. jsonResponse(result, w, logger)
  940. }
  941. if stop {
  942. go func() {
  943. time.Sleep(1 * time.Second)
  944. os.Exit(100)
  945. }()
  946. }
  947. } else {
  948. result := configurationPartialImport(content)
  949. if result.ErrorMsg != "" {
  950. w.WriteHeader(http.StatusBadRequest)
  951. jsonResponse(result, w, logger)
  952. } else {
  953. w.WriteHeader(http.StatusOK)
  954. jsonResponse(result, w, logger)
  955. }
  956. }
  957. }
  958. func configurationStatusExport() Configuration {
  959. conf := Configuration{
  960. Streams: make(map[string]string),
  961. Tables: make(map[string]string),
  962. Rules: make(map[string]string),
  963. NativePlugins: make(map[string]string),
  964. PortablePlugins: make(map[string]string),
  965. SourceConfig: make(map[string]string),
  966. SinkConfig: make(map[string]string),
  967. ConnectionConfig: make(map[string]string),
  968. Service: make(map[string]string),
  969. Schema: make(map[string]string),
  970. Uploads: make(map[string]string),
  971. }
  972. ruleSet := rulesetProcessor.ExportRuleSetStatus()
  973. if ruleSet != nil {
  974. conf.Streams = ruleSet.Streams
  975. conf.Tables = ruleSet.Tables
  976. conf.Rules = ruleSet.Rules
  977. }
  978. conf.NativePlugins = pluginStatusExport()
  979. conf.PortablePlugins = portablePluginStatusExport()
  980. conf.Service = serviceStatusExport()
  981. conf.Schema = schemaStatusExport()
  982. conf.Uploads = uploadsStatusExport()
  983. yamlCfgStatus := meta.GetConfigurationStatus()
  984. conf.SourceConfig = yamlCfgStatus.Sources
  985. conf.SinkConfig = yamlCfgStatus.Sinks
  986. conf.ConnectionConfig = yamlCfgStatus.Connections
  987. return conf
  988. }
  989. func configurationStatusHandler(w http.ResponseWriter, r *http.Request) {
  990. defer r.Body.Close()
  991. content := configurationStatusExport()
  992. jsonResponse(content, w, logger)
  993. }
  994. func importRuleSetPartial(all processor.Ruleset) processor.Ruleset {
  995. ruleSetRsp := processor.Ruleset{
  996. Rules: map[string]string{},
  997. Streams: map[string]string{},
  998. Tables: map[string]string{},
  999. }
  1000. // replace streams
  1001. for k, v := range all.Streams {
  1002. _, e := streamProcessor.ExecReplaceStream(k, v, ast.TypeStream)
  1003. if e != nil {
  1004. ruleSetRsp.Streams[k] = e.Error()
  1005. continue
  1006. }
  1007. }
  1008. // replace tables
  1009. for k, v := range all.Tables {
  1010. _, e := streamProcessor.ExecReplaceStream(k, v, ast.TypeTable)
  1011. if e != nil {
  1012. ruleSetRsp.Tables[k] = e.Error()
  1013. continue
  1014. }
  1015. }
  1016. for k, v := range all.Rules {
  1017. _, err := ruleProcessor.GetRuleJson(k)
  1018. if err == nil {
  1019. // the rule already exist, update
  1020. err = updateRule(k, v)
  1021. if err != nil {
  1022. ruleSetRsp.Rules[k] = err.Error()
  1023. continue
  1024. }
  1025. // Update to db after validation
  1026. _, err = ruleProcessor.ExecUpdate(k, v)
  1027. if err != nil {
  1028. ruleSetRsp.Rules[k] = err.Error()
  1029. continue
  1030. }
  1031. } else {
  1032. // not found, create
  1033. _, err2 := createRule(k, v)
  1034. if err2 != nil {
  1035. ruleSetRsp.Rules[k] = err2.Error()
  1036. continue
  1037. }
  1038. }
  1039. }
  1040. return ruleSetRsp
  1041. }
  1042. func uploadsReset() {
  1043. _ = uploadsDb.Clean()
  1044. _ = uploadsStatusDb.Clean()
  1045. }
  1046. func uploadsExport() map[string]string {
  1047. conf, _ := uploadsDb.All()
  1048. return conf
  1049. }
  1050. func uploadsStatusExport() map[string]string {
  1051. status, _ := uploadsDb.All()
  1052. return status
  1053. }
  1054. func uploadsImport(s map[string]string) map[string]string {
  1055. errMap := map[string]string{}
  1056. _ = uploadsStatusDb.Clean()
  1057. for k, v := range s {
  1058. fc := &fileContent{}
  1059. err := json.Unmarshal([]byte(v), fc)
  1060. if err != nil {
  1061. errMsg := fmt.Sprintf("invalid body: Error decoding file json: %s", err.Error())
  1062. errMap[k] = errMsg
  1063. _ = uploadsStatusDb.Set(k, errMsg)
  1064. continue
  1065. }
  1066. err = fc.Validate()
  1067. if err != nil {
  1068. errMap[k] = err.Error()
  1069. _ = uploadsStatusDb.Set(k, err.Error())
  1070. continue
  1071. }
  1072. err = upload(fc)
  1073. if err != nil {
  1074. errMap[k] = err.Error()
  1075. continue
  1076. }
  1077. }
  1078. return errMap
  1079. }