meta_init.go 9.2 KB


  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build ui || !core
  15. // +build ui !core
  16. package server
  17. import (
  18. "encoding/json"
  19. "fmt"
  20. "io"
  21. "net/http"
  22. "strings"
  23. "github.com/gorilla/mux"
  24. "github.com/lf-edge/ekuiper/internal/meta"
  25. "github.com/lf-edge/ekuiper/internal/topo/node"
  26. "github.com/lf-edge/ekuiper/pkg/ast"
  27. )
  28. func init() {
  29. components["meta"] = metaComp{}
  30. }
  31. var metaEndpoints []restEndpoint
  32. type metaComp struct{}
  33. func (m metaComp) register() {
  34. // do nothing
  35. }
  36. func (m metaComp) rest(r *mux.Router) {
  37. r.HandleFunc("/metadata/functions", functionsMetaHandler).Methods(http.MethodGet)
  38. r.HandleFunc("/metadata/operators", operatorsMetaHandler).Methods(http.MethodGet)
  39. r.HandleFunc("/metadata/sinks", sinksMetaHandler).Methods(http.MethodGet)
  40. r.HandleFunc("/metadata/sinks/{name}", newSinkMetaHandler).Methods(http.MethodGet)
  41. r.HandleFunc("/metadata/sources", sourcesMetaHandler).Methods(http.MethodGet)
  42. r.HandleFunc("/metadata/sources/{name}", sourceMetaHandler).Methods(http.MethodGet)
  43. r.HandleFunc("/metadata/sources/yaml/{name}", sourceConfHandler).Methods(http.MethodGet)
  44. r.HandleFunc("/metadata/sources/{name}/confKeys/{confKey}", sourceConfKeyHandler).Methods(http.MethodDelete, http.MethodPut)
  45. r.HandleFunc("/metadata/sinks/yaml/{name}", sinkConfHandler).Methods(http.MethodGet)
  46. r.HandleFunc("/metadata/sinks/{name}/confKeys/{confKey}", sinkConfKeyHandler).Methods(http.MethodDelete, http.MethodPut)
  47. r.HandleFunc("/metadata/connections", connectionsMetaHandler).Methods(http.MethodGet)
  48. r.HandleFunc("/metadata/connections/{name}", connectionMetaHandler).Methods(http.MethodGet)
  49. r.HandleFunc("/metadata/connections/yaml/{name}", connectionConfHandler).Methods(http.MethodGet)
  50. r.HandleFunc("/metadata/connections/{name}/confKeys/{confKey}", connectionConfKeyHandler).Methods(http.MethodDelete, http.MethodPut)
  51. r.HandleFunc("/metadata/resources", resourcesHandler).Methods(http.MethodGet)
  52. r.HandleFunc("/metadata/sources/connection/{name}", sourceConnectionHandler).Methods(http.MethodPost)
  53. r.HandleFunc("/metadata/sinks/connection/{name}", sinkConnectionHandler).Methods(http.MethodPost)
  54. for _, endpoint := range metaEndpoints {
  55. endpoint(r)
  56. }
  57. }
  58. // list sink plugin
  59. func sinksMetaHandler(w http.ResponseWriter, r *http.Request) {
  60. defer r.Body.Close()
  61. sinks := meta.GetSinks()
  62. jsonResponse(sinks, w, logger)
  63. return
  64. }
  65. // Get sink metadata when creating rules
  66. func newSinkMetaHandler(w http.ResponseWriter, r *http.Request) {
  67. defer r.Body.Close()
  68. vars := mux.Vars(r)
  69. pluginName := vars["name"]
  70. language := getLanguage(r)
  71. ptrMetadata, err := meta.GetSinkMeta(pluginName, language)
  72. if err != nil {
  73. handleError(w, err, "", logger)
  74. return
  75. }
  76. jsonResponse(ptrMetadata, w, logger)
  77. }
  78. // list functions
  79. func functionsMetaHandler(w http.ResponseWriter, r *http.Request) {
  80. defer r.Body.Close()
  81. funcs := meta.GetFunctions()
  82. jsonByteResponse(funcs, w, logger)
  83. return
  84. }
  85. // list operators
  86. func operatorsMetaHandler(w http.ResponseWriter, r *http.Request) {
  87. defer r.Body.Close()
  88. ops := meta.GetOperators()
  89. jsonByteResponse(ops, w, logger)
  90. return
  91. }
  92. // list source plugin
  93. func sourcesMetaHandler(w http.ResponseWriter, r *http.Request) {
  94. defer r.Body.Close()
  95. kind := r.URL.Query().Get("kind")
  96. switch strings.ToLower(kind) {
  97. case "lookup":
  98. kind = ast.StreamKindLookup
  99. default:
  100. kind = ast.StreamKindScan
  101. }
  102. ret := meta.GetSourcesPlugins(kind)
  103. if nil != ret {
  104. jsonResponse(ret, w, logger)
  105. return
  106. }
  107. }
  108. // list shareMeta
  109. func connectionsMetaHandler(w http.ResponseWriter, r *http.Request) {
  110. defer r.Body.Close()
  111. ret := meta.GetConnectionPlugins()
  112. if nil != ret {
  113. jsonResponse(ret, w, logger)
  114. return
  115. }
  116. }
  117. // Get source metadata when creating stream
  118. func sourceMetaHandler(w http.ResponseWriter, r *http.Request) {
  119. defer r.Body.Close()
  120. vars := mux.Vars(r)
  121. pluginName := vars["name"]
  122. language := getLanguage(r)
  123. ret, err := meta.GetSourceMeta(pluginName, language)
  124. if err != nil {
  125. handleError(w, err, "", logger)
  126. return
  127. }
  128. if nil != ret {
  129. jsonResponse(ret, w, logger)
  130. return
  131. }
  132. }
  133. // Get source metadata when creating stream
  134. func connectionMetaHandler(w http.ResponseWriter, r *http.Request) {
  135. defer r.Body.Close()
  136. vars := mux.Vars(r)
  137. pluginName := vars["name"]
  138. language := getLanguage(r)
  139. ret, err := meta.GetConnectionMeta(pluginName, language)
  140. if err != nil {
  141. handleError(w, err, "", logger)
  142. return
  143. }
  144. if nil != ret {
  145. jsonResponse(ret, w, logger)
  146. return
  147. }
  148. }
  149. // Get source yaml
  150. func sourceConfHandler(w http.ResponseWriter, r *http.Request) {
  151. defer r.Body.Close()
  152. vars := mux.Vars(r)
  153. pluginName := vars["name"]
  154. language := getLanguage(r)
  155. configOperatorKey := fmt.Sprintf(meta.SourceCfgOperatorKeyTemplate, pluginName)
  156. ret, err := meta.GetYamlConf(configOperatorKey, language)
  157. if err != nil {
  158. handleError(w, err, "", logger)
  159. return
  160. } else {
  161. w.Write(ret)
  162. }
  163. }
  164. // Get share yaml
  165. func connectionConfHandler(w http.ResponseWriter, r *http.Request) {
  166. defer r.Body.Close()
  167. vars := mux.Vars(r)
  168. pluginName := vars["name"]
  169. language := getLanguage(r)
  170. configOperatorKey := fmt.Sprintf(meta.ConnectionCfgOperatorKeyTemplate, pluginName)
  171. ret, err := meta.GetYamlConf(configOperatorKey, language)
  172. if err != nil {
  173. handleError(w, err, "", logger)
  174. return
  175. } else {
  176. _, _ = w.Write(ret)
  177. }
  178. }
  179. // Get sink yaml
  180. func sinkConfHandler(w http.ResponseWriter, r *http.Request) {
  181. defer r.Body.Close()
  182. vars := mux.Vars(r)
  183. pluginName := vars["name"]
  184. language := getLanguage(r)
  185. configOperatorKey := fmt.Sprintf(meta.SinkCfgOperatorKeyTemplate, pluginName)
  186. ret, err := meta.GetYamlConf(configOperatorKey, language)
  187. if err != nil {
  188. handleError(w, err, "", logger)
  189. return
  190. } else {
  191. _, _ = w.Write(ret)
  192. }
  193. }
  194. // Add del confkey
  195. func sourceConfKeyHandler(w http.ResponseWriter, r *http.Request) {
  196. defer r.Body.Close()
  197. var err error
  198. vars := mux.Vars(r)
  199. pluginName := vars["name"]
  200. confKey := vars["confKey"]
  201. language := getLanguage(r)
  202. switch r.Method {
  203. case http.MethodDelete:
  204. err = meta.DelSourceConfKey(pluginName, confKey, language)
  205. case http.MethodPut:
  206. v, err1 := io.ReadAll(r.Body)
  207. if err1 != nil {
  208. handleError(w, err, "Invalid body", logger)
  209. return
  210. }
  211. err = meta.AddSourceConfKey(pluginName, confKey, language, v)
  212. }
  213. if err != nil {
  214. handleError(w, err, "", logger)
  215. return
  216. }
  217. }
  218. // Add del confkey
  219. func sinkConfKeyHandler(w http.ResponseWriter, r *http.Request) {
  220. defer r.Body.Close()
  221. var err error
  222. vars := mux.Vars(r)
  223. pluginName := vars["name"]
  224. confKey := vars["confKey"]
  225. language := getLanguage(r)
  226. switch r.Method {
  227. case http.MethodDelete:
  228. err = meta.DelSinkConfKey(pluginName, confKey, language)
  229. case http.MethodPut:
  230. v, err1 := io.ReadAll(r.Body)
  231. if err1 != nil {
  232. handleError(w, err, "Invalid body", logger)
  233. return
  234. }
  235. err = meta.AddSinkConfKey(pluginName, confKey, language, v)
  236. }
  237. if err != nil {
  238. handleError(w, err, "", logger)
  239. return
  240. }
  241. }
  242. // Add del confkey
  243. func connectionConfKeyHandler(w http.ResponseWriter, r *http.Request) {
  244. defer r.Body.Close()
  245. var err error
  246. vars := mux.Vars(r)
  247. pluginName := vars["name"]
  248. confKey := vars["confKey"]
  249. language := getLanguage(r)
  250. switch r.Method {
  251. case http.MethodDelete:
  252. err = meta.DelConnectionConfKey(pluginName, confKey, language)
  253. case http.MethodPut:
  254. v, err1 := io.ReadAll(r.Body)
  255. if err1 != nil {
  256. handleError(w, err1, "Invalid body", logger)
  257. return
  258. }
  259. err = meta.AddConnectionConfKey(pluginName, confKey, language, v)
  260. }
  261. if err != nil {
  262. handleError(w, err, "", logger)
  263. return
  264. }
  265. }
  266. // get updatable resources
  267. func resourcesHandler(w http.ResponseWriter, r *http.Request) {
  268. defer r.Body.Close()
  269. language := getLanguage(r)
  270. ret, err := meta.GetResources(language)
  271. if err != nil {
  272. handleError(w, err, "", logger)
  273. return
  274. } else {
  275. _, _ = w.Write(ret)
  276. }
  277. }
  278. func getLanguage(r *http.Request) string {
  279. language := r.Header.Get("Content-Language")
  280. if 0 == len(language) {
  281. language = "en_US"
  282. } else {
  283. language = strings.ReplaceAll(language, "-", "_")
  284. }
  285. return language
  286. }
  287. func sinkConnectionHandler(w http.ResponseWriter, r *http.Request) {
  288. defer r.Body.Close()
  289. vars := mux.Vars(r)
  290. sinkNm := vars["name"]
  291. config := map[string]interface{}{}
  292. v, _ := io.ReadAll(r.Body)
  293. err := json.Unmarshal(v, &config)
  294. if err != nil {
  295. handleError(w, err, "", logger)
  296. return
  297. }
  298. err = node.SinkOpen(sinkNm, config)
  299. if err != nil {
  300. handleError(w, err, "", logger)
  301. return
  302. }
  303. w.WriteHeader(http.StatusOK)
  304. }
  305. func sourceConnectionHandler(w http.ResponseWriter, r *http.Request) {
  306. defer r.Body.Close()
  307. vars := mux.Vars(r)
  308. sourceNm := vars["name"]
  309. config := map[string]interface{}{}
  310. v, _ := io.ReadAll(r.Body)
  311. err := json.Unmarshal(v, &config)
  312. if err != nil {
  313. handleError(w, err, "", logger)
  314. return
  315. }
  316. err = node.SourceOpen(sourceNm, config)
  317. if err != nil {
  318. handleError(w, err, "", logger)
  319. return
  320. }
  321. w.WriteHeader(http.StatusOK)
  322. }