sub.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/edgexfoundry/go-mod-messaging/messaging"
  5. "github.com/edgexfoundry/go-mod-messaging/pkg/types"
  6. "github.com/emqx/kuiper/common"
  7. "os"
  8. )
  9. func subEventsFromZMQ() {
  10. var msgConfig1 = types.MessageBusConfig{
  11. SubscribeHost: types.HostInfo{
  12. Host: "localhost",
  13. Port: 5571,
  14. Protocol: "tcp",
  15. },
  16. Type:messaging.ZeroMQ,
  17. }
  18. if msgClient, err := messaging.NewMessageClient(msgConfig1); err != nil {
  19. common.Log.Fatal(err)
  20. } else {
  21. if ec := msgClient.Connect(); ec != nil {
  22. common.Log.Fatal(ec)
  23. } else {
  24. //log.Infof("The connection to edgex messagebus is established successfully.")
  25. messages := make(chan types.MessageEnvelope)
  26. topics := []types.TopicChannel{{Topic: "", Messages: messages}}
  27. err := make(chan error)
  28. if e := msgClient.Subscribe(topics, err); e != nil {
  29. //log.Errorf("Failed to subscribe to edgex messagebus topic %s.\n", e)
  30. common.Log.Fatal(e)
  31. } else {
  32. var count int = 0
  33. for {
  34. select {
  35. case e1 := <-err:
  36. common.Log.Errorf("%s\n", e1)
  37. return
  38. case env := <-messages:
  39. count ++
  40. fmt.Printf("%s\n", env.Payload)
  41. if count == 1 {
  42. return
  43. }
  44. }
  45. }
  46. }
  47. }
  48. }
  49. }
  50. func subEventsFromMQTT(host string) {
  51. var msgConfig1 = types.MessageBusConfig{
  52. SubscribeHost: types.HostInfo{
  53. Host: host,
  54. Port: 1883,
  55. Protocol: "tcp",
  56. },
  57. Type: messaging.MQTT,
  58. }
  59. if msgClient, err := messaging.NewMessageClient(msgConfig1); err != nil {
  60. common.Log.Fatal(err)
  61. } else {
  62. if ec := msgClient.Connect(); ec != nil {
  63. common.Log.Fatal(ec)
  64. } else {
  65. //log.Infof("The connection to edgex messagebus is established successfully.")
  66. messages := make(chan types.MessageEnvelope)
  67. topics := []types.TopicChannel{{Topic: "result", Messages: messages}}
  68. err := make(chan error)
  69. if e := msgClient.Subscribe(topics, err); e != nil {
  70. //log.Errorf("Failed to subscribe to edgex messagebus topic %s.\n", e)
  71. common.Log.Fatal(e)
  72. } else {
  73. var count int = 0
  74. for {
  75. select {
  76. case e1 := <-err:
  77. common.Log.Errorf("%s\n", e1)
  78. return
  79. case env := <-messages:
  80. count++
  81. fmt.Printf("%s\n", env.Payload)
  82. if count == 1 {
  83. return
  84. }
  85. }
  86. }
  87. }
  88. }
  89. }
  90. }
  91. func main() {
  92. if len(os.Args) == 1 {
  93. subEventsFromZMQ()
  94. } else if len(os.Args) == 3 {
  95. if v := os.Args[1]; v == "mqtt" {
  96. subEventsFromMQTT(os.Args[2])
  97. }
  98. }
  99. }