connect_selector.go 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. package connection
  2. import (
  3. "fmt"
  4. "github.com/lf-edge/ekuiper/internal/conf"
  5. "strings"
  6. )
  7. var SUPPORTE_CONTYPE = []string{"mqtt", "edgex"}
  8. const CONNECTION_CONF = "connections/connection.yaml"
  9. type ConSelector struct {
  10. ConnSelectorCfg string
  11. Type string // mqtt edgex
  12. CfgKey string // config key
  13. SupportedType []string
  14. }
  15. func (c *ConSelector) Init() error {
  16. c.SupportedType = SUPPORTE_CONTYPE
  17. conTypeSel := strings.SplitN(c.ConnSelectorCfg, ".", 2)
  18. if len(conTypeSel) != 2 {
  19. return fmt.Errorf("not a valid connection selector : %s", c.ConnSelectorCfg)
  20. }
  21. c.Type = strings.ToLower(conTypeSel[0])
  22. c.CfgKey = strings.ToLower(conTypeSel[1])
  23. return nil
  24. }
  25. func (c *ConSelector) ReadCfgFromYaml() (props map[string]interface{}, err error) {
  26. var (
  27. found = false
  28. )
  29. cfg := make(map[string]interface{})
  30. err = conf.LoadConfigByName(CONNECTION_CONF, &cfg)
  31. if err != nil {
  32. return nil, err
  33. }
  34. if cons, ok := cfg[c.Type]; ok {
  35. if connItems, ok1 := cons.(map[string]interface{}); ok1 {
  36. if conItem, ok := connItems[c.CfgKey]; ok {
  37. if item, ok1 := conItem.(map[string]interface{}); ok1 {
  38. props = item
  39. found = true
  40. }
  41. }
  42. }
  43. }
  44. if !found {
  45. return nil, fmt.Errorf("not found connection Type and Selector: %s.%s", c.Type, c.CfgKey)
  46. }
  47. return
  48. }