rest.go 28 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package server
  15. import (
  16. "bytes"
  17. "encoding/json"
  18. "errors"
  19. "fmt"
  20. "github.com/lf-edge/ekuiper/internal/conf"
  21. "github.com/lf-edge/ekuiper/internal/meta"
  22. "github.com/lf-edge/ekuiper/internal/pkg/httpx"
  23. "github.com/lf-edge/ekuiper/internal/processor"
  24. "io"
  25. "net/http"
  26. "os"
  27. "path/filepath"
  28. "runtime"
  29. "strconv"
  30. "strings"
  31. "time"
  32. "github.com/gorilla/handlers"
  33. "github.com/gorilla/mux"
  34. "github.com/lf-edge/ekuiper/internal/server/middleware"
  35. "github.com/lf-edge/ekuiper/pkg/api"
  36. "github.com/lf-edge/ekuiper/pkg/ast"
  37. "github.com/lf-edge/ekuiper/pkg/errorx"
  38. "github.com/lf-edge/ekuiper/pkg/infra"
  39. )
  40. const (
  41. ContentType = "Content-Type"
  42. ContentTypeJSON = "application/json"
  43. )
  44. var uploadDir string
  45. type statementDescriptor struct {
  46. Sql string `json:"sql,omitempty"`
  47. }
  48. func decodeStatementDescriptor(reader io.ReadCloser) (statementDescriptor, error) {
  49. sd := statementDescriptor{}
  50. err := json.NewDecoder(reader).Decode(&sd)
  51. // Problems decoding
  52. if err != nil {
  53. return sd, fmt.Errorf("Error decoding the statement descriptor: %v", err)
  54. }
  55. return sd, nil
  56. }
  57. // Handle applies the specified error and error concept to the HTTP response writer
  58. func handleError(w http.ResponseWriter, err error, prefix string, logger api.Logger) {
  59. message := prefix
  60. if message != "" {
  61. message += ": "
  62. }
  63. message += err.Error()
  64. logger.Error(message)
  65. var ec int
  66. switch e := err.(type) {
  67. case *errorx.Error:
  68. switch e.Code() {
  69. case errorx.NOT_FOUND:
  70. ec = http.StatusNotFound
  71. default:
  72. ec = http.StatusBadRequest
  73. }
  74. default:
  75. ec = http.StatusBadRequest
  76. }
  77. http.Error(w, message, ec)
  78. }
  79. func jsonResponse(i interface{}, w http.ResponseWriter, logger api.Logger) {
  80. w.Header().Add(ContentType, ContentTypeJSON)
  81. jsonByte, err := json.Marshal(i)
  82. if err != nil {
  83. handleError(w, err, "", logger)
  84. }
  85. w.Header().Add("Content-Length", strconv.Itoa(len(jsonByte)))
  86. _, err = w.Write(jsonByte)
  87. // Problems encoding
  88. if err != nil {
  89. handleError(w, err, "", logger)
  90. }
  91. }
  92. func jsonByteResponse(buffer bytes.Buffer, w http.ResponseWriter, logger api.Logger) {
  93. w.Header().Add(ContentType, ContentTypeJSON)
  94. w.Header().Add("Content-Length", strconv.Itoa(buffer.Len()))
  95. _, err := w.Write(buffer.Bytes())
  96. // Problems encoding
  97. if err != nil {
  98. handleError(w, err, "", logger)
  99. }
  100. }
  101. func createRestServer(ip string, port int, needToken bool) *http.Server {
  102. dataDir, err := conf.GetDataLoc()
  103. if err != nil {
  104. panic(err)
  105. }
  106. uploadDir = filepath.Join(dataDir, "uploads")
  107. r := mux.NewRouter()
  108. r.HandleFunc("/", rootHandler).Methods(http.MethodGet, http.MethodPost)
  109. r.HandleFunc("/ping", pingHandler).Methods(http.MethodGet)
  110. r.HandleFunc("/streams", streamsHandler).Methods(http.MethodGet, http.MethodPost)
  111. r.HandleFunc("/streams/{name}", streamHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
  112. r.HandleFunc("/streams/{name}/schema", streamSchemaHandler).Methods(http.MethodGet)
  113. r.HandleFunc("/tables", tablesHandler).Methods(http.MethodGet, http.MethodPost)
  114. r.HandleFunc("/tables/{name}", tableHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
  115. r.HandleFunc("/tables/{name}/schema", tableSchemaHandler).Methods(http.MethodGet)
  116. r.HandleFunc("/rules", rulesHandler).Methods(http.MethodGet, http.MethodPost)
  117. r.HandleFunc("/rules/{name}", ruleHandler).Methods(http.MethodDelete, http.MethodGet, http.MethodPut)
  118. r.HandleFunc("/rules/{name}/status", getStatusRuleHandler).Methods(http.MethodGet)
  119. r.HandleFunc("/rules/{name}/start", startRuleHandler).Methods(http.MethodPost)
  120. r.HandleFunc("/rules/{name}/stop", stopRuleHandler).Methods(http.MethodPost)
  121. r.HandleFunc("/rules/{name}/restart", restartRuleHandler).Methods(http.MethodPost)
  122. r.HandleFunc("/rules/{name}/topo", getTopoRuleHandler).Methods(http.MethodGet)
  123. r.HandleFunc("/ruleset/export", exportHandler).Methods(http.MethodPost)
  124. r.HandleFunc("/ruleset/import", importHandler).Methods(http.MethodPost)
  125. r.HandleFunc("/config/uploads", fileUploadHandler).Methods(http.MethodPost, http.MethodGet)
  126. r.HandleFunc("/config/uploads/{name}", fileDeleteHandler).Methods(http.MethodDelete)
  127. r.HandleFunc("/data/export", configurationExportHandler).Methods(http.MethodGet, http.MethodPost)
  128. r.HandleFunc("/data/import", configurationImportHandler).Methods(http.MethodPost)
  129. r.HandleFunc("/data/import/status", configurationStatusHandler).Methods(http.MethodGet)
  130. // Register extended routes
  131. for k, v := range components {
  132. logger.Infof("register rest endpoint for component %s", k)
  133. v.rest(r)
  134. }
  135. if needToken {
  136. r.Use(middleware.Auth)
  137. }
  138. server := &http.Server{
  139. Addr: fmt.Sprintf("%s:%d", ip, port),
  140. // Good practice to set timeouts to avoid Slowloris attacks.
  141. WriteTimeout: time.Second * 60 * 5,
  142. ReadTimeout: time.Second * 60 * 5,
  143. IdleTimeout: time.Second * 60,
  144. Handler: handlers.CORS(handlers.AllowedHeaders([]string{"Accept", "Accept-Language", "Content-Type", "Content-Language", "Origin", "Authorization"}), handlers.AllowedMethods([]string{"POST", "GET", "PUT", "DELETE", "HEAD"}))(r),
  145. }
  146. server.SetKeepAlivesEnabled(false)
  147. return server
  148. }
  149. type fileContent struct {
  150. Name string `json:"name"`
  151. Content string `json:"content"`
  152. }
  153. func fileUploadHandler(w http.ResponseWriter, r *http.Request) {
  154. switch r.Method {
  155. // Upload or overwrite a file
  156. case http.MethodPost:
  157. switch r.Header.Get("Content-Type") {
  158. case "application/json":
  159. fc := &fileContent{}
  160. defer r.Body.Close()
  161. err := json.NewDecoder(r.Body).Decode(fc)
  162. if err != nil {
  163. handleError(w, err, "Invalid body: Error decoding file json", logger)
  164. return
  165. }
  166. if fc.Content == "" || fc.Name == "" {
  167. handleError(w, nil, "Invalid body: name and content are required", logger)
  168. return
  169. }
  170. filePath := filepath.Join(uploadDir, fc.Name)
  171. dst, err := os.Create(filePath)
  172. defer dst.Close()
  173. if err != nil {
  174. handleError(w, err, "Error creating the file", logger)
  175. return
  176. }
  177. _, err = dst.Write([]byte(fc.Content))
  178. if err != nil {
  179. handleError(w, err, "Error writing the file", logger)
  180. return
  181. }
  182. w.WriteHeader(http.StatusCreated)
  183. w.Write([]byte(filePath))
  184. default:
  185. // Maximum upload of 1 GB files
  186. err := r.ParseMultipartForm(1024 << 20)
  187. if err != nil {
  188. handleError(w, err, "Error parse the multi part form", logger)
  189. return
  190. }
  191. // Get handler for filename, size and headers
  192. file, handler, err := r.FormFile("uploadFile")
  193. if err != nil {
  194. handleError(w, err, "Error Retrieving the File", logger)
  195. return
  196. }
  197. defer file.Close()
  198. // Create file
  199. filePath := filepath.Join(uploadDir, handler.Filename)
  200. dst, err := os.Create(filePath)
  201. defer dst.Close()
  202. if err != nil {
  203. handleError(w, err, "Error creating the file", logger)
  204. return
  205. }
  206. // Copy the uploaded file to the created file on the filesystem
  207. if _, err := io.Copy(dst, file); err != nil {
  208. handleError(w, err, "Error writing the file", logger)
  209. return
  210. }
  211. w.WriteHeader(http.StatusCreated)
  212. w.Write([]byte(filePath))
  213. }
  214. case http.MethodGet:
  215. // Get the list of files in the upload directory
  216. files, err := os.ReadDir(uploadDir)
  217. if err != nil {
  218. handleError(w, err, "Error reading the file upload dir", logger)
  219. return
  220. }
  221. fileNames := make([]string, len(files))
  222. for i, f := range files {
  223. fileNames[i] = filepath.Join(uploadDir, f.Name())
  224. }
  225. jsonResponse(fileNames, w, logger)
  226. }
  227. }
  228. func fileDeleteHandler(w http.ResponseWriter, r *http.Request) {
  229. vars := mux.Vars(r)
  230. name := vars["name"]
  231. filePath := filepath.Join(uploadDir, name)
  232. e := os.Remove(filePath)
  233. if e != nil {
  234. handleError(w, e, "Error deleting the file", logger)
  235. return
  236. }
  237. w.WriteHeader(http.StatusOK)
  238. w.Write([]byte("ok"))
  239. }
  240. type information struct {
  241. Version string `json:"version"`
  242. Os string `json:"os"`
  243. Arch string `json:"arch"`
  244. UpTimeSeconds int64 `json:"upTimeSeconds"`
  245. }
  246. // The handler for root
  247. func rootHandler(w http.ResponseWriter, r *http.Request) {
  248. defer r.Body.Close()
  249. switch r.Method {
  250. case http.MethodGet, http.MethodPost:
  251. w.WriteHeader(http.StatusOK)
  252. info := new(information)
  253. info.Version = version
  254. info.UpTimeSeconds = time.Now().Unix() - startTimeStamp
  255. info.Os = runtime.GOOS
  256. info.Arch = runtime.GOARCH
  257. byteInfo, _ := json.Marshal(info)
  258. w.Write(byteInfo)
  259. }
  260. }
  261. func pingHandler(w http.ResponseWriter, _ *http.Request) {
  262. w.WriteHeader(http.StatusOK)
  263. }
  264. func sourcesManageHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
  265. defer r.Body.Close()
  266. switch r.Method {
  267. case http.MethodGet:
  268. var (
  269. content []string
  270. err error
  271. kind string
  272. )
  273. if st == ast.TypeTable {
  274. kind = r.URL.Query().Get("kind")
  275. if kind == "scan" {
  276. kind = ast.StreamKindScan
  277. } else if kind == "lookup" {
  278. kind = ast.StreamKindLookup
  279. } else {
  280. kind = ""
  281. }
  282. }
  283. if kind != "" {
  284. content, err = streamProcessor.ShowTable(kind)
  285. } else {
  286. content, err = streamProcessor.ShowStream(st)
  287. }
  288. if err != nil {
  289. handleError(w, err, fmt.Sprintf("%s command error", strings.Title(ast.StreamTypeMap[st])), logger)
  290. return
  291. }
  292. jsonResponse(content, w, logger)
  293. case http.MethodPost:
  294. v, err := decodeStatementDescriptor(r.Body)
  295. if err != nil {
  296. handleError(w, err, "Invalid body", logger)
  297. return
  298. }
  299. content, err := streamProcessor.ExecStreamSql(v.Sql)
  300. if err != nil {
  301. handleError(w, err, fmt.Sprintf("%s command error", strings.Title(ast.StreamTypeMap[st])), logger)
  302. return
  303. }
  304. w.WriteHeader(http.StatusCreated)
  305. w.Write([]byte(content))
  306. }
  307. }
  308. func sourceManageHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
  309. defer r.Body.Close()
  310. vars := mux.Vars(r)
  311. name := vars["name"]
  312. switch r.Method {
  313. case http.MethodGet:
  314. content, err := streamProcessor.DescStream(name, st)
  315. if err != nil {
  316. handleError(w, err, fmt.Sprintf("describe %s error", ast.StreamTypeMap[st]), logger)
  317. return
  318. }
  319. jsonResponse(content, w, logger)
  320. case http.MethodDelete:
  321. content, err := streamProcessor.DropStream(name, st)
  322. if err != nil {
  323. handleError(w, err, fmt.Sprintf("delete %s error", ast.StreamTypeMap[st]), logger)
  324. return
  325. }
  326. w.WriteHeader(http.StatusOK)
  327. w.Write([]byte(content))
  328. case http.MethodPut:
  329. v, err := decodeStatementDescriptor(r.Body)
  330. if err != nil {
  331. handleError(w, err, "Invalid body", logger)
  332. return
  333. }
  334. content, err := streamProcessor.ExecReplaceStream(name, v.Sql, st)
  335. if err != nil {
  336. handleError(w, err, fmt.Sprintf("%s command error", strings.Title(ast.StreamTypeMap[st])), logger)
  337. return
  338. }
  339. w.WriteHeader(http.StatusOK)
  340. w.Write([]byte(content))
  341. }
  342. }
  343. // list or create streams
  344. func streamsHandler(w http.ResponseWriter, r *http.Request) {
  345. sourcesManageHandler(w, r, ast.TypeStream)
  346. }
  347. // describe or delete a stream
  348. func streamHandler(w http.ResponseWriter, r *http.Request) {
  349. sourceManageHandler(w, r, ast.TypeStream)
  350. }
  351. // list or create tables
  352. func tablesHandler(w http.ResponseWriter, r *http.Request) {
  353. sourcesManageHandler(w, r, ast.TypeTable)
  354. }
  355. func tableHandler(w http.ResponseWriter, r *http.Request) {
  356. sourceManageHandler(w, r, ast.TypeTable)
  357. }
  358. func streamSchemaHandler(w http.ResponseWriter, r *http.Request) {
  359. sourceSchemaHandler(w, r, ast.TypeStream)
  360. }
  361. func tableSchemaHandler(w http.ResponseWriter, r *http.Request) {
  362. sourceSchemaHandler(w, r, ast.TypeTable)
  363. }
  364. func sourceSchemaHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
  365. vars := mux.Vars(r)
  366. name := vars["name"]
  367. content, err := streamProcessor.GetInferredJsonSchema(name, st)
  368. if err != nil {
  369. handleError(w, err, fmt.Sprintf("get schema of %s error", ast.StreamTypeMap[st]), logger)
  370. return
  371. }
  372. jsonResponse(content, w, logger)
  373. }
  374. // list or create rules
  375. func rulesHandler(w http.ResponseWriter, r *http.Request) {
  376. defer r.Body.Close()
  377. switch r.Method {
  378. case http.MethodPost:
  379. body, err := io.ReadAll(r.Body)
  380. if err != nil {
  381. handleError(w, err, "Invalid body", logger)
  382. return
  383. }
  384. id, err := createRule("", string(body))
  385. if err != nil {
  386. handleError(w, err, "", logger)
  387. return
  388. }
  389. result := fmt.Sprintf("Rule %s was created successfully.", id)
  390. w.WriteHeader(http.StatusCreated)
  391. w.Write([]byte(result))
  392. case http.MethodGet:
  393. content, err := getAllRulesWithStatus()
  394. if err != nil {
  395. handleError(w, err, "Show rules error", logger)
  396. return
  397. }
  398. jsonResponse(content, w, logger)
  399. }
  400. }
  401. // describe or delete a rule
  402. func ruleHandler(w http.ResponseWriter, r *http.Request) {
  403. defer r.Body.Close()
  404. vars := mux.Vars(r)
  405. name := vars["name"]
  406. switch r.Method {
  407. case http.MethodGet:
  408. rule, err := ruleProcessor.GetRuleJson(name)
  409. if err != nil {
  410. handleError(w, err, "Describe rule error", logger)
  411. return
  412. }
  413. w.Header().Add(ContentType, ContentTypeJSON)
  414. w.Write([]byte(rule))
  415. case http.MethodDelete:
  416. deleteRule(name)
  417. content, err := ruleProcessor.ExecDrop(name)
  418. if err != nil {
  419. handleError(w, err, "Delete rule error", logger)
  420. return
  421. }
  422. w.WriteHeader(http.StatusOK)
  423. w.Write([]byte(content))
  424. case http.MethodPut:
  425. _, err := ruleProcessor.GetRuleById(name)
  426. if err != nil {
  427. handleError(w, err, "Rule not found", logger)
  428. return
  429. }
  430. body, err := io.ReadAll(r.Body)
  431. if err != nil {
  432. handleError(w, err, "Invalid body", logger)
  433. return
  434. }
  435. err = updateRule(name, string(body))
  436. if err != nil {
  437. handleError(w, err, "Update rule error", logger)
  438. return
  439. }
  440. // Update to db after validation
  441. _, err = ruleProcessor.ExecUpdate(name, string(body))
  442. var result string
  443. if err != nil {
  444. handleError(w, err, "Update rule error, suggest to delete it and recreate", logger)
  445. return
  446. } else {
  447. result = fmt.Sprintf("Rule %s was updated successfully.", name)
  448. }
  449. w.WriteHeader(http.StatusOK)
  450. w.Write([]byte(result))
  451. }
  452. }
  453. // get status of a rule
  454. func getStatusRuleHandler(w http.ResponseWriter, r *http.Request) {
  455. defer r.Body.Close()
  456. vars := mux.Vars(r)
  457. name := vars["name"]
  458. content, err := getRuleStatus(name)
  459. if err != nil {
  460. handleError(w, err, "get rule status error", logger)
  461. return
  462. }
  463. w.Header().Set(ContentType, ContentTypeJSON)
  464. w.WriteHeader(http.StatusOK)
  465. w.Write([]byte(content))
  466. }
  467. // start a rule
  468. func startRuleHandler(w http.ResponseWriter, r *http.Request) {
  469. defer r.Body.Close()
  470. vars := mux.Vars(r)
  471. name := vars["name"]
  472. err := startRule(name)
  473. if err != nil {
  474. handleError(w, err, "start rule error", logger)
  475. return
  476. }
  477. w.WriteHeader(http.StatusOK)
  478. w.Write([]byte(fmt.Sprintf("Rule %s was started", name)))
  479. }
  480. // stop a rule
  481. func stopRuleHandler(w http.ResponseWriter, r *http.Request) {
  482. defer r.Body.Close()
  483. vars := mux.Vars(r)
  484. name := vars["name"]
  485. result := stopRule(name)
  486. w.WriteHeader(http.StatusOK)
  487. w.Write([]byte(result))
  488. }
  489. // restart a rule
  490. func restartRuleHandler(w http.ResponseWriter, r *http.Request) {
  491. defer r.Body.Close()
  492. vars := mux.Vars(r)
  493. name := vars["name"]
  494. err := restartRule(name)
  495. if err != nil {
  496. handleError(w, err, "restart rule error", logger)
  497. return
  498. }
  499. w.WriteHeader(http.StatusOK)
  500. w.Write([]byte(fmt.Sprintf("Rule %s was restarted", name)))
  501. }
  502. // get topo of a rule
  503. func getTopoRuleHandler(w http.ResponseWriter, r *http.Request) {
  504. defer r.Body.Close()
  505. vars := mux.Vars(r)
  506. name := vars["name"]
  507. content, err := getRuleTopo(name)
  508. if err != nil {
  509. handleError(w, err, "get rule topo error", logger)
  510. return
  511. }
  512. w.Header().Set(ContentType, ContentTypeJSON)
  513. w.Write([]byte(content))
  514. }
  515. type rulesetInfo struct {
  516. Content string `json:"content"`
  517. FilePath string `json:"file"`
  518. }
  519. func importHandler(w http.ResponseWriter, r *http.Request) {
  520. rsi := &rulesetInfo{}
  521. err := json.NewDecoder(r.Body).Decode(rsi)
  522. if err != nil {
  523. handleError(w, err, "Invalid body: Error decoding json", logger)
  524. return
  525. }
  526. if rsi.Content != "" && rsi.FilePath != "" {
  527. handleError(w, errors.New("bad request"), "Invalid body: Cannot specify both content and file", logger)
  528. return
  529. } else if rsi.Content == "" && rsi.FilePath == "" {
  530. handleError(w, errors.New("bad request"), "Invalid body: must specify content or file", logger)
  531. return
  532. }
  533. content := []byte(rsi.Content)
  534. if rsi.FilePath != "" {
  535. reader, err := httpx.ReadFile(rsi.FilePath)
  536. if err != nil {
  537. handleError(w, err, "Fail to read file", logger)
  538. return
  539. }
  540. defer reader.Close()
  541. buf := new(bytes.Buffer)
  542. _, err = io.Copy(buf, reader)
  543. if err != nil {
  544. handleError(w, err, "fail to convert file", logger)
  545. return
  546. }
  547. content = buf.Bytes()
  548. }
  549. rules, counts, err := rulesetProcessor.Import(content)
  550. if err != nil {
  551. handleError(w, nil, "Import ruleset error", logger)
  552. return
  553. }
  554. infra.SafeRun(func() error {
  555. for _, name := range rules {
  556. rul, ee := ruleProcessor.GetRuleById(name)
  557. if ee != nil {
  558. logger.Error(ee)
  559. continue
  560. }
  561. reply := recoverRule(rul)
  562. if reply != "" {
  563. logger.Error(reply)
  564. }
  565. }
  566. return nil
  567. })
  568. w.Write([]byte(fmt.Sprintf("imported %d streams, %d tables and %d rules", counts[0], counts[1], counts[2])))
  569. }
  570. func exportHandler(w http.ResponseWriter, r *http.Request) {
  571. const name = "ekuiper_export.json"
  572. exported, _, err := rulesetProcessor.Export()
  573. if err != nil {
  574. handleError(w, err, "export error", logger)
  575. return
  576. }
  577. w.Header().Set("Content-Type", "application/octet-stream")
  578. w.Header().Add("Content-Disposition", "Attachment")
  579. http.ServeContent(w, r, name, time.Now(), exported)
  580. }
  581. type Configuration struct {
  582. Streams map[string]string `json:"streams"`
  583. Tables map[string]string `json:"tables"`
  584. Rules map[string]string `json:"rules"`
  585. NativePlugins map[string]string `json:"nativePlugins"`
  586. PortablePlugins map[string]string `json:"portablePlugins"`
  587. SourceConfig map[string]string `json:"sourceConfig"`
  588. SinkConfig map[string]string `json:"sinkConfig"`
  589. ConnectionConfig map[string]string `json:"connectionConfig"`
  590. Service map[string]string `json:"Service"`
  591. Schema map[string]string `json:"Schema"`
  592. }
  593. func configurationExport() ([]byte, error) {
  594. conf := &Configuration{
  595. Streams: make(map[string]string),
  596. Tables: make(map[string]string),
  597. Rules: make(map[string]string),
  598. NativePlugins: make(map[string]string),
  599. PortablePlugins: make(map[string]string),
  600. SourceConfig: make(map[string]string),
  601. SinkConfig: make(map[string]string),
  602. ConnectionConfig: make(map[string]string),
  603. Service: make(map[string]string),
  604. Schema: make(map[string]string),
  605. }
  606. ruleSet := rulesetProcessor.ExportRuleSet()
  607. if ruleSet != nil {
  608. conf.Streams = ruleSet.Streams
  609. conf.Tables = ruleSet.Tables
  610. conf.Rules = ruleSet.Rules
  611. }
  612. conf.NativePlugins = pluginExport()
  613. conf.PortablePlugins = portablePluginExport()
  614. conf.Service = serviceExport()
  615. conf.Schema = schemaExport()
  616. yamlCfg := meta.GetConfigurations()
  617. conf.SourceConfig = yamlCfg.Sources
  618. conf.SinkConfig = yamlCfg.Sinks
  619. conf.ConnectionConfig = yamlCfg.Connections
  620. return json.Marshal(conf)
  621. }
  622. func configurationExportHandler(w http.ResponseWriter, r *http.Request) {
  623. var jsonBytes []byte
  624. const name = "ekuiper_export.json"
  625. switch r.Method {
  626. case http.MethodGet:
  627. jsonBytes, _ = configurationExport()
  628. case http.MethodPost:
  629. var rules []string
  630. _ = json.NewDecoder(r.Body).Decode(&rules)
  631. jsonBytes, _ = ruleMigrationProcessor.ConfigurationPartialExport(rules)
  632. }
  633. w.Header().Set("Content-Type", "application/octet-stream")
  634. w.Header().Add("Content-Disposition", "Attachment")
  635. http.ServeContent(w, r, name, time.Now(), bytes.NewReader(jsonBytes))
  636. }
  637. func configurationReset() {
  638. _ = resetAllRules()
  639. _ = resetAllStreams()
  640. pluginReset()
  641. portablePluginsReset()
  642. serviceReset()
  643. schemaReset()
  644. meta.ResetConfigs()
  645. }
  646. func configurationImport(data []byte, reboot bool) Configuration {
  647. conf := &Configuration{
  648. Streams: make(map[string]string),
  649. Tables: make(map[string]string),
  650. Rules: make(map[string]string),
  651. NativePlugins: make(map[string]string),
  652. PortablePlugins: make(map[string]string),
  653. SourceConfig: make(map[string]string),
  654. SinkConfig: make(map[string]string),
  655. ConnectionConfig: make(map[string]string),
  656. Service: make(map[string]string),
  657. Schema: make(map[string]string),
  658. }
  659. configResponse := Configuration{
  660. Streams: make(map[string]string),
  661. Tables: make(map[string]string),
  662. Rules: make(map[string]string),
  663. NativePlugins: make(map[string]string),
  664. PortablePlugins: make(map[string]string),
  665. SourceConfig: make(map[string]string),
  666. SinkConfig: make(map[string]string),
  667. ConnectionConfig: make(map[string]string),
  668. Service: make(map[string]string),
  669. Schema: make(map[string]string),
  670. }
  671. err := json.Unmarshal(data, conf)
  672. if err != nil {
  673. configResponse.Rules["configuration"] = fmt.Errorf("configuration unmarshal with error %v", err).Error()
  674. return configResponse
  675. }
  676. if reboot {
  677. err = pluginImport(conf.NativePlugins)
  678. if err != nil {
  679. return configResponse
  680. }
  681. err = schemaImport(conf.Schema)
  682. if err != nil {
  683. return configResponse
  684. }
  685. }
  686. configResponse.PortablePlugins = portablePluginImport(conf.PortablePlugins)
  687. configResponse.Service = serviceImport(conf.Service)
  688. yamlCfgSet := meta.YamlConfigurationSet{
  689. Sources: conf.SourceConfig,
  690. Sinks: conf.SinkConfig,
  691. Connections: conf.ConnectionConfig,
  692. }
  693. confRsp := meta.LoadConfigurations(yamlCfgSet)
  694. configResponse.SourceConfig = confRsp.Sources
  695. configResponse.SinkConfig = confRsp.Sinks
  696. configResponse.ConnectionConfig = confRsp.Connections
  697. ruleSet := processor.Ruleset{
  698. Streams: conf.Streams,
  699. Tables: conf.Tables,
  700. Rules: conf.Rules,
  701. }
  702. result := rulesetProcessor.ImportRuleSet(ruleSet)
  703. configResponse.Streams = result.Streams
  704. configResponse.Tables = result.Tables
  705. configResponse.Rules = result.Rules
  706. if !reboot {
  707. infra.SafeRun(func() error {
  708. for name := range ruleSet.Rules {
  709. rul, ee := ruleProcessor.GetRuleById(name)
  710. if ee != nil {
  711. logger.Error(ee)
  712. continue
  713. }
  714. reply := recoverRule(rul)
  715. if reply != "" {
  716. logger.Error(reply)
  717. }
  718. }
  719. return nil
  720. })
  721. }
  722. return configResponse
  723. }
  724. func configurationPartialImport(data []byte) Configuration {
  725. conf := &Configuration{
  726. Streams: make(map[string]string),
  727. Tables: make(map[string]string),
  728. Rules: make(map[string]string),
  729. NativePlugins: make(map[string]string),
  730. PortablePlugins: make(map[string]string),
  731. SourceConfig: make(map[string]string),
  732. SinkConfig: make(map[string]string),
  733. ConnectionConfig: make(map[string]string),
  734. Service: make(map[string]string),
  735. Schema: make(map[string]string),
  736. }
  737. configResponse := Configuration{
  738. Streams: make(map[string]string),
  739. Tables: make(map[string]string),
  740. Rules: make(map[string]string),
  741. NativePlugins: make(map[string]string),
  742. PortablePlugins: make(map[string]string),
  743. SourceConfig: make(map[string]string),
  744. SinkConfig: make(map[string]string),
  745. ConnectionConfig: make(map[string]string),
  746. Service: make(map[string]string),
  747. Schema: make(map[string]string),
  748. }
  749. err := json.Unmarshal(data, conf)
  750. if err != nil {
  751. configResponse.Rules["configuration"] = fmt.Errorf("configuration unmarshal with error %v", err).Error()
  752. return configResponse
  753. }
  754. yamlCfgSet := meta.YamlConfigurationSet{
  755. Sources: conf.SourceConfig,
  756. Sinks: conf.SinkConfig,
  757. Connections: conf.ConnectionConfig,
  758. }
  759. confRsp := meta.LoadConfigurationsPartial(yamlCfgSet)
  760. configResponse.NativePlugins = pluginPartialImport(conf.NativePlugins)
  761. configResponse.Schema = schemaPartialImport(conf.Schema)
  762. configResponse.PortablePlugins = portablePluginPartialImport(conf.PortablePlugins)
  763. configResponse.Service = servicePartialImport(conf.Service)
  764. configResponse.SourceConfig = confRsp.Sources
  765. configResponse.SinkConfig = confRsp.Sinks
  766. configResponse.ConnectionConfig = confRsp.Connections
  767. ruleSet := processor.Ruleset{
  768. Streams: conf.Streams,
  769. Tables: conf.Tables,
  770. Rules: conf.Rules,
  771. }
  772. result := importRuleSetPartial(ruleSet)
  773. configResponse.Streams = result.Streams
  774. configResponse.Tables = result.Tables
  775. configResponse.Rules = result.Rules
  776. return configResponse
  777. }
  778. type configurationInfo struct {
  779. Content string `json:"content"`
  780. FilePath string `json:"file"`
  781. }
  782. func configurationImportHandler(w http.ResponseWriter, r *http.Request) {
  783. cb := r.URL.Query().Get("stop")
  784. stop := cb == "1"
  785. par := r.URL.Query().Get("partial")
  786. partial := par == "1"
  787. rsi := &configurationInfo{}
  788. err := json.NewDecoder(r.Body).Decode(rsi)
  789. if err != nil {
  790. handleError(w, err, "Invalid body: Error decoding json", logger)
  791. return
  792. }
  793. if rsi.Content != "" && rsi.FilePath != "" {
  794. handleError(w, errors.New("bad request"), "Invalid body: Cannot specify both content and file", logger)
  795. return
  796. } else if rsi.Content == "" && rsi.FilePath == "" {
  797. handleError(w, errors.New("bad request"), "Invalid body: must specify content or file", logger)
  798. return
  799. }
  800. content := []byte(rsi.Content)
  801. if rsi.FilePath != "" {
  802. reader, err := httpx.ReadFile(rsi.FilePath)
  803. if err != nil {
  804. handleError(w, err, "Fail to read file", logger)
  805. return
  806. }
  807. defer reader.Close()
  808. buf := new(bytes.Buffer)
  809. _, err = io.Copy(buf, reader)
  810. if err != nil {
  811. handleError(w, err, "fail to convert file", logger)
  812. return
  813. }
  814. content = buf.Bytes()
  815. }
  816. if !partial {
  817. configurationReset()
  818. result := configurationImport(content, stop)
  819. jsonResponse(result, w, logger)
  820. if stop {
  821. go func() {
  822. time.Sleep(1 * time.Second)
  823. os.Exit(100)
  824. }()
  825. }
  826. } else {
  827. result := configurationPartialImport(content)
  828. jsonResponse(result, w, logger)
  829. }
  830. w.WriteHeader(http.StatusOK)
  831. }
  832. func configurationStatusExport() Configuration {
  833. conf := Configuration{
  834. Streams: make(map[string]string),
  835. Tables: make(map[string]string),
  836. Rules: make(map[string]string),
  837. NativePlugins: make(map[string]string),
  838. PortablePlugins: make(map[string]string),
  839. SourceConfig: make(map[string]string),
  840. SinkConfig: make(map[string]string),
  841. ConnectionConfig: make(map[string]string),
  842. Service: make(map[string]string),
  843. Schema: make(map[string]string),
  844. }
  845. ruleSet := rulesetProcessor.ExportRuleSetStatus()
  846. if ruleSet != nil {
  847. conf.Streams = ruleSet.Streams
  848. conf.Tables = ruleSet.Tables
  849. conf.Rules = ruleSet.Rules
  850. }
  851. conf.NativePlugins = pluginStatusExport()
  852. conf.PortablePlugins = portablePluginStatusExport()
  853. conf.Service = serviceStatusExport()
  854. conf.Schema = schemaStatusExport()
  855. yamlCfgStatus := meta.GetConfigurationStatus()
  856. conf.SourceConfig = yamlCfgStatus.Sources
  857. conf.SinkConfig = yamlCfgStatus.Sinks
  858. conf.ConnectionConfig = yamlCfgStatus.Connections
  859. return conf
  860. }
  861. func configurationStatusHandler(w http.ResponseWriter, r *http.Request) {
  862. defer r.Body.Close()
  863. content := configurationStatusExport()
  864. jsonResponse(content, w, logger)
  865. }
  866. func importRuleSetPartial(all processor.Ruleset) processor.Ruleset {
  867. ruleSetRsp := processor.Ruleset{
  868. Rules: map[string]string{},
  869. Streams: map[string]string{},
  870. Tables: map[string]string{},
  871. }
  872. //replace streams
  873. for k, v := range all.Streams {
  874. _, e := streamProcessor.ExecReplaceStream(k, v, ast.TypeStream)
  875. if e != nil {
  876. ruleSetRsp.Streams[k] = e.Error()
  877. continue
  878. }
  879. }
  880. // replace tables
  881. for k, v := range all.Tables {
  882. _, e := streamProcessor.ExecReplaceStream(k, v, ast.TypeTable)
  883. if e != nil {
  884. ruleSetRsp.Tables[k] = e.Error()
  885. continue
  886. }
  887. }
  888. for k, v := range all.Rules {
  889. _, err := ruleProcessor.GetRuleJson(k)
  890. if err == nil {
  891. // the rule already exist, update
  892. err = updateRule(k, v)
  893. if err != nil {
  894. ruleSetRsp.Rules[k] = err.Error()
  895. continue
  896. }
  897. // Update to db after validation
  898. _, err = ruleProcessor.ExecUpdate(k, v)
  899. if err != nil {
  900. ruleSetRsp.Rules[k] = err.Error()
  901. continue
  902. }
  903. } else {
  904. // not found, create
  905. _, err2 := createRule(k, v)
  906. if err2 != nil {
  907. ruleSetRsp.Rules[k] = err2.Error()
  908. continue
  909. }
  910. }
  911. }
  912. return ruleSetRsp
  913. }