rest.go 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package server
  15. import (
  16. "bytes"
  17. "encoding/json"
  18. "errors"
  19. "fmt"
  20. "io"
  21. "net/http"
  22. "os"
  23. "path/filepath"
  24. "reflect"
  25. "runtime"
  26. "strconv"
  27. "time"
  28. "github.com/gorilla/handlers"
  29. "github.com/gorilla/mux"
  30. "golang.org/x/text/cases"
  31. "golang.org/x/text/language"
  32. "github.com/lf-edge/ekuiper/internal/conf"
  33. "github.com/lf-edge/ekuiper/internal/meta"
  34. "github.com/lf-edge/ekuiper/internal/pkg/httpx"
  35. "github.com/lf-edge/ekuiper/internal/pkg/store"
  36. "github.com/lf-edge/ekuiper/internal/processor"
  37. "github.com/lf-edge/ekuiper/internal/server/middleware"
  38. "github.com/lf-edge/ekuiper/pkg/api"
  39. "github.com/lf-edge/ekuiper/pkg/ast"
  40. "github.com/lf-edge/ekuiper/pkg/cast"
  41. "github.com/lf-edge/ekuiper/pkg/errorx"
  42. "github.com/lf-edge/ekuiper/pkg/infra"
  43. "github.com/lf-edge/ekuiper/pkg/kv"
  44. )
  45. const (
  46. ContentType = "Content-Type"
  47. ContentTypeJSON = "application/json"
  48. )
  49. var (
  50. uploadDir string
  51. uploadsDb kv.KeyValue
  52. uploadsStatusDb kv.KeyValue
  53. )
  54. type statementDescriptor struct {
  55. Sql string `json:"sql,omitempty"`
  56. }
  57. func decodeStatementDescriptor(reader io.ReadCloser) (statementDescriptor, error) {
  58. sd := statementDescriptor{}
  59. err := json.NewDecoder(reader).Decode(&sd)
  60. // Problems decoding
  61. if err != nil {
  62. return sd, fmt.Errorf("Error decoding the statement descriptor: %v", err)
  63. }
  64. return sd, nil
  65. }
  66. // Handle applies the specified error and error concept to the HTTP response writer
  67. func handleError(w http.ResponseWriter, err error, prefix string, logger api.Logger) {
  68. message := prefix
  69. if message != "" {
  70. message += ": "
  71. }
  72. message += err.Error()
  73. logger.Error(message)
  74. var ec int
  75. switch e := err.(type) {
  76. case *errorx.Error:
  77. switch e.Code() {
  78. case errorx.NOT_FOUND:
  79. ec = http.StatusNotFound
  80. default:
  81. ec = http.StatusBadRequest
  82. }
  83. default:
  84. ec = http.StatusBadRequest
  85. }
  86. http.Error(w, message, ec)
  87. }
  88. func jsonResponse(i interface{}, w http.ResponseWriter, logger api.Logger) {
  89. w.Header().Add(ContentType, ContentTypeJSON)
  90. jsonByte, err := json.Marshal(i)
  91. if err != nil {
  92. handleError(w, err, "", logger)
  93. }
  94. w.Header().Add("Content-Length", strconv.Itoa(len(jsonByte)))
  95. _, err = w.Write(jsonByte)
  96. // Problems encoding
  97. if err != nil {
  98. handleError(w, err, "", logger)
  99. }
  100. }
  101. func jsonByteResponse(buffer bytes.Buffer, w http.ResponseWriter, logger api.Logger) {
  102. w.Header().Add(ContentType, ContentTypeJSON)
  103. w.Header().Add("Content-Length", strconv.Itoa(buffer.Len()))
  104. _, err := w.Write(buffer.Bytes())
  105. // Problems encoding
  106. if err != nil {
  107. handleError(w, err, "", logger)
  108. }
  109. }
  110. func createRestServer(ip string, port int, needToken bool) *http.Server {
  111. dataDir, err := conf.GetDataLoc()
  112. if err != nil {
  113. panic(err)
  114. }
  115. uploadDir = filepath.Join(dataDir, "uploads")
  116. uploadsDb, err = store.GetKV("uploads")
  117. if err != nil {
  118. panic(err)
  119. }
  120. uploadsStatusDb, err = store.GetKV("uploadsStatusDb")
  121. if err != nil {
  122. panic(err)
  123. }
  124. r := mux.NewRouter()
  125. r.HandleFunc("/", rootHandler).Methods(http.MethodGet, http.MethodPost)
  126. r.HandleFunc("/ping", pingHandler).Methods(http.MethodGet)
  127. r.HandleFunc("/streams", streamsHandler).Methods(http.MethodGet, http.MethodPost)
  128. r.HandleFunc("/streams/{name}", streamHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
  129. r.HandleFunc("/streams/{name}/schema", streamSchemaHandler).Methods(http.MethodGet)
  130. r.HandleFunc("/tables", tablesHandler).Methods(http.MethodGet, http.MethodPost)
  131. r.HandleFunc("/tables/{name}", tableHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
  132. r.HandleFunc("/tables/{name}/schema", tableSchemaHandler).Methods(http.MethodGet)
  133. r.HandleFunc("/rules", rulesHandler).Methods(http.MethodGet, http.MethodPost)
  134. r.HandleFunc("/rules/{name}", ruleHandler).Methods(http.MethodDelete, http.MethodGet, http.MethodPut)
  135. r.HandleFunc("/rules/{name}/status", getStatusRuleHandler).Methods(http.MethodGet)
  136. r.HandleFunc("/rules/{name}/start", startRuleHandler).Methods(http.MethodPost)
  137. r.HandleFunc("/rules/{name}/stop", stopRuleHandler).Methods(http.MethodPost)
  138. r.HandleFunc("/rules/{name}/restart", restartRuleHandler).Methods(http.MethodPost)
  139. r.HandleFunc("/rules/{name}/topo", getTopoRuleHandler).Methods(http.MethodGet)
  140. r.HandleFunc("/ruleset/export", exportHandler).Methods(http.MethodPost)
  141. r.HandleFunc("/ruleset/import", importHandler).Methods(http.MethodPost)
  142. r.HandleFunc("/configs", configurationUpdateHandler).Methods(http.MethodPatch)
  143. r.HandleFunc("/config/uploads", fileUploadHandler).Methods(http.MethodPost, http.MethodGet)
  144. r.HandleFunc("/config/uploads/{name}", fileDeleteHandler).Methods(http.MethodDelete)
  145. r.HandleFunc("/data/export", configurationExportHandler).Methods(http.MethodGet, http.MethodPost)
  146. r.HandleFunc("/data/import", configurationImportHandler).Methods(http.MethodPost)
  147. r.HandleFunc("/data/import/status", configurationStatusHandler).Methods(http.MethodGet)
  148. // Register extended routes
  149. for k, v := range components {
  150. logger.Infof("register rest endpoint for component %s", k)
  151. v.rest(r)
  152. }
  153. if needToken {
  154. r.Use(middleware.Auth)
  155. }
  156. server := &http.Server{
  157. Addr: cast.JoinHostPortInt(ip, port),
  158. // Good practice to set timeouts to avoid Slowloris attacks.
  159. WriteTimeout: time.Second * 60 * 5,
  160. ReadTimeout: time.Second * 60 * 5,
  161. IdleTimeout: time.Second * 60,
  162. Handler: handlers.CORS(handlers.AllowedHeaders([]string{"Accept", "Accept-Language", "Content-Type", "Content-Language", "Origin", "Authorization"}), handlers.AllowedMethods([]string{"POST", "GET", "PUT", "DELETE", "HEAD"}))(r),
  163. }
  164. server.SetKeepAlivesEnabled(false)
  165. return server
  166. }
  167. type fileContent struct {
  168. Name string `json:"name"`
  169. Content string `json:"content"`
  170. FilePath string `json:"file"`
  171. }
  172. func (f *fileContent) InstallScript() string {
  173. marshal, err := json.Marshal(f)
  174. if err != nil {
  175. return ""
  176. }
  177. return string(marshal)
  178. }
  179. func (f *fileContent) Validate() error {
  180. if f.Content == "" && f.FilePath == "" {
  181. return fmt.Errorf("invalid body: content or FilePath is required")
  182. }
  183. if f.Name == "" {
  184. return fmt.Errorf("invalid body: name is required")
  185. }
  186. return nil
  187. }
  188. func upload(file *fileContent) error {
  189. err := getFile(file)
  190. if err != nil {
  191. _ = uploadsStatusDb.Set(file.Name, err.Error())
  192. return err
  193. }
  194. return uploadsDb.Set(file.Name, file.InstallScript())
  195. }
  196. func getFile(file *fileContent) error {
  197. filePath := filepath.Join(uploadDir, file.Name)
  198. dst, err := os.Create(filePath)
  199. if err != nil {
  200. return err
  201. }
  202. defer dst.Close()
  203. if file.FilePath != "" {
  204. err := httpx.DownloadFile(filePath, file.FilePath)
  205. if err != nil {
  206. return err
  207. }
  208. } else {
  209. _, err := dst.Write([]byte(file.Content))
  210. if err != nil {
  211. return err
  212. }
  213. }
  214. return nil
  215. }
  216. func fileUploadHandler(w http.ResponseWriter, r *http.Request) {
  217. switch r.Method {
  218. // Upload or overwrite a file
  219. case http.MethodPost:
  220. switch r.Header.Get("Content-Type") {
  221. case "application/json":
  222. fc := &fileContent{}
  223. defer r.Body.Close()
  224. err := json.NewDecoder(r.Body).Decode(fc)
  225. if err != nil {
  226. handleError(w, err, "Invalid body: Error decoding file json", logger)
  227. return
  228. }
  229. err = fc.Validate()
  230. if err != nil {
  231. handleError(w, err, "Invalid body: missing necessary field", logger)
  232. return
  233. }
  234. filePath := filepath.Join(uploadDir, fc.Name)
  235. err = upload(fc)
  236. if err != nil {
  237. handleError(w, err, "Upload error: getFile has error", logger)
  238. return
  239. }
  240. w.WriteHeader(http.StatusCreated)
  241. w.Write([]byte(filePath))
  242. default:
  243. // Maximum upload of 1 GB files
  244. err := r.ParseMultipartForm(1024 << 20)
  245. if err != nil {
  246. handleError(w, err, "Error parse the multi part form", logger)
  247. return
  248. }
  249. // Get handler for filename, size and headers
  250. file, handler, err := r.FormFile("uploadFile")
  251. if err != nil {
  252. handleError(w, err, "Error Retrieving the File", logger)
  253. return
  254. }
  255. defer file.Close()
  256. // Create file
  257. filePath := filepath.Join(uploadDir, handler.Filename)
  258. dst, err := os.Create(filePath)
  259. defer dst.Close()
  260. if err != nil {
  261. handleError(w, err, "Error creating the file", logger)
  262. return
  263. }
  264. // Copy the uploaded file to the created file on the filesystem
  265. if _, err := io.Copy(dst, file); err != nil {
  266. handleError(w, err, "Error writing the file", logger)
  267. return
  268. }
  269. w.WriteHeader(http.StatusCreated)
  270. w.Write([]byte(filePath))
  271. }
  272. case http.MethodGet:
  273. // Get the list of files in the upload directory
  274. files, err := os.ReadDir(uploadDir)
  275. if err != nil {
  276. handleError(w, err, "Error reading the file upload dir", logger)
  277. return
  278. }
  279. fileNames := make([]string, len(files))
  280. for i, f := range files {
  281. fileNames[i] = filepath.Join(uploadDir, f.Name())
  282. }
  283. jsonResponse(fileNames, w, logger)
  284. }
  285. }
  286. func fileDeleteHandler(w http.ResponseWriter, r *http.Request) {
  287. vars := mux.Vars(r)
  288. name := vars["name"]
  289. filePath := filepath.Join(uploadDir, name)
  290. e := os.Remove(filePath)
  291. if e != nil {
  292. handleError(w, e, "Error deleting the file", logger)
  293. return
  294. }
  295. _ = uploadsDb.Delete(name)
  296. _ = uploadsStatusDb.Delete(name)
  297. w.WriteHeader(http.StatusOK)
  298. w.Write([]byte("ok"))
  299. }
  300. type information struct {
  301. Version string `json:"version"`
  302. Os string `json:"os"`
  303. Arch string `json:"arch"`
  304. UpTimeSeconds int64 `json:"upTimeSeconds"`
  305. }
  306. // The handler for root
  307. func rootHandler(w http.ResponseWriter, r *http.Request) {
  308. defer r.Body.Close()
  309. switch r.Method {
  310. case http.MethodGet, http.MethodPost:
  311. w.WriteHeader(http.StatusOK)
  312. info := new(information)
  313. info.Version = version
  314. info.UpTimeSeconds = time.Now().Unix() - startTimeStamp
  315. info.Os = runtime.GOOS
  316. info.Arch = runtime.GOARCH
  317. byteInfo, _ := json.Marshal(info)
  318. w.Write(byteInfo)
  319. }
  320. }
  321. func pingHandler(w http.ResponseWriter, _ *http.Request) {
  322. w.WriteHeader(http.StatusOK)
  323. }
  324. func sourcesManageHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
  325. defer r.Body.Close()
  326. switch r.Method {
  327. case http.MethodGet:
  328. var (
  329. content []string
  330. err error
  331. kind string
  332. )
  333. if st == ast.TypeTable {
  334. kind = r.URL.Query().Get("kind")
  335. if kind == "scan" {
  336. kind = ast.StreamKindScan
  337. } else if kind == "lookup" {
  338. kind = ast.StreamKindLookup
  339. } else {
  340. kind = ""
  341. }
  342. }
  343. if kind != "" {
  344. content, err = streamProcessor.ShowTable(kind)
  345. } else {
  346. content, err = streamProcessor.ShowStream(st)
  347. }
  348. if err != nil {
  349. handleError(w, err, fmt.Sprintf("%s command error", cases.Title(language.Und).String(ast.StreamTypeMap[st])), logger)
  350. return
  351. }
  352. jsonResponse(content, w, logger)
  353. case http.MethodPost:
  354. v, err := decodeStatementDescriptor(r.Body)
  355. if err != nil {
  356. handleError(w, err, "Invalid body", logger)
  357. return
  358. }
  359. content, err := streamProcessor.ExecStreamSql(v.Sql)
  360. if err != nil {
  361. handleError(w, err, fmt.Sprintf("%s command error", cases.Title(language.Und).String(ast.StreamTypeMap[st])), logger)
  362. return
  363. }
  364. w.WriteHeader(http.StatusCreated)
  365. w.Write([]byte(content))
  366. }
  367. }
  368. func sourceManageHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
  369. defer r.Body.Close()
  370. vars := mux.Vars(r)
  371. name := vars["name"]
  372. switch r.Method {
  373. case http.MethodGet:
  374. content, err := streamProcessor.DescStream(name, st)
  375. if err != nil {
  376. handleError(w, err, fmt.Sprintf("describe %s error", ast.StreamTypeMap[st]), logger)
  377. return
  378. }
  379. jsonResponse(content, w, logger)
  380. case http.MethodDelete:
  381. content, err := streamProcessor.DropStream(name, st)
  382. if err != nil {
  383. handleError(w, err, fmt.Sprintf("delete %s error", ast.StreamTypeMap[st]), logger)
  384. return
  385. }
  386. w.WriteHeader(http.StatusOK)
  387. w.Write([]byte(content))
  388. case http.MethodPut:
  389. v, err := decodeStatementDescriptor(r.Body)
  390. if err != nil {
  391. handleError(w, err, "Invalid body", logger)
  392. return
  393. }
  394. content, err := streamProcessor.ExecReplaceStream(name, v.Sql, st)
  395. if err != nil {
  396. handleError(w, err, fmt.Sprintf("%s command error", cases.Title(language.Und).String(ast.StreamTypeMap[st])), logger)
  397. return
  398. }
  399. w.WriteHeader(http.StatusOK)
  400. w.Write([]byte(content))
  401. }
  402. }
  403. // list or create streams
  404. func streamsHandler(w http.ResponseWriter, r *http.Request) {
  405. sourcesManageHandler(w, r, ast.TypeStream)
  406. }
  407. // describe or delete a stream
  408. func streamHandler(w http.ResponseWriter, r *http.Request) {
  409. sourceManageHandler(w, r, ast.TypeStream)
  410. }
  411. // list or create tables
  412. func tablesHandler(w http.ResponseWriter, r *http.Request) {
  413. sourcesManageHandler(w, r, ast.TypeTable)
  414. }
  415. func tableHandler(w http.ResponseWriter, r *http.Request) {
  416. sourceManageHandler(w, r, ast.TypeTable)
  417. }
  418. func streamSchemaHandler(w http.ResponseWriter, r *http.Request) {
  419. sourceSchemaHandler(w, r, ast.TypeStream)
  420. }
  421. func tableSchemaHandler(w http.ResponseWriter, r *http.Request) {
  422. sourceSchemaHandler(w, r, ast.TypeTable)
  423. }
  424. func sourceSchemaHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
  425. vars := mux.Vars(r)
  426. name := vars["name"]
  427. content, err := streamProcessor.GetInferredJsonSchema(name, st)
  428. if err != nil {
  429. handleError(w, err, fmt.Sprintf("get schema of %s error", ast.StreamTypeMap[st]), logger)
  430. return
  431. }
  432. jsonResponse(content, w, logger)
  433. }
  434. // list or create rules
  435. func rulesHandler(w http.ResponseWriter, r *http.Request) {
  436. defer r.Body.Close()
  437. switch r.Method {
  438. case http.MethodPost:
  439. body, err := io.ReadAll(r.Body)
  440. if err != nil {
  441. handleError(w, err, "Invalid body", logger)
  442. return
  443. }
  444. id, err := createRule("", string(body))
  445. if err != nil {
  446. handleError(w, err, "", logger)
  447. return
  448. }
  449. w.WriteHeader(http.StatusCreated)
  450. fmt.Fprintf(w, "Rule %s was created successfully.", id)
  451. case http.MethodGet:
  452. content, err := getAllRulesWithStatus()
  453. if err != nil {
  454. handleError(w, err, "Show rules error", logger)
  455. return
  456. }
  457. jsonResponse(content, w, logger)
  458. }
  459. }
  460. // describe or delete a rule
  461. func ruleHandler(w http.ResponseWriter, r *http.Request) {
  462. defer r.Body.Close()
  463. vars := mux.Vars(r)
  464. name := vars["name"]
  465. switch r.Method {
  466. case http.MethodGet:
  467. rule, err := ruleProcessor.GetRuleJson(name)
  468. if err != nil {
  469. handleError(w, err, "Describe rule error", logger)
  470. return
  471. }
  472. w.Header().Add(ContentType, ContentTypeJSON)
  473. w.Write([]byte(rule))
  474. case http.MethodDelete:
  475. deleteRule(name)
  476. content, err := ruleProcessor.ExecDrop(name)
  477. if err != nil {
  478. handleError(w, err, "Delete rule error", logger)
  479. return
  480. }
  481. w.WriteHeader(http.StatusOK)
  482. w.Write([]byte(content))
  483. case http.MethodPut:
  484. _, err := ruleProcessor.GetRuleById(name)
  485. if err != nil {
  486. handleError(w, err, "Rule not found", logger)
  487. return
  488. }
  489. body, err := io.ReadAll(r.Body)
  490. if err != nil {
  491. handleError(w, err, "Invalid body", logger)
  492. return
  493. }
  494. err = updateRule(name, string(body))
  495. if err != nil {
  496. handleError(w, err, "Update rule error", logger)
  497. return
  498. }
  499. // Update to db after validation
  500. _, err = ruleProcessor.ExecUpdate(name, string(body))
  501. if err != nil {
  502. handleError(w, err, "Update rule error, suggest to delete it and recreate", logger)
  503. return
  504. }
  505. w.WriteHeader(http.StatusOK)
  506. fmt.Fprintf(w, "Rule %s was updated successfully.", name)
  507. }
  508. }
  509. // get status of a rule
  510. func getStatusRuleHandler(w http.ResponseWriter, r *http.Request) {
  511. defer r.Body.Close()
  512. vars := mux.Vars(r)
  513. name := vars["name"]
  514. content, err := getRuleStatus(name)
  515. if err != nil {
  516. handleError(w, err, "get rule status error", logger)
  517. return
  518. }
  519. w.Header().Set(ContentType, ContentTypeJSON)
  520. w.WriteHeader(http.StatusOK)
  521. w.Write([]byte(content))
  522. }
  523. // start a rule
  524. func startRuleHandler(w http.ResponseWriter, r *http.Request) {
  525. defer r.Body.Close()
  526. vars := mux.Vars(r)
  527. name := vars["name"]
  528. err := startRule(name)
  529. if err != nil {
  530. handleError(w, err, "start rule error", logger)
  531. return
  532. }
  533. w.WriteHeader(http.StatusOK)
  534. fmt.Fprintf(w, "Rule %s was started", name)
  535. }
  536. // stop a rule
  537. func stopRuleHandler(w http.ResponseWriter, r *http.Request) {
  538. defer r.Body.Close()
  539. vars := mux.Vars(r)
  540. name := vars["name"]
  541. result := stopRule(name)
  542. w.WriteHeader(http.StatusOK)
  543. w.Write([]byte(result))
  544. }
  545. // restart a rule
  546. func restartRuleHandler(w http.ResponseWriter, r *http.Request) {
  547. defer r.Body.Close()
  548. vars := mux.Vars(r)
  549. name := vars["name"]
  550. err := restartRule(name)
  551. if err != nil {
  552. handleError(w, err, "restart rule error", logger)
  553. return
  554. }
  555. w.WriteHeader(http.StatusOK)
  556. fmt.Fprintf(w, "Rule %s was restarted", name)
  557. }
  558. // get topo of a rule
  559. func getTopoRuleHandler(w http.ResponseWriter, r *http.Request) {
  560. defer r.Body.Close()
  561. vars := mux.Vars(r)
  562. name := vars["name"]
  563. content, err := getRuleTopo(name)
  564. if err != nil {
  565. handleError(w, err, "get rule topo error", logger)
  566. return
  567. }
  568. w.Header().Set(ContentType, ContentTypeJSON)
  569. w.Write([]byte(content))
  570. }
  571. type rulesetInfo struct {
  572. Content string `json:"content"`
  573. FilePath string `json:"file"`
  574. }
  575. func importHandler(w http.ResponseWriter, r *http.Request) {
  576. rsi := &rulesetInfo{}
  577. err := json.NewDecoder(r.Body).Decode(rsi)
  578. if err != nil {
  579. handleError(w, err, "Invalid body: Error decoding json", logger)
  580. return
  581. }
  582. if rsi.Content != "" && rsi.FilePath != "" {
  583. handleError(w, errors.New("bad request"), "Invalid body: Cannot specify both content and file", logger)
  584. return
  585. } else if rsi.Content == "" && rsi.FilePath == "" {
  586. handleError(w, errors.New("bad request"), "Invalid body: must specify content or file", logger)
  587. return
  588. }
  589. content := []byte(rsi.Content)
  590. if rsi.FilePath != "" {
  591. reader, err := httpx.ReadFile(rsi.FilePath)
  592. if err != nil {
  593. handleError(w, err, "Fail to read file", logger)
  594. return
  595. }
  596. defer reader.Close()
  597. buf := new(bytes.Buffer)
  598. _, err = io.Copy(buf, reader)
  599. if err != nil {
  600. handleError(w, err, "fail to convert file", logger)
  601. return
  602. }
  603. content = buf.Bytes()
  604. }
  605. rules, counts, err := rulesetProcessor.Import(content)
  606. if err != nil {
  607. handleError(w, nil, "Import ruleset error", logger)
  608. return
  609. }
  610. infra.SafeRun(func() error {
  611. for _, name := range rules {
  612. rul, ee := ruleProcessor.GetRuleById(name)
  613. if ee != nil {
  614. logger.Error(ee)
  615. continue
  616. }
  617. reply := recoverRule(rul)
  618. if reply != "" {
  619. logger.Error(reply)
  620. }
  621. }
  622. return nil
  623. })
  624. fmt.Fprintf(w, "imported %d streams, %d tables and %d rules", counts[0], counts[1], counts[2])
  625. }
  626. func exportHandler(w http.ResponseWriter, r *http.Request) {
  627. const name = "ekuiper_export.json"
  628. exported, _, err := rulesetProcessor.Export()
  629. if err != nil {
  630. handleError(w, err, "export error", logger)
  631. return
  632. }
  633. w.Header().Set("Content-Type", "application/octet-stream")
  634. w.Header().Add("Content-Disposition", "Attachment")
  635. http.ServeContent(w, r, name, time.Now(), exported)
  636. }
  637. type Configuration struct {
  638. Streams map[string]string `json:"streams"`
  639. Tables map[string]string `json:"tables"`
  640. Rules map[string]string `json:"rules"`
  641. NativePlugins map[string]string `json:"nativePlugins"`
  642. PortablePlugins map[string]string `json:"portablePlugins"`
  643. SourceConfig map[string]string `json:"sourceConfig"`
  644. SinkConfig map[string]string `json:"sinkConfig"`
  645. ConnectionConfig map[string]string `json:"connectionConfig"`
  646. Service map[string]string `json:"Service"`
  647. Schema map[string]string `json:"Schema"`
  648. Uploads map[string]string `json:"uploads"`
  649. }
  650. func configurationExport() ([]byte, error) {
  651. conf := &Configuration{
  652. Streams: make(map[string]string),
  653. Tables: make(map[string]string),
  654. Rules: make(map[string]string),
  655. NativePlugins: make(map[string]string),
  656. PortablePlugins: make(map[string]string),
  657. SourceConfig: make(map[string]string),
  658. SinkConfig: make(map[string]string),
  659. ConnectionConfig: make(map[string]string),
  660. Service: make(map[string]string),
  661. Schema: make(map[string]string),
  662. Uploads: make(map[string]string),
  663. }
  664. ruleSet := rulesetProcessor.ExportRuleSet()
  665. if ruleSet != nil {
  666. conf.Streams = ruleSet.Streams
  667. conf.Tables = ruleSet.Tables
  668. conf.Rules = ruleSet.Rules
  669. }
  670. conf.NativePlugins = pluginExport()
  671. conf.PortablePlugins = portablePluginExport()
  672. conf.Service = serviceExport()
  673. conf.Schema = schemaExport()
  674. conf.Uploads = uploadsExport()
  675. yamlCfg := meta.GetConfigurations()
  676. conf.SourceConfig = yamlCfg.Sources
  677. conf.SinkConfig = yamlCfg.Sinks
  678. conf.ConnectionConfig = yamlCfg.Connections
  679. return json.Marshal(conf)
  680. }
  681. func configurationExportHandler(w http.ResponseWriter, r *http.Request) {
  682. var jsonBytes []byte
  683. const name = "ekuiper_export.json"
  684. switch r.Method {
  685. case http.MethodGet:
  686. jsonBytes, _ = configurationExport()
  687. case http.MethodPost:
  688. var rules []string
  689. _ = json.NewDecoder(r.Body).Decode(&rules)
  690. jsonBytes, _ = ruleMigrationProcessor.ConfigurationPartialExport(rules)
  691. }
  692. w.Header().Set("Content-Type", "application/octet-stream")
  693. w.Header().Add("Content-Disposition", "Attachment")
  694. http.ServeContent(w, r, name, time.Now(), bytes.NewReader(jsonBytes))
  695. }
  696. func configurationReset() {
  697. _ = resetAllRules()
  698. _ = resetAllStreams()
  699. pluginReset()
  700. portablePluginsReset()
  701. serviceReset()
  702. schemaReset()
  703. meta.ResetConfigs()
  704. uploadsReset()
  705. }
  706. type ImportConfigurationStatus struct {
  707. ErrorMsg string
  708. ConfigResponse Configuration
  709. }
  710. func configurationImport(data []byte, reboot bool) ImportConfigurationStatus {
  711. conf := &Configuration{
  712. Streams: make(map[string]string),
  713. Tables: make(map[string]string),
  714. Rules: make(map[string]string),
  715. NativePlugins: make(map[string]string),
  716. PortablePlugins: make(map[string]string),
  717. SourceConfig: make(map[string]string),
  718. SinkConfig: make(map[string]string),
  719. ConnectionConfig: make(map[string]string),
  720. Service: make(map[string]string),
  721. Schema: make(map[string]string),
  722. Uploads: make(map[string]string),
  723. }
  724. importStatus := ImportConfigurationStatus{}
  725. configResponse := Configuration{
  726. Streams: make(map[string]string),
  727. Tables: make(map[string]string),
  728. Rules: make(map[string]string),
  729. NativePlugins: make(map[string]string),
  730. PortablePlugins: make(map[string]string),
  731. SourceConfig: make(map[string]string),
  732. SinkConfig: make(map[string]string),
  733. ConnectionConfig: make(map[string]string),
  734. Service: make(map[string]string),
  735. Schema: make(map[string]string),
  736. Uploads: make(map[string]string),
  737. }
  738. ResponseNil := Configuration{
  739. Streams: make(map[string]string),
  740. Tables: make(map[string]string),
  741. Rules: make(map[string]string),
  742. NativePlugins: make(map[string]string),
  743. PortablePlugins: make(map[string]string),
  744. SourceConfig: make(map[string]string),
  745. SinkConfig: make(map[string]string),
  746. ConnectionConfig: make(map[string]string),
  747. Service: make(map[string]string),
  748. Schema: make(map[string]string),
  749. Uploads: make(map[string]string),
  750. }
  751. err := json.Unmarshal(data, conf)
  752. if err != nil {
  753. importStatus.ErrorMsg = fmt.Errorf("configuration unmarshal with error %v", err).Error()
  754. return importStatus
  755. }
  756. configResponse.Uploads = uploadsImport(conf.Uploads)
  757. if reboot {
  758. err = pluginImport(conf.NativePlugins)
  759. if err != nil {
  760. importStatus.ErrorMsg = fmt.Errorf("pluginImport NativePlugins import error %v", err).Error()
  761. return importStatus
  762. }
  763. err = schemaImport(conf.Schema)
  764. if err != nil {
  765. importStatus.ErrorMsg = fmt.Errorf("schemaImport Schema import error %v", err).Error()
  766. return importStatus
  767. }
  768. }
  769. configResponse.PortablePlugins = portablePluginImport(conf.PortablePlugins)
  770. configResponse.Service = serviceImport(conf.Service)
  771. yamlCfgSet := meta.YamlConfigurationSet{
  772. Sources: conf.SourceConfig,
  773. Sinks: conf.SinkConfig,
  774. Connections: conf.ConnectionConfig,
  775. }
  776. confRsp := meta.LoadConfigurations(yamlCfgSet)
  777. configResponse.SourceConfig = confRsp.Sources
  778. configResponse.SinkConfig = confRsp.Sinks
  779. configResponse.ConnectionConfig = confRsp.Connections
  780. ruleSet := processor.Ruleset{
  781. Streams: conf.Streams,
  782. Tables: conf.Tables,
  783. Rules: conf.Rules,
  784. }
  785. result := rulesetProcessor.ImportRuleSet(ruleSet)
  786. configResponse.Streams = result.Streams
  787. configResponse.Tables = result.Tables
  788. configResponse.Rules = result.Rules
  789. if !reboot {
  790. infra.SafeRun(func() error {
  791. for name := range ruleSet.Rules {
  792. rul, ee := ruleProcessor.GetRuleById(name)
  793. if ee != nil {
  794. logger.Error(ee)
  795. continue
  796. }
  797. reply := recoverRule(rul)
  798. if reply != "" {
  799. logger.Error(reply)
  800. }
  801. }
  802. return nil
  803. })
  804. }
  805. if reflect.DeepEqual(ResponseNil, configResponse) {
  806. importStatus.ConfigResponse = ResponseNil
  807. } else {
  808. importStatus.ErrorMsg = "process error"
  809. importStatus.ConfigResponse = configResponse
  810. }
  811. return importStatus
  812. }
  813. func configurationPartialImport(data []byte) ImportConfigurationStatus {
  814. conf := &Configuration{
  815. Streams: make(map[string]string),
  816. Tables: make(map[string]string),
  817. Rules: make(map[string]string),
  818. NativePlugins: make(map[string]string),
  819. PortablePlugins: make(map[string]string),
  820. SourceConfig: make(map[string]string),
  821. SinkConfig: make(map[string]string),
  822. ConnectionConfig: make(map[string]string),
  823. Service: make(map[string]string),
  824. Schema: make(map[string]string),
  825. Uploads: make(map[string]string),
  826. }
  827. importStatus := ImportConfigurationStatus{}
  828. configResponse := Configuration{
  829. Streams: make(map[string]string),
  830. Tables: make(map[string]string),
  831. Rules: make(map[string]string),
  832. NativePlugins: make(map[string]string),
  833. PortablePlugins: make(map[string]string),
  834. SourceConfig: make(map[string]string),
  835. SinkConfig: make(map[string]string),
  836. ConnectionConfig: make(map[string]string),
  837. Service: make(map[string]string),
  838. Schema: make(map[string]string),
  839. Uploads: make(map[string]string),
  840. }
  841. ResponseNil := Configuration{
  842. Streams: make(map[string]string),
  843. Tables: make(map[string]string),
  844. Rules: make(map[string]string),
  845. NativePlugins: make(map[string]string),
  846. PortablePlugins: make(map[string]string),
  847. SourceConfig: make(map[string]string),
  848. SinkConfig: make(map[string]string),
  849. ConnectionConfig: make(map[string]string),
  850. Service: make(map[string]string),
  851. Schema: make(map[string]string),
  852. Uploads: make(map[string]string),
  853. }
  854. err := json.Unmarshal(data, conf)
  855. if err != nil {
  856. importStatus.ErrorMsg = fmt.Errorf("configuration unmarshal with error %v", err).Error()
  857. return importStatus
  858. }
  859. yamlCfgSet := meta.YamlConfigurationSet{
  860. Sources: conf.SourceConfig,
  861. Sinks: conf.SinkConfig,
  862. Connections: conf.ConnectionConfig,
  863. }
  864. confRsp := meta.LoadConfigurationsPartial(yamlCfgSet)
  865. configResponse.Uploads = uploadsImport(conf.Uploads)
  866. configResponse.NativePlugins = pluginPartialImport(conf.NativePlugins)
  867. configResponse.Schema = schemaPartialImport(conf.Schema)
  868. configResponse.PortablePlugins = portablePluginPartialImport(conf.PortablePlugins)
  869. configResponse.Service = servicePartialImport(conf.Service)
  870. configResponse.SourceConfig = confRsp.Sources
  871. configResponse.SinkConfig = confRsp.Sinks
  872. configResponse.ConnectionConfig = confRsp.Connections
  873. ruleSet := processor.Ruleset{
  874. Streams: conf.Streams,
  875. Tables: conf.Tables,
  876. Rules: conf.Rules,
  877. }
  878. result := importRuleSetPartial(ruleSet)
  879. configResponse.Streams = result.Streams
  880. configResponse.Tables = result.Tables
  881. configResponse.Rules = result.Rules
  882. if reflect.DeepEqual(ResponseNil, configResponse) {
  883. importStatus.ConfigResponse = ResponseNil
  884. } else {
  885. importStatus.ErrorMsg = "process error"
  886. importStatus.ConfigResponse = configResponse
  887. }
  888. return importStatus
  889. }
  890. type configurationInfo struct {
  891. Content string `json:"content"`
  892. FilePath string `json:"file"`
  893. }
  894. func configurationImportHandler(w http.ResponseWriter, r *http.Request) {
  895. cb := r.URL.Query().Get("stop")
  896. stop := cb == "1"
  897. par := r.URL.Query().Get("partial")
  898. partial := par == "1"
  899. rsi := &configurationInfo{}
  900. err := json.NewDecoder(r.Body).Decode(rsi)
  901. if err != nil {
  902. handleError(w, err, "Invalid body: Error decoding json", logger)
  903. return
  904. }
  905. if rsi.Content != "" && rsi.FilePath != "" {
  906. handleError(w, errors.New("bad request"), "Invalid body: Cannot specify both content and file", logger)
  907. return
  908. } else if rsi.Content == "" && rsi.FilePath == "" {
  909. handleError(w, errors.New("bad request"), "Invalid body: must specify content or file", logger)
  910. return
  911. }
  912. content := []byte(rsi.Content)
  913. if rsi.FilePath != "" {
  914. reader, err := httpx.ReadFile(rsi.FilePath)
  915. if err != nil {
  916. handleError(w, err, "Fail to read file", logger)
  917. return
  918. }
  919. defer reader.Close()
  920. buf := new(bytes.Buffer)
  921. _, err = io.Copy(buf, reader)
  922. if err != nil {
  923. handleError(w, err, "fail to convert file", logger)
  924. return
  925. }
  926. content = buf.Bytes()
  927. }
  928. if !partial {
  929. configurationReset()
  930. result := configurationImport(content, stop)
  931. if result.ErrorMsg != "" {
  932. w.WriteHeader(http.StatusBadRequest)
  933. jsonResponse(result, w, logger)
  934. } else {
  935. w.WriteHeader(http.StatusOK)
  936. jsonResponse(result, w, logger)
  937. }
  938. if stop {
  939. go func() {
  940. time.Sleep(1 * time.Second)
  941. os.Exit(100)
  942. }()
  943. }
  944. } else {
  945. result := configurationPartialImport(content)
  946. if result.ErrorMsg != "" {
  947. w.WriteHeader(http.StatusBadRequest)
  948. jsonResponse(result, w, logger)
  949. } else {
  950. w.WriteHeader(http.StatusOK)
  951. jsonResponse(result, w, logger)
  952. }
  953. }
  954. }
  955. func configurationStatusExport() Configuration {
  956. conf := Configuration{
  957. Streams: make(map[string]string),
  958. Tables: make(map[string]string),
  959. Rules: make(map[string]string),
  960. NativePlugins: make(map[string]string),
  961. PortablePlugins: make(map[string]string),
  962. SourceConfig: make(map[string]string),
  963. SinkConfig: make(map[string]string),
  964. ConnectionConfig: make(map[string]string),
  965. Service: make(map[string]string),
  966. Schema: make(map[string]string),
  967. Uploads: make(map[string]string),
  968. }
  969. ruleSet := rulesetProcessor.ExportRuleSetStatus()
  970. if ruleSet != nil {
  971. conf.Streams = ruleSet.Streams
  972. conf.Tables = ruleSet.Tables
  973. conf.Rules = ruleSet.Rules
  974. }
  975. conf.NativePlugins = pluginStatusExport()
  976. conf.PortablePlugins = portablePluginStatusExport()
  977. conf.Service = serviceStatusExport()
  978. conf.Schema = schemaStatusExport()
  979. conf.Uploads = uploadsStatusExport()
  980. yamlCfgStatus := meta.GetConfigurationStatus()
  981. conf.SourceConfig = yamlCfgStatus.Sources
  982. conf.SinkConfig = yamlCfgStatus.Sinks
  983. conf.ConnectionConfig = yamlCfgStatus.Connections
  984. return conf
  985. }
  986. func configurationUpdateHandler(w http.ResponseWriter, r *http.Request) {
  987. basic := struct {
  988. Debug *bool `json:"debug"`
  989. ConsoleLog *bool `json:"consoleLog"`
  990. FileLog *bool `json:"fileLog"`
  991. TimeZone *string `json:"timezone"`
  992. }{}
  993. if err := json.NewDecoder(r.Body).Decode(&basic); err != nil {
  994. w.WriteHeader(http.StatusBadRequest)
  995. handleError(w, err, "Invalid JSON", logger)
  996. return
  997. }
  998. if basic.Debug != nil {
  999. conf.SetDebugLevel(*basic.Debug)
  1000. conf.Config.Basic.Debug = *basic.Debug
  1001. }
  1002. if basic.TimeZone != nil {
  1003. if err := cast.SetTimeZone(*basic.TimeZone); err != nil {
  1004. w.WriteHeader(http.StatusBadRequest)
  1005. handleError(w, err, "Invalid TZ", logger)
  1006. return
  1007. }
  1008. conf.Config.Basic.TimeZone = *basic.TimeZone
  1009. }
  1010. if basic.ConsoleLog != nil || basic.FileLog != nil {
  1011. consoleLog := conf.Config.Basic.ConsoleLog
  1012. if basic.ConsoleLog != nil {
  1013. consoleLog = *basic.ConsoleLog
  1014. }
  1015. fileLog := conf.Config.Basic.FileLog
  1016. if basic.FileLog != nil {
  1017. fileLog = *basic.FileLog
  1018. }
  1019. if err := conf.SetConsoleAndFileLog(consoleLog, fileLog); err != nil {
  1020. w.WriteHeader(http.StatusBadRequest)
  1021. handleError(w, err, "", logger)
  1022. return
  1023. }
  1024. conf.Config.Basic.ConsoleLog = consoleLog
  1025. conf.Config.Basic.FileLog = fileLog
  1026. }
  1027. w.WriteHeader(http.StatusNoContent)
  1028. }
  1029. func configurationStatusHandler(w http.ResponseWriter, r *http.Request) {
  1030. defer r.Body.Close()
  1031. content := configurationStatusExport()
  1032. jsonResponse(content, w, logger)
  1033. }
  1034. func importRuleSetPartial(all processor.Ruleset) processor.Ruleset {
  1035. ruleSetRsp := processor.Ruleset{
  1036. Rules: map[string]string{},
  1037. Streams: map[string]string{},
  1038. Tables: map[string]string{},
  1039. }
  1040. // replace streams
  1041. for k, v := range all.Streams {
  1042. _, e := streamProcessor.ExecReplaceStream(k, v, ast.TypeStream)
  1043. if e != nil {
  1044. ruleSetRsp.Streams[k] = e.Error()
  1045. continue
  1046. }
  1047. }
  1048. // replace tables
  1049. for k, v := range all.Tables {
  1050. _, e := streamProcessor.ExecReplaceStream(k, v, ast.TypeTable)
  1051. if e != nil {
  1052. ruleSetRsp.Tables[k] = e.Error()
  1053. continue
  1054. }
  1055. }
  1056. for k, v := range all.Rules {
  1057. _, err := ruleProcessor.GetRuleJson(k)
  1058. if err == nil {
  1059. // the rule already exist, update
  1060. err = updateRule(k, v)
  1061. if err != nil {
  1062. ruleSetRsp.Rules[k] = err.Error()
  1063. continue
  1064. }
  1065. // Update to db after validation
  1066. _, err = ruleProcessor.ExecUpdate(k, v)
  1067. if err != nil {
  1068. ruleSetRsp.Rules[k] = err.Error()
  1069. continue
  1070. }
  1071. } else {
  1072. // not found, create
  1073. _, err2 := createRule(k, v)
  1074. if err2 != nil {
  1075. ruleSetRsp.Rules[k] = err2.Error()
  1076. continue
  1077. }
  1078. }
  1079. }
  1080. return ruleSetRsp
  1081. }
  1082. func uploadsReset() {
  1083. _ = uploadsDb.Clean()
  1084. _ = uploadsStatusDb.Clean()
  1085. }
  1086. func uploadsExport() map[string]string {
  1087. conf, _ := uploadsDb.All()
  1088. return conf
  1089. }
  1090. func uploadsStatusExport() map[string]string {
  1091. status, _ := uploadsDb.All()
  1092. return status
  1093. }
  1094. func uploadsImport(s map[string]string) map[string]string {
  1095. errMap := map[string]string{}
  1096. _ = uploadsStatusDb.Clean()
  1097. for k, v := range s {
  1098. fc := &fileContent{}
  1099. err := json.Unmarshal([]byte(v), fc)
  1100. if err != nil {
  1101. errMsg := fmt.Sprintf("invalid body: Error decoding file json: %s", err.Error())
  1102. errMap[k] = errMsg
  1103. _ = uploadsStatusDb.Set(k, errMsg)
  1104. continue
  1105. }
  1106. err = fc.Validate()
  1107. if err != nil {
  1108. errMap[k] = err.Error()
  1109. _ = uploadsStatusDb.Set(k, err.Error())
  1110. continue
  1111. }
  1112. err = upload(fc)
  1113. if err != nil {
  1114. errMap[k] = err.Error()
  1115. continue
  1116. }
  1117. }
  1118. return errMap
  1119. }