connect_selector.go 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. package connection
  2. import (
  3. "fmt"
  4. "github.com/lf-edge/ekuiper/internal/conf"
  5. "strings"
  6. )
  7. var SUPPORTE_CONTYPE = []string{"mqtt", "edgex"}
  8. type ConSelector struct {
  9. ConnSelectorCfg string
  10. Type string // mqtt edgex
  11. CfgKey string // config key
  12. SupportedType []string
  13. }
  14. func (c *ConSelector) Init() error {
  15. c.SupportedType = SUPPORTE_CONTYPE
  16. conTypeSel := strings.SplitN(c.ConnSelectorCfg, ".", 2)
  17. if len(conTypeSel) != 2 {
  18. return fmt.Errorf("not a valid connection selector : %s", c.ConnSelectorCfg)
  19. }
  20. c.Type = conTypeSel[0]
  21. c.CfgKey = conTypeSel[1]
  22. return nil
  23. }
  24. func (c *ConSelector) ReadCfgFromYaml() (props map[string]interface{}, err error) {
  25. var (
  26. found = false
  27. )
  28. cfg := make(map[string]interface{})
  29. err = conf.LoadConfigByName("connection.yaml", &cfg)
  30. if err != nil {
  31. return nil, err
  32. }
  33. if cons, ok := cfg[c.Type]; ok {
  34. if connItems, ok1 := cons.(map[string]interface{}); ok1 {
  35. if conItem, ok := connItems[c.CfgKey]; ok {
  36. if item, ok1 := conItem.(map[string]interface{}); ok1 {
  37. props = item
  38. found = true
  39. }
  40. }
  41. }
  42. }
  43. if !found {
  44. return nil, fmt.Errorf("not found connection Type and Selector: %s.%s", c.Type, c.CfgKey)
  45. }
  46. return
  47. }