schema_init.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build schema || !core
  15. // +build schema !core
  16. package server
  17. import (
  18. "encoding/json"
  19. "fmt"
  20. "net/http"
  21. "github.com/gorilla/mux"
  22. "github.com/lf-edge/ekuiper/internal/pkg/def"
  23. "github.com/lf-edge/ekuiper/internal/schema"
  24. "github.com/lf-edge/ekuiper/pkg/errorx"
  25. )
  26. func init() {
  27. components["schema"] = schemaComp{}
  28. }
  29. type schemaComp struct{}
  30. func (sc schemaComp) register() {
  31. err := schema.InitRegistry()
  32. if err != nil {
  33. panic(err)
  34. }
  35. }
  36. func (sc schemaComp) rest(r *mux.Router) {
  37. r.HandleFunc("/schemas/{type}", schemasHandler).Methods(http.MethodGet, http.MethodPost)
  38. r.HandleFunc("/schemas/{type}/{name}", schemaHandler).Methods(http.MethodPut, http.MethodDelete, http.MethodGet)
  39. }
  40. func schemasHandler(w http.ResponseWriter, r *http.Request) {
  41. vars := mux.Vars(r)
  42. st := vars["type"]
  43. defer r.Body.Close()
  44. switch r.Method {
  45. case http.MethodGet:
  46. l, err := schema.GetAllForType(def.SchemaType(st))
  47. if err != nil {
  48. handleError(w, err, "", logger)
  49. return
  50. }
  51. jsonResponse(l, w, logger)
  52. case http.MethodPost:
  53. sch := &schema.Info{Type: def.SchemaType(st)}
  54. err := json.NewDecoder(r.Body).Decode(sch)
  55. if err != nil {
  56. handleError(w, err, "Invalid body: Error decoding schema json", logger)
  57. return
  58. }
  59. if err = sch.Validate(); err != nil {
  60. handleError(w, nil, "Invalid body", logger)
  61. return
  62. }
  63. err = schema.Register(sch)
  64. if err != nil {
  65. handleError(w, err, "schema create command error", logger)
  66. return
  67. }
  68. w.WriteHeader(http.StatusCreated)
  69. fmt.Fprintf(w, "%s schema %s is created", sch.Type, sch.Name)
  70. }
  71. }
  72. func schemaHandler(w http.ResponseWriter, r *http.Request) {
  73. defer r.Body.Close()
  74. vars := mux.Vars(r)
  75. st := vars["type"]
  76. name := vars["name"]
  77. switch r.Method {
  78. case http.MethodGet:
  79. j, err := schema.GetSchema(def.SchemaType(st), name)
  80. if err != nil {
  81. handleError(w, err, "", logger)
  82. return
  83. } else if j == nil {
  84. handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), "", logger)
  85. return
  86. }
  87. jsonResponse(j, w, logger)
  88. case http.MethodDelete:
  89. err := schema.DeleteSchema(def.SchemaType(st), name)
  90. if err != nil {
  91. handleError(w, err, fmt.Sprintf("delete %s schema %s error", st, name), logger)
  92. return
  93. }
  94. w.WriteHeader(http.StatusOK)
  95. fmt.Fprintf(w, "%s schema %s is deleted", st, name)
  96. case http.MethodPut:
  97. sch := &schema.Info{Type: def.SchemaType(st), Name: name}
  98. err := json.NewDecoder(r.Body).Decode(sch)
  99. if err != nil {
  100. handleError(w, err, "Invalid body: Error decoding schema json", logger)
  101. return
  102. }
  103. if sch.Type != def.SchemaType(st) || sch.Name != name {
  104. handleError(w, nil, "Invalid body: Type or name does not match", logger)
  105. return
  106. }
  107. if err = sch.Validate(); err != nil {
  108. handleError(w, nil, "Invalid body", logger)
  109. return
  110. }
  111. err = schema.CreateOrUpdateSchema(sch)
  112. if err != nil {
  113. handleError(w, err, "schema update command error", logger)
  114. return
  115. }
  116. w.WriteHeader(http.StatusOK)
  117. fmt.Fprintf(w, "%s schema %s is updated", sch.Type, sch.Name)
  118. }
  119. }
  120. func schemaReset() {
  121. schema.UninstallAllSchema()
  122. }
  123. func schemaExport() map[string]string {
  124. return schema.GetAllSchema()
  125. }
  126. func schemaStatusExport() map[string]string {
  127. return schema.GetAllSchemaStatus()
  128. }
  129. func schemaImport(s map[string]string) error {
  130. return schema.ImportSchema(s)
  131. }
  132. func schemaPartialImport(s map[string]string) map[string]string {
  133. return schema.SchemaPartialImport(s)
  134. }
  135. func getSchemaInstallScript(s string) (string, string) {
  136. return schema.GetSchemaInstallScript(s)
  137. }