rest.go 35 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package server
  15. import (
  16. "bytes"
  17. "encoding/json"
  18. "errors"
  19. "fmt"
  20. "io"
  21. "net/http"
  22. "os"
  23. "path/filepath"
  24. "reflect"
  25. "runtime"
  26. "strconv"
  27. "time"
  28. "github.com/gorilla/handlers"
  29. "github.com/gorilla/mux"
  30. "golang.org/x/text/cases"
  31. "golang.org/x/text/language"
  32. "github.com/lf-edge/ekuiper/internal/conf"
  33. "github.com/lf-edge/ekuiper/internal/meta"
  34. "github.com/lf-edge/ekuiper/internal/pkg/httpx"
  35. "github.com/lf-edge/ekuiper/internal/pkg/store"
  36. "github.com/lf-edge/ekuiper/internal/processor"
  37. "github.com/lf-edge/ekuiper/internal/server/middleware"
  38. "github.com/lf-edge/ekuiper/pkg/api"
  39. "github.com/lf-edge/ekuiper/pkg/ast"
  40. "github.com/lf-edge/ekuiper/pkg/cast"
  41. "github.com/lf-edge/ekuiper/pkg/errorx"
  42. "github.com/lf-edge/ekuiper/pkg/infra"
  43. "github.com/lf-edge/ekuiper/pkg/kv"
  44. )
  45. const (
  46. ContentType = "Content-Type"
  47. ContentTypeJSON = "application/json"
  48. )
  49. var (
  50. uploadDir string
  51. uploadsDb kv.KeyValue
  52. uploadsStatusDb kv.KeyValue
  53. )
  54. type statementDescriptor struct {
  55. Sql string `json:"sql,omitempty"`
  56. }
  57. func decodeStatementDescriptor(reader io.ReadCloser) (statementDescriptor, error) {
  58. sd := statementDescriptor{}
  59. err := json.NewDecoder(reader).Decode(&sd)
  60. // Problems decoding
  61. if err != nil {
  62. return sd, fmt.Errorf("Error decoding the statement descriptor: %v", err)
  63. }
  64. return sd, nil
  65. }
  66. // Handle applies the specified error and error concept to the HTTP response writer
  67. func handleError(w http.ResponseWriter, err error, prefix string, logger api.Logger) {
  68. message := prefix
  69. if message != "" {
  70. message += ": "
  71. }
  72. message += err.Error()
  73. logger.Error(message)
  74. var ec int
  75. switch e := err.(type) {
  76. case *errorx.Error:
  77. switch e.Code() {
  78. case errorx.NOT_FOUND:
  79. ec = http.StatusNotFound
  80. default:
  81. ec = http.StatusBadRequest
  82. }
  83. default:
  84. ec = http.StatusBadRequest
  85. }
  86. http.Error(w, message, ec)
  87. }
  88. func jsonResponse(i interface{}, w http.ResponseWriter, logger api.Logger) {
  89. w.Header().Add(ContentType, ContentTypeJSON)
  90. jsonByte, err := json.Marshal(i)
  91. if err != nil {
  92. handleError(w, err, "", logger)
  93. }
  94. w.Header().Add("Content-Length", strconv.Itoa(len(jsonByte)))
  95. _, err = w.Write(jsonByte)
  96. // Problems encoding
  97. if err != nil {
  98. handleError(w, err, "", logger)
  99. }
  100. }
  101. func jsonByteResponse(buffer bytes.Buffer, w http.ResponseWriter, logger api.Logger) {
  102. w.Header().Add(ContentType, ContentTypeJSON)
  103. w.Header().Add("Content-Length", strconv.Itoa(buffer.Len()))
  104. _, err := w.Write(buffer.Bytes())
  105. // Problems encoding
  106. if err != nil {
  107. handleError(w, err, "", logger)
  108. }
  109. }
  110. func createRestServer(ip string, port int, needToken bool) *http.Server {
  111. dataDir, err := conf.GetDataLoc()
  112. if err != nil {
  113. panic(err)
  114. }
  115. uploadDir = filepath.Join(dataDir, "uploads")
  116. uploadsDb, err = store.GetKV("uploads")
  117. if err != nil {
  118. panic(err)
  119. }
  120. uploadsStatusDb, err = store.GetKV("uploadsStatusDb")
  121. if err != nil {
  122. panic(err)
  123. }
  124. r := mux.NewRouter()
  125. r.HandleFunc("/", rootHandler).Methods(http.MethodGet, http.MethodPost)
  126. r.HandleFunc("/ping", pingHandler).Methods(http.MethodGet)
  127. r.HandleFunc("/streams", streamsHandler).Methods(http.MethodGet, http.MethodPost)
  128. r.HandleFunc("/streams/{name}", streamHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
  129. r.HandleFunc("/streams/{name}/schema", streamSchemaHandler).Methods(http.MethodGet)
  130. r.HandleFunc("/tables", tablesHandler).Methods(http.MethodGet, http.MethodPost)
  131. r.HandleFunc("/tables/{name}", tableHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
  132. r.HandleFunc("/tables/{name}/schema", tableSchemaHandler).Methods(http.MethodGet)
  133. r.HandleFunc("/rules", rulesHandler).Methods(http.MethodGet, http.MethodPost)
  134. r.HandleFunc("/rules/{name}", ruleHandler).Methods(http.MethodDelete, http.MethodGet, http.MethodPut)
  135. r.HandleFunc("/rules/{name}/status", getStatusRuleHandler).Methods(http.MethodGet)
  136. r.HandleFunc("/rules/{name}/start", startRuleHandler).Methods(http.MethodPost)
  137. r.HandleFunc("/rules/{name}/stop", stopRuleHandler).Methods(http.MethodPost)
  138. r.HandleFunc("/rules/{name}/restart", restartRuleHandler).Methods(http.MethodPost)
  139. r.HandleFunc("/rules/{name}/topo", getTopoRuleHandler).Methods(http.MethodGet)
  140. r.HandleFunc("/rules/validate", validateRuleHandler).Methods(http.MethodPost)
  141. r.HandleFunc("/ruleset/export", exportHandler).Methods(http.MethodPost)
  142. r.HandleFunc("/ruleset/import", importHandler).Methods(http.MethodPost)
  143. r.HandleFunc("/configs", configurationUpdateHandler).Methods(http.MethodPatch)
  144. r.HandleFunc("/config/uploads", fileUploadHandler).Methods(http.MethodPost, http.MethodGet)
  145. r.HandleFunc("/config/uploads/{name}", fileDeleteHandler).Methods(http.MethodDelete)
  146. r.HandleFunc("/data/export", configurationExportHandler).Methods(http.MethodGet, http.MethodPost)
  147. r.HandleFunc("/data/import", configurationImportHandler).Methods(http.MethodPost)
  148. r.HandleFunc("/data/import/status", configurationStatusHandler).Methods(http.MethodGet)
  149. // Register extended routes
  150. for k, v := range components {
  151. logger.Infof("register rest endpoint for component %s", k)
  152. v.rest(r)
  153. }
  154. if needToken {
  155. r.Use(middleware.Auth)
  156. }
  157. server := &http.Server{
  158. Addr: cast.JoinHostPortInt(ip, port),
  159. // Good practice to set timeouts to avoid Slowloris attacks.
  160. WriteTimeout: time.Second * 60 * 5,
  161. ReadTimeout: time.Second * 60 * 5,
  162. IdleTimeout: time.Second * 60,
  163. Handler: handlers.CORS(handlers.AllowedHeaders([]string{"Accept", "Accept-Language", "Content-Type", "Content-Language", "Origin", "Authorization"}), handlers.AllowedMethods([]string{"POST", "GET", "PUT", "DELETE", "HEAD"}))(r),
  164. }
  165. server.SetKeepAlivesEnabled(false)
  166. return server
  167. }
  168. type fileContent struct {
  169. Name string `json:"name"`
  170. Content string `json:"content"`
  171. FilePath string `json:"file"`
  172. }
  173. func (f *fileContent) InstallScript() string {
  174. marshal, err := json.Marshal(f)
  175. if err != nil {
  176. return ""
  177. }
  178. return string(marshal)
  179. }
  180. func (f *fileContent) Validate() error {
  181. if f.Content == "" && f.FilePath == "" {
  182. return fmt.Errorf("invalid body: content or FilePath is required")
  183. }
  184. if f.Name == "" {
  185. return fmt.Errorf("invalid body: name is required")
  186. }
  187. return nil
  188. }
  189. func upload(file *fileContent) error {
  190. err := getFile(file)
  191. if err != nil {
  192. _ = uploadsStatusDb.Set(file.Name, err.Error())
  193. return err
  194. }
  195. return uploadsDb.Set(file.Name, file.InstallScript())
  196. }
  197. func getFile(file *fileContent) error {
  198. filePath := filepath.Join(uploadDir, file.Name)
  199. dst, err := os.Create(filePath)
  200. if err != nil {
  201. return err
  202. }
  203. defer dst.Close()
  204. if file.FilePath != "" {
  205. err := httpx.DownloadFile(filePath, file.FilePath)
  206. if err != nil {
  207. return err
  208. }
  209. } else {
  210. _, err := dst.Write([]byte(file.Content))
  211. if err != nil {
  212. return err
  213. }
  214. }
  215. return nil
  216. }
  217. func fileUploadHandler(w http.ResponseWriter, r *http.Request) {
  218. switch r.Method {
  219. // Upload or overwrite a file
  220. case http.MethodPost:
  221. switch r.Header.Get("Content-Type") {
  222. case "application/json":
  223. fc := &fileContent{}
  224. defer r.Body.Close()
  225. err := json.NewDecoder(r.Body).Decode(fc)
  226. if err != nil {
  227. handleError(w, err, "Invalid body: Error decoding file json", logger)
  228. return
  229. }
  230. err = fc.Validate()
  231. if err != nil {
  232. handleError(w, err, "Invalid body: missing necessary field", logger)
  233. return
  234. }
  235. filePath := filepath.Join(uploadDir, fc.Name)
  236. err = upload(fc)
  237. if err != nil {
  238. handleError(w, err, "Upload error: getFile has error", logger)
  239. return
  240. }
  241. w.WriteHeader(http.StatusCreated)
  242. w.Write([]byte(filePath))
  243. default:
  244. // Maximum upload of 1 GB files
  245. err := r.ParseMultipartForm(1024 << 20)
  246. if err != nil {
  247. handleError(w, err, "Error parse the multi part form", logger)
  248. return
  249. }
  250. // Get handler for filename, size and headers
  251. file, handler, err := r.FormFile("uploadFile")
  252. if err != nil {
  253. handleError(w, err, "Error Retrieving the File", logger)
  254. return
  255. }
  256. defer file.Close()
  257. // Create file
  258. filePath := filepath.Join(uploadDir, handler.Filename)
  259. dst, err := os.Create(filePath)
  260. defer dst.Close()
  261. if err != nil {
  262. handleError(w, err, "Error creating the file", logger)
  263. return
  264. }
  265. // Copy the uploaded file to the created file on the filesystem
  266. if _, err := io.Copy(dst, file); err != nil {
  267. handleError(w, err, "Error writing the file", logger)
  268. return
  269. }
  270. w.WriteHeader(http.StatusCreated)
  271. w.Write([]byte(filePath))
  272. }
  273. case http.MethodGet:
  274. // Get the list of files in the upload directory
  275. files, err := os.ReadDir(uploadDir)
  276. if err != nil {
  277. handleError(w, err, "Error reading the file upload dir", logger)
  278. return
  279. }
  280. fileNames := make([]string, len(files))
  281. for i, f := range files {
  282. fileNames[i] = filepath.Join(uploadDir, f.Name())
  283. }
  284. jsonResponse(fileNames, w, logger)
  285. }
  286. }
  287. func fileDeleteHandler(w http.ResponseWriter, r *http.Request) {
  288. vars := mux.Vars(r)
  289. name := vars["name"]
  290. filePath := filepath.Join(uploadDir, name)
  291. e := os.Remove(filePath)
  292. if e != nil {
  293. handleError(w, e, "Error deleting the file", logger)
  294. return
  295. }
  296. _ = uploadsDb.Delete(name)
  297. _ = uploadsStatusDb.Delete(name)
  298. w.WriteHeader(http.StatusOK)
  299. w.Write([]byte("ok"))
  300. }
  301. type information struct {
  302. Version string `json:"version"`
  303. Os string `json:"os"`
  304. Arch string `json:"arch"`
  305. UpTimeSeconds int64 `json:"upTimeSeconds"`
  306. }
  307. // The handler for root
  308. func rootHandler(w http.ResponseWriter, r *http.Request) {
  309. defer r.Body.Close()
  310. switch r.Method {
  311. case http.MethodGet, http.MethodPost:
  312. w.WriteHeader(http.StatusOK)
  313. info := new(information)
  314. info.Version = version
  315. info.UpTimeSeconds = time.Now().Unix() - startTimeStamp
  316. info.Os = runtime.GOOS
  317. info.Arch = runtime.GOARCH
  318. byteInfo, _ := json.Marshal(info)
  319. w.Write(byteInfo)
  320. }
  321. }
  322. func pingHandler(w http.ResponseWriter, _ *http.Request) {
  323. w.WriteHeader(http.StatusOK)
  324. }
  325. func sourcesManageHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
  326. defer r.Body.Close()
  327. switch r.Method {
  328. case http.MethodGet:
  329. var (
  330. content []string
  331. err error
  332. kind string
  333. )
  334. if st == ast.TypeTable {
  335. kind = r.URL.Query().Get("kind")
  336. if kind == "scan" {
  337. kind = ast.StreamKindScan
  338. } else if kind == "lookup" {
  339. kind = ast.StreamKindLookup
  340. } else {
  341. kind = ""
  342. }
  343. }
  344. if kind != "" {
  345. content, err = streamProcessor.ShowTable(kind)
  346. } else {
  347. content, err = streamProcessor.ShowStream(st)
  348. }
  349. if err != nil {
  350. handleError(w, err, fmt.Sprintf("%s command error", cases.Title(language.Und).String(ast.StreamTypeMap[st])), logger)
  351. return
  352. }
  353. jsonResponse(content, w, logger)
  354. case http.MethodPost:
  355. v, err := decodeStatementDescriptor(r.Body)
  356. if err != nil {
  357. handleError(w, err, "Invalid body", logger)
  358. return
  359. }
  360. content, err := streamProcessor.ExecStreamSql(v.Sql)
  361. if err != nil {
  362. handleError(w, err, fmt.Sprintf("%s command error", cases.Title(language.Und).String(ast.StreamTypeMap[st])), logger)
  363. return
  364. }
  365. w.WriteHeader(http.StatusCreated)
  366. w.Write([]byte(content))
  367. }
  368. }
  369. func sourceManageHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
  370. defer r.Body.Close()
  371. vars := mux.Vars(r)
  372. name := vars["name"]
  373. switch r.Method {
  374. case http.MethodGet:
  375. content, err := streamProcessor.DescStream(name, st)
  376. if err != nil {
  377. handleError(w, err, fmt.Sprintf("describe %s error", ast.StreamTypeMap[st]), logger)
  378. return
  379. }
  380. jsonResponse(content, w, logger)
  381. case http.MethodDelete:
  382. content, err := streamProcessor.DropStream(name, st)
  383. if err != nil {
  384. handleError(w, err, fmt.Sprintf("delete %s error", ast.StreamTypeMap[st]), logger)
  385. return
  386. }
  387. w.WriteHeader(http.StatusOK)
  388. w.Write([]byte(content))
  389. case http.MethodPut:
  390. v, err := decodeStatementDescriptor(r.Body)
  391. if err != nil {
  392. handleError(w, err, "Invalid body", logger)
  393. return
  394. }
  395. content, err := streamProcessor.ExecReplaceStream(name, v.Sql, st)
  396. if err != nil {
  397. handleError(w, err, fmt.Sprintf("%s command error", cases.Title(language.Und).String(ast.StreamTypeMap[st])), logger)
  398. return
  399. }
  400. w.WriteHeader(http.StatusOK)
  401. w.Write([]byte(content))
  402. }
  403. }
  404. // list or create streams
  405. func streamsHandler(w http.ResponseWriter, r *http.Request) {
  406. sourcesManageHandler(w, r, ast.TypeStream)
  407. }
  408. // describe or delete a stream
  409. func streamHandler(w http.ResponseWriter, r *http.Request) {
  410. sourceManageHandler(w, r, ast.TypeStream)
  411. }
  412. // list or create tables
  413. func tablesHandler(w http.ResponseWriter, r *http.Request) {
  414. sourcesManageHandler(w, r, ast.TypeTable)
  415. }
  416. func tableHandler(w http.ResponseWriter, r *http.Request) {
  417. sourceManageHandler(w, r, ast.TypeTable)
  418. }
  419. func streamSchemaHandler(w http.ResponseWriter, r *http.Request) {
  420. sourceSchemaHandler(w, r, ast.TypeStream)
  421. }
  422. func tableSchemaHandler(w http.ResponseWriter, r *http.Request) {
  423. sourceSchemaHandler(w, r, ast.TypeTable)
  424. }
  425. func sourceSchemaHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
  426. vars := mux.Vars(r)
  427. name := vars["name"]
  428. content, err := streamProcessor.GetInferredJsonSchema(name, st)
  429. if err != nil {
  430. handleError(w, err, fmt.Sprintf("get schema of %s error", ast.StreamTypeMap[st]), logger)
  431. return
  432. }
  433. jsonResponse(content, w, logger)
  434. }
  435. // list or create rules
  436. func rulesHandler(w http.ResponseWriter, r *http.Request) {
  437. defer r.Body.Close()
  438. switch r.Method {
  439. case http.MethodPost:
  440. body, err := io.ReadAll(r.Body)
  441. if err != nil {
  442. handleError(w, err, "Invalid body", logger)
  443. return
  444. }
  445. id, err := createRule("", string(body))
  446. if err != nil {
  447. handleError(w, err, "", logger)
  448. return
  449. }
  450. w.WriteHeader(http.StatusCreated)
  451. fmt.Fprintf(w, "Rule %s was created successfully.", id)
  452. case http.MethodGet:
  453. content, err := getAllRulesWithStatus()
  454. if err != nil {
  455. handleError(w, err, "Show rules error", logger)
  456. return
  457. }
  458. jsonResponse(content, w, logger)
  459. }
  460. }
  461. // describe or delete a rule
  462. func ruleHandler(w http.ResponseWriter, r *http.Request) {
  463. defer r.Body.Close()
  464. vars := mux.Vars(r)
  465. name := vars["name"]
  466. switch r.Method {
  467. case http.MethodGet:
  468. rule, err := ruleProcessor.GetRuleJson(name)
  469. if err != nil {
  470. handleError(w, err, "Describe rule error", logger)
  471. return
  472. }
  473. w.Header().Add(ContentType, ContentTypeJSON)
  474. w.Write([]byte(rule))
  475. case http.MethodDelete:
  476. deleteRule(name)
  477. content, err := ruleProcessor.ExecDrop(name)
  478. if err != nil {
  479. handleError(w, err, "Delete rule error", logger)
  480. return
  481. }
  482. w.WriteHeader(http.StatusOK)
  483. w.Write([]byte(content))
  484. case http.MethodPut:
  485. _, err := ruleProcessor.GetRuleById(name)
  486. if err != nil {
  487. handleError(w, err, "Rule not found", logger)
  488. return
  489. }
  490. body, err := io.ReadAll(r.Body)
  491. if err != nil {
  492. handleError(w, err, "Invalid body", logger)
  493. return
  494. }
  495. err = updateRule(name, string(body))
  496. if err != nil {
  497. handleError(w, err, "Update rule error", logger)
  498. return
  499. }
  500. // Update to db after validation
  501. _, err = ruleProcessor.ExecUpdate(name, string(body))
  502. if err != nil {
  503. handleError(w, err, "Update rule error, suggest to delete it and recreate", logger)
  504. return
  505. }
  506. w.WriteHeader(http.StatusOK)
  507. fmt.Fprintf(w, "Rule %s was updated successfully.", name)
  508. }
  509. }
  510. // get status of a rule
  511. func getStatusRuleHandler(w http.ResponseWriter, r *http.Request) {
  512. defer r.Body.Close()
  513. vars := mux.Vars(r)
  514. name := vars["name"]
  515. content, err := getRuleStatus(name)
  516. if err != nil {
  517. handleError(w, err, "get rule status error", logger)
  518. return
  519. }
  520. w.Header().Set(ContentType, ContentTypeJSON)
  521. w.WriteHeader(http.StatusOK)
  522. w.Write([]byte(content))
  523. }
  524. // start a rule
  525. func startRuleHandler(w http.ResponseWriter, r *http.Request) {
  526. defer r.Body.Close()
  527. vars := mux.Vars(r)
  528. name := vars["name"]
  529. err := startRule(name)
  530. if err != nil {
  531. handleError(w, err, "start rule error", logger)
  532. return
  533. }
  534. w.WriteHeader(http.StatusOK)
  535. fmt.Fprintf(w, "Rule %s was started", name)
  536. }
  537. // stop a rule
  538. func stopRuleHandler(w http.ResponseWriter, r *http.Request) {
  539. defer r.Body.Close()
  540. vars := mux.Vars(r)
  541. name := vars["name"]
  542. result := stopRule(name)
  543. w.WriteHeader(http.StatusOK)
  544. w.Write([]byte(result))
  545. }
  546. // restart a rule
  547. func restartRuleHandler(w http.ResponseWriter, r *http.Request) {
  548. defer r.Body.Close()
  549. vars := mux.Vars(r)
  550. name := vars["name"]
  551. err := restartRule(name)
  552. if err != nil {
  553. handleError(w, err, "restart rule error", logger)
  554. return
  555. }
  556. w.WriteHeader(http.StatusOK)
  557. fmt.Fprintf(w, "Rule %s was restarted", name)
  558. }
  559. // get topo of a rule
  560. func getTopoRuleHandler(w http.ResponseWriter, r *http.Request) {
  561. defer r.Body.Close()
  562. vars := mux.Vars(r)
  563. name := vars["name"]
  564. content, err := getRuleTopo(name)
  565. if err != nil {
  566. handleError(w, err, "get rule topo error", logger)
  567. return
  568. }
  569. w.Header().Set(ContentType, ContentTypeJSON)
  570. w.Write([]byte(content))
  571. }
  572. // validate a rule
  573. func validateRuleHandler(w http.ResponseWriter, r *http.Request) {
  574. defer r.Body.Close()
  575. body, err := io.ReadAll(r.Body)
  576. if err != nil {
  577. handleError(w, err, "Invalid body", logger)
  578. return
  579. }
  580. validate, err := validateRule("", string(body))
  581. if !validate {
  582. w.WriteHeader(http.StatusUnprocessableEntity)
  583. w.Write([]byte(err.Error()))
  584. return
  585. }
  586. w.WriteHeader(http.StatusOK)
  587. w.Write([]byte("The rule has been successfully validated and is confirmed to be correct."))
  588. }
  589. type rulesetInfo struct {
  590. Content string `json:"content"`
  591. FilePath string `json:"file"`
  592. }
  593. func importHandler(w http.ResponseWriter, r *http.Request) {
  594. rsi := &rulesetInfo{}
  595. err := json.NewDecoder(r.Body).Decode(rsi)
  596. if err != nil {
  597. handleError(w, err, "Invalid body: Error decoding json", logger)
  598. return
  599. }
  600. if rsi.Content != "" && rsi.FilePath != "" {
  601. handleError(w, errors.New("bad request"), "Invalid body: Cannot specify both content and file", logger)
  602. return
  603. } else if rsi.Content == "" && rsi.FilePath == "" {
  604. handleError(w, errors.New("bad request"), "Invalid body: must specify content or file", logger)
  605. return
  606. }
  607. content := []byte(rsi.Content)
  608. if rsi.FilePath != "" {
  609. reader, err := httpx.ReadFile(rsi.FilePath)
  610. if err != nil {
  611. handleError(w, err, "Fail to read file", logger)
  612. return
  613. }
  614. defer reader.Close()
  615. buf := new(bytes.Buffer)
  616. _, err = io.Copy(buf, reader)
  617. if err != nil {
  618. handleError(w, err, "fail to convert file", logger)
  619. return
  620. }
  621. content = buf.Bytes()
  622. }
  623. rules, counts, err := rulesetProcessor.Import(content)
  624. if err != nil {
  625. handleError(w, nil, "Import ruleset error", logger)
  626. return
  627. }
  628. infra.SafeRun(func() error {
  629. for _, name := range rules {
  630. rul, ee := ruleProcessor.GetRuleById(name)
  631. if ee != nil {
  632. logger.Error(ee)
  633. continue
  634. }
  635. reply := recoverRule(rul)
  636. if reply != "" {
  637. logger.Error(reply)
  638. }
  639. }
  640. return nil
  641. })
  642. fmt.Fprintf(w, "imported %d streams, %d tables and %d rules", counts[0], counts[1], counts[2])
  643. }
  644. func exportHandler(w http.ResponseWriter, r *http.Request) {
  645. const name = "ekuiper_export.json"
  646. exported, _, err := rulesetProcessor.Export()
  647. if err != nil {
  648. handleError(w, err, "export error", logger)
  649. return
  650. }
  651. w.Header().Set("Content-Type", "application/octet-stream")
  652. w.Header().Add("Content-Disposition", "Attachment")
  653. http.ServeContent(w, r, name, time.Now(), exported)
  654. }
  655. type Configuration struct {
  656. Streams map[string]string `json:"streams"`
  657. Tables map[string]string `json:"tables"`
  658. Rules map[string]string `json:"rules"`
  659. NativePlugins map[string]string `json:"nativePlugins"`
  660. PortablePlugins map[string]string `json:"portablePlugins"`
  661. SourceConfig map[string]string `json:"sourceConfig"`
  662. SinkConfig map[string]string `json:"sinkConfig"`
  663. ConnectionConfig map[string]string `json:"connectionConfig"`
  664. Service map[string]string `json:"Service"`
  665. Schema map[string]string `json:"Schema"`
  666. Uploads map[string]string `json:"uploads"`
  667. }
  668. func configurationExport() ([]byte, error) {
  669. conf := &Configuration{
  670. Streams: make(map[string]string),
  671. Tables: make(map[string]string),
  672. Rules: make(map[string]string),
  673. NativePlugins: make(map[string]string),
  674. PortablePlugins: make(map[string]string),
  675. SourceConfig: make(map[string]string),
  676. SinkConfig: make(map[string]string),
  677. ConnectionConfig: make(map[string]string),
  678. Service: make(map[string]string),
  679. Schema: make(map[string]string),
  680. Uploads: make(map[string]string),
  681. }
  682. ruleSet := rulesetProcessor.ExportRuleSet()
  683. if ruleSet != nil {
  684. conf.Streams = ruleSet.Streams
  685. conf.Tables = ruleSet.Tables
  686. conf.Rules = ruleSet.Rules
  687. }
  688. conf.NativePlugins = pluginExport()
  689. conf.PortablePlugins = portablePluginExport()
  690. conf.Service = serviceExport()
  691. conf.Schema = schemaExport()
  692. conf.Uploads = uploadsExport()
  693. yamlCfg := meta.GetConfigurations()
  694. conf.SourceConfig = yamlCfg.Sources
  695. conf.SinkConfig = yamlCfg.Sinks
  696. conf.ConnectionConfig = yamlCfg.Connections
  697. return json.Marshal(conf)
  698. }
  699. func configurationExportHandler(w http.ResponseWriter, r *http.Request) {
  700. var jsonBytes []byte
  701. const name = "ekuiper_export.json"
  702. switch r.Method {
  703. case http.MethodGet:
  704. jsonBytes, _ = configurationExport()
  705. case http.MethodPost:
  706. var rules []string
  707. _ = json.NewDecoder(r.Body).Decode(&rules)
  708. jsonBytes, _ = ruleMigrationProcessor.ConfigurationPartialExport(rules)
  709. }
  710. w.Header().Set("Content-Type", "application/octet-stream")
  711. w.Header().Add("Content-Disposition", "Attachment")
  712. http.ServeContent(w, r, name, time.Now(), bytes.NewReader(jsonBytes))
  713. }
  714. func configurationReset() {
  715. _ = resetAllRules()
  716. _ = resetAllStreams()
  717. pluginReset()
  718. portablePluginsReset()
  719. serviceReset()
  720. schemaReset()
  721. meta.ResetConfigs()
  722. uploadsReset()
  723. }
  724. type ImportConfigurationStatus struct {
  725. ErrorMsg string
  726. ConfigResponse Configuration
  727. }
  728. func configurationImport(data []byte, reboot bool) ImportConfigurationStatus {
  729. conf := &Configuration{
  730. Streams: make(map[string]string),
  731. Tables: make(map[string]string),
  732. Rules: make(map[string]string),
  733. NativePlugins: make(map[string]string),
  734. PortablePlugins: make(map[string]string),
  735. SourceConfig: make(map[string]string),
  736. SinkConfig: make(map[string]string),
  737. ConnectionConfig: make(map[string]string),
  738. Service: make(map[string]string),
  739. Schema: make(map[string]string),
  740. Uploads: make(map[string]string),
  741. }
  742. importStatus := ImportConfigurationStatus{}
  743. configResponse := Configuration{
  744. Streams: make(map[string]string),
  745. Tables: make(map[string]string),
  746. Rules: make(map[string]string),
  747. NativePlugins: make(map[string]string),
  748. PortablePlugins: make(map[string]string),
  749. SourceConfig: make(map[string]string),
  750. SinkConfig: make(map[string]string),
  751. ConnectionConfig: make(map[string]string),
  752. Service: make(map[string]string),
  753. Schema: make(map[string]string),
  754. Uploads: make(map[string]string),
  755. }
  756. ResponseNil := Configuration{
  757. Streams: make(map[string]string),
  758. Tables: make(map[string]string),
  759. Rules: make(map[string]string),
  760. NativePlugins: make(map[string]string),
  761. PortablePlugins: make(map[string]string),
  762. SourceConfig: make(map[string]string),
  763. SinkConfig: make(map[string]string),
  764. ConnectionConfig: make(map[string]string),
  765. Service: make(map[string]string),
  766. Schema: make(map[string]string),
  767. Uploads: make(map[string]string),
  768. }
  769. err := json.Unmarshal(data, conf)
  770. if err != nil {
  771. importStatus.ErrorMsg = fmt.Errorf("configuration unmarshal with error %v", err).Error()
  772. return importStatus
  773. }
  774. configResponse.Uploads = uploadsImport(conf.Uploads)
  775. if reboot {
  776. err = pluginImport(conf.NativePlugins)
  777. if err != nil {
  778. importStatus.ErrorMsg = fmt.Errorf("pluginImport NativePlugins import error %v", err).Error()
  779. return importStatus
  780. }
  781. err = schemaImport(conf.Schema)
  782. if err != nil {
  783. importStatus.ErrorMsg = fmt.Errorf("schemaImport Schema import error %v", err).Error()
  784. return importStatus
  785. }
  786. }
  787. configResponse.PortablePlugins = portablePluginImport(conf.PortablePlugins)
  788. configResponse.Service = serviceImport(conf.Service)
  789. yamlCfgSet := meta.YamlConfigurationSet{
  790. Sources: conf.SourceConfig,
  791. Sinks: conf.SinkConfig,
  792. Connections: conf.ConnectionConfig,
  793. }
  794. confRsp := meta.LoadConfigurations(yamlCfgSet)
  795. configResponse.SourceConfig = confRsp.Sources
  796. configResponse.SinkConfig = confRsp.Sinks
  797. configResponse.ConnectionConfig = confRsp.Connections
  798. ruleSet := processor.Ruleset{
  799. Streams: conf.Streams,
  800. Tables: conf.Tables,
  801. Rules: conf.Rules,
  802. }
  803. result := rulesetProcessor.ImportRuleSet(ruleSet)
  804. configResponse.Streams = result.Streams
  805. configResponse.Tables = result.Tables
  806. configResponse.Rules = result.Rules
  807. if !reboot {
  808. infra.SafeRun(func() error {
  809. for name := range ruleSet.Rules {
  810. rul, ee := ruleProcessor.GetRuleById(name)
  811. if ee != nil {
  812. logger.Error(ee)
  813. continue
  814. }
  815. reply := recoverRule(rul)
  816. if reply != "" {
  817. logger.Error(reply)
  818. }
  819. }
  820. return nil
  821. })
  822. }
  823. if reflect.DeepEqual(ResponseNil, configResponse) {
  824. importStatus.ConfigResponse = ResponseNil
  825. } else {
  826. importStatus.ErrorMsg = "process error"
  827. importStatus.ConfigResponse = configResponse
  828. }
  829. return importStatus
  830. }
  831. func configurationPartialImport(data []byte) ImportConfigurationStatus {
  832. conf := &Configuration{
  833. Streams: make(map[string]string),
  834. Tables: make(map[string]string),
  835. Rules: make(map[string]string),
  836. NativePlugins: make(map[string]string),
  837. PortablePlugins: make(map[string]string),
  838. SourceConfig: make(map[string]string),
  839. SinkConfig: make(map[string]string),
  840. ConnectionConfig: make(map[string]string),
  841. Service: make(map[string]string),
  842. Schema: make(map[string]string),
  843. Uploads: make(map[string]string),
  844. }
  845. importStatus := ImportConfigurationStatus{}
  846. configResponse := Configuration{
  847. Streams: make(map[string]string),
  848. Tables: make(map[string]string),
  849. Rules: make(map[string]string),
  850. NativePlugins: make(map[string]string),
  851. PortablePlugins: make(map[string]string),
  852. SourceConfig: make(map[string]string),
  853. SinkConfig: make(map[string]string),
  854. ConnectionConfig: make(map[string]string),
  855. Service: make(map[string]string),
  856. Schema: make(map[string]string),
  857. Uploads: make(map[string]string),
  858. }
  859. ResponseNil := Configuration{
  860. Streams: make(map[string]string),
  861. Tables: make(map[string]string),
  862. Rules: make(map[string]string),
  863. NativePlugins: make(map[string]string),
  864. PortablePlugins: make(map[string]string),
  865. SourceConfig: make(map[string]string),
  866. SinkConfig: make(map[string]string),
  867. ConnectionConfig: make(map[string]string),
  868. Service: make(map[string]string),
  869. Schema: make(map[string]string),
  870. Uploads: make(map[string]string),
  871. }
  872. err := json.Unmarshal(data, conf)
  873. if err != nil {
  874. importStatus.ErrorMsg = fmt.Errorf("configuration unmarshal with error %v", err).Error()
  875. return importStatus
  876. }
  877. yamlCfgSet := meta.YamlConfigurationSet{
  878. Sources: conf.SourceConfig,
  879. Sinks: conf.SinkConfig,
  880. Connections: conf.ConnectionConfig,
  881. }
  882. confRsp := meta.LoadConfigurationsPartial(yamlCfgSet)
  883. configResponse.Uploads = uploadsImport(conf.Uploads)
  884. configResponse.NativePlugins = pluginPartialImport(conf.NativePlugins)
  885. configResponse.Schema = schemaPartialImport(conf.Schema)
  886. configResponse.PortablePlugins = portablePluginPartialImport(conf.PortablePlugins)
  887. configResponse.Service = servicePartialImport(conf.Service)
  888. configResponse.SourceConfig = confRsp.Sources
  889. configResponse.SinkConfig = confRsp.Sinks
  890. configResponse.ConnectionConfig = confRsp.Connections
  891. ruleSet := processor.Ruleset{
  892. Streams: conf.Streams,
  893. Tables: conf.Tables,
  894. Rules: conf.Rules,
  895. }
  896. result := importRuleSetPartial(ruleSet)
  897. configResponse.Streams = result.Streams
  898. configResponse.Tables = result.Tables
  899. configResponse.Rules = result.Rules
  900. if reflect.DeepEqual(ResponseNil, configResponse) {
  901. importStatus.ConfigResponse = ResponseNil
  902. } else {
  903. importStatus.ErrorMsg = "process error"
  904. importStatus.ConfigResponse = configResponse
  905. }
  906. return importStatus
  907. }
  908. type configurationInfo struct {
  909. Content string `json:"content"`
  910. FilePath string `json:"file"`
  911. }
  912. func configurationImportHandler(w http.ResponseWriter, r *http.Request) {
  913. cb := r.URL.Query().Get("stop")
  914. stop := cb == "1"
  915. par := r.URL.Query().Get("partial")
  916. partial := par == "1"
  917. rsi := &configurationInfo{}
  918. err := json.NewDecoder(r.Body).Decode(rsi)
  919. if err != nil {
  920. handleError(w, err, "Invalid body: Error decoding json", logger)
  921. return
  922. }
  923. if rsi.Content != "" && rsi.FilePath != "" {
  924. handleError(w, errors.New("bad request"), "Invalid body: Cannot specify both content and file", logger)
  925. return
  926. } else if rsi.Content == "" && rsi.FilePath == "" {
  927. handleError(w, errors.New("bad request"), "Invalid body: must specify content or file", logger)
  928. return
  929. }
  930. content := []byte(rsi.Content)
  931. if rsi.FilePath != "" {
  932. reader, err := httpx.ReadFile(rsi.FilePath)
  933. if err != nil {
  934. handleError(w, err, "Fail to read file", logger)
  935. return
  936. }
  937. defer reader.Close()
  938. buf := new(bytes.Buffer)
  939. _, err = io.Copy(buf, reader)
  940. if err != nil {
  941. handleError(w, err, "fail to convert file", logger)
  942. return
  943. }
  944. content = buf.Bytes()
  945. }
  946. if !partial {
  947. configurationReset()
  948. result := configurationImport(content, stop)
  949. if result.ErrorMsg != "" {
  950. w.WriteHeader(http.StatusBadRequest)
  951. jsonResponse(result, w, logger)
  952. } else {
  953. w.WriteHeader(http.StatusOK)
  954. jsonResponse(result, w, logger)
  955. }
  956. if stop {
  957. go func() {
  958. time.Sleep(1 * time.Second)
  959. os.Exit(100)
  960. }()
  961. }
  962. } else {
  963. result := configurationPartialImport(content)
  964. if result.ErrorMsg != "" {
  965. w.WriteHeader(http.StatusBadRequest)
  966. jsonResponse(result, w, logger)
  967. } else {
  968. w.WriteHeader(http.StatusOK)
  969. jsonResponse(result, w, logger)
  970. }
  971. }
  972. }
  973. func configurationStatusExport() Configuration {
  974. conf := Configuration{
  975. Streams: make(map[string]string),
  976. Tables: make(map[string]string),
  977. Rules: make(map[string]string),
  978. NativePlugins: make(map[string]string),
  979. PortablePlugins: make(map[string]string),
  980. SourceConfig: make(map[string]string),
  981. SinkConfig: make(map[string]string),
  982. ConnectionConfig: make(map[string]string),
  983. Service: make(map[string]string),
  984. Schema: make(map[string]string),
  985. Uploads: make(map[string]string),
  986. }
  987. ruleSet := rulesetProcessor.ExportRuleSetStatus()
  988. if ruleSet != nil {
  989. conf.Streams = ruleSet.Streams
  990. conf.Tables = ruleSet.Tables
  991. conf.Rules = ruleSet.Rules
  992. }
  993. conf.NativePlugins = pluginStatusExport()
  994. conf.PortablePlugins = portablePluginStatusExport()
  995. conf.Service = serviceStatusExport()
  996. conf.Schema = schemaStatusExport()
  997. conf.Uploads = uploadsStatusExport()
  998. yamlCfgStatus := meta.GetConfigurationStatus()
  999. conf.SourceConfig = yamlCfgStatus.Sources
  1000. conf.SinkConfig = yamlCfgStatus.Sinks
  1001. conf.ConnectionConfig = yamlCfgStatus.Connections
  1002. return conf
  1003. }
  1004. func configurationUpdateHandler(w http.ResponseWriter, r *http.Request) {
  1005. basic := struct {
  1006. Debug *bool `json:"debug"`
  1007. ConsoleLog *bool `json:"consoleLog"`
  1008. FileLog *bool `json:"fileLog"`
  1009. TimeZone *string `json:"timezone"`
  1010. }{}
  1011. if err := json.NewDecoder(r.Body).Decode(&basic); err != nil {
  1012. w.WriteHeader(http.StatusBadRequest)
  1013. handleError(w, err, "Invalid JSON", logger)
  1014. return
  1015. }
  1016. if basic.Debug != nil {
  1017. conf.SetDebugLevel(*basic.Debug)
  1018. conf.Config.Basic.Debug = *basic.Debug
  1019. }
  1020. if basic.TimeZone != nil {
  1021. if err := cast.SetTimeZone(*basic.TimeZone); err != nil {
  1022. w.WriteHeader(http.StatusBadRequest)
  1023. handleError(w, err, "Invalid TZ", logger)
  1024. return
  1025. }
  1026. conf.Config.Basic.TimeZone = *basic.TimeZone
  1027. }
  1028. if basic.ConsoleLog != nil || basic.FileLog != nil {
  1029. consoleLog := conf.Config.Basic.ConsoleLog
  1030. if basic.ConsoleLog != nil {
  1031. consoleLog = *basic.ConsoleLog
  1032. }
  1033. fileLog := conf.Config.Basic.FileLog
  1034. if basic.FileLog != nil {
  1035. fileLog = *basic.FileLog
  1036. }
  1037. if err := conf.SetConsoleAndFileLog(consoleLog, fileLog); err != nil {
  1038. w.WriteHeader(http.StatusBadRequest)
  1039. handleError(w, err, "", logger)
  1040. return
  1041. }
  1042. conf.Config.Basic.ConsoleLog = consoleLog
  1043. conf.Config.Basic.FileLog = fileLog
  1044. }
  1045. w.WriteHeader(http.StatusNoContent)
  1046. }
  1047. func configurationStatusHandler(w http.ResponseWriter, r *http.Request) {
  1048. defer r.Body.Close()
  1049. content := configurationStatusExport()
  1050. jsonResponse(content, w, logger)
  1051. }
  1052. func importRuleSetPartial(all processor.Ruleset) processor.Ruleset {
  1053. ruleSetRsp := processor.Ruleset{
  1054. Rules: map[string]string{},
  1055. Streams: map[string]string{},
  1056. Tables: map[string]string{},
  1057. }
  1058. // replace streams
  1059. for k, v := range all.Streams {
  1060. _, e := streamProcessor.ExecReplaceStream(k, v, ast.TypeStream)
  1061. if e != nil {
  1062. ruleSetRsp.Streams[k] = e.Error()
  1063. continue
  1064. }
  1065. }
  1066. // replace tables
  1067. for k, v := range all.Tables {
  1068. _, e := streamProcessor.ExecReplaceStream(k, v, ast.TypeTable)
  1069. if e != nil {
  1070. ruleSetRsp.Tables[k] = e.Error()
  1071. continue
  1072. }
  1073. }
  1074. for k, v := range all.Rules {
  1075. _, err := ruleProcessor.GetRuleJson(k)
  1076. if err == nil {
  1077. // the rule already exist, update
  1078. err = updateRule(k, v)
  1079. if err != nil {
  1080. ruleSetRsp.Rules[k] = err.Error()
  1081. continue
  1082. }
  1083. // Update to db after validation
  1084. _, err = ruleProcessor.ExecUpdate(k, v)
  1085. if err != nil {
  1086. ruleSetRsp.Rules[k] = err.Error()
  1087. continue
  1088. }
  1089. } else {
  1090. // not found, create
  1091. _, err2 := createRule(k, v)
  1092. if err2 != nil {
  1093. ruleSetRsp.Rules[k] = err2.Error()
  1094. continue
  1095. }
  1096. }
  1097. }
  1098. return ruleSetRsp
  1099. }
  1100. func uploadsReset() {
  1101. _ = uploadsDb.Clean()
  1102. _ = uploadsStatusDb.Clean()
  1103. }
  1104. func uploadsExport() map[string]string {
  1105. conf, _ := uploadsDb.All()
  1106. return conf
  1107. }
  1108. func uploadsStatusExport() map[string]string {
  1109. status, _ := uploadsDb.All()
  1110. return status
  1111. }
  1112. func uploadsImport(s map[string]string) map[string]string {
  1113. errMap := map[string]string{}
  1114. _ = uploadsStatusDb.Clean()
  1115. for k, v := range s {
  1116. fc := &fileContent{}
  1117. err := json.Unmarshal([]byte(v), fc)
  1118. if err != nil {
  1119. errMsg := fmt.Sprintf("invalid body: Error decoding file json: %s", err.Error())
  1120. errMap[k] = errMsg
  1121. _ = uploadsStatusDb.Set(k, errMsg)
  1122. continue
  1123. }
  1124. err = fc.Validate()
  1125. if err != nil {
  1126. errMap[k] = err.Error()
  1127. _ = uploadsStatusDb.Set(k, err.Error())
  1128. continue
  1129. }
  1130. err = upload(fc)
  1131. if err != nil {
  1132. errMap[k] = err.Error()
  1133. continue
  1134. }
  1135. }
  1136. return errMap
  1137. }