edgex.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build edgex
  15. // +build edgex
  16. package edgex
  17. import (
  18. "fmt"
  19. "github.com/edgexfoundry/go-mod-messaging/v2/messaging"
  20. "github.com/edgexfoundry/go-mod-messaging/v2/pkg/types"
  21. "github.com/lf-edge/ekuiper/internal/conf"
  22. "github.com/lf-edge/ekuiper/pkg/cast"
  23. "strings"
  24. )
  25. type EdgexClient struct {
  26. mbconf types.MessageBusConfig
  27. client messaging.MessageClient
  28. }
  29. type EdgexConf struct {
  30. Protocol string `json:"protocol"`
  31. Server string `json:"server"`
  32. Host string `json:"host"`
  33. Port int `json:"port"`
  34. Type string `json:"type"`
  35. Optional map[string]string `json:"optional"`
  36. }
  37. // Modify the copied conf to print no password.
  38. func printConf(mbconf types.MessageBusConfig) {
  39. var printableOptional = make(map[string]string)
  40. for k, v := range mbconf.Optional {
  41. if strings.EqualFold(k, "password") {
  42. printableOptional[k] = "*"
  43. } else {
  44. printableOptional[k] = v
  45. }
  46. }
  47. mbconf.Optional = printableOptional
  48. conf.Log.Infof("Use configuration for edgex messagebus %v", mbconf)
  49. }
  50. func (es *EdgexClient) CfgValidate(props map[string]interface{}) error {
  51. edgeAddr := "localhost"
  52. c := &EdgexConf{
  53. Protocol: "redis",
  54. Port: 6379,
  55. Type: messaging.Redis,
  56. Optional: nil,
  57. }
  58. if o, ok := props["optional"]; ok {
  59. switch ot := o.(type) {
  60. case map[string]string:
  61. c.Optional = ot
  62. case map[string]interface{}:
  63. c.Optional = make(map[string]string)
  64. for k, v := range ot {
  65. c.Optional[k] = fmt.Sprintf("%v", v)
  66. }
  67. default:
  68. return fmt.Errorf("invalid optional config %v, must be a map", o)
  69. }
  70. delete(props, "optional")
  71. }
  72. err := cast.MapToStruct(props, c)
  73. if err != nil {
  74. return fmt.Errorf("map config map to struct fail with error: %v", err)
  75. }
  76. if c.Host != "" {
  77. edgeAddr = c.Host
  78. } else if c.Server != "" {
  79. edgeAddr = c.Server
  80. }
  81. if c.Type != messaging.ZeroMQ && c.Type != messaging.MQTT &&
  82. c.Type != messaging.Redis && c.Type != messaging.NatsCore &&
  83. c.Type != messaging.NatsJetStream {
  84. return fmt.Errorf("specified wrong type value %s", c.Type)
  85. }
  86. if c.Port < 0 {
  87. return fmt.Errorf("specified wrong port value, expect positive integer but got %d", c.Port)
  88. }
  89. mbconf := types.MessageBusConfig{
  90. SubscribeHost: types.HostInfo{
  91. Host: edgeAddr,
  92. Port: c.Port,
  93. Protocol: c.Protocol,
  94. },
  95. PublishHost: types.HostInfo{
  96. Host: edgeAddr,
  97. Port: c.Port,
  98. Protocol: c.Protocol,
  99. },
  100. Type: c.Type}
  101. mbconf.Optional = c.Optional
  102. es.mbconf = mbconf
  103. printConf(mbconf)
  104. return nil
  105. }
  106. func (es *EdgexClient) Connect() error {
  107. client, err := messaging.NewMessageClient(es.mbconf)
  108. if err != nil {
  109. return err
  110. }
  111. if err := client.Connect(); err != nil {
  112. conf.Log.Errorf("The connection to edgex messagebus failed.")
  113. return fmt.Errorf("Failed to connect to edgex message bus: " + err.Error())
  114. }
  115. es.client = client
  116. return nil
  117. }
  118. func (es *EdgexClient) Publish(env types.MessageEnvelope, topic string) error {
  119. if err := es.client.Publish(env, topic); err != nil {
  120. conf.Log.Errorf("Publish to topic %s has error : %s.", topic, err.Error())
  121. return fmt.Errorf("Failed to publish to edgex message bus: " + err.Error())
  122. }
  123. return nil
  124. }
  125. func (es *EdgexClient) Subscribe(msg chan types.MessageEnvelope, topic string, err chan error) error {
  126. topics := []types.TopicChannel{{Topic: topic, Messages: msg}}
  127. if err := es.client.Subscribe(topics, err); err != nil {
  128. conf.Log.Errorf("Failed to subscribe to edgex messagebus with topic %s has error : %s.", topic, err.Error())
  129. return err
  130. }
  131. return nil
  132. }
  133. func (es *EdgexClient) GetClient() (interface{}, error) {
  134. client, err := messaging.NewMessageClient(es.mbconf)
  135. if err != nil {
  136. return nil, err
  137. }
  138. if err := client.Connect(); err != nil {
  139. conf.Log.Errorf("The connection to edgex messagebus failed.")
  140. return nil, fmt.Errorf("Failed to connect to edgex message bus: " + err.Error())
  141. }
  142. conf.Log.Infof("The connection to edgex messagebus is established successfully.")
  143. es.client = client
  144. return client, nil
  145. }
  146. func (es *EdgexClient) Disconnect() error {
  147. conf.Log.Infof("Closing the connection to edgex messagebus.")
  148. if e := es.client.Disconnect(); e != nil {
  149. return e
  150. }
  151. return nil
  152. }