rest.go 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package server
  15. import (
  16. "bytes"
  17. "encoding/json"
  18. "errors"
  19. "fmt"
  20. "io"
  21. "net/http"
  22. "os"
  23. "path/filepath"
  24. "reflect"
  25. "runtime"
  26. "strconv"
  27. "time"
  28. "github.com/gorilla/handlers"
  29. "github.com/gorilla/mux"
  30. "golang.org/x/text/cases"
  31. "golang.org/x/text/language"
  32. "github.com/lf-edge/ekuiper/internal/conf"
  33. "github.com/lf-edge/ekuiper/internal/meta"
  34. "github.com/lf-edge/ekuiper/internal/pkg/httpx"
  35. "github.com/lf-edge/ekuiper/internal/pkg/store"
  36. "github.com/lf-edge/ekuiper/internal/processor"
  37. "github.com/lf-edge/ekuiper/internal/server/middleware"
  38. "github.com/lf-edge/ekuiper/pkg/api"
  39. "github.com/lf-edge/ekuiper/pkg/ast"
  40. "github.com/lf-edge/ekuiper/pkg/cast"
  41. "github.com/lf-edge/ekuiper/pkg/errorx"
  42. "github.com/lf-edge/ekuiper/pkg/infra"
  43. "github.com/lf-edge/ekuiper/pkg/kv"
  44. )
  45. const (
  46. ContentType = "Content-Type"
  47. ContentTypeJSON = "application/json"
  48. )
  49. var (
  50. uploadDir string
  51. uploadsDb kv.KeyValue
  52. uploadsStatusDb kv.KeyValue
  53. )
  54. type statementDescriptor struct {
  55. Sql string `json:"sql,omitempty"`
  56. }
  57. func decodeStatementDescriptor(reader io.ReadCloser) (statementDescriptor, error) {
  58. sd := statementDescriptor{}
  59. err := json.NewDecoder(reader).Decode(&sd)
  60. // Problems decoding
  61. if err != nil {
  62. return sd, fmt.Errorf("Error decoding the statement descriptor: %v", err)
  63. }
  64. return sd, nil
  65. }
  66. // Handle applies the specified error and error concept to the HTTP response writer
  67. func handleError(w http.ResponseWriter, err error, prefix string, logger api.Logger) {
  68. message := prefix
  69. if message != "" {
  70. message += ": "
  71. }
  72. message += err.Error()
  73. logger.Error(message)
  74. var ec int
  75. switch e := err.(type) {
  76. case *errorx.Error:
  77. switch e.Code() {
  78. case errorx.NOT_FOUND:
  79. ec = http.StatusNotFound
  80. default:
  81. ec = http.StatusBadRequest
  82. }
  83. default:
  84. ec = http.StatusBadRequest
  85. }
  86. http.Error(w, message, ec)
  87. }
  88. func jsonResponse(i interface{}, w http.ResponseWriter, logger api.Logger) {
  89. w.Header().Add(ContentType, ContentTypeJSON)
  90. jsonByte, err := json.Marshal(i)
  91. if err != nil {
  92. handleError(w, err, "", logger)
  93. }
  94. w.Header().Add("Content-Length", strconv.Itoa(len(jsonByte)))
  95. _, err = w.Write(jsonByte)
  96. // Problems encoding
  97. if err != nil {
  98. handleError(w, err, "", logger)
  99. }
  100. }
  101. func jsonByteResponse(buffer bytes.Buffer, w http.ResponseWriter, logger api.Logger) {
  102. w.Header().Add(ContentType, ContentTypeJSON)
  103. w.Header().Add("Content-Length", strconv.Itoa(buffer.Len()))
  104. _, err := w.Write(buffer.Bytes())
  105. // Problems encoding
  106. if err != nil {
  107. handleError(w, err, "", logger)
  108. }
  109. }
  110. func createRestServer(ip string, port int, needToken bool) *http.Server {
  111. dataDir, err := conf.GetDataLoc()
  112. if err != nil {
  113. panic(err)
  114. }
  115. uploadDir = filepath.Join(dataDir, "uploads")
  116. uploadsDb, err = store.GetKV("uploads")
  117. if err != nil {
  118. panic(err)
  119. }
  120. uploadsStatusDb, err = store.GetKV("uploadsStatusDb")
  121. if err != nil {
  122. panic(err)
  123. }
  124. r := mux.NewRouter()
  125. r.HandleFunc("/", rootHandler).Methods(http.MethodGet, http.MethodPost)
  126. r.HandleFunc("/ping", pingHandler).Methods(http.MethodGet)
  127. r.HandleFunc("/streams", streamsHandler).Methods(http.MethodGet, http.MethodPost)
  128. r.HandleFunc("/streams/{name}", streamHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
  129. r.HandleFunc("/streams/{name}/schema", streamSchemaHandler).Methods(http.MethodGet)
  130. r.HandleFunc("/tables", tablesHandler).Methods(http.MethodGet, http.MethodPost)
  131. r.HandleFunc("/tables/{name}", tableHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
  132. r.HandleFunc("/tables/{name}/schema", tableSchemaHandler).Methods(http.MethodGet)
  133. r.HandleFunc("/rules", rulesHandler).Methods(http.MethodGet, http.MethodPost)
  134. r.HandleFunc("/rules/{name}", ruleHandler).Methods(http.MethodDelete, http.MethodGet, http.MethodPut)
  135. r.HandleFunc("/rules/{name}/status", getStatusRuleHandler).Methods(http.MethodGet)
  136. r.HandleFunc("/rules/{name}/start", startRuleHandler).Methods(http.MethodPost)
  137. r.HandleFunc("/rules/{name}/stop", stopRuleHandler).Methods(http.MethodPost)
  138. r.HandleFunc("/rules/{name}/restart", restartRuleHandler).Methods(http.MethodPost)
  139. r.HandleFunc("/rules/{name}/topo", getTopoRuleHandler).Methods(http.MethodGet)
  140. r.HandleFunc("/rules/validate", validateRuleHandler).Methods(http.MethodPost)
  141. r.HandleFunc("/ruleset/export", exportHandler).Methods(http.MethodPost)
  142. r.HandleFunc("/ruleset/import", importHandler).Methods(http.MethodPost)
  143. r.HandleFunc("/configs", configurationUpdateHandler).Methods(http.MethodPatch)
  144. r.HandleFunc("/config/uploads", fileUploadHandler).Methods(http.MethodPost, http.MethodGet)
  145. r.HandleFunc("/config/uploads/{name}", fileDeleteHandler).Methods(http.MethodDelete)
  146. r.HandleFunc("/data/export", configurationExportHandler).Methods(http.MethodGet, http.MethodPost)
  147. r.HandleFunc("/data/import", configurationImportHandler).Methods(http.MethodPost)
  148. r.HandleFunc("/data/import/status", configurationStatusHandler).Methods(http.MethodGet)
  149. // Register extended routes
  150. for k, v := range components {
  151. logger.Infof("register rest endpoint for component %s", k)
  152. v.rest(r)
  153. }
  154. if needToken {
  155. r.Use(middleware.Auth)
  156. }
  157. server := &http.Server{
  158. Addr: cast.JoinHostPortInt(ip, port),
  159. // Good practice to set timeouts to avoid Slowloris attacks.
  160. WriteTimeout: time.Second * 60 * 5,
  161. ReadTimeout: time.Second * 60 * 5,
  162. IdleTimeout: time.Second * 60,
  163. Handler: handlers.CORS(handlers.AllowedHeaders([]string{"Accept", "Accept-Language", "Content-Type", "Content-Language", "Origin", "Authorization"}), handlers.AllowedMethods([]string{"POST", "GET", "PUT", "DELETE", "HEAD"}))(r),
  164. }
  165. server.SetKeepAlivesEnabled(false)
  166. return server
  167. }
  168. type fileContent struct {
  169. Name string `json:"name"`
  170. Content string `json:"content"`
  171. FilePath string `json:"file"`
  172. }
  173. func (f *fileContent) InstallScript() string {
  174. marshal, err := json.Marshal(f)
  175. if err != nil {
  176. return ""
  177. }
  178. return string(marshal)
  179. }
  180. func (f *fileContent) Validate() error {
  181. if f.Content == "" && f.FilePath == "" {
  182. return fmt.Errorf("invalid body: content or FilePath is required")
  183. }
  184. if f.Name == "" {
  185. return fmt.Errorf("invalid body: name is required")
  186. }
  187. return nil
  188. }
  189. func upload(file *fileContent) error {
  190. err := getFile(file)
  191. if err != nil {
  192. _ = uploadsStatusDb.Set(file.Name, err.Error())
  193. return err
  194. }
  195. return uploadsDb.Set(file.Name, file.InstallScript())
  196. }
  197. func getFile(file *fileContent) error {
  198. filePath := filepath.Join(uploadDir, file.Name)
  199. dst, err := os.Create(filePath)
  200. if err != nil {
  201. return err
  202. }
  203. defer dst.Close()
  204. if file.FilePath != "" {
  205. err := httpx.DownloadFile(filePath, file.FilePath)
  206. if err != nil {
  207. return err
  208. }
  209. } else {
  210. _, err := dst.Write([]byte(file.Content))
  211. if err != nil {
  212. return err
  213. }
  214. }
  215. return nil
  216. }
  217. func fileUploadHandler(w http.ResponseWriter, r *http.Request) {
  218. switch r.Method {
  219. // Upload or overwrite a file
  220. case http.MethodPost:
  221. switch r.Header.Get("Content-Type") {
  222. case "application/json":
  223. fc := &fileContent{}
  224. defer r.Body.Close()
  225. err := json.NewDecoder(r.Body).Decode(fc)
  226. if err != nil {
  227. handleError(w, err, "Invalid body: Error decoding file json", logger)
  228. return
  229. }
  230. err = fc.Validate()
  231. if err != nil {
  232. handleError(w, err, "Invalid body: missing necessary field", logger)
  233. return
  234. }
  235. filePath := filepath.Join(uploadDir, fc.Name)
  236. err = upload(fc)
  237. if err != nil {
  238. handleError(w, err, "Upload error: getFile has error", logger)
  239. return
  240. }
  241. w.WriteHeader(http.StatusCreated)
  242. w.Write([]byte(filePath))
  243. default:
  244. // Maximum upload of 1 GB files
  245. err := r.ParseMultipartForm(1024 << 20)
  246. if err != nil {
  247. handleError(w, err, "Error parse the multi part form", logger)
  248. return
  249. }
  250. // Get handler for filename, size and headers
  251. file, handler, err := r.FormFile("uploadFile")
  252. if err != nil {
  253. handleError(w, err, "Error Retrieving the File", logger)
  254. return
  255. }
  256. defer file.Close()
  257. // Create file
  258. filePath := filepath.Join(uploadDir, handler.Filename)
  259. dst, err := os.Create(filePath)
  260. defer dst.Close()
  261. if err != nil {
  262. handleError(w, err, "Error creating the file", logger)
  263. return
  264. }
  265. // Copy the uploaded file to the created file on the filesystem
  266. if _, err := io.Copy(dst, file); err != nil {
  267. handleError(w, err, "Error writing the file", logger)
  268. return
  269. }
  270. w.WriteHeader(http.StatusCreated)
  271. w.Write([]byte(filePath))
  272. }
  273. case http.MethodGet:
  274. // Get the list of files in the upload directory
  275. files, err := os.ReadDir(uploadDir)
  276. if err != nil {
  277. handleError(w, err, "Error reading the file upload dir", logger)
  278. return
  279. }
  280. fileNames := make([]string, len(files))
  281. for i, f := range files {
  282. fileNames[i] = filepath.Join(uploadDir, f.Name())
  283. }
  284. jsonResponse(fileNames, w, logger)
  285. }
  286. }
  287. func fileDeleteHandler(w http.ResponseWriter, r *http.Request) {
  288. vars := mux.Vars(r)
  289. name := vars["name"]
  290. filePath := filepath.Join(uploadDir, name)
  291. e := os.Remove(filePath)
  292. if e != nil {
  293. handleError(w, e, "Error deleting the file", logger)
  294. return
  295. }
  296. _ = uploadsDb.Delete(name)
  297. _ = uploadsStatusDb.Delete(name)
  298. w.WriteHeader(http.StatusOK)
  299. w.Write([]byte("ok"))
  300. }
  301. type information struct {
  302. Version string `json:"version"`
  303. Os string `json:"os"`
  304. Arch string `json:"arch"`
  305. UpTimeSeconds int64 `json:"upTimeSeconds"`
  306. }
  307. // The handler for root
  308. func rootHandler(w http.ResponseWriter, r *http.Request) {
  309. defer r.Body.Close()
  310. switch r.Method {
  311. case http.MethodGet, http.MethodPost:
  312. w.WriteHeader(http.StatusOK)
  313. info := new(information)
  314. info.Version = version
  315. info.UpTimeSeconds = time.Now().Unix() - startTimeStamp
  316. info.Os = runtime.GOOS
  317. info.Arch = runtime.GOARCH
  318. byteInfo, _ := json.Marshal(info)
  319. w.Write(byteInfo)
  320. }
  321. }
  322. func pingHandler(w http.ResponseWriter, _ *http.Request) {
  323. w.WriteHeader(http.StatusOK)
  324. }
  325. func sourcesManageHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
  326. defer r.Body.Close()
  327. switch r.Method {
  328. case http.MethodGet:
  329. var (
  330. content []string
  331. err error
  332. kind string
  333. )
  334. if st == ast.TypeTable {
  335. kind = r.URL.Query().Get("kind")
  336. if kind == "scan" {
  337. kind = ast.StreamKindScan
  338. } else if kind == "lookup" {
  339. kind = ast.StreamKindLookup
  340. } else {
  341. kind = ""
  342. }
  343. }
  344. if kind != "" {
  345. content, err = streamProcessor.ShowTable(kind)
  346. } else {
  347. content, err = streamProcessor.ShowStream(st)
  348. }
  349. if err != nil {
  350. handleError(w, err, fmt.Sprintf("%s command error", cases.Title(language.Und).String(ast.StreamTypeMap[st])), logger)
  351. return
  352. }
  353. jsonResponse(content, w, logger)
  354. case http.MethodPost:
  355. v, err := decodeStatementDescriptor(r.Body)
  356. if err != nil {
  357. handleError(w, err, "Invalid body", logger)
  358. return
  359. }
  360. content, err := streamProcessor.ExecStreamSql(v.Sql)
  361. if err != nil {
  362. handleError(w, err, fmt.Sprintf("%s command error", cases.Title(language.Und).String(ast.StreamTypeMap[st])), logger)
  363. return
  364. }
  365. w.WriteHeader(http.StatusCreated)
  366. w.Write([]byte(content))
  367. }
  368. }
  369. func sourceManageHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
  370. defer r.Body.Close()
  371. vars := mux.Vars(r)
  372. name := vars["name"]
  373. switch r.Method {
  374. case http.MethodGet:
  375. content, err := streamProcessor.DescStream(name, st)
  376. if err != nil {
  377. handleError(w, err, fmt.Sprintf("describe %s error", ast.StreamTypeMap[st]), logger)
  378. return
  379. }
  380. jsonResponse(content, w, logger)
  381. case http.MethodDelete:
  382. content, err := streamProcessor.DropStream(name, st)
  383. if err != nil {
  384. handleError(w, err, fmt.Sprintf("delete %s error", ast.StreamTypeMap[st]), logger)
  385. return
  386. }
  387. w.WriteHeader(http.StatusOK)
  388. w.Write([]byte(content))
  389. case http.MethodPut:
  390. v, err := decodeStatementDescriptor(r.Body)
  391. if err != nil {
  392. handleError(w, err, "Invalid body", logger)
  393. return
  394. }
  395. content, err := streamProcessor.ExecReplaceStream(name, v.Sql, st)
  396. if err != nil {
  397. handleError(w, err, fmt.Sprintf("%s command error", cases.Title(language.Und).String(ast.StreamTypeMap[st])), logger)
  398. return
  399. }
  400. w.WriteHeader(http.StatusOK)
  401. w.Write([]byte(content))
  402. }
  403. }
  404. // list or create streams
  405. func streamsHandler(w http.ResponseWriter, r *http.Request) {
  406. sourcesManageHandler(w, r, ast.TypeStream)
  407. }
  408. // describe or delete a stream
  409. func streamHandler(w http.ResponseWriter, r *http.Request) {
  410. sourceManageHandler(w, r, ast.TypeStream)
  411. }
  412. // list or create tables
  413. func tablesHandler(w http.ResponseWriter, r *http.Request) {
  414. sourcesManageHandler(w, r, ast.TypeTable)
  415. }
  416. func tableHandler(w http.ResponseWriter, r *http.Request) {
  417. sourceManageHandler(w, r, ast.TypeTable)
  418. }
  419. func streamSchemaHandler(w http.ResponseWriter, r *http.Request) {
  420. sourceSchemaHandler(w, r, ast.TypeStream)
  421. }
  422. func tableSchemaHandler(w http.ResponseWriter, r *http.Request) {
  423. sourceSchemaHandler(w, r, ast.TypeTable)
  424. }
  425. func sourceSchemaHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
  426. vars := mux.Vars(r)
  427. name := vars["name"]
  428. content, err := streamProcessor.GetInferredJsonSchema(name, st)
  429. if err != nil {
  430. handleError(w, err, fmt.Sprintf("get schema of %s error", ast.StreamTypeMap[st]), logger)
  431. return
  432. }
  433. jsonResponse(content, w, logger)
  434. }
  435. // list or create rules
  436. func rulesHandler(w http.ResponseWriter, r *http.Request) {
  437. defer r.Body.Close()
  438. switch r.Method {
  439. case http.MethodPost:
  440. body, err := io.ReadAll(r.Body)
  441. if err != nil {
  442. handleError(w, err, "Invalid body", logger)
  443. return
  444. }
  445. id, err := createRule("", string(body))
  446. if err != nil {
  447. handleError(w, err, "", logger)
  448. return
  449. }
  450. w.WriteHeader(http.StatusCreated)
  451. fmt.Fprintf(w, "Rule %s was created successfully.", id)
  452. case http.MethodGet:
  453. content, err := getAllRulesWithStatus()
  454. if err != nil {
  455. handleError(w, err, "Show rules error", logger)
  456. return
  457. }
  458. jsonResponse(content, w, logger)
  459. }
  460. }
  461. // describe or delete a rule
  462. func ruleHandler(w http.ResponseWriter, r *http.Request) {
  463. defer r.Body.Close()
  464. vars := mux.Vars(r)
  465. name := vars["name"]
  466. switch r.Method {
  467. case http.MethodGet:
  468. rule, err := ruleProcessor.GetRuleJson(name)
  469. if err != nil {
  470. handleError(w, err, "Describe rule error", logger)
  471. return
  472. }
  473. w.Header().Add(ContentType, ContentTypeJSON)
  474. w.Write([]byte(rule))
  475. case http.MethodDelete:
  476. deleteRule(name)
  477. content, err := ruleProcessor.ExecDrop(name)
  478. if err != nil {
  479. handleError(w, err, "Delete rule error", logger)
  480. return
  481. }
  482. w.WriteHeader(http.StatusOK)
  483. w.Write([]byte(content))
  484. case http.MethodPut:
  485. _, err := ruleProcessor.GetRuleById(name)
  486. if err != nil {
  487. handleError(w, err, "Rule not found", logger)
  488. return
  489. }
  490. body, err := io.ReadAll(r.Body)
  491. if err != nil {
  492. handleError(w, err, "Invalid body", logger)
  493. return
  494. }
  495. err = updateRule(name, string(body))
  496. if err != nil {
  497. handleError(w, err, "Update rule error", logger)
  498. return
  499. }
  500. // Update to db after validation
  501. _, err = ruleProcessor.ExecUpdate(name, string(body))
  502. if err != nil {
  503. handleError(w, err, "Update rule error, suggest to delete it and recreate", logger)
  504. return
  505. }
  506. w.WriteHeader(http.StatusOK)
  507. fmt.Fprintf(w, "Rule %s was updated successfully.", name)
  508. }
  509. }
  510. // get status of a rule
  511. func getStatusRuleHandler(w http.ResponseWriter, r *http.Request) {
  512. defer r.Body.Close()
  513. vars := mux.Vars(r)
  514. name := vars["name"]
  515. content, err := getRuleStatus(name)
  516. if err != nil {
  517. handleError(w, err, "get rule status error", logger)
  518. return
  519. }
  520. w.Header().Set(ContentType, ContentTypeJSON)
  521. w.WriteHeader(http.StatusOK)
  522. w.Write([]byte(content))
  523. }
  524. // start a rule
  525. func startRuleHandler(w http.ResponseWriter, r *http.Request) {
  526. defer r.Body.Close()
  527. vars := mux.Vars(r)
  528. name := vars["name"]
  529. err := startRule(name)
  530. if err != nil {
  531. handleError(w, err, "start rule error", logger)
  532. return
  533. }
  534. w.WriteHeader(http.StatusOK)
  535. fmt.Fprintf(w, "Rule %s was started", name)
  536. }
  537. // stop a rule
  538. func stopRuleHandler(w http.ResponseWriter, r *http.Request) {
  539. defer r.Body.Close()
  540. vars := mux.Vars(r)
  541. name := vars["name"]
  542. result := stopRule(name)
  543. w.WriteHeader(http.StatusOK)
  544. w.Write([]byte(result))
  545. }
  546. // restart a rule
  547. func restartRuleHandler(w http.ResponseWriter, r *http.Request) {
  548. defer r.Body.Close()
  549. vars := mux.Vars(r)
  550. name := vars["name"]
  551. err := restartRule(name)
  552. if err != nil {
  553. handleError(w, err, "restart rule error", logger)
  554. return
  555. }
  556. w.WriteHeader(http.StatusOK)
  557. fmt.Fprintf(w, "Rule %s was restarted", name)
  558. }
  559. // get topo of a rule
  560. func getTopoRuleHandler(w http.ResponseWriter, r *http.Request) {
  561. defer r.Body.Close()
  562. vars := mux.Vars(r)
  563. name := vars["name"]
  564. content, err := getRuleTopo(name)
  565. if err != nil {
  566. handleError(w, err, "get rule topo error", logger)
  567. return
  568. }
  569. w.Header().Set(ContentType, ContentTypeJSON)
  570. w.Write([]byte(content))
  571. }
  572. // validate a rule
  573. func validateRuleHandler(w http.ResponseWriter, r *http.Request) {
  574. defer r.Body.Close()
  575. body, err := io.ReadAll(r.Body)
  576. if err != nil {
  577. handleError(w, err, "Invalid body", logger)
  578. return
  579. }
  580. validate, err := validateRule("", string(body))
  581. w.WriteHeader(http.StatusOK)
  582. if !validate {
  583. w.Write([]byte(err.Error()))
  584. return
  585. }
  586. w.Write([]byte("The rule has been successfully validated and is confirmed to be correct."))
  587. }
  588. type rulesetInfo struct {
  589. Content string `json:"content"`
  590. FilePath string `json:"file"`
  591. }
  592. func importHandler(w http.ResponseWriter, r *http.Request) {
  593. rsi := &rulesetInfo{}
  594. err := json.NewDecoder(r.Body).Decode(rsi)
  595. if err != nil {
  596. handleError(w, err, "Invalid body: Error decoding json", logger)
  597. return
  598. }
  599. if rsi.Content != "" && rsi.FilePath != "" {
  600. handleError(w, errors.New("bad request"), "Invalid body: Cannot specify both content and file", logger)
  601. return
  602. } else if rsi.Content == "" && rsi.FilePath == "" {
  603. handleError(w, errors.New("bad request"), "Invalid body: must specify content or file", logger)
  604. return
  605. }
  606. content := []byte(rsi.Content)
  607. if rsi.FilePath != "" {
  608. reader, err := httpx.ReadFile(rsi.FilePath)
  609. if err != nil {
  610. handleError(w, err, "Fail to read file", logger)
  611. return
  612. }
  613. defer reader.Close()
  614. buf := new(bytes.Buffer)
  615. _, err = io.Copy(buf, reader)
  616. if err != nil {
  617. handleError(w, err, "fail to convert file", logger)
  618. return
  619. }
  620. content = buf.Bytes()
  621. }
  622. rules, counts, err := rulesetProcessor.Import(content)
  623. if err != nil {
  624. handleError(w, nil, "Import ruleset error", logger)
  625. return
  626. }
  627. infra.SafeRun(func() error {
  628. for _, name := range rules {
  629. rul, ee := ruleProcessor.GetRuleById(name)
  630. if ee != nil {
  631. logger.Error(ee)
  632. continue
  633. }
  634. reply := recoverRule(rul)
  635. if reply != "" {
  636. logger.Error(reply)
  637. }
  638. }
  639. return nil
  640. })
  641. fmt.Fprintf(w, "imported %d streams, %d tables and %d rules", counts[0], counts[1], counts[2])
  642. }
  643. func exportHandler(w http.ResponseWriter, r *http.Request) {
  644. const name = "ekuiper_export.json"
  645. exported, _, err := rulesetProcessor.Export()
  646. if err != nil {
  647. handleError(w, err, "export error", logger)
  648. return
  649. }
  650. w.Header().Set("Content-Type", "application/octet-stream")
  651. w.Header().Add("Content-Disposition", "Attachment")
  652. http.ServeContent(w, r, name, time.Now(), exported)
  653. }
  654. type Configuration struct {
  655. Streams map[string]string `json:"streams"`
  656. Tables map[string]string `json:"tables"`
  657. Rules map[string]string `json:"rules"`
  658. NativePlugins map[string]string `json:"nativePlugins"`
  659. PortablePlugins map[string]string `json:"portablePlugins"`
  660. SourceConfig map[string]string `json:"sourceConfig"`
  661. SinkConfig map[string]string `json:"sinkConfig"`
  662. ConnectionConfig map[string]string `json:"connectionConfig"`
  663. Service map[string]string `json:"Service"`
  664. Schema map[string]string `json:"Schema"`
  665. Uploads map[string]string `json:"uploads"`
  666. }
  667. func configurationExport() ([]byte, error) {
  668. conf := &Configuration{
  669. Streams: make(map[string]string),
  670. Tables: make(map[string]string),
  671. Rules: make(map[string]string),
  672. NativePlugins: make(map[string]string),
  673. PortablePlugins: make(map[string]string),
  674. SourceConfig: make(map[string]string),
  675. SinkConfig: make(map[string]string),
  676. ConnectionConfig: make(map[string]string),
  677. Service: make(map[string]string),
  678. Schema: make(map[string]string),
  679. Uploads: make(map[string]string),
  680. }
  681. ruleSet := rulesetProcessor.ExportRuleSet()
  682. if ruleSet != nil {
  683. conf.Streams = ruleSet.Streams
  684. conf.Tables = ruleSet.Tables
  685. conf.Rules = ruleSet.Rules
  686. }
  687. conf.NativePlugins = pluginExport()
  688. conf.PortablePlugins = portablePluginExport()
  689. conf.Service = serviceExport()
  690. conf.Schema = schemaExport()
  691. conf.Uploads = uploadsExport()
  692. yamlCfg := meta.GetConfigurations()
  693. conf.SourceConfig = yamlCfg.Sources
  694. conf.SinkConfig = yamlCfg.Sinks
  695. conf.ConnectionConfig = yamlCfg.Connections
  696. return json.Marshal(conf)
  697. }
  698. func configurationExportHandler(w http.ResponseWriter, r *http.Request) {
  699. var jsonBytes []byte
  700. const name = "ekuiper_export.json"
  701. switch r.Method {
  702. case http.MethodGet:
  703. jsonBytes, _ = configurationExport()
  704. case http.MethodPost:
  705. var rules []string
  706. _ = json.NewDecoder(r.Body).Decode(&rules)
  707. jsonBytes, _ = ruleMigrationProcessor.ConfigurationPartialExport(rules)
  708. }
  709. w.Header().Set("Content-Type", "application/octet-stream")
  710. w.Header().Add("Content-Disposition", "Attachment")
  711. http.ServeContent(w, r, name, time.Now(), bytes.NewReader(jsonBytes))
  712. }
  713. func configurationReset() {
  714. _ = resetAllRules()
  715. _ = resetAllStreams()
  716. pluginReset()
  717. portablePluginsReset()
  718. serviceReset()
  719. schemaReset()
  720. meta.ResetConfigs()
  721. uploadsReset()
  722. }
  723. type ImportConfigurationStatus struct {
  724. ErrorMsg string
  725. ConfigResponse Configuration
  726. }
  727. func configurationImport(data []byte, reboot bool) ImportConfigurationStatus {
  728. conf := &Configuration{
  729. Streams: make(map[string]string),
  730. Tables: make(map[string]string),
  731. Rules: make(map[string]string),
  732. NativePlugins: make(map[string]string),
  733. PortablePlugins: make(map[string]string),
  734. SourceConfig: make(map[string]string),
  735. SinkConfig: make(map[string]string),
  736. ConnectionConfig: make(map[string]string),
  737. Service: make(map[string]string),
  738. Schema: make(map[string]string),
  739. Uploads: make(map[string]string),
  740. }
  741. importStatus := ImportConfigurationStatus{}
  742. configResponse := Configuration{
  743. Streams: make(map[string]string),
  744. Tables: make(map[string]string),
  745. Rules: make(map[string]string),
  746. NativePlugins: make(map[string]string),
  747. PortablePlugins: make(map[string]string),
  748. SourceConfig: make(map[string]string),
  749. SinkConfig: make(map[string]string),
  750. ConnectionConfig: make(map[string]string),
  751. Service: make(map[string]string),
  752. Schema: make(map[string]string),
  753. Uploads: make(map[string]string),
  754. }
  755. ResponseNil := Configuration{
  756. Streams: make(map[string]string),
  757. Tables: make(map[string]string),
  758. Rules: make(map[string]string),
  759. NativePlugins: make(map[string]string),
  760. PortablePlugins: make(map[string]string),
  761. SourceConfig: make(map[string]string),
  762. SinkConfig: make(map[string]string),
  763. ConnectionConfig: make(map[string]string),
  764. Service: make(map[string]string),
  765. Schema: make(map[string]string),
  766. Uploads: make(map[string]string),
  767. }
  768. err := json.Unmarshal(data, conf)
  769. if err != nil {
  770. importStatus.ErrorMsg = fmt.Errorf("configuration unmarshal with error %v", err).Error()
  771. return importStatus
  772. }
  773. configResponse.Uploads = uploadsImport(conf.Uploads)
  774. if reboot {
  775. err = pluginImport(conf.NativePlugins)
  776. if err != nil {
  777. importStatus.ErrorMsg = fmt.Errorf("pluginImport NativePlugins import error %v", err).Error()
  778. return importStatus
  779. }
  780. err = schemaImport(conf.Schema)
  781. if err != nil {
  782. importStatus.ErrorMsg = fmt.Errorf("schemaImport Schema import error %v", err).Error()
  783. return importStatus
  784. }
  785. }
  786. configResponse.PortablePlugins = portablePluginImport(conf.PortablePlugins)
  787. configResponse.Service = serviceImport(conf.Service)
  788. yamlCfgSet := meta.YamlConfigurationSet{
  789. Sources: conf.SourceConfig,
  790. Sinks: conf.SinkConfig,
  791. Connections: conf.ConnectionConfig,
  792. }
  793. confRsp := meta.LoadConfigurations(yamlCfgSet)
  794. configResponse.SourceConfig = confRsp.Sources
  795. configResponse.SinkConfig = confRsp.Sinks
  796. configResponse.ConnectionConfig = confRsp.Connections
  797. ruleSet := processor.Ruleset{
  798. Streams: conf.Streams,
  799. Tables: conf.Tables,
  800. Rules: conf.Rules,
  801. }
  802. result := rulesetProcessor.ImportRuleSet(ruleSet)
  803. configResponse.Streams = result.Streams
  804. configResponse.Tables = result.Tables
  805. configResponse.Rules = result.Rules
  806. if !reboot {
  807. infra.SafeRun(func() error {
  808. for name := range ruleSet.Rules {
  809. rul, ee := ruleProcessor.GetRuleById(name)
  810. if ee != nil {
  811. logger.Error(ee)
  812. continue
  813. }
  814. reply := recoverRule(rul)
  815. if reply != "" {
  816. logger.Error(reply)
  817. }
  818. }
  819. return nil
  820. })
  821. }
  822. if reflect.DeepEqual(ResponseNil, configResponse) {
  823. importStatus.ConfigResponse = ResponseNil
  824. } else {
  825. importStatus.ErrorMsg = "process error"
  826. importStatus.ConfigResponse = configResponse
  827. }
  828. return importStatus
  829. }
  830. func configurationPartialImport(data []byte) ImportConfigurationStatus {
  831. conf := &Configuration{
  832. Streams: make(map[string]string),
  833. Tables: make(map[string]string),
  834. Rules: make(map[string]string),
  835. NativePlugins: make(map[string]string),
  836. PortablePlugins: make(map[string]string),
  837. SourceConfig: make(map[string]string),
  838. SinkConfig: make(map[string]string),
  839. ConnectionConfig: make(map[string]string),
  840. Service: make(map[string]string),
  841. Schema: make(map[string]string),
  842. Uploads: make(map[string]string),
  843. }
  844. importStatus := ImportConfigurationStatus{}
  845. configResponse := Configuration{
  846. Streams: make(map[string]string),
  847. Tables: make(map[string]string),
  848. Rules: make(map[string]string),
  849. NativePlugins: make(map[string]string),
  850. PortablePlugins: make(map[string]string),
  851. SourceConfig: make(map[string]string),
  852. SinkConfig: make(map[string]string),
  853. ConnectionConfig: make(map[string]string),
  854. Service: make(map[string]string),
  855. Schema: make(map[string]string),
  856. Uploads: make(map[string]string),
  857. }
  858. ResponseNil := Configuration{
  859. Streams: make(map[string]string),
  860. Tables: make(map[string]string),
  861. Rules: make(map[string]string),
  862. NativePlugins: make(map[string]string),
  863. PortablePlugins: make(map[string]string),
  864. SourceConfig: make(map[string]string),
  865. SinkConfig: make(map[string]string),
  866. ConnectionConfig: make(map[string]string),
  867. Service: make(map[string]string),
  868. Schema: make(map[string]string),
  869. Uploads: make(map[string]string),
  870. }
  871. err := json.Unmarshal(data, conf)
  872. if err != nil {
  873. importStatus.ErrorMsg = fmt.Errorf("configuration unmarshal with error %v", err).Error()
  874. return importStatus
  875. }
  876. yamlCfgSet := meta.YamlConfigurationSet{
  877. Sources: conf.SourceConfig,
  878. Sinks: conf.SinkConfig,
  879. Connections: conf.ConnectionConfig,
  880. }
  881. confRsp := meta.LoadConfigurationsPartial(yamlCfgSet)
  882. configResponse.Uploads = uploadsImport(conf.Uploads)
  883. configResponse.NativePlugins = pluginPartialImport(conf.NativePlugins)
  884. configResponse.Schema = schemaPartialImport(conf.Schema)
  885. configResponse.PortablePlugins = portablePluginPartialImport(conf.PortablePlugins)
  886. configResponse.Service = servicePartialImport(conf.Service)
  887. configResponse.SourceConfig = confRsp.Sources
  888. configResponse.SinkConfig = confRsp.Sinks
  889. configResponse.ConnectionConfig = confRsp.Connections
  890. ruleSet := processor.Ruleset{
  891. Streams: conf.Streams,
  892. Tables: conf.Tables,
  893. Rules: conf.Rules,
  894. }
  895. result := importRuleSetPartial(ruleSet)
  896. configResponse.Streams = result.Streams
  897. configResponse.Tables = result.Tables
  898. configResponse.Rules = result.Rules
  899. if reflect.DeepEqual(ResponseNil, configResponse) {
  900. importStatus.ConfigResponse = ResponseNil
  901. } else {
  902. importStatus.ErrorMsg = "process error"
  903. importStatus.ConfigResponse = configResponse
  904. }
  905. return importStatus
  906. }
  907. type configurationInfo struct {
  908. Content string `json:"content"`
  909. FilePath string `json:"file"`
  910. }
  911. func configurationImportHandler(w http.ResponseWriter, r *http.Request) {
  912. cb := r.URL.Query().Get("stop")
  913. stop := cb == "1"
  914. par := r.URL.Query().Get("partial")
  915. partial := par == "1"
  916. rsi := &configurationInfo{}
  917. err := json.NewDecoder(r.Body).Decode(rsi)
  918. if err != nil {
  919. handleError(w, err, "Invalid body: Error decoding json", logger)
  920. return
  921. }
  922. if rsi.Content != "" && rsi.FilePath != "" {
  923. handleError(w, errors.New("bad request"), "Invalid body: Cannot specify both content and file", logger)
  924. return
  925. } else if rsi.Content == "" && rsi.FilePath == "" {
  926. handleError(w, errors.New("bad request"), "Invalid body: must specify content or file", logger)
  927. return
  928. }
  929. content := []byte(rsi.Content)
  930. if rsi.FilePath != "" {
  931. reader, err := httpx.ReadFile(rsi.FilePath)
  932. if err != nil {
  933. handleError(w, err, "Fail to read file", logger)
  934. return
  935. }
  936. defer reader.Close()
  937. buf := new(bytes.Buffer)
  938. _, err = io.Copy(buf, reader)
  939. if err != nil {
  940. handleError(w, err, "fail to convert file", logger)
  941. return
  942. }
  943. content = buf.Bytes()
  944. }
  945. if !partial {
  946. configurationReset()
  947. result := configurationImport(content, stop)
  948. if result.ErrorMsg != "" {
  949. w.WriteHeader(http.StatusBadRequest)
  950. jsonResponse(result, w, logger)
  951. } else {
  952. w.WriteHeader(http.StatusOK)
  953. jsonResponse(result, w, logger)
  954. }
  955. if stop {
  956. go func() {
  957. time.Sleep(1 * time.Second)
  958. os.Exit(100)
  959. }()
  960. }
  961. } else {
  962. result := configurationPartialImport(content)
  963. if result.ErrorMsg != "" {
  964. w.WriteHeader(http.StatusBadRequest)
  965. jsonResponse(result, w, logger)
  966. } else {
  967. w.WriteHeader(http.StatusOK)
  968. jsonResponse(result, w, logger)
  969. }
  970. }
  971. }
  972. func configurationStatusExport() Configuration {
  973. conf := Configuration{
  974. Streams: make(map[string]string),
  975. Tables: make(map[string]string),
  976. Rules: make(map[string]string),
  977. NativePlugins: make(map[string]string),
  978. PortablePlugins: make(map[string]string),
  979. SourceConfig: make(map[string]string),
  980. SinkConfig: make(map[string]string),
  981. ConnectionConfig: make(map[string]string),
  982. Service: make(map[string]string),
  983. Schema: make(map[string]string),
  984. Uploads: make(map[string]string),
  985. }
  986. ruleSet := rulesetProcessor.ExportRuleSetStatus()
  987. if ruleSet != nil {
  988. conf.Streams = ruleSet.Streams
  989. conf.Tables = ruleSet.Tables
  990. conf.Rules = ruleSet.Rules
  991. }
  992. conf.NativePlugins = pluginStatusExport()
  993. conf.PortablePlugins = portablePluginStatusExport()
  994. conf.Service = serviceStatusExport()
  995. conf.Schema = schemaStatusExport()
  996. conf.Uploads = uploadsStatusExport()
  997. yamlCfgStatus := meta.GetConfigurationStatus()
  998. conf.SourceConfig = yamlCfgStatus.Sources
  999. conf.SinkConfig = yamlCfgStatus.Sinks
  1000. conf.ConnectionConfig = yamlCfgStatus.Connections
  1001. return conf
  1002. }
  1003. func configurationUpdateHandler(w http.ResponseWriter, r *http.Request) {
  1004. basic := struct {
  1005. Debug *bool `json:"debug"`
  1006. ConsoleLog *bool `json:"consoleLog"`
  1007. FileLog *bool `json:"fileLog"`
  1008. TimeZone *string `json:"timezone"`
  1009. }{}
  1010. if err := json.NewDecoder(r.Body).Decode(&basic); err != nil {
  1011. w.WriteHeader(http.StatusBadRequest)
  1012. handleError(w, err, "Invalid JSON", logger)
  1013. return
  1014. }
  1015. if basic.Debug != nil {
  1016. conf.SetDebugLevel(*basic.Debug)
  1017. conf.Config.Basic.Debug = *basic.Debug
  1018. }
  1019. if basic.TimeZone != nil {
  1020. if err := cast.SetTimeZone(*basic.TimeZone); err != nil {
  1021. w.WriteHeader(http.StatusBadRequest)
  1022. handleError(w, err, "Invalid TZ", logger)
  1023. return
  1024. }
  1025. conf.Config.Basic.TimeZone = *basic.TimeZone
  1026. }
  1027. if basic.ConsoleLog != nil || basic.FileLog != nil {
  1028. consoleLog := conf.Config.Basic.ConsoleLog
  1029. if basic.ConsoleLog != nil {
  1030. consoleLog = *basic.ConsoleLog
  1031. }
  1032. fileLog := conf.Config.Basic.FileLog
  1033. if basic.FileLog != nil {
  1034. fileLog = *basic.FileLog
  1035. }
  1036. if err := conf.SetConsoleAndFileLog(consoleLog, fileLog); err != nil {
  1037. w.WriteHeader(http.StatusBadRequest)
  1038. handleError(w, err, "", logger)
  1039. return
  1040. }
  1041. conf.Config.Basic.ConsoleLog = consoleLog
  1042. conf.Config.Basic.FileLog = fileLog
  1043. }
  1044. w.WriteHeader(http.StatusNoContent)
  1045. }
  1046. func configurationStatusHandler(w http.ResponseWriter, r *http.Request) {
  1047. defer r.Body.Close()
  1048. content := configurationStatusExport()
  1049. jsonResponse(content, w, logger)
  1050. }
  1051. func importRuleSetPartial(all processor.Ruleset) processor.Ruleset {
  1052. ruleSetRsp := processor.Ruleset{
  1053. Rules: map[string]string{},
  1054. Streams: map[string]string{},
  1055. Tables: map[string]string{},
  1056. }
  1057. // replace streams
  1058. for k, v := range all.Streams {
  1059. _, e := streamProcessor.ExecReplaceStream(k, v, ast.TypeStream)
  1060. if e != nil {
  1061. ruleSetRsp.Streams[k] = e.Error()
  1062. continue
  1063. }
  1064. }
  1065. // replace tables
  1066. for k, v := range all.Tables {
  1067. _, e := streamProcessor.ExecReplaceStream(k, v, ast.TypeTable)
  1068. if e != nil {
  1069. ruleSetRsp.Tables[k] = e.Error()
  1070. continue
  1071. }
  1072. }
  1073. for k, v := range all.Rules {
  1074. _, err := ruleProcessor.GetRuleJson(k)
  1075. if err == nil {
  1076. // the rule already exist, update
  1077. err = updateRule(k, v)
  1078. if err != nil {
  1079. ruleSetRsp.Rules[k] = err.Error()
  1080. continue
  1081. }
  1082. // Update to db after validation
  1083. _, err = ruleProcessor.ExecUpdate(k, v)
  1084. if err != nil {
  1085. ruleSetRsp.Rules[k] = err.Error()
  1086. continue
  1087. }
  1088. } else {
  1089. // not found, create
  1090. _, err2 := createRule(k, v)
  1091. if err2 != nil {
  1092. ruleSetRsp.Rules[k] = err2.Error()
  1093. continue
  1094. }
  1095. }
  1096. }
  1097. return ruleSetRsp
  1098. }
  1099. func uploadsReset() {
  1100. _ = uploadsDb.Clean()
  1101. _ = uploadsStatusDb.Clean()
  1102. }
  1103. func uploadsExport() map[string]string {
  1104. conf, _ := uploadsDb.All()
  1105. return conf
  1106. }
  1107. func uploadsStatusExport() map[string]string {
  1108. status, _ := uploadsDb.All()
  1109. return status
  1110. }
  1111. func uploadsImport(s map[string]string) map[string]string {
  1112. errMap := map[string]string{}
  1113. _ = uploadsStatusDb.Clean()
  1114. for k, v := range s {
  1115. fc := &fileContent{}
  1116. err := json.Unmarshal([]byte(v), fc)
  1117. if err != nil {
  1118. errMsg := fmt.Sprintf("invalid body: Error decoding file json: %s", err.Error())
  1119. errMap[k] = errMsg
  1120. _ = uploadsStatusDb.Set(k, errMsg)
  1121. continue
  1122. }
  1123. err = fc.Validate()
  1124. if err != nil {
  1125. errMap[k] = err.Error()
  1126. _ = uploadsStatusDb.Set(k, err.Error())
  1127. continue
  1128. }
  1129. err = upload(fc)
  1130. if err != nil {
  1131. errMap[k] = err.Error()
  1132. continue
  1133. }
  1134. }
  1135. return errMap
  1136. }