pub.go 6.4 KB


  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/edgexfoundry/go-mod-core-contracts/clients/coredata"
  6. "github.com/edgexfoundry/go-mod-core-contracts/clients/urlclient/local"
  7. "github.com/edgexfoundry/go-mod-core-contracts/models"
  8. "github.com/edgexfoundry/go-mod-messaging/messaging"
  9. "github.com/edgexfoundry/go-mod-messaging/pkg/types"
  10. "log"
  11. "os"
  12. "time"
  13. )
  14. var msgConfig1 = types.MessageBusConfig{
  15. PublishHost: types.HostInfo{
  16. Host: "*",
  17. Port: 5563,
  18. Protocol: "tcp",
  19. },
  20. Type:messaging.ZeroMQ,
  21. }
  22. func pubEventClientZeroMq() {
  23. if msgClient, err := messaging.NewMessageClient(msgConfig1); err != nil {
  24. log.Fatal(err)
  25. } else {
  26. if ec := msgClient.Connect(); ec != nil {
  27. log.Fatal(ec)
  28. } else {
  29. client := coredata.NewEventClient(local.New("test"))
  30. //r := rand.New(rand.NewSource(time.Now().UnixNano()))
  31. for i := 0; i < 10; i++ {
  32. //temp := r.Intn(100)
  33. //humd := r.Intn(100)
  34. var testEvent = models.Event{Device: "demo", Created: 123, Modified: 123, Origin: 123}
  35. var r1 = models.Reading{Pushed: 123, Created: 123, Origin: 123, Modified: 123, Device: "test device name", Name: "Temperature", Value: fmt.Sprintf("%d", i*8)}
  36. var r2 = models.Reading{Pushed: 123, Created: 123, Origin: 123, Modified: 123, Device: "test device name", Name: "Humidity", Value: fmt.Sprintf("%d", i*9)}
  37. var r3 = models.Reading{Name:"b1"}
  38. if i % 2 == 0 {
  39. r3.Value = "true"
  40. } else {
  41. r3.Value = "false"
  42. }
  43. r4 := models.Reading{Name:"i1", Value:fmt.Sprintf("%d", i)}
  44. r5 := models.Reading{Name:"f1", Value:fmt.Sprintf("%.2f", float64(i)/2.0)}
  45. testEvent.Readings = append(testEvent.Readings, r1, r2, r3, r4, r5)
  46. data, err := client.MarshalEvent(testEvent)
  47. if err != nil {
  48. fmt.Errorf("unexpected error MarshalEvent %v", err)
  49. } else {
  50. fmt.Println(string(data))
  51. }
  52. env := types.NewMessageEnvelope([]byte(data), context.Background())
  53. env.ContentType = "application/json"
  54. if e := msgClient.Publish(env, "events"); e != nil {
  55. log.Fatal(e)
  56. } else {
  57. fmt.Printf("Pub successful: %s\n", data)
  58. }
  59. time.Sleep(1500 * time.Millisecond)
  60. }
  61. }
  62. }
  63. }
  64. func pubToAnother() {
  65. var msgConfig2 = types.MessageBusConfig{
  66. PublishHost: types.HostInfo{
  67. Host: "*",
  68. Port: 5571,
  69. Protocol: "tcp",
  70. },
  71. Type:messaging.ZeroMQ,
  72. }
  73. if msgClient, err := messaging.NewMessageClient(msgConfig2); err != nil {
  74. log.Fatal(err)
  75. } else {
  76. if ec := msgClient.Connect(); ec != nil {
  77. log.Fatal(ec)
  78. }
  79. client := coredata.NewEventClient(local.New("test1"))
  80. var testEvent = models.Event{Device: "demo1", Created: 123, Modified: 123, Origin: 123}
  81. var r1 = models.Reading{Pushed: 123, Created: 123, Origin: 123, Modified: 123, Device: "test device name", Name: "Temperature", Value: "20"}
  82. var r2 = models.Reading{Pushed: 123, Created: 123, Origin: 123, Modified: 123, Device: "test device name", Name: "Humidity", Value: "30"}
  83. testEvent.Readings = append(testEvent.Readings, r1, r2)
  84. data, err := client.MarshalEvent(testEvent)
  85. if err != nil {
  86. fmt.Errorf("unexpected error MarshalEvent %v", err)
  87. } else {
  88. fmt.Println(string(data))
  89. }
  90. env := types.NewMessageEnvelope([]byte(data), context.Background())
  91. env.ContentType = "application/json"
  92. if e := msgClient.Publish(env, "application"); e != nil {
  93. log.Fatal(e)
  94. } else {
  95. fmt.Printf("pubToAnother successful: %s\n", data)
  96. }
  97. time.Sleep(1500 * time.Millisecond)
  98. }
  99. }
  100. func pubToMQTT(host string) {
  101. var msgConfig2 = types.MessageBusConfig{
  102. PublishHost: types.HostInfo{
  103. Host: host,
  104. Port: 1883,
  105. Protocol: "tcp",
  106. },
  107. Optional:map[string]string{
  108. "ClientId": "0001_client_id",
  109. },
  110. Type:messaging.MQTT,
  111. }
  112. if msgClient, err := messaging.NewMessageClient(msgConfig2); err != nil {
  113. log.Fatal(err)
  114. } else {
  115. if ec := msgClient.Connect(); ec != nil {
  116. log.Fatal(ec)
  117. }
  118. client := coredata.NewEventClient(local.New("test1"))
  119. var testEvent = models.Event{Device: "demo1", Created: 123, Modified: 123, Origin: 123}
  120. var r1 = models.Reading{Pushed: 123, Created: 123, Origin: 123, Modified: 123, Device: "test device name", Name: "Temperature", Value: "20"}
  121. var r2 = models.Reading{Pushed: 123, Created: 123, Origin: 123, Modified: 123, Device: "test device name", Name: "Humidity", Value: "30"}
  122. testEvent.Readings = append(testEvent.Readings, r1, r2)
  123. data, err := client.MarshalEvent(testEvent)
  124. if err != nil {
  125. fmt.Errorf("unexpected error MarshalEvent %v", err)
  126. } else {
  127. fmt.Println(string(data))
  128. }
  129. env := types.NewMessageEnvelope([]byte(data), context.Background())
  130. env.ContentType = "application/json"
  131. if e := msgClient.Publish(env, "events"); e != nil {
  132. log.Fatal(e)
  133. } else {
  134. fmt.Printf("pubToAnother successful: %s\n", data)
  135. }
  136. time.Sleep(1500 * time.Millisecond)
  137. }
  138. }
  139. func pubMetaSource() {
  140. if msgClient, err := messaging.NewMessageClient(msgConfig1); err != nil {
  141. log.Fatal(err)
  142. } else {
  143. if ec := msgClient.Connect(); ec != nil {
  144. log.Fatal(ec)
  145. } else {
  146. client := coredata.NewEventClient(local.New("test"))
  147. evtDevice := []string{"demo1", "demo2"}
  148. for i, device := range evtDevice {
  149. j := int64(i) + 1
  150. testEvent := models.Event{Device: device, Created: 11*j, Modified: 12*j, Origin: 13*j}
  151. r1 := models.Reading{Pushed: 22*j, Created: 23*j, Origin: 24*j, Modified: 25*j, Device: "Temperature sensor", Name: "Temperature", Value: fmt.Sprintf("%d", j*8)}
  152. r2 := models.Reading{Pushed: 32*j, Created: 33*j, Origin: 34*j, Modified: 35*j, Device: "Humidity sensor", Name: "Humidity", Value: fmt.Sprintf("%d", j*8)}
  153. testEvent.Readings = append(testEvent.Readings, r1, r2)
  154. data, err := client.MarshalEvent(testEvent)
  155. if err != nil {
  156. fmt.Errorf("unexpected error MarshalEvent %v", err)
  157. } else {
  158. fmt.Println(string(data))
  159. }
  160. env := types.NewMessageEnvelope([]byte(data), context.Background())
  161. env.ContentType = "application/json"
  162. if e := msgClient.Publish(env, "events"); e != nil {
  163. log.Fatal(e)
  164. } else {
  165. fmt.Printf("Pub successful: %s\n", data)
  166. }
  167. time.Sleep(1500 * time.Millisecond)
  168. }
  169. }
  170. }
  171. }
  172. func main() {
  173. if len(os.Args) == 1 {
  174. pubEventClientZeroMq()
  175. } else if len(os.Args) == 2 {
  176. if v := os.Args[1]; v == "another" {
  177. pubToAnother()
  178. } else if v == "meta" {
  179. pubMetaSource()
  180. }
  181. } else if len(os.Args) == 3 {
  182. if v := os.Args[1]; v == "mqtt" {
  183. //The 2nd parameter is MQTT broker server address
  184. pubToMQTT(os.Args[2])
  185. }
  186. }
  187. }