pub.go 6.5 KB


  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/edgexfoundry/go-mod-core-contracts/clients/coredata"
  6. "github.com/edgexfoundry/go-mod-core-contracts/clients/urlclient/local"
  7. "github.com/edgexfoundry/go-mod-core-contracts/models"
  8. "github.com/edgexfoundry/go-mod-messaging/messaging"
  9. "github.com/edgexfoundry/go-mod-messaging/pkg/types"
  10. "log"
  11. "os"
  12. "time"
  13. )
  14. var msgConfig1 = types.MessageBusConfig{
  15. PublishHost: types.HostInfo{
  16. Host: "*",
  17. Port: 5563,
  18. Protocol: "tcp",
  19. },
  20. Type: messaging.ZeroMQ,
  21. }
  22. func pubEventClientZeroMq() {
  23. if msgClient, err := messaging.NewMessageClient(msgConfig1); err != nil {
  24. log.Fatal(err)
  25. } else {
  26. if ec := msgClient.Connect(); ec != nil {
  27. log.Fatal(ec)
  28. } else {
  29. client := coredata.NewEventClient(local.New("test"))
  30. //r := rand.New(rand.NewSource(time.Now().UnixNano()))
  31. for i := 0; i < 10; i++ {
  32. //temp := r.Intn(100)
  33. //humd := r.Intn(100)
  34. var testEvent = models.Event{Device: "demo", Created: 123, Modified: 123, Origin: 123}
  35. var r1 = models.Reading{Pushed: 123, Created: 123, Origin: 123, Modified: 123, Device: "test device name", Name: "Temperature", Value: fmt.Sprintf("%d", i*8)}
  36. var r2 = models.Reading{Pushed: 123, Created: 123, Origin: 123, Modified: 123, Device: "test device name", Name: "Humidity", Value: fmt.Sprintf("%d", i*9)}
  37. var r3 = models.Reading{Name: "b1"}
  38. if i%2 == 0 {
  39. r3.Value = "true"
  40. } else {
  41. r3.Value = "false"
  42. }
  43. r4 := models.Reading{Name: "i1", Value: fmt.Sprintf("%d", i)}
  44. r5 := models.Reading{Name: "f1", Value: fmt.Sprintf("%.2f", float64(i)/2.0)}
  45. r6 := models.Reading{Name: "ui64", Value: "10796529505058023104"}
  46. testEvent.Readings = append(testEvent.Readings, r1, r2, r3, r4, r5, r6)
  47. data, err := client.MarshalEvent(testEvent)
  48. if err != nil {
  49. fmt.Errorf("unexpected error MarshalEvent %v", err)
  50. } else {
  51. fmt.Println(string(data))
  52. }
  53. env := types.NewMessageEnvelope([]byte(data), context.Background())
  54. env.ContentType = "application/json"
  55. if e := msgClient.Publish(env, "events"); e != nil {
  56. log.Fatal(e)
  57. } else {
  58. fmt.Printf("Pub successful: %s\n", data)
  59. }
  60. time.Sleep(1500 * time.Millisecond)
  61. }
  62. }
  63. }
  64. }
  65. func pubToAnother() {
  66. var msgConfig2 = types.MessageBusConfig{
  67. PublishHost: types.HostInfo{
  68. Host: "*",
  69. Port: 5571,
  70. Protocol: "tcp",
  71. },
  72. Type: messaging.ZeroMQ,
  73. }
  74. if msgClient, err := messaging.NewMessageClient(msgConfig2); err != nil {
  75. log.Fatal(err)
  76. } else {
  77. if ec := msgClient.Connect(); ec != nil {
  78. log.Fatal(ec)
  79. }
  80. client := coredata.NewEventClient(local.New("test1"))
  81. var testEvent = models.Event{Device: "demo1", Created: 123, Modified: 123, Origin: 123}
  82. var r1 = models.Reading{Pushed: 123, Created: 123, Origin: 123, Modified: 123, Device: "test device name", Name: "Temperature", Value: "20"}
  83. var r2 = models.Reading{Pushed: 123, Created: 123, Origin: 123, Modified: 123, Device: "test device name", Name: "Humidity", Value: "30"}
  84. testEvent.Readings = append(testEvent.Readings, r1, r2)
  85. data, err := client.MarshalEvent(testEvent)
  86. if err != nil {
  87. fmt.Errorf("unexpected error MarshalEvent %v", err)
  88. } else {
  89. fmt.Println(string(data))
  90. }
  91. env := types.NewMessageEnvelope([]byte(data), context.Background())
  92. env.ContentType = "application/json"
  93. if e := msgClient.Publish(env, "application"); e != nil {
  94. log.Fatal(e)
  95. } else {
  96. fmt.Printf("pubToAnother successful: %s\n", data)
  97. }
  98. time.Sleep(1500 * time.Millisecond)
  99. }
  100. }
  101. func pubToMQTT(host string) {
  102. var msgConfig2 = types.MessageBusConfig{
  103. PublishHost: types.HostInfo{
  104. Host: host,
  105. Port: 1883,
  106. Protocol: "tcp",
  107. },
  108. Optional: map[string]string{
  109. "ClientId": "0001_client_id",
  110. },
  111. Type: messaging.MQTT,
  112. }
  113. if msgClient, err := messaging.NewMessageClient(msgConfig2); err != nil {
  114. log.Fatal(err)
  115. } else {
  116. if ec := msgClient.Connect(); ec != nil {
  117. log.Fatal(ec)
  118. }
  119. client := coredata.NewEventClient(local.New("test1"))
  120. var testEvent = models.Event{Device: "demo1", Created: 123, Modified: 123, Origin: 123}
  121. var r1 = models.Reading{Pushed: 123, Created: 123, Origin: 123, Modified: 123, Device: "test device name", Name: "Temperature", Value: "20"}
  122. var r2 = models.Reading{Pushed: 123, Created: 123, Origin: 123, Modified: 123, Device: "test device name", Name: "Humidity", Value: "30"}
  123. testEvent.Readings = append(testEvent.Readings, r1, r2)
  124. data, err := client.MarshalEvent(testEvent)
  125. if err != nil {
  126. fmt.Errorf("unexpected error MarshalEvent %v", err)
  127. } else {
  128. fmt.Println(string(data))
  129. }
  130. env := types.NewMessageEnvelope([]byte(data), context.Background())
  131. env.ContentType = "application/json"
  132. if e := msgClient.Publish(env, "events"); e != nil {
  133. log.Fatal(e)
  134. } else {
  135. fmt.Printf("pubToAnother successful: %s\n", data)
  136. }
  137. time.Sleep(1500 * time.Millisecond)
  138. }
  139. }
  140. func pubMetaSource() {
  141. if msgClient, err := messaging.NewMessageClient(msgConfig1); err != nil {
  142. log.Fatal(err)
  143. } else {
  144. if ec := msgClient.Connect(); ec != nil {
  145. log.Fatal(ec)
  146. } else {
  147. client := coredata.NewEventClient(local.New("test"))
  148. evtDevice := []string{"demo1", "demo2"}
  149. for i, device := range evtDevice {
  150. j := int64(i) + 1
  151. testEvent := models.Event{Device: device, Created: 11 * j, Modified: 12 * j, Origin: 13 * j}
  152. r1 := models.Reading{Pushed: 22 * j, Created: 23 * j, Origin: 24 * j, Modified: 25 * j, Device: "Temperature sensor", Name: "Temperature", Value: fmt.Sprintf("%d", j*8)}
  153. r2 := models.Reading{Pushed: 32 * j, Created: 33 * j, Origin: 34 * j, Modified: 35 * j, Device: "Humidity sensor", Name: "Humidity", Value: fmt.Sprintf("%d", j*8)}
  154. testEvent.Readings = append(testEvent.Readings, r1, r2)
  155. data, err := client.MarshalEvent(testEvent)
  156. if err != nil {
  157. fmt.Errorf("unexpected error MarshalEvent %v", err)
  158. } else {
  159. fmt.Println(string(data))
  160. }
  161. env := types.NewMessageEnvelope([]byte(data), context.Background())
  162. env.ContentType = "application/json"
  163. if e := msgClient.Publish(env, "events"); e != nil {
  164. log.Fatal(e)
  165. } else {
  166. fmt.Printf("Pub successful: %s\n", data)
  167. }
  168. time.Sleep(1500 * time.Millisecond)
  169. }
  170. }
  171. }
  172. }
  173. func main() {
  174. if len(os.Args) == 1 {
  175. pubEventClientZeroMq()
  176. } else if len(os.Args) == 2 {
  177. if v := os.Args[1]; v == "another" {
  178. pubToAnother()
  179. } else if v == "meta" {
  180. pubMetaSource()
  181. }
  182. } else if len(os.Args) == 3 {
  183. if v := os.Args[1]; v == "mqtt" {
  184. //The 2nd parameter is MQTT broker server address
  185. pubToMQTT(os.Args[2])
  186. }
  187. }
  188. }