sub.go 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/edgexfoundry/go-mod-messaging/messaging"
  5. "github.com/edgexfoundry/go-mod-messaging/pkg/types"
  6. "github.com/emqx/kuiper/common"
  7. )
  8. func main() {
  9. var msgConfig1 = types.MessageBusConfig{
  10. SubscribeHost: types.HostInfo{
  11. Host: "localhost",
  12. Port: 5571,
  13. Protocol: "tcp",
  14. },
  15. Type:messaging.ZeroMQ,
  16. }
  17. if msgClient, err := messaging.NewMessageClient(msgConfig1); err != nil {
  18. common.Log.Fatal(err)
  19. } else {
  20. if ec := msgClient.Connect(); ec != nil {
  21. common.Log.Fatal(ec)
  22. } else {
  23. if err := msgClient.Connect(); err != nil {
  24. common.Log.Fatal(err)
  25. }
  26. //log.Infof("The connection to edgex messagebus is established successfully.")
  27. messages := make(chan types.MessageEnvelope)
  28. topics := []types.TopicChannel{{Topic: "", Messages: messages}}
  29. err := make(chan error)
  30. if e := msgClient.Subscribe(topics, err); e != nil {
  31. //log.Errorf("Failed to subscribe to edgex messagebus topic %s.\n", e)
  32. common.Log.Fatal(e)
  33. } else {
  34. var count int = 0
  35. for {
  36. select {
  37. case e1 := <-err:
  38. common.Log.Errorf("%s\n", e1)
  39. return
  40. case env := <-messages:
  41. count ++
  42. fmt.Printf("%s\n", env.Payload)
  43. if count == 1 {
  44. return
  45. }
  46. }
  47. }
  48. }
  49. }
  50. }
  51. }