zmq_pub.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "os"
  19. "time"
  20. zmq "github.com/pebbe/zmq4"
  21. )
  22. type zmqPub struct {
  23. publisher *zmq.Socket
  24. srv string
  25. topic string
  26. }
  27. func (m *zmqPub) Open() (err error) {
  28. m.publisher, err = zmq.NewSocket(zmq.PUB)
  29. if err != nil {
  30. return fmt.Errorf("zmq sink fails to create socket: %v", err)
  31. }
  32. err = m.publisher.Bind(m.srv)
  33. if err != nil {
  34. return fmt.Errorf("zmq sink fails to bind to %s: %v", m.srv, err)
  35. }
  36. fmt.Println("zmq sink open")
  37. return nil
  38. }
  39. func (m *zmqPub) Send(item interface{}) (err error) {
  40. if v, ok := item.([]byte); ok {
  41. fmt.Printf("To pub: %s \n", item)
  42. if m.topic == "" {
  43. _, err = m.publisher.Send(string(v), 0)
  44. } else {
  45. msgs := []string{
  46. m.topic,
  47. string(v),
  48. }
  49. _, err = m.publisher.SendMessage(msgs)
  50. }
  51. } else {
  52. fmt.Printf("zmq sink receive non byte data %v \n", item)
  53. }
  54. if err != nil {
  55. fmt.Printf("send to zmq error %v \n", err)
  56. }
  57. return
  58. }
  59. func (m *zmqPub) Close() error {
  60. if m.publisher != nil {
  61. return m.publisher.Close()
  62. }
  63. return nil
  64. }
  65. type data struct {
  66. Temperature int `json:"temperature"`
  67. Humidity int `json:"humidity"`
  68. }
  69. var mockup = [10]data{
  70. {Temperature: 10, Humidity: 15},
  71. {Temperature: 15, Humidity: 20},
  72. {Temperature: 20, Humidity: 25},
  73. {Temperature: 25, Humidity: 30},
  74. {Temperature: 30, Humidity: 35},
  75. {Temperature: 35, Humidity: 40},
  76. {Temperature: 40, Humidity: 45},
  77. {Temperature: 45, Humidity: 50},
  78. {Temperature: 50, Humidity: 55},
  79. {Temperature: 55, Humidity: 60},
  80. }
  81. func main() {
  82. zmq := zmqPub{srv: "tcp://127.0.0.1:5563", topic: "events"}
  83. if e := zmq.Open(); e != nil {
  84. return
  85. } else {
  86. if len(os.Args) == 2 {
  87. v := os.Args[1]
  88. if v != "" {
  89. zmq.topic = v
  90. fmt.Printf("Use the topic %s\n", v)
  91. } else {
  92. fmt.Printf("Use the default zeromq topic %s\n", "events")
  93. }
  94. }
  95. for i := 0; i < 20; i++ {
  96. index := i % 10
  97. b, _ := json.Marshal(mockup[index])
  98. time.Sleep(1000 * time.Millisecond)
  99. zmq.Send(b)
  100. }
  101. }
  102. }