rest.go 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package server
  15. import (
  16. "bytes"
  17. "encoding/json"
  18. "errors"
  19. "fmt"
  20. "io"
  21. "net/http"
  22. "os"
  23. "path/filepath"
  24. "reflect"
  25. "runtime"
  26. "strconv"
  27. "time"
  28. "github.com/gorilla/handlers"
  29. "github.com/gorilla/mux"
  30. "golang.org/x/text/cases"
  31. "golang.org/x/text/language"
  32. "github.com/lf-edge/ekuiper/internal/conf"
  33. "github.com/lf-edge/ekuiper/internal/meta"
  34. "github.com/lf-edge/ekuiper/internal/pkg/httpx"
  35. "github.com/lf-edge/ekuiper/internal/processor"
  36. "github.com/lf-edge/ekuiper/internal/server/middleware"
  37. "github.com/lf-edge/ekuiper/pkg/api"
  38. "github.com/lf-edge/ekuiper/pkg/ast"
  39. "github.com/lf-edge/ekuiper/pkg/cast"
  40. "github.com/lf-edge/ekuiper/pkg/errorx"
  41. "github.com/lf-edge/ekuiper/pkg/infra"
  42. )
  43. const (
  44. ContentType = "Content-Type"
  45. ContentTypeJSON = "application/json"
  46. )
  47. var uploadDir string
  48. type statementDescriptor struct {
  49. Sql string `json:"sql,omitempty"`
  50. }
  51. func decodeStatementDescriptor(reader io.ReadCloser) (statementDescriptor, error) {
  52. sd := statementDescriptor{}
  53. err := json.NewDecoder(reader).Decode(&sd)
  54. // Problems decoding
  55. if err != nil {
  56. return sd, fmt.Errorf("Error decoding the statement descriptor: %v", err)
  57. }
  58. return sd, nil
  59. }
  60. // Handle applies the specified error and error concept to the HTTP response writer
  61. func handleError(w http.ResponseWriter, err error, prefix string, logger api.Logger) {
  62. message := prefix
  63. if message != "" {
  64. message += ": "
  65. }
  66. message += err.Error()
  67. logger.Error(message)
  68. var ec int
  69. switch e := err.(type) {
  70. case *errorx.Error:
  71. switch e.Code() {
  72. case errorx.NOT_FOUND:
  73. ec = http.StatusNotFound
  74. default:
  75. ec = http.StatusBadRequest
  76. }
  77. default:
  78. ec = http.StatusBadRequest
  79. }
  80. http.Error(w, message, ec)
  81. }
  82. func jsonResponse(i interface{}, w http.ResponseWriter, logger api.Logger) {
  83. w.Header().Add(ContentType, ContentTypeJSON)
  84. jsonByte, err := json.Marshal(i)
  85. if err != nil {
  86. handleError(w, err, "", logger)
  87. }
  88. w.Header().Add("Content-Length", strconv.Itoa(len(jsonByte)))
  89. _, err = w.Write(jsonByte)
  90. // Problems encoding
  91. if err != nil {
  92. handleError(w, err, "", logger)
  93. }
  94. }
  95. func jsonByteResponse(buffer bytes.Buffer, w http.ResponseWriter, logger api.Logger) {
  96. w.Header().Add(ContentType, ContentTypeJSON)
  97. w.Header().Add("Content-Length", strconv.Itoa(buffer.Len()))
  98. _, err := w.Write(buffer.Bytes())
  99. // Problems encoding
  100. if err != nil {
  101. handleError(w, err, "", logger)
  102. }
  103. }
  104. func createRestServer(ip string, port int, needToken bool) *http.Server {
  105. dataDir, err := conf.GetDataLoc()
  106. if err != nil {
  107. panic(err)
  108. }
  109. uploadDir = filepath.Join(dataDir, "uploads")
  110. r := mux.NewRouter()
  111. r.HandleFunc("/", rootHandler).Methods(http.MethodGet, http.MethodPost)
  112. r.HandleFunc("/ping", pingHandler).Methods(http.MethodGet)
  113. r.HandleFunc("/streams", streamsHandler).Methods(http.MethodGet, http.MethodPost)
  114. r.HandleFunc("/streams/{name}", streamHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
  115. r.HandleFunc("/streams/{name}/schema", streamSchemaHandler).Methods(http.MethodGet)
  116. r.HandleFunc("/tables", tablesHandler).Methods(http.MethodGet, http.MethodPost)
  117. r.HandleFunc("/tables/{name}", tableHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
  118. r.HandleFunc("/tables/{name}/schema", tableSchemaHandler).Methods(http.MethodGet)
  119. r.HandleFunc("/rules", rulesHandler).Methods(http.MethodGet, http.MethodPost)
  120. r.HandleFunc("/rules/{name}", ruleHandler).Methods(http.MethodDelete, http.MethodGet, http.MethodPut)
  121. r.HandleFunc("/rules/{name}/status", getStatusRuleHandler).Methods(http.MethodGet)
  122. r.HandleFunc("/rules/{name}/start", startRuleHandler).Methods(http.MethodPost)
  123. r.HandleFunc("/rules/{name}/stop", stopRuleHandler).Methods(http.MethodPost)
  124. r.HandleFunc("/rules/{name}/restart", restartRuleHandler).Methods(http.MethodPost)
  125. r.HandleFunc("/rules/{name}/topo", getTopoRuleHandler).Methods(http.MethodGet)
  126. r.HandleFunc("/ruleset/export", exportHandler).Methods(http.MethodPost)
  127. r.HandleFunc("/ruleset/import", importHandler).Methods(http.MethodPost)
  128. r.HandleFunc("/config/uploads", fileUploadHandler).Methods(http.MethodPost, http.MethodGet)
  129. r.HandleFunc("/config/uploads/{name}", fileDeleteHandler).Methods(http.MethodDelete)
  130. r.HandleFunc("/data/export", configurationExportHandler).Methods(http.MethodGet, http.MethodPost)
  131. r.HandleFunc("/data/import", configurationImportHandler).Methods(http.MethodPost)
  132. r.HandleFunc("/data/import/status", configurationStatusHandler).Methods(http.MethodGet)
  133. // Register extended routes
  134. for k, v := range components {
  135. logger.Infof("register rest endpoint for component %s", k)
  136. v.rest(r)
  137. }
  138. if needToken {
  139. r.Use(middleware.Auth)
  140. }
  141. server := &http.Server{
  142. Addr: cast.JoinHostPortInt(ip, port),
  143. // Good practice to set timeouts to avoid Slowloris attacks.
  144. WriteTimeout: time.Second * 60 * 5,
  145. ReadTimeout: time.Second * 60 * 5,
  146. IdleTimeout: time.Second * 60,
  147. Handler: handlers.CORS(handlers.AllowedHeaders([]string{"Accept", "Accept-Language", "Content-Type", "Content-Language", "Origin", "Authorization"}), handlers.AllowedMethods([]string{"POST", "GET", "PUT", "DELETE", "HEAD"}))(r),
  148. }
  149. server.SetKeepAlivesEnabled(false)
  150. return server
  151. }
  152. type fileContent struct {
  153. Name string `json:"name"`
  154. Content string `json:"content"`
  155. }
  156. func fileUploadHandler(w http.ResponseWriter, r *http.Request) {
  157. switch r.Method {
  158. // Upload or overwrite a file
  159. case http.MethodPost:
  160. switch r.Header.Get("Content-Type") {
  161. case "application/json":
  162. fc := &fileContent{}
  163. defer r.Body.Close()
  164. err := json.NewDecoder(r.Body).Decode(fc)
  165. if err != nil {
  166. handleError(w, err, "Invalid body: Error decoding file json", logger)
  167. return
  168. }
  169. if fc.Content == "" || fc.Name == "" {
  170. handleError(w, nil, "Invalid body: name and content are required", logger)
  171. return
  172. }
  173. filePath := filepath.Join(uploadDir, fc.Name)
  174. dst, err := os.Create(filePath)
  175. defer dst.Close()
  176. if err != nil {
  177. handleError(w, err, "Error creating the file", logger)
  178. return
  179. }
  180. _, err = dst.Write([]byte(fc.Content))
  181. if err != nil {
  182. handleError(w, err, "Error writing the file", logger)
  183. return
  184. }
  185. w.WriteHeader(http.StatusCreated)
  186. w.Write([]byte(filePath))
  187. default:
  188. // Maximum upload of 1 GB files
  189. err := r.ParseMultipartForm(1024 << 20)
  190. if err != nil {
  191. handleError(w, err, "Error parse the multi part form", logger)
  192. return
  193. }
  194. // Get handler for filename, size and headers
  195. file, handler, err := r.FormFile("uploadFile")
  196. if err != nil {
  197. handleError(w, err, "Error Retrieving the File", logger)
  198. return
  199. }
  200. defer file.Close()
  201. // Create file
  202. filePath := filepath.Join(uploadDir, handler.Filename)
  203. dst, err := os.Create(filePath)
  204. defer dst.Close()
  205. if err != nil {
  206. handleError(w, err, "Error creating the file", logger)
  207. return
  208. }
  209. // Copy the uploaded file to the created file on the filesystem
  210. if _, err := io.Copy(dst, file); err != nil {
  211. handleError(w, err, "Error writing the file", logger)
  212. return
  213. }
  214. w.WriteHeader(http.StatusCreated)
  215. w.Write([]byte(filePath))
  216. }
  217. case http.MethodGet:
  218. // Get the list of files in the upload directory
  219. files, err := os.ReadDir(uploadDir)
  220. if err != nil {
  221. handleError(w, err, "Error reading the file upload dir", logger)
  222. return
  223. }
  224. fileNames := make([]string, len(files))
  225. for i, f := range files {
  226. fileNames[i] = filepath.Join(uploadDir, f.Name())
  227. }
  228. jsonResponse(fileNames, w, logger)
  229. }
  230. }
  231. func fileDeleteHandler(w http.ResponseWriter, r *http.Request) {
  232. vars := mux.Vars(r)
  233. name := vars["name"]
  234. filePath := filepath.Join(uploadDir, name)
  235. e := os.Remove(filePath)
  236. if e != nil {
  237. handleError(w, e, "Error deleting the file", logger)
  238. return
  239. }
  240. w.WriteHeader(http.StatusOK)
  241. w.Write([]byte("ok"))
  242. }
  243. type information struct {
  244. Version string `json:"version"`
  245. Os string `json:"os"`
  246. Arch string `json:"arch"`
  247. UpTimeSeconds int64 `json:"upTimeSeconds"`
  248. }
  249. // The handler for root
  250. func rootHandler(w http.ResponseWriter, r *http.Request) {
  251. defer r.Body.Close()
  252. switch r.Method {
  253. case http.MethodGet, http.MethodPost:
  254. w.WriteHeader(http.StatusOK)
  255. info := new(information)
  256. info.Version = version
  257. info.UpTimeSeconds = time.Now().Unix() - startTimeStamp
  258. info.Os = runtime.GOOS
  259. info.Arch = runtime.GOARCH
  260. byteInfo, _ := json.Marshal(info)
  261. w.Write(byteInfo)
  262. }
  263. }
  264. func pingHandler(w http.ResponseWriter, _ *http.Request) {
  265. w.WriteHeader(http.StatusOK)
  266. }
  267. func sourcesManageHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
  268. defer r.Body.Close()
  269. switch r.Method {
  270. case http.MethodGet:
  271. var (
  272. content []string
  273. err error
  274. kind string
  275. )
  276. if st == ast.TypeTable {
  277. kind = r.URL.Query().Get("kind")
  278. if kind == "scan" {
  279. kind = ast.StreamKindScan
  280. } else if kind == "lookup" {
  281. kind = ast.StreamKindLookup
  282. } else {
  283. kind = ""
  284. }
  285. }
  286. if kind != "" {
  287. content, err = streamProcessor.ShowTable(kind)
  288. } else {
  289. content, err = streamProcessor.ShowStream(st)
  290. }
  291. if err != nil {
  292. handleError(w, err, fmt.Sprintf("%s command error", cases.Title(language.Und).String(ast.StreamTypeMap[st])), logger)
  293. return
  294. }
  295. jsonResponse(content, w, logger)
  296. case http.MethodPost:
  297. v, err := decodeStatementDescriptor(r.Body)
  298. if err != nil {
  299. handleError(w, err, "Invalid body", logger)
  300. return
  301. }
  302. content, err := streamProcessor.ExecStreamSql(v.Sql)
  303. if err != nil {
  304. handleError(w, err, fmt.Sprintf("%s command error", cases.Title(language.Und).String(ast.StreamTypeMap[st])), logger)
  305. return
  306. }
  307. w.WriteHeader(http.StatusCreated)
  308. w.Write([]byte(content))
  309. }
  310. }
  311. func sourceManageHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
  312. defer r.Body.Close()
  313. vars := mux.Vars(r)
  314. name := vars["name"]
  315. switch r.Method {
  316. case http.MethodGet:
  317. content, err := streamProcessor.DescStream(name, st)
  318. if err != nil {
  319. handleError(w, err, fmt.Sprintf("describe %s error", ast.StreamTypeMap[st]), logger)
  320. return
  321. }
  322. jsonResponse(content, w, logger)
  323. case http.MethodDelete:
  324. content, err := streamProcessor.DropStream(name, st)
  325. if err != nil {
  326. handleError(w, err, fmt.Sprintf("delete %s error", ast.StreamTypeMap[st]), logger)
  327. return
  328. }
  329. w.WriteHeader(http.StatusOK)
  330. w.Write([]byte(content))
  331. case http.MethodPut:
  332. v, err := decodeStatementDescriptor(r.Body)
  333. if err != nil {
  334. handleError(w, err, "Invalid body", logger)
  335. return
  336. }
  337. content, err := streamProcessor.ExecReplaceStream(name, v.Sql, st)
  338. if err != nil {
  339. handleError(w, err, fmt.Sprintf("%s command error", cases.Title(language.Und).String(ast.StreamTypeMap[st])), logger)
  340. return
  341. }
  342. w.WriteHeader(http.StatusOK)
  343. w.Write([]byte(content))
  344. }
  345. }
  346. // list or create streams
  347. func streamsHandler(w http.ResponseWriter, r *http.Request) {
  348. sourcesManageHandler(w, r, ast.TypeStream)
  349. }
  350. // describe or delete a stream
  351. func streamHandler(w http.ResponseWriter, r *http.Request) {
  352. sourceManageHandler(w, r, ast.TypeStream)
  353. }
  354. // list or create tables
  355. func tablesHandler(w http.ResponseWriter, r *http.Request) {
  356. sourcesManageHandler(w, r, ast.TypeTable)
  357. }
  358. func tableHandler(w http.ResponseWriter, r *http.Request) {
  359. sourceManageHandler(w, r, ast.TypeTable)
  360. }
  361. func streamSchemaHandler(w http.ResponseWriter, r *http.Request) {
  362. sourceSchemaHandler(w, r, ast.TypeStream)
  363. }
  364. func tableSchemaHandler(w http.ResponseWriter, r *http.Request) {
  365. sourceSchemaHandler(w, r, ast.TypeTable)
  366. }
  367. func sourceSchemaHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
  368. vars := mux.Vars(r)
  369. name := vars["name"]
  370. content, err := streamProcessor.GetInferredJsonSchema(name, st)
  371. if err != nil {
  372. handleError(w, err, fmt.Sprintf("get schema of %s error", ast.StreamTypeMap[st]), logger)
  373. return
  374. }
  375. jsonResponse(content, w, logger)
  376. }
  377. // list or create rules
  378. func rulesHandler(w http.ResponseWriter, r *http.Request) {
  379. defer r.Body.Close()
  380. switch r.Method {
  381. case http.MethodPost:
  382. body, err := io.ReadAll(r.Body)
  383. if err != nil {
  384. handleError(w, err, "Invalid body", logger)
  385. return
  386. }
  387. id, err := createRule("", string(body))
  388. if err != nil {
  389. handleError(w, err, "", logger)
  390. return
  391. }
  392. result := fmt.Sprintf("Rule %s was created successfully.", id)
  393. w.WriteHeader(http.StatusCreated)
  394. w.Write([]byte(result))
  395. case http.MethodGet:
  396. content, err := getAllRulesWithStatus()
  397. if err != nil {
  398. handleError(w, err, "Show rules error", logger)
  399. return
  400. }
  401. jsonResponse(content, w, logger)
  402. }
  403. }
  404. // describe or delete a rule
  405. func ruleHandler(w http.ResponseWriter, r *http.Request) {
  406. defer r.Body.Close()
  407. vars := mux.Vars(r)
  408. name := vars["name"]
  409. switch r.Method {
  410. case http.MethodGet:
  411. rule, err := ruleProcessor.GetRuleJson(name)
  412. if err != nil {
  413. handleError(w, err, "Describe rule error", logger)
  414. return
  415. }
  416. w.Header().Add(ContentType, ContentTypeJSON)
  417. w.Write([]byte(rule))
  418. case http.MethodDelete:
  419. deleteRule(name)
  420. content, err := ruleProcessor.ExecDrop(name)
  421. if err != nil {
  422. handleError(w, err, "Delete rule error", logger)
  423. return
  424. }
  425. w.WriteHeader(http.StatusOK)
  426. w.Write([]byte(content))
  427. case http.MethodPut:
  428. _, err := ruleProcessor.GetRuleById(name)
  429. if err != nil {
  430. handleError(w, err, "Rule not found", logger)
  431. return
  432. }
  433. body, err := io.ReadAll(r.Body)
  434. if err != nil {
  435. handleError(w, err, "Invalid body", logger)
  436. return
  437. }
  438. err = updateRule(name, string(body))
  439. if err != nil {
  440. handleError(w, err, "Update rule error", logger)
  441. return
  442. }
  443. // Update to db after validation
  444. _, err = ruleProcessor.ExecUpdate(name, string(body))
  445. var result string
  446. if err != nil {
  447. handleError(w, err, "Update rule error, suggest to delete it and recreate", logger)
  448. return
  449. } else {
  450. result = fmt.Sprintf("Rule %s was updated successfully.", name)
  451. }
  452. w.WriteHeader(http.StatusOK)
  453. w.Write([]byte(result))
  454. }
  455. }
  456. // get status of a rule
  457. func getStatusRuleHandler(w http.ResponseWriter, r *http.Request) {
  458. defer r.Body.Close()
  459. vars := mux.Vars(r)
  460. name := vars["name"]
  461. content, err := getRuleStatus(name)
  462. if err != nil {
  463. handleError(w, err, "get rule status error", logger)
  464. return
  465. }
  466. w.Header().Set(ContentType, ContentTypeJSON)
  467. w.WriteHeader(http.StatusOK)
  468. w.Write([]byte(content))
  469. }
  470. // start a rule
  471. func startRuleHandler(w http.ResponseWriter, r *http.Request) {
  472. defer r.Body.Close()
  473. vars := mux.Vars(r)
  474. name := vars["name"]
  475. err := startRule(name)
  476. if err != nil {
  477. handleError(w, err, "start rule error", logger)
  478. return
  479. }
  480. w.WriteHeader(http.StatusOK)
  481. w.Write([]byte(fmt.Sprintf("Rule %s was started", name)))
  482. }
  483. // stop a rule
  484. func stopRuleHandler(w http.ResponseWriter, r *http.Request) {
  485. defer r.Body.Close()
  486. vars := mux.Vars(r)
  487. name := vars["name"]
  488. result := stopRule(name)
  489. w.WriteHeader(http.StatusOK)
  490. w.Write([]byte(result))
  491. }
  492. // restart a rule
  493. func restartRuleHandler(w http.ResponseWriter, r *http.Request) {
  494. defer r.Body.Close()
  495. vars := mux.Vars(r)
  496. name := vars["name"]
  497. err := restartRule(name)
  498. if err != nil {
  499. handleError(w, err, "restart rule error", logger)
  500. return
  501. }
  502. w.WriteHeader(http.StatusOK)
  503. w.Write([]byte(fmt.Sprintf("Rule %s was restarted", name)))
  504. }
  505. // get topo of a rule
  506. func getTopoRuleHandler(w http.ResponseWriter, r *http.Request) {
  507. defer r.Body.Close()
  508. vars := mux.Vars(r)
  509. name := vars["name"]
  510. content, err := getRuleTopo(name)
  511. if err != nil {
  512. handleError(w, err, "get rule topo error", logger)
  513. return
  514. }
  515. w.Header().Set(ContentType, ContentTypeJSON)
  516. w.Write([]byte(content))
  517. }
  518. type rulesetInfo struct {
  519. Content string `json:"content"`
  520. FilePath string `json:"file"`
  521. }
  522. func importHandler(w http.ResponseWriter, r *http.Request) {
  523. rsi := &rulesetInfo{}
  524. err := json.NewDecoder(r.Body).Decode(rsi)
  525. if err != nil {
  526. handleError(w, err, "Invalid body: Error decoding json", logger)
  527. return
  528. }
  529. if rsi.Content != "" && rsi.FilePath != "" {
  530. handleError(w, errors.New("bad request"), "Invalid body: Cannot specify both content and file", logger)
  531. return
  532. } else if rsi.Content == "" && rsi.FilePath == "" {
  533. handleError(w, errors.New("bad request"), "Invalid body: must specify content or file", logger)
  534. return
  535. }
  536. content := []byte(rsi.Content)
  537. if rsi.FilePath != "" {
  538. reader, err := httpx.ReadFile(rsi.FilePath)
  539. if err != nil {
  540. handleError(w, err, "Fail to read file", logger)
  541. return
  542. }
  543. defer reader.Close()
  544. buf := new(bytes.Buffer)
  545. _, err = io.Copy(buf, reader)
  546. if err != nil {
  547. handleError(w, err, "fail to convert file", logger)
  548. return
  549. }
  550. content = buf.Bytes()
  551. }
  552. rules, counts, err := rulesetProcessor.Import(content)
  553. if err != nil {
  554. handleError(w, nil, "Import ruleset error", logger)
  555. return
  556. }
  557. infra.SafeRun(func() error {
  558. for _, name := range rules {
  559. rul, ee := ruleProcessor.GetRuleById(name)
  560. if ee != nil {
  561. logger.Error(ee)
  562. continue
  563. }
  564. reply := recoverRule(rul)
  565. if reply != "" {
  566. logger.Error(reply)
  567. }
  568. }
  569. return nil
  570. })
  571. w.Write([]byte(fmt.Sprintf("imported %d streams, %d tables and %d rules", counts[0], counts[1], counts[2])))
  572. }
  573. func exportHandler(w http.ResponseWriter, r *http.Request) {
  574. const name = "ekuiper_export.json"
  575. exported, _, err := rulesetProcessor.Export()
  576. if err != nil {
  577. handleError(w, err, "export error", logger)
  578. return
  579. }
  580. w.Header().Set("Content-Type", "application/octet-stream")
  581. w.Header().Add("Content-Disposition", "Attachment")
  582. http.ServeContent(w, r, name, time.Now(), exported)
  583. }
  584. type Configuration struct {
  585. Streams map[string]string `json:"streams"`
  586. Tables map[string]string `json:"tables"`
  587. Rules map[string]string `json:"rules"`
  588. NativePlugins map[string]string `json:"nativePlugins"`
  589. PortablePlugins map[string]string `json:"portablePlugins"`
  590. SourceConfig map[string]string `json:"sourceConfig"`
  591. SinkConfig map[string]string `json:"sinkConfig"`
  592. ConnectionConfig map[string]string `json:"connectionConfig"`
  593. Service map[string]string `json:"Service"`
  594. Schema map[string]string `json:"Schema"`
  595. }
  596. func configurationExport() ([]byte, error) {
  597. conf := &Configuration{
  598. Streams: make(map[string]string),
  599. Tables: make(map[string]string),
  600. Rules: make(map[string]string),
  601. NativePlugins: make(map[string]string),
  602. PortablePlugins: make(map[string]string),
  603. SourceConfig: make(map[string]string),
  604. SinkConfig: make(map[string]string),
  605. ConnectionConfig: make(map[string]string),
  606. Service: make(map[string]string),
  607. Schema: make(map[string]string),
  608. }
  609. ruleSet := rulesetProcessor.ExportRuleSet()
  610. if ruleSet != nil {
  611. conf.Streams = ruleSet.Streams
  612. conf.Tables = ruleSet.Tables
  613. conf.Rules = ruleSet.Rules
  614. }
  615. conf.NativePlugins = pluginExport()
  616. conf.PortablePlugins = portablePluginExport()
  617. conf.Service = serviceExport()
  618. conf.Schema = schemaExport()
  619. yamlCfg := meta.GetConfigurations()
  620. conf.SourceConfig = yamlCfg.Sources
  621. conf.SinkConfig = yamlCfg.Sinks
  622. conf.ConnectionConfig = yamlCfg.Connections
  623. return json.Marshal(conf)
  624. }
  625. func configurationExportHandler(w http.ResponseWriter, r *http.Request) {
  626. var jsonBytes []byte
  627. const name = "ekuiper_export.json"
  628. switch r.Method {
  629. case http.MethodGet:
  630. jsonBytes, _ = configurationExport()
  631. case http.MethodPost:
  632. var rules []string
  633. _ = json.NewDecoder(r.Body).Decode(&rules)
  634. jsonBytes, _ = ruleMigrationProcessor.ConfigurationPartialExport(rules)
  635. }
  636. w.Header().Set("Content-Type", "application/octet-stream")
  637. w.Header().Add("Content-Disposition", "Attachment")
  638. http.ServeContent(w, r, name, time.Now(), bytes.NewReader(jsonBytes))
  639. }
  640. func configurationReset() {
  641. _ = resetAllRules()
  642. _ = resetAllStreams()
  643. pluginReset()
  644. portablePluginsReset()
  645. serviceReset()
  646. schemaReset()
  647. meta.ResetConfigs()
  648. }
  649. type ImportConfigurationStatus struct {
  650. ErrorMsg string
  651. ConfigResponse Configuration
  652. }
  653. func configurationImport(data []byte, reboot bool) ImportConfigurationStatus {
  654. conf := &Configuration{
  655. Streams: make(map[string]string),
  656. Tables: make(map[string]string),
  657. Rules: make(map[string]string),
  658. NativePlugins: make(map[string]string),
  659. PortablePlugins: make(map[string]string),
  660. SourceConfig: make(map[string]string),
  661. SinkConfig: make(map[string]string),
  662. ConnectionConfig: make(map[string]string),
  663. Service: make(map[string]string),
  664. Schema: make(map[string]string),
  665. }
  666. importStatus := ImportConfigurationStatus{}
  667. configResponse := Configuration{
  668. Streams: make(map[string]string),
  669. Tables: make(map[string]string),
  670. Rules: make(map[string]string),
  671. NativePlugins: make(map[string]string),
  672. PortablePlugins: make(map[string]string),
  673. SourceConfig: make(map[string]string),
  674. SinkConfig: make(map[string]string),
  675. ConnectionConfig: make(map[string]string),
  676. Service: make(map[string]string),
  677. Schema: make(map[string]string),
  678. }
  679. ResponseNil := Configuration{
  680. Streams: make(map[string]string),
  681. Tables: make(map[string]string),
  682. Rules: make(map[string]string),
  683. NativePlugins: make(map[string]string),
  684. PortablePlugins: make(map[string]string),
  685. SourceConfig: make(map[string]string),
  686. SinkConfig: make(map[string]string),
  687. ConnectionConfig: make(map[string]string),
  688. Service: make(map[string]string),
  689. Schema: make(map[string]string),
  690. }
  691. err := json.Unmarshal(data, conf)
  692. if err != nil {
  693. importStatus.ErrorMsg = fmt.Errorf("configuration unmarshal with error %v", err).Error()
  694. return importStatus
  695. }
  696. if reboot {
  697. err = pluginImport(conf.NativePlugins)
  698. if err != nil {
  699. importStatus.ErrorMsg = fmt.Errorf("pluginImport NativePlugins import error %v", err).Error()
  700. return importStatus
  701. }
  702. err = schemaImport(conf.Schema)
  703. if err != nil {
  704. importStatus.ErrorMsg = fmt.Errorf("schemaImport Schema import error %v", err).Error()
  705. return importStatus
  706. }
  707. }
  708. configResponse.PortablePlugins = portablePluginImport(conf.PortablePlugins)
  709. configResponse.Service = serviceImport(conf.Service)
  710. yamlCfgSet := meta.YamlConfigurationSet{
  711. Sources: conf.SourceConfig,
  712. Sinks: conf.SinkConfig,
  713. Connections: conf.ConnectionConfig,
  714. }
  715. confRsp := meta.LoadConfigurations(yamlCfgSet)
  716. configResponse.SourceConfig = confRsp.Sources
  717. configResponse.SinkConfig = confRsp.Sinks
  718. configResponse.ConnectionConfig = confRsp.Connections
  719. ruleSet := processor.Ruleset{
  720. Streams: conf.Streams,
  721. Tables: conf.Tables,
  722. Rules: conf.Rules,
  723. }
  724. result := rulesetProcessor.ImportRuleSet(ruleSet)
  725. configResponse.Streams = result.Streams
  726. configResponse.Tables = result.Tables
  727. configResponse.Rules = result.Rules
  728. if !reboot {
  729. infra.SafeRun(func() error {
  730. for name := range ruleSet.Rules {
  731. rul, ee := ruleProcessor.GetRuleById(name)
  732. if ee != nil {
  733. logger.Error(ee)
  734. continue
  735. }
  736. reply := recoverRule(rul)
  737. if reply != "" {
  738. logger.Error(reply)
  739. }
  740. }
  741. return nil
  742. })
  743. }
  744. if reflect.DeepEqual(ResponseNil, configResponse) {
  745. importStatus.ConfigResponse = ResponseNil
  746. } else {
  747. importStatus.ErrorMsg = "process error"
  748. importStatus.ConfigResponse = configResponse
  749. }
  750. return importStatus
  751. }
  752. func configurationPartialImport(data []byte) ImportConfigurationStatus {
  753. conf := &Configuration{
  754. Streams: make(map[string]string),
  755. Tables: make(map[string]string),
  756. Rules: make(map[string]string),
  757. NativePlugins: make(map[string]string),
  758. PortablePlugins: make(map[string]string),
  759. SourceConfig: make(map[string]string),
  760. SinkConfig: make(map[string]string),
  761. ConnectionConfig: make(map[string]string),
  762. Service: make(map[string]string),
  763. Schema: make(map[string]string),
  764. }
  765. importStatus := ImportConfigurationStatus{}
  766. configResponse := Configuration{
  767. Streams: make(map[string]string),
  768. Tables: make(map[string]string),
  769. Rules: make(map[string]string),
  770. NativePlugins: make(map[string]string),
  771. PortablePlugins: make(map[string]string),
  772. SourceConfig: make(map[string]string),
  773. SinkConfig: make(map[string]string),
  774. ConnectionConfig: make(map[string]string),
  775. Service: make(map[string]string),
  776. Schema: make(map[string]string),
  777. }
  778. ResponseNil := Configuration{
  779. Streams: make(map[string]string),
  780. Tables: make(map[string]string),
  781. Rules: make(map[string]string),
  782. NativePlugins: make(map[string]string),
  783. PortablePlugins: make(map[string]string),
  784. SourceConfig: make(map[string]string),
  785. SinkConfig: make(map[string]string),
  786. ConnectionConfig: make(map[string]string),
  787. Service: make(map[string]string),
  788. Schema: make(map[string]string),
  789. }
  790. err := json.Unmarshal(data, conf)
  791. if err != nil {
  792. importStatus.ErrorMsg = fmt.Errorf("configuration unmarshal with error %v", err).Error()
  793. return importStatus
  794. }
  795. yamlCfgSet := meta.YamlConfigurationSet{
  796. Sources: conf.SourceConfig,
  797. Sinks: conf.SinkConfig,
  798. Connections: conf.ConnectionConfig,
  799. }
  800. confRsp := meta.LoadConfigurationsPartial(yamlCfgSet)
  801. configResponse.NativePlugins = pluginPartialImport(conf.NativePlugins)
  802. configResponse.Schema = schemaPartialImport(conf.Schema)
  803. configResponse.PortablePlugins = portablePluginPartialImport(conf.PortablePlugins)
  804. configResponse.Service = servicePartialImport(conf.Service)
  805. configResponse.SourceConfig = confRsp.Sources
  806. configResponse.SinkConfig = confRsp.Sinks
  807. configResponse.ConnectionConfig = confRsp.Connections
  808. ruleSet := processor.Ruleset{
  809. Streams: conf.Streams,
  810. Tables: conf.Tables,
  811. Rules: conf.Rules,
  812. }
  813. result := importRuleSetPartial(ruleSet)
  814. configResponse.Streams = result.Streams
  815. configResponse.Tables = result.Tables
  816. configResponse.Rules = result.Rules
  817. if reflect.DeepEqual(ResponseNil, configResponse) {
  818. importStatus.ConfigResponse = ResponseNil
  819. } else {
  820. importStatus.ErrorMsg = "process error"
  821. importStatus.ConfigResponse = configResponse
  822. }
  823. return importStatus
  824. }
  825. type configurationInfo struct {
  826. Content string `json:"content"`
  827. FilePath string `json:"file"`
  828. }
  829. func configurationImportHandler(w http.ResponseWriter, r *http.Request) {
  830. cb := r.URL.Query().Get("stop")
  831. stop := cb == "1"
  832. par := r.URL.Query().Get("partial")
  833. partial := par == "1"
  834. rsi := &configurationInfo{}
  835. err := json.NewDecoder(r.Body).Decode(rsi)
  836. if err != nil {
  837. handleError(w, err, "Invalid body: Error decoding json", logger)
  838. return
  839. }
  840. if rsi.Content != "" && rsi.FilePath != "" {
  841. handleError(w, errors.New("bad request"), "Invalid body: Cannot specify both content and file", logger)
  842. return
  843. } else if rsi.Content == "" && rsi.FilePath == "" {
  844. handleError(w, errors.New("bad request"), "Invalid body: must specify content or file", logger)
  845. return
  846. }
  847. content := []byte(rsi.Content)
  848. if rsi.FilePath != "" {
  849. reader, err := httpx.ReadFile(rsi.FilePath)
  850. if err != nil {
  851. handleError(w, err, "Fail to read file", logger)
  852. return
  853. }
  854. defer reader.Close()
  855. buf := new(bytes.Buffer)
  856. _, err = io.Copy(buf, reader)
  857. if err != nil {
  858. handleError(w, err, "fail to convert file", logger)
  859. return
  860. }
  861. content = buf.Bytes()
  862. }
  863. if !partial {
  864. configurationReset()
  865. result := configurationImport(content, stop)
  866. if result.ErrorMsg != "" {
  867. w.WriteHeader(http.StatusBadRequest)
  868. jsonResponse(result, w, logger)
  869. } else {
  870. w.WriteHeader(http.StatusOK)
  871. jsonResponse(result, w, logger)
  872. }
  873. if stop {
  874. go func() {
  875. time.Sleep(1 * time.Second)
  876. os.Exit(100)
  877. }()
  878. }
  879. } else {
  880. result := configurationPartialImport(content)
  881. if result.ErrorMsg != "" {
  882. w.WriteHeader(http.StatusBadRequest)
  883. jsonResponse(result, w, logger)
  884. } else {
  885. w.WriteHeader(http.StatusOK)
  886. jsonResponse(result, w, logger)
  887. }
  888. }
  889. }
  890. func configurationStatusExport() Configuration {
  891. conf := Configuration{
  892. Streams: make(map[string]string),
  893. Tables: make(map[string]string),
  894. Rules: make(map[string]string),
  895. NativePlugins: make(map[string]string),
  896. PortablePlugins: make(map[string]string),
  897. SourceConfig: make(map[string]string),
  898. SinkConfig: make(map[string]string),
  899. ConnectionConfig: make(map[string]string),
  900. Service: make(map[string]string),
  901. Schema: make(map[string]string),
  902. }
  903. ruleSet := rulesetProcessor.ExportRuleSetStatus()
  904. if ruleSet != nil {
  905. conf.Streams = ruleSet.Streams
  906. conf.Tables = ruleSet.Tables
  907. conf.Rules = ruleSet.Rules
  908. }
  909. conf.NativePlugins = pluginStatusExport()
  910. conf.PortablePlugins = portablePluginStatusExport()
  911. conf.Service = serviceStatusExport()
  912. conf.Schema = schemaStatusExport()
  913. yamlCfgStatus := meta.GetConfigurationStatus()
  914. conf.SourceConfig = yamlCfgStatus.Sources
  915. conf.SinkConfig = yamlCfgStatus.Sinks
  916. conf.ConnectionConfig = yamlCfgStatus.Connections
  917. return conf
  918. }
  919. func configurationStatusHandler(w http.ResponseWriter, r *http.Request) {
  920. defer r.Body.Close()
  921. content := configurationStatusExport()
  922. jsonResponse(content, w, logger)
  923. }
  924. func importRuleSetPartial(all processor.Ruleset) processor.Ruleset {
  925. ruleSetRsp := processor.Ruleset{
  926. Rules: map[string]string{},
  927. Streams: map[string]string{},
  928. Tables: map[string]string{},
  929. }
  930. // replace streams
  931. for k, v := range all.Streams {
  932. _, e := streamProcessor.ExecReplaceStream(k, v, ast.TypeStream)
  933. if e != nil {
  934. ruleSetRsp.Streams[k] = e.Error()
  935. continue
  936. }
  937. }
  938. // replace tables
  939. for k, v := range all.Tables {
  940. _, e := streamProcessor.ExecReplaceStream(k, v, ast.TypeTable)
  941. if e != nil {
  942. ruleSetRsp.Tables[k] = e.Error()
  943. continue
  944. }
  945. }
  946. for k, v := range all.Rules {
  947. _, err := ruleProcessor.GetRuleJson(k)
  948. if err == nil {
  949. // the rule already exist, update
  950. err = updateRule(k, v)
  951. if err != nil {
  952. ruleSetRsp.Rules[k] = err.Error()
  953. continue
  954. }
  955. // Update to db after validation
  956. _, err = ruleProcessor.ExecUpdate(k, v)
  957. if err != nil {
  958. ruleSetRsp.Rules[k] = err.Error()
  959. continue
  960. }
  961. } else {
  962. // not found, create
  963. _, err2 := createRule(k, v)
  964. if err2 != nil {
  965. ruleSetRsp.Rules[k] = err2.Error()
  966. continue
  967. }
  968. }
  969. }
  970. return ruleSetRsp
  971. }