edgex.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build edgex
  15. // +build edgex
  16. package edgex
  17. import (
  18. "fmt"
  19. "github.com/edgexfoundry/go-mod-messaging/v3/messaging"
  20. "github.com/edgexfoundry/go-mod-messaging/v3/pkg/types"
  21. "github.com/lf-edge/ekuiper/internal/conf"
  22. "github.com/lf-edge/ekuiper/pkg/cast"
  23. "strings"
  24. )
  25. type EdgexClient struct {
  26. mbconf types.MessageBusConfig
  27. client messaging.MessageClient
  28. }
  29. type EdgexConf struct {
  30. Protocol string `json:"protocol"`
  31. Server string `json:"server"`
  32. Host string `json:"host"`
  33. Port int `json:"port"`
  34. Type string `json:"type"`
  35. Optional map[string]string `json:"optional"`
  36. }
  37. // Modify the copied conf to print no password.
  38. func printConf(mbconf types.MessageBusConfig) {
  39. var printableOptional = make(map[string]string)
  40. for k, v := range mbconf.Optional {
  41. if strings.EqualFold(k, "password") {
  42. printableOptional[k] = "*"
  43. } else {
  44. printableOptional[k] = v
  45. }
  46. }
  47. mbconf.Optional = printableOptional
  48. conf.Log.Infof("Use configuration for edgex messagebus %v", mbconf)
  49. }
  50. func (es *EdgexClient) CfgValidate(props map[string]interface{}) error {
  51. edgeAddr := "localhost"
  52. c := &EdgexConf{
  53. Protocol: "redis",
  54. Port: 6379,
  55. Type: messaging.Redis,
  56. Optional: nil,
  57. }
  58. if o, ok := props["optional"]; ok {
  59. switch ot := o.(type) {
  60. case map[string]string:
  61. c.Optional = ot
  62. case map[string]interface{}:
  63. c.Optional = make(map[string]string)
  64. for k, v := range ot {
  65. c.Optional[k] = fmt.Sprintf("%v", v)
  66. }
  67. default:
  68. return fmt.Errorf("invalid optional config %v, must be a map", o)
  69. }
  70. delete(props, "optional")
  71. }
  72. err := cast.MapToStruct(props, c)
  73. if err != nil {
  74. return fmt.Errorf("map config map to struct fail with error: %v", err)
  75. }
  76. if c.Host != "" {
  77. edgeAddr = c.Host
  78. } else if c.Server != "" {
  79. edgeAddr = c.Server
  80. }
  81. if c.Type != messaging.MQTT && c.Type != messaging.Redis &&
  82. c.Type != messaging.NatsCore && c.Type != messaging.NatsJetStream {
  83. return fmt.Errorf("specified wrong type value %s", c.Type)
  84. }
  85. if c.Port < 0 {
  86. return fmt.Errorf("specified wrong port value, expect positive integer but got %d", c.Port)
  87. }
  88. mbconf := types.MessageBusConfig{
  89. Broker: types.HostInfo{
  90. Host: edgeAddr,
  91. Port: c.Port,
  92. Protocol: c.Protocol,
  93. },
  94. Type: c.Type}
  95. mbconf.Optional = c.Optional
  96. es.mbconf = mbconf
  97. printConf(mbconf)
  98. return nil
  99. }
  100. func (es *EdgexClient) Connect() error {
  101. client, err := messaging.NewMessageClient(es.mbconf)
  102. if err != nil {
  103. return err
  104. }
  105. if err := client.Connect(); err != nil {
  106. conf.Log.Errorf("The connection to edgex messagebus failed.")
  107. return fmt.Errorf("Failed to connect to edgex message bus: " + err.Error())
  108. }
  109. es.client = client
  110. return nil
  111. }
  112. func (es *EdgexClient) Publish(env types.MessageEnvelope, topic string) error {
  113. if err := es.client.Publish(env, topic); err != nil {
  114. conf.Log.Errorf("Publish to topic %s has error : %s.", topic, err.Error())
  115. return fmt.Errorf("Failed to publish to edgex message bus: " + err.Error())
  116. }
  117. return nil
  118. }
  119. func (es *EdgexClient) Subscribe(msg chan types.MessageEnvelope, topic string, err chan error) error {
  120. topics := []types.TopicChannel{{Topic: topic, Messages: msg}}
  121. if err := es.client.Subscribe(topics, err); err != nil {
  122. conf.Log.Errorf("Failed to subscribe to edgex messagebus with topic %s has error : %s.", topic, err.Error())
  123. return err
  124. }
  125. return nil
  126. }
  127. func (es *EdgexClient) GetClient() (interface{}, error) {
  128. client, err := messaging.NewMessageClient(es.mbconf)
  129. if err != nil {
  130. return nil, err
  131. }
  132. if err := client.Connect(); err != nil {
  133. conf.Log.Errorf("The connection to edgex messagebus failed.")
  134. return nil, fmt.Errorf("Failed to connect to edgex message bus: " + err.Error())
  135. }
  136. conf.Log.Infof("The connection to edgex messagebus is established successfully.")
  137. es.client = client
  138. return client, nil
  139. }
  140. func (es *EdgexClient) Disconnect() error {
  141. conf.Log.Infof("Closing the connection to edgex messagebus.")
  142. if e := es.client.Disconnect(); e != nil {
  143. return e
  144. }
  145. return nil
  146. }