123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409 |
- package plugins
- import (
- "encoding/json"
- "fmt"
- "github.com/emqx/kuiper/common"
- "io/ioutil"
- "path"
- "reflect"
- "strings"
- )
- type (
- fileSource struct {
- About *fileAbout `json:"about"`
- Libs []string `json:"libs"`
- ConfKeys map[string][]*fileField `json:"properties"`
- }
- uiSource struct {
- About *about `json:"about"`
- Libs []string `json:"libs"`
- ConfKeys map[string][]field `json:"properties"`
- }
- sourceProperty struct {
- cf map[string]map[string]interface{}
- meta *uiSource
- }
- )
- func isInternalSource(fiName string) bool {
- internal := []string{`edgex.json`, `httppull.json`, `mqtt.json`}
- for _, v := range internal {
- if v == fiName {
- return true
- }
- }
- return false
- }
- func newUiSource(fi *fileSource) (*uiSource, error) {
- if nil == fi {
- return nil, nil
- }
- var err error
- ui := new(uiSource)
- ui.Libs = fi.Libs
- ui.About = newAbout(fi.About)
- ui.ConfKeys = make(map[string][]field)
- for k, fields := range fi.ConfKeys {
- if ui.ConfKeys[k], err = newField(fields); nil != err {
- return nil, err
- }
- }
- return ui, nil
- }
- var g_sourceProperty map[string]*sourceProperty
- func (m *Manager) uninstalSource(name string) {
- if v, ok := g_sourceProperty[name+".json"]; ok {
- if ui := v.meta; nil != ui {
- if nil != ui.About {
- ui.About.Installed = false
- }
- }
- }
- }
- func (m *Manager) readSourceMetaFile(filePath string) error {
- fileName := path.Base(filePath)
- if "mqtt_source.json" == fileName {
- fileName = "mqtt.json"
- }
- ptrMeta := new(fileSource)
- err := common.ReadJsonUnmarshal(filePath, ptrMeta)
- if nil != err || 0 == len(ptrMeta.ConfKeys) {
- return fmt.Errorf("file:%s err:%v", filePath, err)
- }
- if 0 == len(ptrMeta.ConfKeys["default"]) {
- return fmt.Errorf("not found default confKey %s", filePath)
- }
- if nil == ptrMeta.About {
- return fmt.Errorf("not found about of %s", filePath)
- } else if isInternalSource(fileName) {
- ptrMeta.About.Installed = true
- } else {
- _, ptrMeta.About.Installed = m.registry.Get(SOURCE, strings.TrimSuffix(fileName, `.json`))
- }
- yamlData := make(map[string]map[string]interface{})
- filePath = strings.TrimSuffix(filePath, `.json`) + `.yaml`
- err = common.ReadYamlUnmarshal(filePath, &yamlData)
- if nil != err {
- return fmt.Errorf("file:%s err:%v", filePath, err)
- }
- if 0 == len(yamlData["default"]) {
- return fmt.Errorf("not found default confKey from %s", filePath)
- }
- property := new(sourceProperty)
- property.cf = yamlData
- property.meta, err = newUiSource(ptrMeta)
- if nil != err {
- return err
- }
- g_sourceProperty[fileName] = property
- return err
- }
- func (m *Manager) readSourceMetaDir() error {
- g_sourceProperty = make(map[string]*sourceProperty)
- confDir, err := common.GetConfLoc()
- if nil != err {
- return err
- }
- dir := path.Join(confDir, "sources")
- infos, err := ioutil.ReadDir(dir)
- if nil != err {
- return err
- }
- if err = m.readSourceMetaFile(path.Join(confDir, "mqtt_source.json")); nil != err {
- return err
- }
- for _, info := range infos {
- fileName := info.Name()
- if strings.HasSuffix(fileName, ".json") {
- filePath := path.Join(dir, fileName)
- if err = m.readSourceMetaFile(filePath); nil != err {
- return err
- }
- common.Log.Infof("sourceMeta file : %s", fileName)
- }
- }
- return nil
- }
- func GetSourceConf(pluginName, language string) (b []byte, err error) {
- property, ok := g_sourceProperty[pluginName+".json"]
- if ok {
- cf := make(map[string]map[string]interface{})
- for key, kvs := range property.cf {
- aux := make(map[interface{}]interface{})
- for k, v := range kvs {
- aux[k] = v
- }
- cf[key] = common.ConvertMap(aux)
- }
- if b, err = json.Marshal(cf); nil != err {
- return nil, fmt.Errorf(`%s%v`, getMsg(language, source, "json_marshal_fail"), cf)
- } else {
- return b, err
- }
- }
- return nil, fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), pluginName)
- }
- func GetSourceMeta(pluginName, language string) (ptrSourceProperty *uiSource, err error) {
- property, ok := g_sourceProperty[pluginName+".json"]
- if ok {
- return property.cfToMeta(language)
- }
- return nil, fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), pluginName)
- }
- func GetSources() (sources []*pluginfo) {
- for fileName, v := range g_sourceProperty {
- node := new(pluginfo)
- node.Name = strings.TrimSuffix(fileName, `.json`)
- if nil == v.meta {
- continue
- }
- if nil == v.meta.About {
- continue
- }
- node.About = v.meta.About
- i := 0
- for ; i < len(sources); i++ {
- if node.Name <= sources[i].Name {
- sources = append(sources, node)
- copy(sources[i+1:], sources[i:])
- sources[i] = node
- break
- }
- }
- if len(sources) == i {
- sources = append(sources, node)
- }
- }
- return sources
- }
- func GetSourceConfKeys(pluginName string) (keys []string) {
- property := g_sourceProperty[pluginName+".json"]
- if nil == property {
- return keys
- }
- for k := range property.cf {
- keys = append(keys, k)
- }
- return keys
- }
- func DelSourceConfKey(pluginName, confKey, language string) error {
- property := g_sourceProperty[pluginName+".json"]
- if nil == property {
- return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), pluginName)
- }
- if nil == property.cf {
- return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_confkey"), confKey)
- }
- delete(property.cf, confKey)
- return property.saveCf(pluginName, language)
- }
- func AddSourceConfKey(pluginName, confKey, language string, content []byte) error {
- reqField := make(map[string]interface{})
- err := json.Unmarshal(content, &reqField)
- if nil != err {
- return err
- }
- property := g_sourceProperty[pluginName+".json"]
- if nil == property {
- return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), pluginName)
- }
- if nil == property.cf {
- property.cf = make(map[string]map[string]interface{})
- }
- if 0 != len(property.cf[confKey]) {
- return fmt.Errorf(`%s%s`, getMsg(language, source, "confkey_already_exist"), pluginName)
- }
- property.cf[confKey] = reqField
- g_sourceProperty[pluginName+".json"] = property
- return property.saveCf(pluginName, language)
- }
- func AddSourceConfKeyField(pluginName, confKey, language string, content []byte) error {
- reqField := make(map[string]interface{})
- err := json.Unmarshal(content, &reqField)
- if nil != err {
- return err
- }
- property := g_sourceProperty[pluginName+".json"]
- if nil == property {
- return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), pluginName)
- }
- if nil == property.cf {
- return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_confkey"), confKey)
- }
- if nil == property.cf[confKey] {
- return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_confkey"), confKey)
- }
- for k, v := range reqField {
- property.cf[confKey][k] = v
- }
- return property.saveCf(pluginName, language)
- }
- func recursionDelMap(cf, fields map[string]interface{}, language string) error {
- for k, v := range fields {
- if nil == v {
- delete(cf, k)
- continue
- }
- if delKey, ok := v.(string); ok {
- if 0 == len(delKey) {
- delete(cf, k)
- continue
- }
- var auxCf map[string]interface{}
- if err := common.MapToStruct(cf[k], &auxCf); nil != err {
- return fmt.Errorf(`%s%s.%s`, getMsg(language, source, "type_conversion_fail"), k, delKey)
- }
- cf[k] = auxCf
- delete(auxCf, delKey)
- continue
- }
- if reflect.Map == reflect.TypeOf(v).Kind() {
- var auxCf, auxFields map[string]interface{}
- if err := common.MapToStruct(cf[k], &auxCf); nil != err {
- return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "type_conversion_fail"), k, v)
- }
- cf[k] = auxCf
- if err := common.MapToStruct(v, &auxFields); nil != err {
- return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "type_conversion_fail"), k, v)
- }
- if err := recursionDelMap(auxCf, auxFields, language); nil != err {
- return err
- }
- }
- }
- return nil
- }
- func DelSourceConfKeyField(pluginName, confKey, language string, content []byte) error {
- reqField := make(map[string]interface{})
- err := json.Unmarshal(content, &reqField)
- if nil != err {
- return err
- }
- property := g_sourceProperty[pluginName+".json"]
- if nil == property {
- return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_plugin"), pluginName)
- }
- if nil == property.cf {
- return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_confkey"), confKey)
- }
- if nil == property.cf[confKey] {
- return fmt.Errorf(`%s%s`, getMsg(language, source, "not_found_confkey"), confKey)
- }
- err = recursionDelMap(property.cf[confKey], reqField, language)
- if nil != err {
- return err
- }
- return property.saveCf(pluginName, language)
- }
- func recursionNewFields(template []field, conf map[string]interface{}, ret *[]field) error {
- for i := 0; i < len(template); i++ {
- p := template[i]
- v, ok := conf[template[i].Name]
- if ok {
- p.Exist = true
- } else {
- p.Exist = false
- continue
- }
- var auxRet, auxTemplate []field
- p.Default = &auxRet
- if nil == v {
- p.Default = v
- } else {
- if reflect.Map == reflect.TypeOf(v).Kind() {
- var nextCf map[string]interface{}
- if tmp, ok := v.(map[interface{}]interface{}); ok {
- nextCf = common.ConvertMap(tmp)
- } else {
- if err := common.MapToStruct(v, &nextCf); nil != err {
- return err
- }
- }
- if err := common.MapToStruct(template[i].Default, &auxTemplate); nil != err {
- return err
- }
- if err := recursionNewFields(auxTemplate, nextCf, &auxRet); nil != err {
- return err
- }
- } else {
- p.Default = v
- }
- }
- *ret = append(*ret, p)
- }
- return nil
- }
- func (this *sourceProperty) cfToMeta(language string) (*uiSource, error) {
- fields := this.meta.ConfKeys["default"]
- ret := make(map[string][]field)
- for k, kvs := range this.cf {
- var sli []field
- err := recursionNewFields(fields, kvs, &sli)
- if nil != err {
- return nil, fmt.Errorf(`%s%v`, getMsg(language, "source", "type_conversion_fail"), err)
- }
- ret[k] = sli
- }
- meta := new(uiSource)
- *meta = *(this.meta)
- meta.ConfKeys = ret
- return meta, nil
- }
- func (this *sourceProperty) saveCf(pluginName, language string) error {
- confDir, err := common.GetConfLoc()
- if nil != err {
- return fmt.Errorf(`%s%v`, getMsg(language, source, "not_found_file"), err)
- }
- dir := path.Join(confDir, "sources")
- if "mqtt" == pluginName {
- pluginName = "mqtt_source"
- dir = confDir
- }
- filePath := path.Join(dir, pluginName+".yaml")
- err = common.WriteYamlMarshal(filePath, this.cf)
- if nil != err {
- return fmt.Errorf(`%s%v`, getMsg(language, "source", "write_data_fail"), err)
- }
- return nil
- }
|