meta_init.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build ui || !core
  15. // +build ui !core
  16. package server
  17. import (
  18. "fmt"
  19. "io"
  20. "net/http"
  21. "strings"
  22. "github.com/gorilla/mux"
  23. "github.com/lf-edge/ekuiper/internal/meta"
  24. )
  25. func init() {
  26. components["meta"] = metaComp{}
  27. }
  28. var metaEndpoints []restEndpoint
  29. type metaComp struct {
  30. }
  31. func (m metaComp) register() {
  32. // do nothing
  33. }
  34. func (m metaComp) rest(r *mux.Router) {
  35. r.HandleFunc("/metadata/functions", functionsMetaHandler).Methods(http.MethodGet)
  36. r.HandleFunc("/metadata/operators", operatorsMetaHandler).Methods(http.MethodGet)
  37. r.HandleFunc("/metadata/sinks", sinksMetaHandler).Methods(http.MethodGet)
  38. r.HandleFunc("/metadata/sinks/{name}", newSinkMetaHandler).Methods(http.MethodGet)
  39. r.HandleFunc("/metadata/sources", sourcesMetaHandler).Methods(http.MethodGet)
  40. r.HandleFunc("/metadata/sources/{name}", sourceMetaHandler).Methods(http.MethodGet)
  41. r.HandleFunc("/metadata/sources/yaml/{name}", sourceConfHandler).Methods(http.MethodGet)
  42. r.HandleFunc("/metadata/sources/{name}/confKeys/{confKey}", sourceConfKeyHandler).Methods(http.MethodDelete, http.MethodPut)
  43. r.HandleFunc("/metadata/connections", connectionsMetaHandler).Methods(http.MethodGet)
  44. r.HandleFunc("/metadata/connections/{name}", connectionMetaHandler).Methods(http.MethodGet)
  45. r.HandleFunc("/metadata/connections/yaml/{name}", connectionConfHandler).Methods(http.MethodGet)
  46. r.HandleFunc("/metadata/connections/{name}/confKeys/{confKey}", connectionConfKeyHandler).Methods(http.MethodDelete, http.MethodPut)
  47. for _, endpoint := range metaEndpoints {
  48. endpoint(r)
  49. }
  50. }
  51. // list sink plugin
  52. func sinksMetaHandler(w http.ResponseWriter, r *http.Request) {
  53. defer r.Body.Close()
  54. sinks := meta.GetSinks()
  55. jsonResponse(sinks, w, logger)
  56. return
  57. }
  58. // Get sink metadata when creating rules
  59. func newSinkMetaHandler(w http.ResponseWriter, r *http.Request) {
  60. defer r.Body.Close()
  61. vars := mux.Vars(r)
  62. pluginName := vars["name"]
  63. language := getLanguage(r)
  64. ptrMetadata, err := meta.GetSinkMeta(pluginName, language)
  65. if err != nil {
  66. handleError(w, err, "", logger)
  67. return
  68. }
  69. jsonResponse(ptrMetadata, w, logger)
  70. }
  71. // list functions
  72. func functionsMetaHandler(w http.ResponseWriter, r *http.Request) {
  73. defer r.Body.Close()
  74. funcs := meta.GetFunctions()
  75. jsonByteResponse(funcs, w, logger)
  76. return
  77. }
  78. // list operators
  79. func operatorsMetaHandler(w http.ResponseWriter, r *http.Request) {
  80. defer r.Body.Close()
  81. ops := meta.GetOperators()
  82. jsonByteResponse(ops, w, logger)
  83. return
  84. }
  85. // list source plugin
  86. func sourcesMetaHandler(w http.ResponseWriter, r *http.Request) {
  87. defer r.Body.Close()
  88. ret := meta.GetSourcesPlugins()
  89. if nil != ret {
  90. jsonResponse(ret, w, logger)
  91. return
  92. }
  93. }
  94. // list shareMeta
  95. func connectionsMetaHandler(w http.ResponseWriter, r *http.Request) {
  96. defer r.Body.Close()
  97. ret := meta.GetConnectionPlugins()
  98. if nil != ret {
  99. jsonResponse(ret, w, logger)
  100. return
  101. }
  102. }
  103. // Get source metadata when creating stream
  104. func sourceMetaHandler(w http.ResponseWriter, r *http.Request) {
  105. defer r.Body.Close()
  106. vars := mux.Vars(r)
  107. pluginName := vars["name"]
  108. language := getLanguage(r)
  109. ret, err := meta.GetSourceMeta(pluginName, language)
  110. if err != nil {
  111. handleError(w, err, "", logger)
  112. return
  113. }
  114. if nil != ret {
  115. jsonResponse(ret, w, logger)
  116. return
  117. }
  118. }
  119. // Get source metadata when creating stream
  120. func connectionMetaHandler(w http.ResponseWriter, r *http.Request) {
  121. defer r.Body.Close()
  122. vars := mux.Vars(r)
  123. pluginName := vars["name"]
  124. language := getLanguage(r)
  125. ret, err := meta.GetConnectionMeta(pluginName, language)
  126. if err != nil {
  127. handleError(w, err, "", logger)
  128. return
  129. }
  130. if nil != ret {
  131. jsonResponse(ret, w, logger)
  132. return
  133. }
  134. }
  135. // Get source yaml
  136. func sourceConfHandler(w http.ResponseWriter, r *http.Request) {
  137. defer r.Body.Close()
  138. vars := mux.Vars(r)
  139. pluginName := vars["name"]
  140. language := getLanguage(r)
  141. configOperatorKey := fmt.Sprintf(meta.SourceCfgOperatorKeyTemplate, pluginName)
  142. ret, err := meta.GetYamlConf(configOperatorKey, language)
  143. if err != nil {
  144. handleError(w, err, "", logger)
  145. return
  146. } else {
  147. w.Write(ret)
  148. }
  149. }
  150. // Get share yaml
  151. func connectionConfHandler(w http.ResponseWriter, r *http.Request) {
  152. defer r.Body.Close()
  153. vars := mux.Vars(r)
  154. pluginName := vars["name"]
  155. language := getLanguage(r)
  156. configOperatorKey := fmt.Sprintf(meta.ConnectionCfgOperatorKeyTemplate, pluginName)
  157. ret, err := meta.GetYamlConf(configOperatorKey, language)
  158. if err != nil {
  159. handleError(w, err, "", logger)
  160. return
  161. } else {
  162. w.Write(ret)
  163. }
  164. }
  165. // Add del confkey
  166. func sourceConfKeyHandler(w http.ResponseWriter, r *http.Request) {
  167. defer r.Body.Close()
  168. var err error
  169. vars := mux.Vars(r)
  170. pluginName := vars["name"]
  171. confKey := vars["confKey"]
  172. language := getLanguage(r)
  173. switch r.Method {
  174. case http.MethodDelete:
  175. err = meta.DelSourceConfKey(pluginName, confKey, language)
  176. case http.MethodPut:
  177. v, err1 := io.ReadAll(r.Body)
  178. if err1 != nil {
  179. handleError(w, err, "Invalid body", logger)
  180. return
  181. }
  182. err = meta.AddSourceConfKey(pluginName, confKey, language, v)
  183. }
  184. if err != nil {
  185. handleError(w, err, "", logger)
  186. return
  187. }
  188. }
  189. // Add del confkey
  190. func connectionConfKeyHandler(w http.ResponseWriter, r *http.Request) {
  191. defer r.Body.Close()
  192. var err error
  193. vars := mux.Vars(r)
  194. pluginName := vars["name"]
  195. confKey := vars["confKey"]
  196. language := getLanguage(r)
  197. switch r.Method {
  198. case http.MethodDelete:
  199. err = meta.DelConnectionConfKey(pluginName, confKey, language)
  200. case http.MethodPut:
  201. v, err1 := io.ReadAll(r.Body)
  202. if err1 != nil {
  203. handleError(w, err1, "Invalid body", logger)
  204. return
  205. }
  206. err = meta.AddConnectionConfKey(pluginName, confKey, language, v)
  207. }
  208. if err != nil {
  209. handleError(w, err, "", logger)
  210. return
  211. }
  212. }
  213. func getLanguage(r *http.Request) string {
  214. language := r.Header.Get("Content-Language")
  215. if 0 == len(language) {
  216. language = "en_US"
  217. } else {
  218. strings.ReplaceAll(language, "-", "_")
  219. }
  220. return language
  221. }