meta_init.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build ui || !core
  15. // +build ui !core
  16. package server
  17. import (
  18. "encoding/json"
  19. "fmt"
  20. "github.com/lf-edge/ekuiper/internal/topo/node"
  21. "github.com/lf-edge/ekuiper/pkg/ast"
  22. "io"
  23. "net/http"
  24. "strings"
  25. "github.com/gorilla/mux"
  26. "github.com/lf-edge/ekuiper/internal/meta"
  27. )
  28. func init() {
  29. components["meta"] = metaComp{}
  30. }
  31. var metaEndpoints []restEndpoint
  32. type metaComp struct {
  33. }
  34. func (m metaComp) register() {
  35. // do nothing
  36. }
  37. func (m metaComp) rest(r *mux.Router) {
  38. r.HandleFunc("/metadata/functions", functionsMetaHandler).Methods(http.MethodGet)
  39. r.HandleFunc("/metadata/operators", operatorsMetaHandler).Methods(http.MethodGet)
  40. r.HandleFunc("/metadata/sinks", sinksMetaHandler).Methods(http.MethodGet)
  41. r.HandleFunc("/metadata/sinks/{name}", newSinkMetaHandler).Methods(http.MethodGet)
  42. r.HandleFunc("/metadata/sources", sourcesMetaHandler).Methods(http.MethodGet)
  43. r.HandleFunc("/metadata/sources/{name}", sourceMetaHandler).Methods(http.MethodGet)
  44. r.HandleFunc("/metadata/sources/yaml/{name}", sourceConfHandler).Methods(http.MethodGet)
  45. r.HandleFunc("/metadata/sources/{name}/confKeys/{confKey}", sourceConfKeyHandler).Methods(http.MethodDelete, http.MethodPut)
  46. r.HandleFunc("/metadata/sinks/yaml/{name}", sinkConfHandler).Methods(http.MethodGet)
  47. r.HandleFunc("/metadata/sinks/{name}/confKeys/{confKey}", sinkConfKeyHandler).Methods(http.MethodDelete, http.MethodPut)
  48. r.HandleFunc("/metadata/connections", connectionsMetaHandler).Methods(http.MethodGet)
  49. r.HandleFunc("/metadata/connections/{name}", connectionMetaHandler).Methods(http.MethodGet)
  50. r.HandleFunc("/metadata/connections/yaml/{name}", connectionConfHandler).Methods(http.MethodGet)
  51. r.HandleFunc("/metadata/connections/{name}/confKeys/{confKey}", connectionConfKeyHandler).Methods(http.MethodDelete, http.MethodPut)
  52. r.HandleFunc("/metadata/resources", resourcesHandler).Methods(http.MethodGet)
  53. r.HandleFunc("/metadata/sources/connection/{name}", sourceConnectionHandler).Methods(http.MethodPost)
  54. r.HandleFunc("/metadata/sinks/connection/{name}", sinkConnectionHandler).Methods(http.MethodPost)
  55. for _, endpoint := range metaEndpoints {
  56. endpoint(r)
  57. }
  58. }
  59. // list sink plugin
  60. func sinksMetaHandler(w http.ResponseWriter, r *http.Request) {
  61. defer r.Body.Close()
  62. sinks := meta.GetSinks()
  63. jsonResponse(sinks, w, logger)
  64. return
  65. }
  66. // Get sink metadata when creating rules
  67. func newSinkMetaHandler(w http.ResponseWriter, r *http.Request) {
  68. defer r.Body.Close()
  69. vars := mux.Vars(r)
  70. pluginName := vars["name"]
  71. language := getLanguage(r)
  72. ptrMetadata, err := meta.GetSinkMeta(pluginName, language)
  73. if err != nil {
  74. handleError(w, err, "", logger)
  75. return
  76. }
  77. jsonResponse(ptrMetadata, w, logger)
  78. }
  79. // list functions
  80. func functionsMetaHandler(w http.ResponseWriter, r *http.Request) {
  81. defer r.Body.Close()
  82. funcs := meta.GetFunctions()
  83. jsonByteResponse(funcs, w, logger)
  84. return
  85. }
  86. // list operators
  87. func operatorsMetaHandler(w http.ResponseWriter, r *http.Request) {
  88. defer r.Body.Close()
  89. ops := meta.GetOperators()
  90. jsonByteResponse(ops, w, logger)
  91. return
  92. }
  93. // list source plugin
  94. func sourcesMetaHandler(w http.ResponseWriter, r *http.Request) {
  95. defer r.Body.Close()
  96. kind := r.URL.Query().Get("kind")
  97. switch strings.ToLower(kind) {
  98. case "lookup":
  99. kind = ast.StreamKindLookup
  100. default:
  101. kind = ast.StreamKindScan
  102. }
  103. ret := meta.GetSourcesPlugins(kind)
  104. if nil != ret {
  105. jsonResponse(ret, w, logger)
  106. return
  107. }
  108. }
  109. // list shareMeta
  110. func connectionsMetaHandler(w http.ResponseWriter, r *http.Request) {
  111. defer r.Body.Close()
  112. ret := meta.GetConnectionPlugins()
  113. if nil != ret {
  114. jsonResponse(ret, w, logger)
  115. return
  116. }
  117. }
  118. // Get source metadata when creating stream
  119. func sourceMetaHandler(w http.ResponseWriter, r *http.Request) {
  120. defer r.Body.Close()
  121. vars := mux.Vars(r)
  122. pluginName := vars["name"]
  123. language := getLanguage(r)
  124. ret, err := meta.GetSourceMeta(pluginName, language)
  125. if err != nil {
  126. handleError(w, err, "", logger)
  127. return
  128. }
  129. if nil != ret {
  130. jsonResponse(ret, w, logger)
  131. return
  132. }
  133. }
  134. // Get source metadata when creating stream
  135. func connectionMetaHandler(w http.ResponseWriter, r *http.Request) {
  136. defer r.Body.Close()
  137. vars := mux.Vars(r)
  138. pluginName := vars["name"]
  139. language := getLanguage(r)
  140. ret, err := meta.GetConnectionMeta(pluginName, language)
  141. if err != nil {
  142. handleError(w, err, "", logger)
  143. return
  144. }
  145. if nil != ret {
  146. jsonResponse(ret, w, logger)
  147. return
  148. }
  149. }
  150. // Get source yaml
  151. func sourceConfHandler(w http.ResponseWriter, r *http.Request) {
  152. defer r.Body.Close()
  153. vars := mux.Vars(r)
  154. pluginName := vars["name"]
  155. language := getLanguage(r)
  156. configOperatorKey := fmt.Sprintf(meta.SourceCfgOperatorKeyTemplate, pluginName)
  157. ret, err := meta.GetYamlConf(configOperatorKey, language)
  158. if err != nil {
  159. handleError(w, err, "", logger)
  160. return
  161. } else {
  162. w.Write(ret)
  163. }
  164. }
  165. // Get share yaml
  166. func connectionConfHandler(w http.ResponseWriter, r *http.Request) {
  167. defer r.Body.Close()
  168. vars := mux.Vars(r)
  169. pluginName := vars["name"]
  170. language := getLanguage(r)
  171. configOperatorKey := fmt.Sprintf(meta.ConnectionCfgOperatorKeyTemplate, pluginName)
  172. ret, err := meta.GetYamlConf(configOperatorKey, language)
  173. if err != nil {
  174. handleError(w, err, "", logger)
  175. return
  176. } else {
  177. _, _ = w.Write(ret)
  178. }
  179. }
  180. // Get sink yaml
  181. func sinkConfHandler(w http.ResponseWriter, r *http.Request) {
  182. defer r.Body.Close()
  183. vars := mux.Vars(r)
  184. pluginName := vars["name"]
  185. language := getLanguage(r)
  186. configOperatorKey := fmt.Sprintf(meta.SinkCfgOperatorKeyTemplate, pluginName)
  187. ret, err := meta.GetYamlConf(configOperatorKey, language)
  188. if err != nil {
  189. handleError(w, err, "", logger)
  190. return
  191. } else {
  192. _, _ = w.Write(ret)
  193. }
  194. }
  195. // Add del confkey
  196. func sourceConfKeyHandler(w http.ResponseWriter, r *http.Request) {
  197. defer r.Body.Close()
  198. var err error
  199. vars := mux.Vars(r)
  200. pluginName := vars["name"]
  201. confKey := vars["confKey"]
  202. language := getLanguage(r)
  203. switch r.Method {
  204. case http.MethodDelete:
  205. err = meta.DelSourceConfKey(pluginName, confKey, language)
  206. case http.MethodPut:
  207. v, err1 := io.ReadAll(r.Body)
  208. if err1 != nil {
  209. handleError(w, err, "Invalid body", logger)
  210. return
  211. }
  212. err = meta.AddSourceConfKey(pluginName, confKey, language, v)
  213. }
  214. if err != nil {
  215. handleError(w, err, "", logger)
  216. return
  217. }
  218. }
  219. // Add del confkey
  220. func sinkConfKeyHandler(w http.ResponseWriter, r *http.Request) {
  221. defer r.Body.Close()
  222. var err error
  223. vars := mux.Vars(r)
  224. pluginName := vars["name"]
  225. confKey := vars["confKey"]
  226. language := getLanguage(r)
  227. switch r.Method {
  228. case http.MethodDelete:
  229. err = meta.DelSinkConfKey(pluginName, confKey, language)
  230. case http.MethodPut:
  231. v, err1 := io.ReadAll(r.Body)
  232. if err1 != nil {
  233. handleError(w, err, "Invalid body", logger)
  234. return
  235. }
  236. err = meta.AddSinkConfKey(pluginName, confKey, language, v)
  237. }
  238. if err != nil {
  239. handleError(w, err, "", logger)
  240. return
  241. }
  242. }
  243. // Add del confkey
  244. func connectionConfKeyHandler(w http.ResponseWriter, r *http.Request) {
  245. defer r.Body.Close()
  246. var err error
  247. vars := mux.Vars(r)
  248. pluginName := vars["name"]
  249. confKey := vars["confKey"]
  250. language := getLanguage(r)
  251. switch r.Method {
  252. case http.MethodDelete:
  253. err = meta.DelConnectionConfKey(pluginName, confKey, language)
  254. case http.MethodPut:
  255. v, err1 := io.ReadAll(r.Body)
  256. if err1 != nil {
  257. handleError(w, err1, "Invalid body", logger)
  258. return
  259. }
  260. err = meta.AddConnectionConfKey(pluginName, confKey, language, v)
  261. }
  262. if err != nil {
  263. handleError(w, err, "", logger)
  264. return
  265. }
  266. }
  267. // get updatable resources
  268. func resourcesHandler(w http.ResponseWriter, r *http.Request) {
  269. defer r.Body.Close()
  270. language := getLanguage(r)
  271. ret, err := meta.GetResources(language)
  272. if err != nil {
  273. handleError(w, err, "", logger)
  274. return
  275. } else {
  276. _, _ = w.Write(ret)
  277. }
  278. }
  279. func getLanguage(r *http.Request) string {
  280. language := r.Header.Get("Content-Language")
  281. if 0 == len(language) {
  282. language = "en_US"
  283. } else {
  284. language = strings.ReplaceAll(language, "-", "_")
  285. }
  286. return language
  287. }
  288. func sinkConnectionHandler(w http.ResponseWriter, r *http.Request) {
  289. defer r.Body.Close()
  290. vars := mux.Vars(r)
  291. sinkNm := vars["name"]
  292. config := map[string]interface{}{}
  293. v, _ := io.ReadAll(r.Body)
  294. err := json.Unmarshal(v, &config)
  295. if err != nil {
  296. handleError(w, err, "", logger)
  297. return
  298. }
  299. err = node.SinkOpen(sinkNm, config)
  300. if err != nil {
  301. handleError(w, err, "", logger)
  302. return
  303. }
  304. w.WriteHeader(http.StatusOK)
  305. }
  306. func sourceConnectionHandler(w http.ResponseWriter, r *http.Request) {
  307. defer r.Body.Close()
  308. vars := mux.Vars(r)
  309. sourceNm := vars["name"]
  310. config := map[string]interface{}{}
  311. v, _ := io.ReadAll(r.Body)
  312. err := json.Unmarshal(v, &config)
  313. if err != nil {
  314. handleError(w, err, "", logger)
  315. return
  316. }
  317. err = node.SourceOpen(sourceNm, config)
  318. if err != nil {
  319. handleError(w, err, "", logger)
  320. return
  321. }
  322. w.WriteHeader(http.StatusOK)
  323. }