edgex.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build edgex
  15. // +build edgex
  16. package edgex
  17. import (
  18. "fmt"
  19. "strings"
  20. "github.com/edgexfoundry/go-mod-messaging/v3/messaging"
  21. "github.com/edgexfoundry/go-mod-messaging/v3/pkg/types"
  22. "github.com/lf-edge/ekuiper/internal/conf"
  23. "github.com/lf-edge/ekuiper/pkg/cast"
  24. )
  25. type EdgexClient struct {
  26. mbconf types.MessageBusConfig
  27. client messaging.MessageClient
  28. }
  29. type EdgexConf struct {
  30. Protocol string `json:"protocol"`
  31. Server string `json:"server"`
  32. Host string `json:"host"`
  33. Port int `json:"port"`
  34. Type string `json:"type"`
  35. Optional map[string]string `json:"optional"`
  36. }
  37. // Modify the copied conf to print no password.
  38. func printConf(mbconf types.MessageBusConfig) {
  39. printableOptional := make(map[string]string)
  40. for k, v := range mbconf.Optional {
  41. if strings.EqualFold(k, "password") {
  42. printableOptional[k] = "*"
  43. } else {
  44. printableOptional[k] = v
  45. }
  46. }
  47. mbconf.Optional = printableOptional
  48. conf.Log.Infof("Use configuration for edgex messagebus %v", mbconf)
  49. }
  50. func (es *EdgexClient) CfgValidate(props map[string]interface{}) error {
  51. edgeAddr := "localhost"
  52. c := &EdgexConf{
  53. Protocol: "redis",
  54. Port: 6379,
  55. Type: messaging.Redis,
  56. Optional: nil,
  57. }
  58. if o, ok := props["optional"]; ok {
  59. switch ot := o.(type) {
  60. case map[string]string:
  61. c.Optional = ot
  62. case map[string]interface{}:
  63. c.Optional = make(map[string]string)
  64. for k, v := range ot {
  65. c.Optional[k] = fmt.Sprintf("%v", v)
  66. }
  67. default:
  68. return fmt.Errorf("invalid optional config %v, must be a map", o)
  69. }
  70. delete(props, "optional")
  71. }
  72. err := cast.MapToStruct(props, c)
  73. if err != nil {
  74. return fmt.Errorf("map config map to struct fail with error: %v", err)
  75. }
  76. if c.Host != "" {
  77. edgeAddr = c.Host
  78. } else if c.Server != "" {
  79. edgeAddr = c.Server
  80. }
  81. if c.Type != messaging.MQTT && c.Type != messaging.Redis &&
  82. c.Type != messaging.NatsCore && c.Type != messaging.NatsJetStream {
  83. return fmt.Errorf("specified wrong type value %s", c.Type)
  84. }
  85. if c.Port < 0 {
  86. return fmt.Errorf("specified wrong port value, expect positive integer but got %d", c.Port)
  87. }
  88. mbconf := types.MessageBusConfig{
  89. Broker: types.HostInfo{
  90. Host: edgeAddr,
  91. Port: c.Port,
  92. Protocol: c.Protocol,
  93. },
  94. Type: c.Type,
  95. }
  96. mbconf.Optional = c.Optional
  97. es.mbconf = mbconf
  98. printConf(mbconf)
  99. return nil
  100. }
  101. func (es *EdgexClient) Connect() error {
  102. client, err := messaging.NewMessageClient(es.mbconf)
  103. if err != nil {
  104. return err
  105. }
  106. if err := client.Connect(); err != nil {
  107. conf.Log.Errorf("The connection to edgex messagebus failed.")
  108. return fmt.Errorf("Failed to connect to edgex message bus: " + err.Error())
  109. }
  110. es.client = client
  111. return nil
  112. }
  113. func (es *EdgexClient) Publish(env types.MessageEnvelope, topic string) error {
  114. if err := es.client.Publish(env, topic); err != nil {
  115. conf.Log.Errorf("Publish to topic %s has error : %s.", topic, err.Error())
  116. return fmt.Errorf("Failed to publish to edgex message bus: " + err.Error())
  117. }
  118. return nil
  119. }
  120. func (es *EdgexClient) Subscribe(msg chan types.MessageEnvelope, topic string, err chan error) error {
  121. topics := []types.TopicChannel{{Topic: topic, Messages: msg}}
  122. if err := es.client.Subscribe(topics, err); err != nil {
  123. conf.Log.Errorf("Failed to subscribe to edgex messagebus with topic %s has error : %s.", topic, err.Error())
  124. return err
  125. }
  126. return nil
  127. }
  128. func (es *EdgexClient) GetClient() (interface{}, error) {
  129. client, err := messaging.NewMessageClient(es.mbconf)
  130. if err != nil {
  131. return nil, err
  132. }
  133. if err := client.Connect(); err != nil {
  134. conf.Log.Errorf("The connection to edgex messagebus failed.")
  135. return nil, fmt.Errorf("Failed to connect to edgex message bus: " + err.Error())
  136. }
  137. conf.Log.Infof("The connection to edgex messagebus is established successfully.")
  138. es.client = client
  139. return client, nil
  140. }
  141. func (es *EdgexClient) Disconnect() error {
  142. conf.Log.Infof("Closing the connection to edgex messagebus.")
  143. if e := es.client.Disconnect(); e != nil {
  144. return e
  145. }
  146. return nil
  147. }