mqtt_sink.go 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. package sinks
  2. import (
  3. "context"
  4. "engine/common"
  5. "fmt"
  6. MQTT "github.com/eclipse/paho.mqtt.golang"
  7. "github.com/google/uuid"
  8. )
  9. type MQTTSink struct {
  10. srv string
  11. tpc string
  12. clientid string
  13. input chan interface{}
  14. conn MQTT.Client
  15. ruleId string
  16. name string
  17. //ctx context.Context
  18. }
  19. func NewMqttSink(name string, ruleId string, properties interface{}) (*MQTTSink, error) {
  20. ps, ok := properties.(map[string]interface{})
  21. if !ok {
  22. return nil, fmt.Errorf("expect map[string]interface{} type for the mqtt sink properties")
  23. }
  24. srv, ok := ps["server"]
  25. if !ok {
  26. return nil, fmt.Errorf("mqtt sink is missing property server")
  27. }
  28. tpc, ok := ps["topic"]
  29. if !ok {
  30. return nil, fmt.Errorf("mqtt sink is missing property topic")
  31. }
  32. clientid, ok := ps["clientId"]
  33. if !ok{
  34. if uuid, err := uuid.NewUUID(); err != nil {
  35. return nil, fmt.Errorf("mqtt sink fails to get uuid, the error is %s", err)
  36. }else{
  37. clientid = uuid.String()
  38. }
  39. }
  40. ms := &MQTTSink{name:name, ruleId: ruleId, input: make(chan interface{}), srv: srv.(string), tpc: tpc.(string), clientid: clientid.(string)}
  41. return ms, nil
  42. }
  43. func (ms *MQTTSink) GetName() string {
  44. return ms.name
  45. }
  46. func (ms *MQTTSink) GetInput() (chan<- interface{}, string) {
  47. return ms.input, ms.name
  48. }
  49. func (ms *MQTTSink) Open(ctx context.Context, result chan<- error) {
  50. log := common.GetLogger(ctx)
  51. log.Printf("Opening mqtt sink for rule %s", ms.ruleId)
  52. go func() {
  53. exeCtx, cancel := context.WithCancel(ctx)
  54. opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetClientID(ms.clientid)
  55. c := MQTT.NewClient(opts)
  56. if token := c.Connect(); token.Wait() && token.Error() != nil {
  57. result <- fmt.Errorf("Found error: %s", token.Error())
  58. cancel()
  59. }
  60. log.Printf("The connection to server %s was established successfully", ms.srv)
  61. ms.conn = c
  62. for {
  63. select {
  64. case item := <-ms.input:
  65. log.Infof("publish %s", item)
  66. if token := c.Publish(ms.tpc, 0, false, item); token.Wait() && token.Error() != nil {
  67. result <- fmt.Errorf("Publish error: %s", token.Error())
  68. }
  69. case <-exeCtx.Done():
  70. c.Disconnect(5000)
  71. log.Infof("Closing mqtt sink")
  72. cancel()
  73. return
  74. }
  75. }
  76. }()
  77. }