123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- // Copyright 2022-2023 EMQ Technologies Co., Ltd.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- //go:build edgex
- // +build edgex
- package edgex
- import (
- "fmt"
- "github.com/edgexfoundry/go-mod-messaging/v3/messaging"
- "github.com/edgexfoundry/go-mod-messaging/v3/pkg/types"
- "github.com/lf-edge/ekuiper/internal/conf"
- "github.com/lf-edge/ekuiper/pkg/cast"
- "strings"
- )
- type EdgexClient struct {
- mbconf types.MessageBusConfig
- client messaging.MessageClient
- }
- type EdgexConf struct {
- Protocol string `json:"protocol"`
- Server string `json:"server"`
- Host string `json:"host"`
- Port int `json:"port"`
- Type string `json:"type"`
- Optional map[string]string `json:"optional"`
- }
- // Modify the copied conf to print no password.
- func printConf(mbconf types.MessageBusConfig) {
- var printableOptional = make(map[string]string)
- for k, v := range mbconf.Optional {
- if strings.EqualFold(k, "password") {
- printableOptional[k] = "*"
- } else {
- printableOptional[k] = v
- }
- }
- mbconf.Optional = printableOptional
- conf.Log.Infof("Use configuration for edgex messagebus %v", mbconf)
- }
- func (es *EdgexClient) CfgValidate(props map[string]interface{}) error {
- edgeAddr := "localhost"
- c := &EdgexConf{
- Protocol: "redis",
- Port: 6379,
- Type: messaging.Redis,
- Optional: nil,
- }
- if o, ok := props["optional"]; ok {
- switch ot := o.(type) {
- case map[string]string:
- c.Optional = ot
- case map[string]interface{}:
- c.Optional = make(map[string]string)
- for k, v := range ot {
- c.Optional[k] = fmt.Sprintf("%v", v)
- }
- default:
- return fmt.Errorf("invalid optional config %v, must be a map", o)
- }
- delete(props, "optional")
- }
- err := cast.MapToStruct(props, c)
- if err != nil {
- return fmt.Errorf("map config map to struct fail with error: %v", err)
- }
- if c.Host != "" {
- edgeAddr = c.Host
- } else if c.Server != "" {
- edgeAddr = c.Server
- }
- if c.Type != messaging.MQTT && c.Type != messaging.Redis &&
- c.Type != messaging.NatsCore && c.Type != messaging.NatsJetStream {
- return fmt.Errorf("specified wrong type value %s", c.Type)
- }
- if c.Port < 0 {
- return fmt.Errorf("specified wrong port value, expect positive integer but got %d", c.Port)
- }
- mbconf := types.MessageBusConfig{
- Broker: types.HostInfo{
- Host: edgeAddr,
- Port: c.Port,
- Protocol: c.Protocol,
- },
- Type: c.Type}
- mbconf.Optional = c.Optional
- es.mbconf = mbconf
- printConf(mbconf)
- return nil
- }
- func (es *EdgexClient) Connect() error {
- client, err := messaging.NewMessageClient(es.mbconf)
- if err != nil {
- return err
- }
- if err := client.Connect(); err != nil {
- conf.Log.Errorf("The connection to edgex messagebus failed.")
- return fmt.Errorf("Failed to connect to edgex message bus: " + err.Error())
- }
- es.client = client
- return nil
- }
- func (es *EdgexClient) Publish(env types.MessageEnvelope, topic string) error {
- if err := es.client.Publish(env, topic); err != nil {
- conf.Log.Errorf("Publish to topic %s has error : %s.", topic, err.Error())
- return fmt.Errorf("Failed to publish to edgex message bus: " + err.Error())
- }
- return nil
- }
- func (es *EdgexClient) Subscribe(msg chan types.MessageEnvelope, topic string, err chan error) error {
- topics := []types.TopicChannel{{Topic: topic, Messages: msg}}
- if err := es.client.Subscribe(topics, err); err != nil {
- conf.Log.Errorf("Failed to subscribe to edgex messagebus with topic %s has error : %s.", topic, err.Error())
- return err
- }
- return nil
- }
- func (es *EdgexClient) GetClient() (interface{}, error) {
- client, err := messaging.NewMessageClient(es.mbconf)
- if err != nil {
- return nil, err
- }
- if err := client.Connect(); err != nil {
- conf.Log.Errorf("The connection to edgex messagebus failed.")
- return nil, fmt.Errorf("Failed to connect to edgex message bus: " + err.Error())
- }
- conf.Log.Infof("The connection to edgex messagebus is established successfully.")
- es.client = client
- return client, nil
- }
- func (es *EdgexClient) Disconnect() error {
- conf.Log.Infof("Closing the connection to edgex messagebus.")
- if e := es.client.Disconnect(); e != nil {
- return e
- }
- return nil
- }
|