pub.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. // +build benchmark
  2. //Not necessary to build the file, until for the edgex benchmark test
  3. package main
  4. import (
  5. "context"
  6. "encoding/json"
  7. "fmt"
  8. v2 "github.com/edgexfoundry/go-mod-core-contracts/v2/common"
  9. "github.com/edgexfoundry/go-mod-core-contracts/v2/dtos"
  10. "github.com/edgexfoundry/go-mod-messaging/v2/messaging"
  11. "github.com/edgexfoundry/go-mod-messaging/v2/pkg/types"
  12. "log"
  13. "os"
  14. "strconv"
  15. "sync"
  16. "time"
  17. )
  18. var msgConfig1 = types.MessageBusConfig{
  19. PublishHost: types.HostInfo{
  20. Host: "172.31.1.144",
  21. Port: 5563,
  22. Protocol: "tcp",
  23. },
  24. Type: messaging.ZeroMQ,
  25. }
  26. type data struct {
  27. temperature int
  28. humidity int
  29. }
  30. var mockup = []data{
  31. {temperature: 10, humidity: 15},
  32. {temperature: 15, humidity: 20},
  33. {temperature: 20, humidity: 25},
  34. {temperature: 25, humidity: 30},
  35. {temperature: 30, humidity: 35},
  36. {temperature: 35, humidity: 40},
  37. {temperature: 40, humidity: 45},
  38. {temperature: 45, humidity: 50},
  39. {temperature: 50, humidity: 55},
  40. {temperature: 55, humidity: 60},
  41. }
  42. func pubEventClientZeroMq(count int, wg *sync.WaitGroup) {
  43. defer wg.Done()
  44. if msgClient, err := messaging.NewMessageClient(msgConfig1); err != nil {
  45. log.Fatal(err)
  46. } else {
  47. if ec := msgClient.Connect(); ec != nil {
  48. log.Fatal(ec)
  49. } else {
  50. index := 0
  51. for i := 0; i < count; i++ {
  52. if i%10 == 0 {
  53. index = 0
  54. }
  55. testEvent := dtos.NewEvent("demoProfile", "demo", "demoSource")
  56. err := testEvent.AddSimpleReading("Temperature", v2.ValueTypeInt32, int32(mockup[index].temperature))
  57. if err != nil {
  58. fmt.Errorf("Add reading error for Temperature: %v\n", int32(mockup[index].temperature))
  59. }
  60. testEvent.Readings[0].DeviceName = "Temperature device"
  61. err = testEvent.AddSimpleReading("Humidity", v2.ValueTypeInt32, int32(mockup[index].humidity))
  62. if err != nil {
  63. fmt.Errorf("Add reading error for Humidity: %v\n", int32(mockup[index].temperature))
  64. }
  65. testEvent.Readings[1].DeviceName = "Humidity device"
  66. index++
  67. data, err := json.Marshal(testEvent)
  68. if err != nil {
  69. fmt.Errorf("unexpected error MarshalEvent %v", err)
  70. }
  71. env := types.NewMessageEnvelope([]byte(data), context.Background())
  72. env.ContentType = "application/json"
  73. if e := msgClient.Publish(env, "events"); e != nil {
  74. log.Fatal(e)
  75. } else {
  76. //fmt.Printf("%d - %s\n", index, string(data))
  77. }
  78. time.Sleep(100 * time.Nanosecond)
  79. }
  80. }
  81. }
  82. }
  83. func main() {
  84. start := time.Now()
  85. count := 1000
  86. if len(os.Args) == 2 {
  87. v := os.Args[1]
  88. if c, err := strconv.Atoi(v); err != nil {
  89. fmt.Errorf("%s\n", err)
  90. } else {
  91. count = c
  92. }
  93. }
  94. var wg sync.WaitGroup
  95. for i := 0; i < 1; i++ {
  96. wg.Add(1)
  97. go pubEventClientZeroMq(count, &wg)
  98. }
  99. wg.Wait()
  100. t := time.Now()
  101. elapsed := t.Sub(start)
  102. fmt.Printf("elapsed %2fs\n", elapsed.Seconds())
  103. }