pub.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. // +build benchmark
  2. //Not necessary to build the file, until for the edgex benchmark test
  3. package main
  4. import (
  5. "context"
  6. "fmt"
  7. "github.com/edgexfoundry/go-mod-core-contracts/clients/coredata"
  8. "github.com/edgexfoundry/go-mod-core-contracts/clients/urlclient/local"
  9. "github.com/edgexfoundry/go-mod-core-contracts/models"
  10. "github.com/edgexfoundry/go-mod-messaging/messaging"
  11. "github.com/edgexfoundry/go-mod-messaging/pkg/types"
  12. "log"
  13. "os"
  14. "strconv"
  15. "sync"
  16. "time"
  17. )
  18. var msgConfig1 = types.MessageBusConfig{
  19. PublishHost: types.HostInfo{
  20. Host: "172.31.1.144",
  21. Port: 5563,
  22. Protocol: "tcp",
  23. },
  24. Type: messaging.ZeroMQ,
  25. }
  26. type data struct {
  27. temperature int
  28. humidity int
  29. }
  30. var mockup = []data{
  31. {temperature: 10, humidity: 15},
  32. {temperature: 15, humidity: 20},
  33. {temperature: 20, humidity: 25},
  34. {temperature: 25, humidity: 30},
  35. {temperature: 30, humidity: 35},
  36. {temperature: 35, humidity: 40},
  37. {temperature: 40, humidity: 45},
  38. {temperature: 45, humidity: 50},
  39. {temperature: 50, humidity: 55},
  40. {temperature: 55, humidity: 60},
  41. }
  42. func pubEventClientZeroMq(count int, wg *sync.WaitGroup) {
  43. defer wg.Done()
  44. if msgClient, err := messaging.NewMessageClient(msgConfig1); err != nil {
  45. log.Fatal(err)
  46. } else {
  47. if ec := msgClient.Connect(); ec != nil {
  48. log.Fatal(ec)
  49. } else {
  50. client := coredata.NewEventClient(local.New("test"))
  51. index := 0
  52. for i := 0; i < count; i++ {
  53. if i%10 == 0 {
  54. index = 0
  55. }
  56. var testEvent = models.Event{Device: "demo"}
  57. var r1 = models.Reading{Device: "Temperature device", Name: "Temperature", Value: fmt.Sprintf("%d", mockup[index].temperature)}
  58. var r2 = models.Reading{Device: "Humidity device", Name: "Humidity", Value: fmt.Sprintf("%d", mockup[index].humidity)}
  59. index++
  60. testEvent.Readings = append(testEvent.Readings, r1, r2)
  61. data, err := client.MarshalEvent(testEvent)
  62. if err != nil {
  63. fmt.Errorf("unexpected error MarshalEvent %v", err)
  64. }
  65. env := types.NewMessageEnvelope([]byte(data), context.Background())
  66. env.ContentType = "application/json"
  67. if e := msgClient.Publish(env, "events"); e != nil {
  68. log.Fatal(e)
  69. } else {
  70. //fmt.Printf("%d - %s\n", index, string(data))
  71. }
  72. time.Sleep(100 * time.Nanosecond)
  73. }
  74. }
  75. }
  76. }
  77. func main() {
  78. start := time.Now()
  79. count := 1000
  80. if len(os.Args) == 2 {
  81. v := os.Args[1]
  82. if c, err := strconv.Atoi(v); err != nil {
  83. fmt.Errorf("%s\n", err)
  84. } else {
  85. count = c
  86. }
  87. }
  88. var wg sync.WaitGroup
  89. for i := 0; i < 1; i++ {
  90. wg.Add(1)
  91. go pubEventClientZeroMq(count, &wg)
  92. }
  93. wg.Wait()
  94. t := time.Now()
  95. elapsed := t.Sub(start)
  96. fmt.Printf("elapsed %2fs\n", elapsed.Seconds())
  97. }