pub.go 7.9 KB


  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/edgexfoundry/go-mod-core-contracts/clients/coredata"
  6. "github.com/edgexfoundry/go-mod-core-contracts/clients/urlclient/local"
  7. "github.com/edgexfoundry/go-mod-core-contracts/models"
  8. "github.com/edgexfoundry/go-mod-messaging/messaging"
  9. "github.com/edgexfoundry/go-mod-messaging/pkg/types"
  10. "log"
  11. "os"
  12. "time"
  13. )
  14. var msgConfig1 = types.MessageBusConfig{
  15. PublishHost: types.HostInfo{
  16. Host: "*",
  17. Port: 5563,
  18. Protocol: "tcp",
  19. },
  20. Type: messaging.ZeroMQ,
  21. }
  22. func pubEventClientZeroMq() {
  23. if msgClient, err := messaging.NewMessageClient(msgConfig1); err != nil {
  24. log.Fatal(err)
  25. } else {
  26. if ec := msgClient.Connect(); ec != nil {
  27. log.Fatal(ec)
  28. } else {
  29. client := coredata.NewEventClient(local.New("test"))
  30. //r := rand.New(rand.NewSource(time.Now().UnixNano()))
  31. for i := 0; i < 10; i++ {
  32. //temp := r.Intn(100)
  33. //humd := r.Intn(100)
  34. var testEvent = models.Event{Device: "demo", Created: 123, Modified: 123, Origin: 123}
  35. var r1 = models.Reading{Pushed: 123, Created: 123, Origin: 123, Modified: 123, Device: "test device name", Name: "Temperature", Value: fmt.Sprintf("%d", i*8)}
  36. var r2 = models.Reading{Pushed: 123, Created: 123, Origin: 123, Modified: 123, Device: "test device name", Name: "Humidity", Value: fmt.Sprintf("%d", i*9)}
  37. var r3 = models.Reading{Name: "b1"}
  38. if i%2 == 0 {
  39. r3.Value = "true"
  40. } else {
  41. r3.Value = "false"
  42. }
  43. r4 := models.Reading{Name: "i1", Value: fmt.Sprintf("%d", i)}
  44. r5 := models.Reading{Name: "f1", Value: fmt.Sprintf("%.2f", float64(i)/2.0)}
  45. r6 := models.Reading{Name: "ui64", Value: "10796529505058023104"}
  46. testEvent.Readings = append(testEvent.Readings, r1, r2, r3, r4, r5, r6)
  47. data, err := client.MarshalEvent(testEvent)
  48. if err != nil {
  49. fmt.Errorf("unexpected error MarshalEvent %v", err)
  50. } else {
  51. fmt.Println(string(data))
  52. }
  53. env := types.NewMessageEnvelope([]byte(data), context.Background())
  54. env.ContentType = "application/json"
  55. if e := msgClient.Publish(env, "events"); e != nil {
  56. log.Fatal(e)
  57. } else {
  58. fmt.Printf("Pub successful: %s\n", data)
  59. }
  60. time.Sleep(1500 * time.Millisecond)
  61. }
  62. }
  63. }
  64. }
  65. func pubToAnother() {
  66. var msgConfig2 = types.MessageBusConfig{
  67. PublishHost: types.HostInfo{
  68. Host: "*",
  69. Port: 5571,
  70. Protocol: "tcp",
  71. },
  72. Type: messaging.ZeroMQ,
  73. }
  74. if msgClient, err := messaging.NewMessageClient(msgConfig2); err != nil {
  75. log.Fatal(err)
  76. } else {
  77. if ec := msgClient.Connect(); ec != nil {
  78. log.Fatal(ec)
  79. }
  80. client := coredata.NewEventClient(local.New("test1"))
  81. var testEvent = models.Event{Device: "demo1", Created: 123, Modified: 123, Origin: 123}
  82. var r1 = models.Reading{Pushed: 123, Created: 123, Origin: 123, Modified: 123, Device: "test device name", Name: "Temperature", Value: "20"}
  83. var r2 = models.Reading{Pushed: 123, Created: 123, Origin: 123, Modified: 123, Device: "test device name", Name: "Humidity", Value: "30"}
  84. testEvent.Readings = append(testEvent.Readings, r1, r2)
  85. data, err := client.MarshalEvent(testEvent)
  86. if err != nil {
  87. fmt.Errorf("unexpected error MarshalEvent %v", err)
  88. } else {
  89. fmt.Println(string(data))
  90. }
  91. env := types.NewMessageEnvelope([]byte(data), context.Background())
  92. env.ContentType = "application/json"
  93. if e := msgClient.Publish(env, "application"); e != nil {
  94. log.Fatal(e)
  95. } else {
  96. fmt.Printf("pubToAnother successful: %s\n", data)
  97. }
  98. time.Sleep(1500 * time.Millisecond)
  99. }
  100. }
  101. func pubArrayMessage() {
  102. var msgConfig2 = types.MessageBusConfig{
  103. PublishHost: types.HostInfo{
  104. Host: "*",
  105. Port: 5563,
  106. Protocol: "tcp",
  107. },
  108. Type: messaging.ZeroMQ,
  109. }
  110. if msgClient, err := messaging.NewMessageClient(msgConfig2); err != nil {
  111. log.Fatal(err)
  112. } else {
  113. if ec := msgClient.Connect(); ec != nil {
  114. log.Fatal(ec)
  115. }
  116. client := coredata.NewEventClient(local.New("test1"))
  117. var testEvent = models.Event{Device: "demo1", Created: 123, Modified: 123, Origin: 123}
  118. var r1 = models.Reading{Pushed: 123, Created: 123, Origin: 123, Modified: 123, Device: "bool array", Name: "ba", Value: "[true, true, false]"}
  119. var r2 = models.Reading{Pushed: 123, Created: 123, Origin: 123, Modified: 123, Device: "int32 array", Name: "ia", Value: "[30, 40, 50]"}
  120. var r3 = models.Reading{Pushed: 123, Created: 123, Origin: 123, Modified: 123, Device: "float64 array", Name: "fa", Value: "[3.14, 3.1415, 3.1415926]"}
  121. testEvent.Readings = append(testEvent.Readings, r1, r2, r3)
  122. data, err := client.MarshalEvent(testEvent)
  123. if err != nil {
  124. fmt.Errorf("unexpected error MarshalEvent %v", err)
  125. } else {
  126. fmt.Println(string(data))
  127. }
  128. env := types.NewMessageEnvelope([]byte(data), context.Background())
  129. env.ContentType = "application/json"
  130. if e := msgClient.Publish(env, "events"); e != nil {
  131. log.Fatal(e)
  132. }
  133. time.Sleep(1500 * time.Millisecond)
  134. }
  135. }
  136. func pubToMQTT(host string) {
  137. var msgConfig2 = types.MessageBusConfig{
  138. PublishHost: types.HostInfo{
  139. Host: host,
  140. Port: 1883,
  141. Protocol: "tcp",
  142. },
  143. Optional: map[string]string{
  144. "ClientId": "0001_client_id",
  145. },
  146. Type: messaging.MQTT,
  147. }
  148. if msgClient, err := messaging.NewMessageClient(msgConfig2); err != nil {
  149. log.Fatal(err)
  150. } else {
  151. if ec := msgClient.Connect(); ec != nil {
  152. log.Fatal(ec)
  153. }
  154. client := coredata.NewEventClient(local.New("test1"))
  155. var testEvent = models.Event{Device: "demo1", Created: 123, Modified: 123, Origin: 123}
  156. var r1 = models.Reading{Pushed: 123, Created: 123, Origin: 123, Modified: 123, Device: "test device name", Name: "Temperature", Value: "20"}
  157. var r2 = models.Reading{Pushed: 123, Created: 123, Origin: 123, Modified: 123, Device: "test device name", Name: "Humidity", Value: "30"}
  158. testEvent.Readings = append(testEvent.Readings, r1, r2)
  159. data, err := client.MarshalEvent(testEvent)
  160. if err != nil {
  161. fmt.Errorf("unexpected error MarshalEvent %v", err)
  162. } else {
  163. fmt.Println(string(data))
  164. }
  165. env := types.NewMessageEnvelope([]byte(data), context.Background())
  166. env.ContentType = "application/json"
  167. if e := msgClient.Publish(env, "events"); e != nil {
  168. log.Fatal(e)
  169. } else {
  170. fmt.Printf("pubToAnother successful: %s\n", data)
  171. }
  172. time.Sleep(1500 * time.Millisecond)
  173. }
  174. }
  175. func pubMetaSource() {
  176. if msgClient, err := messaging.NewMessageClient(msgConfig1); err != nil {
  177. log.Fatal(err)
  178. } else {
  179. if ec := msgClient.Connect(); ec != nil {
  180. log.Fatal(ec)
  181. } else {
  182. client := coredata.NewEventClient(local.New("test"))
  183. evtDevice := []string{"demo1", "demo2"}
  184. for i, device := range evtDevice {
  185. j := int64(i) + 1
  186. testEvent := models.Event{Device: device, Created: 11 * j, Modified: 12 * j, Origin: 13 * j}
  187. r1 := models.Reading{Pushed: 22 * j, Created: 23 * j, Origin: 24 * j, Modified: 25 * j, Device: "Temperature sensor", Name: "Temperature", Value: fmt.Sprintf("%d", j*8)}
  188. r2 := models.Reading{Pushed: 32 * j, Created: 33 * j, Origin: 34 * j, Modified: 35 * j, Device: "Humidity sensor", Name: "Humidity", Value: fmt.Sprintf("%d", j*8)}
  189. testEvent.Readings = append(testEvent.Readings, r1, r2)
  190. data, err := client.MarshalEvent(testEvent)
  191. if err != nil {
  192. fmt.Errorf("unexpected error MarshalEvent %v", err)
  193. } else {
  194. fmt.Println(string(data))
  195. }
  196. env := types.NewMessageEnvelope([]byte(data), context.Background())
  197. env.ContentType = "application/json"
  198. if e := msgClient.Publish(env, "events"); e != nil {
  199. log.Fatal(e)
  200. } else {
  201. fmt.Printf("Pub successful: %s\n", data)
  202. }
  203. time.Sleep(1500 * time.Millisecond)
  204. }
  205. }
  206. }
  207. }
  208. func main() {
  209. if len(os.Args) == 1 {
  210. pubEventClientZeroMq()
  211. } else if len(os.Args) == 2 {
  212. if v := os.Args[1]; v == "another" {
  213. pubToAnother()
  214. } else if v == "meta" {
  215. pubMetaSource()
  216. } else if v == "array" {
  217. pubArrayMessage()
  218. }
  219. } else if len(os.Args) == 3 {
  220. if v := os.Args[1]; v == "mqtt" {
  221. //The 2nd parameter is MQTT broker server address
  222. pubToMQTT(os.Args[2])
  223. }
  224. }
  225. }