rest.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package server
  15. import (
  16. "bytes"
  17. "encoding/json"
  18. "errors"
  19. "fmt"
  20. "github.com/lf-edge/ekuiper/internal/conf"
  21. "github.com/lf-edge/ekuiper/internal/meta"
  22. "github.com/lf-edge/ekuiper/internal/pkg/httpx"
  23. "github.com/lf-edge/ekuiper/internal/processor"
  24. "io"
  25. "net/http"
  26. "os"
  27. "path/filepath"
  28. "runtime"
  29. "strconv"
  30. "strings"
  31. "time"
  32. "github.com/gorilla/handlers"
  33. "github.com/gorilla/mux"
  34. "github.com/lf-edge/ekuiper/internal/server/middleware"
  35. "github.com/lf-edge/ekuiper/pkg/api"
  36. "github.com/lf-edge/ekuiper/pkg/ast"
  37. "github.com/lf-edge/ekuiper/pkg/errorx"
  38. "github.com/lf-edge/ekuiper/pkg/infra"
  39. )
  40. const (
  41. ContentType = "Content-Type"
  42. ContentTypeJSON = "application/json"
  43. )
  44. var uploadDir string
  45. type statementDescriptor struct {
  46. Sql string `json:"sql,omitempty"`
  47. }
  48. func decodeStatementDescriptor(reader io.ReadCloser) (statementDescriptor, error) {
  49. sd := statementDescriptor{}
  50. err := json.NewDecoder(reader).Decode(&sd)
  51. // Problems decoding
  52. if err != nil {
  53. return sd, fmt.Errorf("Error decoding the statement descriptor: %v", err)
  54. }
  55. return sd, nil
  56. }
  57. // Handle applies the specified error and error concept to the HTTP response writer
  58. func handleError(w http.ResponseWriter, err error, prefix string, logger api.Logger) {
  59. message := prefix
  60. if message != "" {
  61. message += ": "
  62. }
  63. message += err.Error()
  64. logger.Error(message)
  65. var ec int
  66. switch e := err.(type) {
  67. case *errorx.Error:
  68. switch e.Code() {
  69. case errorx.NOT_FOUND:
  70. ec = http.StatusNotFound
  71. default:
  72. ec = http.StatusBadRequest
  73. }
  74. default:
  75. ec = http.StatusBadRequest
  76. }
  77. http.Error(w, message, ec)
  78. }
  79. func jsonResponse(i interface{}, w http.ResponseWriter, logger api.Logger) {
  80. w.Header().Add(ContentType, ContentTypeJSON)
  81. jsonByte, err := json.Marshal(i)
  82. if err != nil {
  83. handleError(w, err, "", logger)
  84. }
  85. w.Header().Add("Content-Length", strconv.Itoa(len(jsonByte)))
  86. _, err = w.Write(jsonByte)
  87. // Problems encoding
  88. if err != nil {
  89. handleError(w, err, "", logger)
  90. }
  91. }
  92. func jsonByteResponse(buffer bytes.Buffer, w http.ResponseWriter, logger api.Logger) {
  93. w.Header().Add(ContentType, ContentTypeJSON)
  94. w.Header().Add("Content-Length", strconv.Itoa(buffer.Len()))
  95. _, err := w.Write(buffer.Bytes())
  96. // Problems encoding
  97. if err != nil {
  98. handleError(w, err, "", logger)
  99. }
  100. }
  101. func createRestServer(ip string, port int, needToken bool) *http.Server {
  102. dataDir, err := conf.GetDataLoc()
  103. if err != nil {
  104. panic(err)
  105. }
  106. uploadDir = filepath.Join(dataDir, "uploads")
  107. r := mux.NewRouter()
  108. r.HandleFunc("/", rootHandler).Methods(http.MethodGet, http.MethodPost)
  109. r.HandleFunc("/ping", pingHandler).Methods(http.MethodGet)
  110. r.HandleFunc("/streams", streamsHandler).Methods(http.MethodGet, http.MethodPost)
  111. r.HandleFunc("/streams/{name}", streamHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
  112. r.HandleFunc("/streams/{name}/schema", streamSchemaHandler).Methods(http.MethodGet)
  113. r.HandleFunc("/tables", tablesHandler).Methods(http.MethodGet, http.MethodPost)
  114. r.HandleFunc("/tables/{name}", tableHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
  115. r.HandleFunc("/tables/{name}/schema", tableSchemaHandler).Methods(http.MethodGet)
  116. r.HandleFunc("/rules", rulesHandler).Methods(http.MethodGet, http.MethodPost)
  117. r.HandleFunc("/rules/{name}", ruleHandler).Methods(http.MethodDelete, http.MethodGet, http.MethodPut)
  118. r.HandleFunc("/rules/{name}/status", getStatusRuleHandler).Methods(http.MethodGet)
  119. r.HandleFunc("/rules/{name}/start", startRuleHandler).Methods(http.MethodPost)
  120. r.HandleFunc("/rules/{name}/stop", stopRuleHandler).Methods(http.MethodPost)
  121. r.HandleFunc("/rules/{name}/restart", restartRuleHandler).Methods(http.MethodPost)
  122. r.HandleFunc("/rules/{name}/topo", getTopoRuleHandler).Methods(http.MethodGet)
  123. r.HandleFunc("/ruleset/export", exportHandler).Methods(http.MethodPost)
  124. r.HandleFunc("/ruleset/import", importHandler).Methods(http.MethodPost)
  125. r.HandleFunc("/config/uploads", fileUploadHandler).Methods(http.MethodPost, http.MethodGet)
  126. r.HandleFunc("/config/uploads/{name}", fileDeleteHandler).Methods(http.MethodDelete)
  127. r.HandleFunc("/data/export", configurationExportHandler).Methods(http.MethodGet)
  128. r.HandleFunc("/data/import", configurationImportHandler).Methods(http.MethodPost)
  129. r.HandleFunc("/data/import/status", configurationStatusHandler).Methods(http.MethodGet)
  130. // Register extended routes
  131. for k, v := range components {
  132. logger.Infof("register rest endpoint for component %s", k)
  133. v.rest(r)
  134. }
  135. if needToken {
  136. r.Use(middleware.Auth)
  137. }
  138. server := &http.Server{
  139. Addr: fmt.Sprintf("%s:%d", ip, port),
  140. // Good practice to set timeouts to avoid Slowloris attacks.
  141. WriteTimeout: time.Second * 60 * 5,
  142. ReadTimeout: time.Second * 60 * 5,
  143. IdleTimeout: time.Second * 60,
  144. Handler: handlers.CORS(handlers.AllowedHeaders([]string{"Accept", "Accept-Language", "Content-Type", "Content-Language", "Origin", "Authorization"}), handlers.AllowedMethods([]string{"POST", "GET", "PUT", "DELETE", "HEAD"}))(r),
  145. }
  146. server.SetKeepAlivesEnabled(false)
  147. return server
  148. }
  149. type fileContent struct {
  150. Name string `json:"name"`
  151. Content string `json:"content"`
  152. }
  153. func fileUploadHandler(w http.ResponseWriter, r *http.Request) {
  154. switch r.Method {
  155. // Upload or overwrite a file
  156. case http.MethodPost:
  157. switch r.Header.Get("Content-Type") {
  158. case "application/json":
  159. fc := &fileContent{}
  160. defer r.Body.Close()
  161. err := json.NewDecoder(r.Body).Decode(fc)
  162. if err != nil {
  163. handleError(w, err, "Invalid body: Error decoding file json", logger)
  164. return
  165. }
  166. if fc.Content == "" || fc.Name == "" {
  167. handleError(w, nil, "Invalid body: name and content are required", logger)
  168. return
  169. }
  170. filePath := filepath.Join(uploadDir, fc.Name)
  171. dst, err := os.Create(filePath)
  172. defer dst.Close()
  173. if err != nil {
  174. handleError(w, err, "Error creating the file", logger)
  175. return
  176. }
  177. _, err = dst.Write([]byte(fc.Content))
  178. if err != nil {
  179. handleError(w, err, "Error writing the file", logger)
  180. return
  181. }
  182. w.WriteHeader(http.StatusCreated)
  183. w.Write([]byte(filePath))
  184. default:
  185. // Maximum upload of 1 GB files
  186. err := r.ParseMultipartForm(1024 << 20)
  187. if err != nil {
  188. handleError(w, err, "Error parse the multi part form", logger)
  189. return
  190. }
  191. // Get handler for filename, size and headers
  192. file, handler, err := r.FormFile("uploadFile")
  193. if err != nil {
  194. handleError(w, err, "Error Retrieving the File", logger)
  195. return
  196. }
  197. defer file.Close()
  198. // Create file
  199. filePath := filepath.Join(uploadDir, handler.Filename)
  200. dst, err := os.Create(filePath)
  201. defer dst.Close()
  202. if err != nil {
  203. handleError(w, err, "Error creating the file", logger)
  204. return
  205. }
  206. // Copy the uploaded file to the created file on the filesystem
  207. if _, err := io.Copy(dst, file); err != nil {
  208. handleError(w, err, "Error writing the file", logger)
  209. return
  210. }
  211. w.WriteHeader(http.StatusCreated)
  212. w.Write([]byte(filePath))
  213. }
  214. case http.MethodGet:
  215. // Get the list of files in the upload directory
  216. files, err := os.ReadDir(uploadDir)
  217. if err != nil {
  218. handleError(w, err, "Error reading the file upload dir", logger)
  219. return
  220. }
  221. fileNames := make([]string, len(files))
  222. for i, f := range files {
  223. fileNames[i] = filepath.Join(uploadDir, f.Name())
  224. }
  225. jsonResponse(fileNames, w, logger)
  226. }
  227. }
  228. func fileDeleteHandler(w http.ResponseWriter, r *http.Request) {
  229. vars := mux.Vars(r)
  230. name := vars["name"]
  231. filePath := filepath.Join(uploadDir, name)
  232. e := os.Remove(filePath)
  233. if e != nil {
  234. handleError(w, e, "Error deleting the file", logger)
  235. return
  236. }
  237. w.WriteHeader(http.StatusOK)
  238. w.Write([]byte("ok"))
  239. }
  240. type information struct {
  241. Version string `json:"version"`
  242. Os string `json:"os"`
  243. Arch string `json:"arch"`
  244. UpTimeSeconds int64 `json:"upTimeSeconds"`
  245. }
  246. // The handler for root
  247. func rootHandler(w http.ResponseWriter, r *http.Request) {
  248. defer r.Body.Close()
  249. switch r.Method {
  250. case http.MethodGet, http.MethodPost:
  251. w.WriteHeader(http.StatusOK)
  252. info := new(information)
  253. info.Version = version
  254. info.UpTimeSeconds = time.Now().Unix() - startTimeStamp
  255. info.Os = runtime.GOOS
  256. info.Arch = runtime.GOARCH
  257. byteInfo, _ := json.Marshal(info)
  258. w.Write(byteInfo)
  259. }
  260. }
  261. func pingHandler(w http.ResponseWriter, _ *http.Request) {
  262. w.WriteHeader(http.StatusOK)
  263. }
  264. func sourcesManageHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
  265. defer r.Body.Close()
  266. switch r.Method {
  267. case http.MethodGet:
  268. var (
  269. content []string
  270. err error
  271. kind string
  272. )
  273. if st == ast.TypeTable {
  274. kind = r.URL.Query().Get("kind")
  275. if kind == "scan" {
  276. kind = ast.StreamKindScan
  277. } else if kind == "lookup" {
  278. kind = ast.StreamKindLookup
  279. } else {
  280. kind = ""
  281. }
  282. }
  283. if kind != "" {
  284. content, err = streamProcessor.ShowTable(kind)
  285. } else {
  286. content, err = streamProcessor.ShowStream(st)
  287. }
  288. if err != nil {
  289. handleError(w, err, fmt.Sprintf("%s command error", strings.Title(ast.StreamTypeMap[st])), logger)
  290. return
  291. }
  292. jsonResponse(content, w, logger)
  293. case http.MethodPost:
  294. v, err := decodeStatementDescriptor(r.Body)
  295. if err != nil {
  296. handleError(w, err, "Invalid body", logger)
  297. return
  298. }
  299. content, err := streamProcessor.ExecStreamSql(v.Sql)
  300. if err != nil {
  301. handleError(w, err, fmt.Sprintf("%s command error", strings.Title(ast.StreamTypeMap[st])), logger)
  302. return
  303. }
  304. w.WriteHeader(http.StatusCreated)
  305. w.Write([]byte(content))
  306. }
  307. }
  308. func sourceManageHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
  309. defer r.Body.Close()
  310. vars := mux.Vars(r)
  311. name := vars["name"]
  312. switch r.Method {
  313. case http.MethodGet:
  314. content, err := streamProcessor.DescStream(name, st)
  315. if err != nil {
  316. handleError(w, err, fmt.Sprintf("describe %s error", ast.StreamTypeMap[st]), logger)
  317. return
  318. }
  319. jsonResponse(content, w, logger)
  320. case http.MethodDelete:
  321. content, err := streamProcessor.DropStream(name, st)
  322. if err != nil {
  323. handleError(w, err, fmt.Sprintf("delete %s error", ast.StreamTypeMap[st]), logger)
  324. return
  325. }
  326. w.WriteHeader(http.StatusOK)
  327. w.Write([]byte(content))
  328. case http.MethodPut:
  329. v, err := decodeStatementDescriptor(r.Body)
  330. if err != nil {
  331. handleError(w, err, "Invalid body", logger)
  332. return
  333. }
  334. content, err := streamProcessor.ExecReplaceStream(name, v.Sql, st)
  335. if err != nil {
  336. handleError(w, err, fmt.Sprintf("%s command error", strings.Title(ast.StreamTypeMap[st])), logger)
  337. return
  338. }
  339. w.WriteHeader(http.StatusOK)
  340. w.Write([]byte(content))
  341. }
  342. }
  343. // list or create streams
  344. func streamsHandler(w http.ResponseWriter, r *http.Request) {
  345. sourcesManageHandler(w, r, ast.TypeStream)
  346. }
  347. // describe or delete a stream
  348. func streamHandler(w http.ResponseWriter, r *http.Request) {
  349. sourceManageHandler(w, r, ast.TypeStream)
  350. }
  351. // list or create tables
  352. func tablesHandler(w http.ResponseWriter, r *http.Request) {
  353. sourcesManageHandler(w, r, ast.TypeTable)
  354. }
  355. func tableHandler(w http.ResponseWriter, r *http.Request) {
  356. sourceManageHandler(w, r, ast.TypeTable)
  357. }
  358. func streamSchemaHandler(w http.ResponseWriter, r *http.Request) {
  359. sourceSchemaHandler(w, r, ast.TypeStream)
  360. }
  361. func tableSchemaHandler(w http.ResponseWriter, r *http.Request) {
  362. sourceSchemaHandler(w, r, ast.TypeTable)
  363. }
  364. func sourceSchemaHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
  365. vars := mux.Vars(r)
  366. name := vars["name"]
  367. content, err := streamProcessor.GetInferredJsonSchema(name, st)
  368. if err != nil {
  369. handleError(w, err, fmt.Sprintf("get schema of %s error", ast.StreamTypeMap[st]), logger)
  370. return
  371. }
  372. jsonResponse(content, w, logger)
  373. }
  374. // list or create rules
  375. func rulesHandler(w http.ResponseWriter, r *http.Request) {
  376. defer r.Body.Close()
  377. switch r.Method {
  378. case http.MethodPost:
  379. body, err := io.ReadAll(r.Body)
  380. if err != nil {
  381. handleError(w, err, "Invalid body", logger)
  382. return
  383. }
  384. id, err := createRule("", string(body))
  385. if err != nil {
  386. handleError(w, err, "", logger)
  387. return
  388. }
  389. result := fmt.Sprintf("Rule %s was created successfully.", id)
  390. w.WriteHeader(http.StatusCreated)
  391. w.Write([]byte(result))
  392. case http.MethodGet:
  393. content, err := getAllRulesWithStatus()
  394. if err != nil {
  395. handleError(w, err, "Show rules error", logger)
  396. return
  397. }
  398. jsonResponse(content, w, logger)
  399. }
  400. }
  401. // describe or delete a rule
  402. func ruleHandler(w http.ResponseWriter, r *http.Request) {
  403. defer r.Body.Close()
  404. vars := mux.Vars(r)
  405. name := vars["name"]
  406. switch r.Method {
  407. case http.MethodGet:
  408. rule, err := ruleProcessor.GetRuleJson(name)
  409. if err != nil {
  410. handleError(w, err, "Describe rule error", logger)
  411. return
  412. }
  413. w.Header().Add(ContentType, ContentTypeJSON)
  414. w.Write([]byte(rule))
  415. case http.MethodDelete:
  416. deleteRule(name)
  417. content, err := ruleProcessor.ExecDrop(name)
  418. if err != nil {
  419. handleError(w, err, "Delete rule error", logger)
  420. return
  421. }
  422. w.WriteHeader(http.StatusOK)
  423. w.Write([]byte(content))
  424. case http.MethodPut:
  425. _, err := ruleProcessor.GetRuleById(name)
  426. if err != nil {
  427. handleError(w, err, "Rule not found", logger)
  428. return
  429. }
  430. body, err := io.ReadAll(r.Body)
  431. if err != nil {
  432. handleError(w, err, "Invalid body", logger)
  433. return
  434. }
  435. err = updateRule(name, string(body))
  436. if err != nil {
  437. handleError(w, err, "Update rule error", logger)
  438. return
  439. }
  440. // Update to db after validation
  441. _, err = ruleProcessor.ExecUpdate(name, string(body))
  442. var result string
  443. if err != nil {
  444. handleError(w, err, "Update rule error, suggest to delete it and recreate", logger)
  445. return
  446. } else {
  447. result = fmt.Sprintf("Rule %s was updated successfully.", name)
  448. }
  449. w.WriteHeader(http.StatusOK)
  450. w.Write([]byte(result))
  451. }
  452. }
  453. // get status of a rule
  454. func getStatusRuleHandler(w http.ResponseWriter, r *http.Request) {
  455. defer r.Body.Close()
  456. vars := mux.Vars(r)
  457. name := vars["name"]
  458. content, err := getRuleStatus(name)
  459. if err != nil {
  460. handleError(w, err, "get rule status error", logger)
  461. return
  462. }
  463. w.Header().Set(ContentType, ContentTypeJSON)
  464. w.WriteHeader(http.StatusOK)
  465. w.Write([]byte(content))
  466. }
  467. // start a rule
  468. func startRuleHandler(w http.ResponseWriter, r *http.Request) {
  469. defer r.Body.Close()
  470. vars := mux.Vars(r)
  471. name := vars["name"]
  472. err := startRule(name)
  473. if err != nil {
  474. handleError(w, err, "start rule error", logger)
  475. return
  476. }
  477. w.WriteHeader(http.StatusOK)
  478. w.Write([]byte(fmt.Sprintf("Rule %s was started", name)))
  479. }
  480. // stop a rule
  481. func stopRuleHandler(w http.ResponseWriter, r *http.Request) {
  482. defer r.Body.Close()
  483. vars := mux.Vars(r)
  484. name := vars["name"]
  485. result := stopRule(name)
  486. w.WriteHeader(http.StatusOK)
  487. w.Write([]byte(result))
  488. }
  489. // restart a rule
  490. func restartRuleHandler(w http.ResponseWriter, r *http.Request) {
  491. defer r.Body.Close()
  492. vars := mux.Vars(r)
  493. name := vars["name"]
  494. err := restartRule(name)
  495. if err != nil {
  496. handleError(w, err, "restart rule error", logger)
  497. return
  498. }
  499. w.WriteHeader(http.StatusOK)
  500. w.Write([]byte(fmt.Sprintf("Rule %s was restarted", name)))
  501. }
  502. // get topo of a rule
  503. func getTopoRuleHandler(w http.ResponseWriter, r *http.Request) {
  504. defer r.Body.Close()
  505. vars := mux.Vars(r)
  506. name := vars["name"]
  507. content, err := getRuleTopo(name)
  508. if err != nil {
  509. handleError(w, err, "get rule topo error", logger)
  510. return
  511. }
  512. w.Header().Set(ContentType, ContentTypeJSON)
  513. w.Write([]byte(content))
  514. }
  515. type rulesetInfo struct {
  516. Content string `json:"content"`
  517. FilePath string `json:"file"`
  518. }
  519. func importHandler(w http.ResponseWriter, r *http.Request) {
  520. rsi := &rulesetInfo{}
  521. err := json.NewDecoder(r.Body).Decode(rsi)
  522. if err != nil {
  523. handleError(w, err, "Invalid body: Error decoding json", logger)
  524. return
  525. }
  526. if rsi.Content != "" && rsi.FilePath != "" {
  527. handleError(w, errors.New("bad request"), "Invalid body: Cannot specify both content and file", logger)
  528. return
  529. } else if rsi.Content == "" && rsi.FilePath == "" {
  530. handleError(w, errors.New("bad request"), "Invalid body: must specify content or file", logger)
  531. return
  532. }
  533. content := []byte(rsi.Content)
  534. if rsi.FilePath != "" {
  535. reader, err := httpx.ReadFile(rsi.FilePath)
  536. if err != nil {
  537. handleError(w, err, "Fail to read file", logger)
  538. return
  539. }
  540. defer reader.Close()
  541. buf := new(bytes.Buffer)
  542. _, err = io.Copy(buf, reader)
  543. if err != nil {
  544. handleError(w, err, "fail to convert file", logger)
  545. return
  546. }
  547. content = buf.Bytes()
  548. }
  549. rules, counts, err := rulesetProcessor.Import(content)
  550. if err != nil {
  551. handleError(w, nil, "Import ruleset error", logger)
  552. return
  553. }
  554. infra.SafeRun(func() error {
  555. for _, name := range rules {
  556. rul, ee := ruleProcessor.GetRuleById(name)
  557. if ee != nil {
  558. logger.Error(ee)
  559. continue
  560. }
  561. reply := recoverRule(rul)
  562. if reply != "" {
  563. logger.Error(reply)
  564. }
  565. }
  566. return nil
  567. })
  568. w.Write([]byte(fmt.Sprintf("imported %d streams, %d tables and %d rules", counts[0], counts[1], counts[2])))
  569. }
  570. func exportHandler(w http.ResponseWriter, r *http.Request) {
  571. const name = "ekuiper_export.json"
  572. exported, _, err := rulesetProcessor.Export()
  573. if err != nil {
  574. handleError(w, err, "export error", logger)
  575. return
  576. }
  577. w.Header().Set("Content-Type", "application/octet-stream")
  578. w.Header().Add("Content-Disposition", "Attachment")
  579. http.ServeContent(w, r, name, time.Now(), exported)
  580. }
  581. type Configuration struct {
  582. Streams map[string]string `json:"streams"`
  583. Tables map[string]string `json:"tables"`
  584. Rules map[string]string `json:"rules"`
  585. NativePlugins map[string]string `json:"nativePlugins"`
  586. PortablePlugins map[string]string `json:"portablePlugins"`
  587. SourceConfig map[string]string `json:"sourceConfig"`
  588. SinkConfig map[string]string `json:"sinkConfig"`
  589. ConnectionConfig map[string]string `json:"connectionConfig"`
  590. Service map[string]string `json:"Service"`
  591. Schema map[string]string `json:"Schema"`
  592. }
  593. func configurationExport() ([]byte, error) {
  594. conf := &Configuration{
  595. Streams: make(map[string]string),
  596. Tables: make(map[string]string),
  597. Rules: make(map[string]string),
  598. NativePlugins: make(map[string]string),
  599. PortablePlugins: make(map[string]string),
  600. SourceConfig: make(map[string]string),
  601. SinkConfig: make(map[string]string),
  602. ConnectionConfig: make(map[string]string),
  603. Service: make(map[string]string),
  604. Schema: make(map[string]string),
  605. }
  606. ruleSet := rulesetProcessor.ExportRuleSet()
  607. if ruleSet != nil {
  608. conf.Streams = ruleSet.Streams
  609. conf.Tables = ruleSet.Tables
  610. conf.Rules = ruleSet.Rules
  611. }
  612. conf.NativePlugins = pluginExport()
  613. conf.PortablePlugins = portablePluginExport()
  614. conf.Service = serviceExport()
  615. conf.Schema = schemaExport()
  616. yamlCfg := meta.GetConfigurations()
  617. conf.SourceConfig = yamlCfg.Sources
  618. conf.SinkConfig = yamlCfg.Sinks
  619. conf.ConnectionConfig = yamlCfg.Connections
  620. return json.Marshal(conf)
  621. }
  622. func configurationExportHandler(w http.ResponseWriter, r *http.Request) {
  623. const name = "ekuiper_export.json"
  624. jsonBytes, _ := configurationExport()
  625. w.Header().Set("Content-Type", "application/octet-stream")
  626. w.Header().Add("Content-Disposition", "Attachment")
  627. http.ServeContent(w, r, name, time.Now(), bytes.NewReader(jsonBytes))
  628. }
  629. func configurationReset() {
  630. _ = resetAllRules()
  631. _ = resetAllStreams()
  632. pluginReset()
  633. portablePluginsReset()
  634. serviceReset()
  635. schemaReset()
  636. meta.ResetConfigs()
  637. }
  638. func configurationImport(data []byte, reboot bool) error {
  639. conf := &Configuration{
  640. Streams: make(map[string]string),
  641. Tables: make(map[string]string),
  642. Rules: make(map[string]string),
  643. NativePlugins: make(map[string]string),
  644. PortablePlugins: make(map[string]string),
  645. SourceConfig: make(map[string]string),
  646. SinkConfig: make(map[string]string),
  647. ConnectionConfig: make(map[string]string),
  648. Service: make(map[string]string),
  649. Schema: make(map[string]string),
  650. }
  651. err := json.Unmarshal(data, conf)
  652. if err != nil {
  653. return fmt.Errorf("configuration unmarshal with error %v", err)
  654. }
  655. if reboot {
  656. err = pluginImport(conf.NativePlugins)
  657. if err != nil {
  658. return fmt.Errorf("pluginImportHandler with error %v", err)
  659. }
  660. err = schemaImport(conf.Schema)
  661. if err != nil {
  662. return fmt.Errorf("schemaImport with error %v", err)
  663. }
  664. }
  665. portablePluginImport(conf.PortablePlugins)
  666. serviceImport(conf.Service)
  667. yamlCfgSet := meta.YamlConfigurationSet{
  668. Sources: conf.SourceConfig,
  669. Sinks: conf.SinkConfig,
  670. Connections: conf.ConnectionConfig,
  671. }
  672. meta.LoadConfigurations(yamlCfgSet)
  673. ruleSet := processor.Ruleset{
  674. Streams: conf.Streams,
  675. Tables: conf.Tables,
  676. Rules: conf.Rules,
  677. }
  678. rulesetProcessor.ImportRuleSet(ruleSet)
  679. if !reboot {
  680. infra.SafeRun(func() error {
  681. for name := range ruleSet.Rules {
  682. rul, ee := ruleProcessor.GetRuleById(name)
  683. if ee != nil {
  684. logger.Error(ee)
  685. continue
  686. }
  687. reply := recoverRule(rul)
  688. if reply != "" {
  689. logger.Error(reply)
  690. }
  691. }
  692. return nil
  693. })
  694. }
  695. return nil
  696. }
  697. type configurationInfo struct {
  698. Content string `json:"content"`
  699. FilePath string `json:"file"`
  700. }
  701. func configurationImportHandler(w http.ResponseWriter, r *http.Request) {
  702. cb := r.URL.Query().Get("stop")
  703. stop := cb == "1"
  704. rsi := &configurationInfo{}
  705. err := json.NewDecoder(r.Body).Decode(rsi)
  706. if err != nil {
  707. handleError(w, err, "Invalid body: Error decoding json", logger)
  708. return
  709. }
  710. if rsi.Content != "" && rsi.FilePath != "" {
  711. handleError(w, errors.New("bad request"), "Invalid body: Cannot specify both content and file", logger)
  712. return
  713. } else if rsi.Content == "" && rsi.FilePath == "" {
  714. handleError(w, errors.New("bad request"), "Invalid body: must specify content or file", logger)
  715. return
  716. }
  717. content := []byte(rsi.Content)
  718. if rsi.FilePath != "" {
  719. reader, err := httpx.ReadFile(rsi.FilePath)
  720. if err != nil {
  721. handleError(w, err, "Fail to read file", logger)
  722. return
  723. }
  724. defer reader.Close()
  725. buf := new(bytes.Buffer)
  726. _, err = io.Copy(buf, reader)
  727. if err != nil {
  728. handleError(w, err, "fail to convert file", logger)
  729. return
  730. }
  731. content = buf.Bytes()
  732. }
  733. configurationReset()
  734. err = configurationImport(content, stop)
  735. if err != nil {
  736. handleError(w, err, "Import configuration error", logger)
  737. return
  738. }
  739. if stop {
  740. go func() {
  741. time.Sleep(1 * time.Second)
  742. os.Exit(100)
  743. }()
  744. }
  745. w.WriteHeader(http.StatusOK)
  746. }
  747. func configurationStatusExport() Configuration {
  748. conf := Configuration{
  749. Streams: make(map[string]string),
  750. Tables: make(map[string]string),
  751. Rules: make(map[string]string),
  752. NativePlugins: make(map[string]string),
  753. PortablePlugins: make(map[string]string),
  754. SourceConfig: make(map[string]string),
  755. SinkConfig: make(map[string]string),
  756. ConnectionConfig: make(map[string]string),
  757. Service: make(map[string]string),
  758. Schema: make(map[string]string),
  759. }
  760. ruleSet := rulesetProcessor.ExportRuleSetStatus()
  761. if ruleSet != nil {
  762. conf.Streams = ruleSet.Streams
  763. conf.Tables = ruleSet.Tables
  764. conf.Rules = ruleSet.Rules
  765. }
  766. conf.NativePlugins = pluginStatusExport()
  767. conf.PortablePlugins = portablePluginStatusExport()
  768. conf.Service = serviceStatusExport()
  769. conf.Schema = schemaStatusExport()
  770. yamlCfgStatus := meta.GetConfigurationStatus()
  771. conf.SourceConfig = yamlCfgStatus.Sources
  772. conf.SinkConfig = yamlCfgStatus.Sinks
  773. conf.ConnectionConfig = yamlCfgStatus.Connections
  774. return conf
  775. }
  776. func configurationStatusHandler(w http.ResponseWriter, r *http.Request) {
  777. defer r.Body.Close()
  778. content := configurationStatusExport()
  779. jsonResponse(content, w, logger)
  780. }