sub.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. // +build edgex
  2. package main
  3. import (
  4. "fmt"
  5. "github.com/edgexfoundry/go-mod-messaging/v2/messaging"
  6. "github.com/edgexfoundry/go-mod-messaging/v2/pkg/types"
  7. "github.com/emqx/kuiper/internal/conf"
  8. "os"
  9. )
  10. func subEventsFromZMQ() {
  11. var msgConfig1 = types.MessageBusConfig{
  12. SubscribeHost: types.HostInfo{
  13. Host: "localhost",
  14. Port: 5571,
  15. Protocol: "tcp",
  16. },
  17. Type: messaging.ZeroMQ,
  18. }
  19. if msgClient, err := messaging.NewMessageClient(msgConfig1); err != nil {
  20. conf.Log.Fatal(err)
  21. } else {
  22. if ec := msgClient.Connect(); ec != nil {
  23. conf.Log.Fatal(ec)
  24. } else {
  25. //log.Infof("The connection to edgex messagebus is established successfully.")
  26. messages := make(chan types.MessageEnvelope)
  27. topics := []types.TopicChannel{{Topic: "", Messages: messages}}
  28. err := make(chan error)
  29. if e := msgClient.Subscribe(topics, err); e != nil {
  30. //log.Errorf("Failed to subscribe to edgex messagebus topic %s.\n", e)
  31. conf.Log.Fatal(e)
  32. } else {
  33. var count = 0
  34. for {
  35. select {
  36. case e1 := <-err:
  37. conf.Log.Errorf("%s\n", e1)
  38. return
  39. case env := <-messages:
  40. count++
  41. fmt.Printf("%s\n", env.Payload)
  42. if count == 1 {
  43. return
  44. }
  45. }
  46. }
  47. }
  48. }
  49. }
  50. }
  51. func subEventsFromMQTT(host string) {
  52. var msgConfig1 = types.MessageBusConfig{
  53. SubscribeHost: types.HostInfo{
  54. Host: host,
  55. Port: 1883,
  56. Protocol: "tcp",
  57. },
  58. Type: messaging.MQTT,
  59. }
  60. if msgClient, err := messaging.NewMessageClient(msgConfig1); err != nil {
  61. conf.Log.Fatal(err)
  62. } else {
  63. if ec := msgClient.Connect(); ec != nil {
  64. conf.Log.Fatal(ec)
  65. } else {
  66. //log.Infof("The connection to edgex messagebus is established successfully.")
  67. messages := make(chan types.MessageEnvelope)
  68. topics := []types.TopicChannel{{Topic: "result", Messages: messages}}
  69. err := make(chan error)
  70. if e := msgClient.Subscribe(topics, err); e != nil {
  71. //log.Errorf("Failed to subscribe to edgex messagebus topic %s.\n", e)
  72. conf.Log.Fatal(e)
  73. } else {
  74. var count int = 0
  75. for {
  76. select {
  77. case e1 := <-err:
  78. conf.Log.Errorf("%s\n", e1)
  79. return
  80. case env := <-messages:
  81. count++
  82. fmt.Printf("%s\n", env.Payload)
  83. if count == 1 {
  84. return
  85. }
  86. }
  87. }
  88. }
  89. }
  90. }
  91. }
  92. func main() {
  93. if len(os.Args) == 1 {
  94. subEventsFromZMQ()
  95. } else if len(os.Args) == 3 {
  96. if v := os.Args[1]; v == "mqtt" {
  97. subEventsFromMQTT(os.Args[2])
  98. }
  99. }
  100. }