zmq_pub.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. zmq "github.com/pebbe/zmq4"
  6. "os"
  7. "time"
  8. )
  9. type zmqPub struct {
  10. publisher *zmq.Socket
  11. srv string
  12. topic string
  13. }
  14. func (m *zmqPub) Open() (err error) {
  15. m.publisher, err = zmq.NewSocket(zmq.PUB)
  16. if err != nil {
  17. return fmt.Errorf("zmq sink fails to create socket: %v", err)
  18. }
  19. err = m.publisher.Bind(m.srv)
  20. if err != nil {
  21. return fmt.Errorf("zmq sink fails to bind to %s: %v", m.srv, err)
  22. }
  23. fmt.Println("zmq sink open")
  24. return nil
  25. }
  26. func (m *zmqPub) Send(item interface{}) (err error) {
  27. if v, ok := item.([]byte); ok {
  28. fmt.Printf("To pub: %s \n", item)
  29. if m.topic == "" {
  30. _, err = m.publisher.Send(string(v), 0)
  31. } else {
  32. msgs := []string{
  33. m.topic,
  34. string(v),
  35. }
  36. _, err = m.publisher.SendMessage(msgs)
  37. }
  38. } else {
  39. fmt.Printf("zmq sink receive non byte data %v \n", item)
  40. }
  41. if err != nil {
  42. fmt.Printf("send to zmq error %v \n", err)
  43. }
  44. return
  45. }
  46. func (m *zmqPub) Close() error {
  47. if m.publisher != nil {
  48. return m.publisher.Close()
  49. }
  50. return nil
  51. }
  52. type data struct {
  53. Temperature int `json:"temperature"`
  54. Humidity int `json:"humidity"`
  55. }
  56. var mockup = [10]data{
  57. {Temperature: 10, Humidity: 15},
  58. {Temperature: 15, Humidity: 20},
  59. {Temperature: 20, Humidity: 25},
  60. {Temperature: 25, Humidity: 30},
  61. {Temperature: 30, Humidity: 35},
  62. {Temperature: 35, Humidity: 40},
  63. {Temperature: 40, Humidity: 45},
  64. {Temperature: 45, Humidity: 50},
  65. {Temperature: 50, Humidity: 55},
  66. {Temperature: 55, Humidity: 60},
  67. }
  68. func main() {
  69. zmq := zmqPub{srv: "tcp://127.0.0.1:5563", topic: "events"}
  70. if e := zmq.Open(); e != nil {
  71. return
  72. } else {
  73. if len(os.Args) == 2 {
  74. v := os.Args[1]
  75. if v != "" {
  76. zmq.topic = v
  77. fmt.Printf("Use the topic %s\n", v)
  78. } else {
  79. fmt.Printf("Use the default zeromq topic %s\n", "events")
  80. }
  81. }
  82. for i := 0; i < 20; i++ {
  83. index := i % 10
  84. b, _ := json.Marshal(mockup[index])
  85. time.Sleep(1000 * time.Millisecond)
  86. zmq.Send(b)
  87. }
  88. }
  89. }