registry.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package schema
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/internal/pkg/store"
  19. "github.com/lf-edge/ekuiper/pkg/kv"
  20. "os"
  21. "path/filepath"
  22. "strings"
  23. "sync"
  24. "github.com/lf-edge/ekuiper/internal/conf"
  25. "github.com/lf-edge/ekuiper/internal/pkg/def"
  26. "github.com/lf-edge/ekuiper/internal/pkg/httpx"
  27. )
  28. // Initialize in the server startup
  29. var registry *Registry
  30. var schemaDb kv.KeyValue
  31. var schemaStatusDb kv.KeyValue
  32. type Files struct {
  33. SchemaFile string
  34. SoFile string
  35. }
  36. // Registry is a global registry for schemas
  37. // It stores the schema ids and the ref to its file content in memory
  38. // The schema definition is stored in the file system and will only be loaded once used
  39. type Registry struct {
  40. sync.RWMutex
  41. // The map of schema files for all types
  42. schemas map[def.SchemaType]map[string]*Files
  43. }
  44. // Registry provide the method to add, update, get and parse and delete schemas
  45. // InitRegistry initialize the registry, only called once by the server
  46. func InitRegistry() error {
  47. registry = &Registry{
  48. schemas: make(map[def.SchemaType]map[string]*Files, len(def.SchemaTypes)),
  49. }
  50. dataDir, err := conf.GetDataLoc()
  51. if err != nil {
  52. return fmt.Errorf("cannot find etc folder: %s", err)
  53. }
  54. err, schemaDb = store.GetKV("schema")
  55. if err != nil {
  56. return fmt.Errorf("cannot open schema db: %s", err)
  57. }
  58. err, schemaStatusDb = store.GetKV("schemaStatus")
  59. if err != nil {
  60. return fmt.Errorf("cannot open schemaStatus db: %s", err)
  61. }
  62. for _, schemaType := range def.SchemaTypes {
  63. schemaDir := filepath.Join(dataDir, "schemas", string(schemaType))
  64. var newSchemas map[string]*Files
  65. files, err := os.ReadDir(schemaDir)
  66. if err != nil {
  67. conf.Log.Warnf("cannot read schema directory: %s", err)
  68. newSchemas = make(map[string]*Files)
  69. } else {
  70. newSchemas = make(map[string]*Files, len(files))
  71. for _, file := range files {
  72. fileName := filepath.Base(file.Name())
  73. ext := filepath.Ext(fileName)
  74. schemaId := strings.TrimSuffix(fileName, filepath.Ext(fileName))
  75. ffs, ok := newSchemas[schemaId]
  76. if !ok {
  77. ffs = &Files{}
  78. newSchemas[schemaId] = ffs
  79. }
  80. switch ext {
  81. case ".so":
  82. ffs.SoFile = filepath.Join(schemaDir, file.Name())
  83. default:
  84. ffs.SchemaFile = filepath.Join(schemaDir, file.Name())
  85. }
  86. conf.Log.Infof("schema file %s.%s loaded", schemaType, schemaId)
  87. }
  88. }
  89. registry.schemas[schemaType] = newSchemas
  90. }
  91. if hasInstallFlag() {
  92. schemaInstallWhenReboot()
  93. clearInstallFlag()
  94. }
  95. return nil
  96. }
  97. func GetAllForType(schemaType def.SchemaType) ([]string, error) {
  98. registry.RLock()
  99. defer registry.RUnlock()
  100. if _, ok := registry.schemas[schemaType]; !ok {
  101. return nil, fmt.Errorf("schema type %s not found", schemaType)
  102. }
  103. result := make([]string, 0, len(registry.schemas[schemaType]))
  104. for k := range registry.schemas[schemaType] {
  105. result = append(result, k)
  106. }
  107. return result, nil
  108. }
  109. func Register(info *Info) error {
  110. if _, ok := registry.schemas[info.Type]; !ok {
  111. return fmt.Errorf("schema type %s not found", info.Type)
  112. }
  113. if _, ok := registry.schemas[info.Type][info.Name]; ok {
  114. return fmt.Errorf("schema %s.%s already registered", info.Type, info.Name)
  115. }
  116. err := CreateOrUpdateSchema(info)
  117. if err != nil {
  118. return err
  119. }
  120. storeSchemaInstallScript(info)
  121. return nil
  122. }
  123. func CreateOrUpdateSchema(info *Info) error {
  124. if _, ok := registry.schemas[info.Type]; !ok {
  125. return fmt.Errorf("schema type %s not found", info.Type)
  126. }
  127. dataDir, _ := conf.GetDataLoc()
  128. etcDir := filepath.Join(dataDir, "schemas", string(info.Type))
  129. if err := os.MkdirAll(etcDir, os.ModePerm); err != nil {
  130. return err
  131. }
  132. ffs := &Files{}
  133. if info.Content != "" || info.FilePath != "" {
  134. schemaFile := filepath.Join(etcDir, info.Name+schemaExt[info.Type])
  135. if _, err := os.Stat(schemaFile); os.IsNotExist(err) {
  136. file, err := os.Create(schemaFile)
  137. if err != nil {
  138. return err
  139. }
  140. defer file.Close()
  141. }
  142. if info.Content != "" {
  143. err := os.WriteFile(schemaFile, []byte(info.Content), 0666)
  144. if err != nil {
  145. return err
  146. }
  147. } else {
  148. err := httpx.DownloadFile(schemaFile, info.FilePath)
  149. if err != nil {
  150. return err
  151. }
  152. }
  153. ffs.SchemaFile = schemaFile
  154. }
  155. if info.SoPath != "" {
  156. soFile := filepath.Join(etcDir, info.Name+".so")
  157. err := httpx.DownloadFile(soFile, info.SoPath)
  158. if err != nil {
  159. return err
  160. }
  161. ffs.SoFile = soFile
  162. }
  163. registry.schemas[info.Type][info.Name] = ffs
  164. return nil
  165. }
  166. func GetSchema(schemaType def.SchemaType, name string) (*Info, error) {
  167. schemaFile, err := GetSchemaFile(schemaType, name)
  168. if err != nil {
  169. return nil, err
  170. }
  171. if schemaFile.SchemaFile != "" {
  172. content, err := os.ReadFile(schemaFile.SchemaFile)
  173. if err != nil {
  174. return nil, fmt.Errorf("cannot read schema file %s: %s", schemaFile, err)
  175. }
  176. return &Info{
  177. Type: schemaType,
  178. Name: name,
  179. Content: string(content),
  180. FilePath: schemaFile.SchemaFile,
  181. SoPath: schemaFile.SoFile,
  182. }, nil
  183. } else {
  184. return &Info{
  185. Type: schemaType,
  186. Name: name,
  187. SoPath: schemaFile.SoFile,
  188. }, nil
  189. }
  190. }
  191. func GetSchemaFile(schemaType def.SchemaType, name string) (*Files, error) {
  192. registry.RLock()
  193. defer registry.RUnlock()
  194. if _, ok := registry.schemas[schemaType]; !ok {
  195. return nil, fmt.Errorf("schema type %s not found in registry", schemaType)
  196. }
  197. if _, ok := registry.schemas[schemaType][name]; !ok {
  198. return nil, fmt.Errorf("schema type %s, file %s not found", schemaType, name)
  199. }
  200. schemaFile := registry.schemas[schemaType][name]
  201. return schemaFile, nil
  202. }
  203. func DeleteSchema(schemaType def.SchemaType, name string) error {
  204. registry.Lock()
  205. defer registry.Unlock()
  206. if _, ok := registry.schemas[schemaType]; !ok {
  207. return fmt.Errorf("schema type %s not found", schemaType)
  208. }
  209. if _, ok := registry.schemas[schemaType][name]; !ok {
  210. return fmt.Errorf("schema %s.%s not found", schemaType, name)
  211. }
  212. schemaFile := registry.schemas[schemaType][name]
  213. if schemaFile.SchemaFile != "" {
  214. err := os.Remove(schemaFile.SchemaFile)
  215. if err != nil {
  216. conf.Log.Errorf("cannot delete schema file %s: %s", schemaFile.SchemaFile, err)
  217. }
  218. }
  219. if schemaFile.SoFile != "" {
  220. err := os.Remove(schemaFile.SoFile)
  221. if err != nil {
  222. conf.Log.Errorf("cannot delete schema so file %s: %s", schemaFile.SoFile, err)
  223. }
  224. }
  225. delete(registry.schemas[schemaType], name)
  226. removeSchemaInstallScript(schemaType, name)
  227. return nil
  228. }
  229. const BOOT_INSTALL = "$boot_install"
  230. func GetAllSchema() map[string]string {
  231. all, err := schemaDb.All()
  232. if err != nil {
  233. return nil
  234. }
  235. delete(all, BOOT_INSTALL)
  236. return all
  237. }
  238. func GetAllSchemaStatus() map[string]string {
  239. all, err := schemaStatusDb.All()
  240. if err != nil {
  241. return nil
  242. }
  243. return all
  244. }
  245. func UninstallAllSchema() {
  246. schemaMaps, err := schemaDb.All()
  247. if err != nil {
  248. return
  249. }
  250. for key, value := range schemaMaps {
  251. info := &Info{}
  252. _ = json.Unmarshal([]byte(value), info)
  253. _ = DeleteSchema(info.Type, key)
  254. }
  255. }
  256. func hasInstallFlag() bool {
  257. var val = ""
  258. found, _ := schemaDb.Get(BOOT_INSTALL, &val)
  259. return found
  260. }
  261. func clearInstallFlag() {
  262. _ = schemaDb.Delete(BOOT_INSTALL)
  263. }
  264. func ImportSchema(schema map[string]string) error {
  265. if len(schema) == 0 {
  266. return nil
  267. }
  268. for k, v := range schema {
  269. err := schemaDb.Set(k, v)
  270. if err != nil {
  271. return err
  272. }
  273. }
  274. //set the flag to install the plugins when eKuiper reboot
  275. return schemaDb.Set(BOOT_INSTALL, BOOT_INSTALL)
  276. }
  277. func schemaInstallWhenReboot() {
  278. allPlgs, err := schemaDb.All()
  279. if err != nil {
  280. return
  281. }
  282. delete(allPlgs, BOOT_INSTALL)
  283. _ = schemaStatusDb.Clean()
  284. for k, v := range allPlgs {
  285. info := &Info{}
  286. err := json.Unmarshal([]byte(v), info)
  287. if err != nil {
  288. _ = schemaStatusDb.Set(k, err.Error())
  289. continue
  290. }
  291. err = CreateOrUpdateSchema(info)
  292. if err != nil {
  293. _ = schemaStatusDb.Set(k, err.Error())
  294. continue
  295. }
  296. }
  297. }
  298. func storeSchemaInstallScript(info *Info) {
  299. key := string(info.Type) + "_" + info.Name
  300. val := info.InstallScript()
  301. _ = schemaDb.Set(key, val)
  302. }
  303. func removeSchemaInstallScript(schemaType def.SchemaType, name string) {
  304. key := string(schemaType) + "_" + name
  305. _ = schemaDb.Delete(key)
  306. }
  307. func GetSchemaInstallScript(schemaId string) (string, string) {
  308. key := strings.ReplaceAll(schemaId, ".", "_")
  309. var script string
  310. schemaDb.Get(key, &script)
  311. return key, script
  312. }