edgex_sink.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570
  1. // +build edgex
  2. package sinks
  3. import (
  4. "encoding/base64"
  5. "encoding/json"
  6. "fmt"
  7. v2 "github.com/edgexfoundry/go-mod-core-contracts/v2/common"
  8. "github.com/edgexfoundry/go-mod-core-contracts/v2/dtos"
  9. "github.com/edgexfoundry/go-mod-messaging/v2/messaging"
  10. "github.com/edgexfoundry/go-mod-messaging/v2/pkg/types"
  11. "github.com/emqx/kuiper/common"
  12. "github.com/emqx/kuiper/xstream/api"
  13. "reflect"
  14. )
  15. type EdgexMsgBusSink struct {
  16. protocol string
  17. host string
  18. port int
  19. ptype string
  20. topic string
  21. contentType string
  22. deviceName string
  23. profileName string
  24. sourceName string
  25. metadata string
  26. optional map[string]string
  27. client messaging.MessageClient
  28. }
  29. func (ems *EdgexMsgBusSink) Configure(ps map[string]interface{}) error {
  30. ems.host = "*"
  31. ems.protocol = "tcp"
  32. ems.port = 5573
  33. ems.topic = "events"
  34. ems.contentType = "application/json"
  35. ems.ptype = messaging.ZeroMQ
  36. if host, ok := ps["host"]; ok {
  37. ems.host = host.(string)
  38. } else {
  39. common.Log.Infof("Not find host conf, will use default value '*'.")
  40. }
  41. if pro, ok := ps["protocol"]; ok {
  42. ems.protocol = pro.(string)
  43. } else {
  44. common.Log.Infof("Not find protocol conf, will use default value 'tcp'.")
  45. }
  46. if port, ok := ps["port"]; ok {
  47. if pv, ok := port.(float64); ok {
  48. ems.port = int(pv)
  49. } else if pv, ok := port.(float32); ok {
  50. ems.port = int(pv)
  51. } else {
  52. common.Log.Infof("Not valid port value, will use default value '5563'.")
  53. }
  54. } else {
  55. common.Log.Infof("Not find port conf, will use default value '5563'.")
  56. }
  57. if topic, ok := ps["topic"]; ok {
  58. ems.topic = topic.(string)
  59. } else {
  60. common.Log.Infof("Not find topic conf, will use default value 'events'.")
  61. }
  62. if contentType, ok := ps["contentType"]; ok {
  63. ems.contentType = contentType.(string)
  64. } else {
  65. common.Log.Infof("Not find contentType conf, will use default value 'application/json'.")
  66. }
  67. if ptype, ok := ps["type"]; ok {
  68. ems.ptype = ptype.(string)
  69. if ems.ptype != messaging.ZeroMQ && ems.ptype != messaging.MQTT && ems.ptype != messaging.Redis {
  70. common.Log.Infof("Specified wrong message type value %s, will use zeromq messagebus.\n", ems.ptype)
  71. ems.ptype = messaging.ZeroMQ
  72. }
  73. }
  74. if dname, ok := ps["deviceName"]; ok {
  75. ems.deviceName = dname.(string)
  76. }
  77. if pname, ok := ps["profileName"]; ok {
  78. ems.profileName = pname.(string)
  79. }
  80. if metadata, ok := ps["metadata"]; ok {
  81. ems.metadata = metadata.(string)
  82. }
  83. if optIntf, ok := ps["optional"]; ok {
  84. if opt, ok1 := optIntf.(map[string]interface{}); ok1 {
  85. optional := make(map[string]string)
  86. for k, v := range opt {
  87. if sv, ok2 := v.(string); ok2 {
  88. optional[k] = sv
  89. } else {
  90. info := fmt.Sprintf("Only string value is allowed for optional value, the value for key %s is not a string.", k)
  91. common.Log.Infof(info)
  92. return fmt.Errorf(info)
  93. }
  94. }
  95. ems.optional = optional
  96. }
  97. }
  98. return nil
  99. }
  100. func (ems *EdgexMsgBusSink) Open(ctx api.StreamContext) error {
  101. log := ctx.GetLogger()
  102. conf := types.MessageBusConfig{
  103. PublishHost: types.HostInfo{
  104. Host: ems.host,
  105. Port: ems.port,
  106. Protocol: ems.protocol,
  107. },
  108. Type: ems.ptype,
  109. Optional: ems.optional,
  110. }
  111. log.Infof("Using configuration for EdgeX message bus sink: %+v", conf)
  112. if msgClient, err := messaging.NewMessageClient(conf); err != nil {
  113. return err
  114. } else {
  115. if ec := msgClient.Connect(); ec != nil {
  116. return ec
  117. } else {
  118. ems.client = msgClient
  119. }
  120. }
  121. return nil
  122. }
  123. func (ems *EdgexMsgBusSink) produceEvents(ctx api.StreamContext, result []byte) (*dtos.Event, error) {
  124. var m []map[string]interface{}
  125. if err := json.Unmarshal(result, &m); err == nil {
  126. m1 := ems.getMeta(m)
  127. event := m1.createEvent()
  128. //Override the devicename if user specified the value
  129. if ems.deviceName != "" {
  130. event.DeviceName = ems.deviceName
  131. }
  132. if ems.profileName != "" {
  133. event.ProfileName = ems.profileName
  134. }
  135. if event.SourceName == "" {
  136. event.SourceName = ems.topic
  137. }
  138. for _, v := range m {
  139. for k1, v1 := range v {
  140. if k1 == ems.metadata {
  141. continue
  142. } else {
  143. var (
  144. vt string
  145. vv interface{}
  146. )
  147. mm1 := m1.readingMeta(ctx, k1)
  148. if mm1 != nil && mm1.valueType != nil {
  149. vt = *mm1.valueType
  150. vv, err = getValueByType(v1, vt)
  151. } else {
  152. vt, vv, err = getValueType(v1)
  153. }
  154. if err != nil {
  155. ctx.GetLogger().Errorf("%v", err)
  156. continue
  157. }
  158. switch vt {
  159. case v2.ValueTypeBinary:
  160. // default media type
  161. event.AddBinaryReading(k1, vv.([]byte), "application/text")
  162. default:
  163. err = event.AddSimpleReading(k1, vt, vv)
  164. }
  165. if err != nil {
  166. ctx.GetLogger().Errorf("%v", err)
  167. continue
  168. }
  169. r := event.Readings[len(event.Readings)-1]
  170. if mm1 != nil {
  171. event.Readings[len(event.Readings)-1] = mm1.decorate(&r)
  172. }
  173. }
  174. }
  175. }
  176. return event, nil
  177. } else {
  178. return nil, err
  179. }
  180. }
  181. func getValueType(v interface{}) (string, interface{}, error) {
  182. k := reflect.TypeOf(v).Kind()
  183. switch k {
  184. case reflect.Bool:
  185. return v2.ValueTypeBool, v, nil
  186. case reflect.String:
  187. return v2.ValueTypeString, v, nil
  188. case reflect.Int64:
  189. return v2.ValueTypeInt64, v, nil
  190. case reflect.Int:
  191. return v2.ValueTypeInt64, v, nil
  192. case reflect.Float64:
  193. return v2.ValueTypeFloat64, v, nil
  194. case reflect.Slice:
  195. switch arrayValue := v.(type) {
  196. case []interface{}:
  197. if len(arrayValue) > 0 {
  198. ka := reflect.TypeOf(arrayValue[0]).Kind()
  199. switch ka {
  200. case reflect.Bool:
  201. result := make([]bool, len(arrayValue))
  202. for i, av := range arrayValue {
  203. temp, ok := av.(bool)
  204. if !ok {
  205. return "", nil, fmt.Errorf("unable to cast value to []bool for %v", v)
  206. }
  207. result[i] = temp
  208. }
  209. return v2.ValueTypeBoolArray, result, nil
  210. case reflect.String:
  211. result := make([]string, len(arrayValue))
  212. for i, av := range arrayValue {
  213. temp, ok := av.(string)
  214. if !ok {
  215. return "", nil, fmt.Errorf("unable to cast value to []string for %v", v)
  216. }
  217. result[i] = temp
  218. }
  219. return v2.ValueTypeStringArray, result, nil
  220. case reflect.Int64, reflect.Int:
  221. result := make([]int64, len(arrayValue))
  222. for i, av := range arrayValue {
  223. temp, ok := av.(int64)
  224. if !ok {
  225. return "", nil, fmt.Errorf("unable to cast value to []int64 for %v", v)
  226. }
  227. result[i] = temp
  228. }
  229. return v2.ValueTypeInt64Array, result, nil
  230. case reflect.Float64:
  231. result := make([]float64, len(arrayValue))
  232. for i, av := range arrayValue {
  233. temp, ok := av.(float64)
  234. if !ok {
  235. return "", nil, fmt.Errorf("unable to cast value to []float64 for %v", v)
  236. }
  237. result[i] = temp
  238. }
  239. return v2.ValueTypeFloat64Array, result, nil
  240. }
  241. } else { // default to string array
  242. return v2.ValueTypeStringArray, []string{}, nil
  243. }
  244. case []byte:
  245. return v2.ValueTypeBinary, v, nil
  246. default:
  247. return "", nil, fmt.Errorf("unable to cast value to []interface{} for %v", v)
  248. }
  249. }
  250. return "", nil, fmt.Errorf("unsupported value %v(%s)", v, k)
  251. }
  252. func getValueByType(v interface{}, vt string) (interface{}, error) {
  253. switch vt {
  254. case v2.ValueTypeBool:
  255. return common.ToBool(v, common.CONVERT_SAMEKIND)
  256. case v2.ValueTypeInt8:
  257. return common.ToInt8(v, common.CONVERT_SAMEKIND)
  258. case v2.ValueTypeInt16:
  259. return common.ToInt16(v, common.CONVERT_SAMEKIND)
  260. case v2.ValueTypeInt32:
  261. return common.ToInt32(v, common.CONVERT_SAMEKIND)
  262. case v2.ValueTypeInt64:
  263. return common.ToInt64(v, common.CONVERT_SAMEKIND)
  264. case v2.ValueTypeUint8:
  265. return common.ToUint8(v, common.CONVERT_SAMEKIND)
  266. case v2.ValueTypeUint16:
  267. return common.ToUint16(v, common.CONVERT_SAMEKIND)
  268. case v2.ValueTypeUint32:
  269. return common.ToUint32(v, common.CONVERT_SAMEKIND)
  270. case v2.ValueTypeUint64:
  271. return common.ToUint64(v, common.CONVERT_SAMEKIND)
  272. case v2.ValueTypeFloat32:
  273. return common.ToFloat32(v, common.CONVERT_SAMEKIND)
  274. case v2.ValueTypeFloat64:
  275. return common.ToFloat64(v, common.CONVERT_SAMEKIND)
  276. case v2.ValueTypeString:
  277. return common.ToString(v, common.CONVERT_SAMEKIND)
  278. case v2.ValueTypeBoolArray:
  279. return common.ToBoolSlice(v, common.CONVERT_SAMEKIND)
  280. case v2.ValueTypeInt8Array:
  281. return common.ToTypedSlice(v, func(input interface{}, sn common.Strictness) (interface{}, error) {
  282. return common.ToInt8(input, sn)
  283. }, "int8", common.CONVERT_SAMEKIND)
  284. case v2.ValueTypeInt16Array:
  285. return common.ToTypedSlice(v, func(input interface{}, sn common.Strictness) (interface{}, error) {
  286. return common.ToInt16(input, sn)
  287. }, "int16", common.CONVERT_SAMEKIND)
  288. case v2.ValueTypeInt32Array:
  289. return common.ToTypedSlice(v, func(input interface{}, sn common.Strictness) (interface{}, error) {
  290. return common.ToInt32(input, sn)
  291. }, "int32", common.CONVERT_SAMEKIND)
  292. case v2.ValueTypeInt64Array:
  293. return common.ToInt64Slice(v, common.CONVERT_SAMEKIND)
  294. case v2.ValueTypeUint8Array:
  295. return common.ToTypedSlice(v, func(input interface{}, sn common.Strictness) (interface{}, error) {
  296. return common.ToUint8(input, sn)
  297. }, "uint8", common.CONVERT_SAMEKIND)
  298. case v2.ValueTypeUint16Array:
  299. return common.ToTypedSlice(v, func(input interface{}, sn common.Strictness) (interface{}, error) {
  300. return common.ToUint16(input, sn)
  301. }, "uint16", common.CONVERT_SAMEKIND)
  302. case v2.ValueTypeUint32Array:
  303. return common.ToTypedSlice(v, func(input interface{}, sn common.Strictness) (interface{}, error) {
  304. return common.ToUint32(input, sn)
  305. }, "uint32", common.CONVERT_SAMEKIND)
  306. case v2.ValueTypeUint64Array:
  307. return common.ToUint64Slice(v, common.CONVERT_SAMEKIND)
  308. case v2.ValueTypeFloat32Array:
  309. return common.ToTypedSlice(v, func(input interface{}, sn common.Strictness) (interface{}, error) {
  310. return common.ToFloat32(input, sn)
  311. }, "float32", common.CONVERT_SAMEKIND)
  312. case v2.ValueTypeFloat64Array:
  313. return common.ToFloat64Slice(v, common.CONVERT_SAMEKIND)
  314. case v2.ValueTypeStringArray:
  315. return common.ToStringSlice(v, common.CONVERT_SAMEKIND)
  316. case v2.ValueTypeBinary:
  317. var (
  318. bv []byte
  319. err error
  320. )
  321. switch vv := v.(type) {
  322. case string:
  323. if bv, err = base64.StdEncoding.DecodeString(vv); err != nil {
  324. return nil, fmt.Errorf("fail to decode binary value from %s: %v", vv, err)
  325. }
  326. case []byte:
  327. bv = vv
  328. default:
  329. return nil, fmt.Errorf("fail to decode binary value from %v: not binary type", vv)
  330. }
  331. return bv, nil
  332. default:
  333. return nil, fmt.Errorf("unsupported type %v", vt)
  334. }
  335. }
  336. func (ems *EdgexMsgBusSink) getMeta(result []map[string]interface{}) *meta {
  337. if ems.metadata == "" {
  338. return newMetaFromMap(nil)
  339. }
  340. //Try to get the meta field
  341. for _, v := range result {
  342. if m, ok := v[ems.metadata]; ok {
  343. if m1, ok1 := m.(map[string]interface{}); ok1 {
  344. return newMetaFromMap(m1)
  345. } else {
  346. common.Log.Infof("Specified a meta field, but the field does not contains any EdgeX metadata.")
  347. }
  348. }
  349. }
  350. return newMetaFromMap(nil)
  351. }
  352. func (ems *EdgexMsgBusSink) Collect(ctx api.StreamContext, item interface{}) error {
  353. logger := ctx.GetLogger()
  354. if payload, ok := item.([]byte); ok {
  355. logger.Debugf("EdgeX message bus sink: %s\n", payload)
  356. evt, err := ems.produceEvents(ctx, payload)
  357. if err != nil {
  358. return fmt.Errorf("Failed to convert to EdgeX event: %s.", err.Error())
  359. }
  360. data, err := json.Marshal(evt)
  361. if err != nil {
  362. return fmt.Errorf("unexpected error MarshalEvent %v", err)
  363. }
  364. env := types.NewMessageEnvelope([]byte(data), ctx)
  365. env.ContentType = ems.contentType
  366. if e := ems.client.Publish(env, ems.topic); e != nil {
  367. logger.Errorf("Found error %s when publish to EdgeX message bus.\n", e)
  368. return e
  369. }
  370. } else {
  371. return fmt.Errorf("Unkown type %t, the message cannot be published.\n", item)
  372. }
  373. return nil
  374. }
  375. func (ems *EdgexMsgBusSink) Close(ctx api.StreamContext) error {
  376. logger := ctx.GetLogger()
  377. logger.Infof("Closing edgex sink")
  378. if ems.client != nil {
  379. if e := ems.client.Disconnect(); e != nil {
  380. return e
  381. }
  382. }
  383. return nil
  384. }
  385. type eventMeta struct {
  386. id *string
  387. deviceName string
  388. profileName string
  389. sourceName string
  390. origin *int64
  391. tags map[string]string
  392. }
  393. type readingMeta struct {
  394. id *string
  395. deviceName *string
  396. profileName *string
  397. resourceName *string
  398. origin *int64
  399. valueType *string
  400. mediaType *string
  401. }
  402. func (m *readingMeta) decorate(r *dtos.BaseReading) dtos.BaseReading {
  403. if m.id != nil {
  404. r.Id = *m.id
  405. }
  406. if m.deviceName != nil {
  407. r.DeviceName = *m.deviceName
  408. }
  409. if m.profileName != nil {
  410. r.ProfileName = *m.profileName
  411. }
  412. if m.origin != nil {
  413. r.Origin = *m.origin
  414. }
  415. if m.valueType != nil {
  416. r.ValueType = *m.valueType
  417. }
  418. if m.mediaType != nil {
  419. r.MediaType = *m.mediaType
  420. }
  421. return *r
  422. }
  423. type meta struct {
  424. eventMeta
  425. readingMetas map[string]interface{}
  426. }
  427. func newMetaFromMap(m1 map[string]interface{}) *meta {
  428. result := &meta{
  429. eventMeta: eventMeta{
  430. profileName: "kuiperProfile",
  431. },
  432. }
  433. for k, v := range m1 {
  434. switch k {
  435. case "id":
  436. if v1, ok := v.(string); ok {
  437. result.id = &v1
  438. }
  439. case "deviceName":
  440. if v1, ok := v.(string); ok {
  441. result.deviceName = v1
  442. }
  443. case "profileName":
  444. if v1, ok := v.(string); ok {
  445. result.profileName = v1
  446. }
  447. case "sourceName":
  448. if v1, ok := v.(string); ok {
  449. result.sourceName = v1
  450. }
  451. case "origin":
  452. if v1, ok := v.(float64); ok {
  453. temp := int64(v1)
  454. result.origin = &temp
  455. }
  456. case "tags":
  457. if v1, ok1 := v.(map[string]interface{}); ok1 {
  458. r := make(map[string]string)
  459. for k, vi := range v1 {
  460. s, ok := vi.(string)
  461. if ok {
  462. r[k] = s
  463. }
  464. }
  465. result.tags = r
  466. }
  467. default:
  468. if result.readingMetas == nil {
  469. result.readingMetas = make(map[string]interface{})
  470. }
  471. result.readingMetas[k] = v
  472. }
  473. }
  474. return result
  475. }
  476. func (m *meta) readingMeta(ctx api.StreamContext, readingName string) *readingMeta {
  477. vi, ok := m.readingMetas[readingName]
  478. if !ok {
  479. return nil
  480. }
  481. m1, ok := vi.(map[string]interface{})
  482. if !ok {
  483. ctx.GetLogger().Errorf("reading %s meta is not a map, but %v", readingName, vi)
  484. return nil
  485. }
  486. result := &readingMeta{}
  487. for k, v := range m1 {
  488. switch k {
  489. case "id":
  490. if v1, ok := v.(string); ok {
  491. result.id = &v1
  492. }
  493. case "deviceName":
  494. if v1, ok := v.(string); ok {
  495. result.deviceName = &v1
  496. }
  497. case "profileName":
  498. if v1, ok := v.(string); ok {
  499. result.profileName = &v1
  500. }
  501. case "resourceName":
  502. if v1, ok := v.(string); ok {
  503. result.resourceName = &v1
  504. }
  505. case "origin":
  506. if v1, ok := v.(float64); ok {
  507. temp := int64(v1)
  508. result.origin = &temp
  509. }
  510. case "valueType":
  511. if v1, ok := v.(string); ok {
  512. result.valueType = &v1
  513. }
  514. case "mediaType":
  515. if v1, ok := v.(string); ok {
  516. result.mediaType = &v1
  517. }
  518. default:
  519. ctx.GetLogger().Warnf("reading %s meta got unknown field %s of value %v", readingName, k, v)
  520. }
  521. }
  522. return result
  523. }
  524. func (m *meta) createEvent() *dtos.Event {
  525. event := dtos.NewEvent(m.profileName, m.deviceName, m.sourceName)
  526. if m.id != nil {
  527. event.Id = *m.id
  528. }
  529. if m.origin != nil {
  530. event.Origin = *m.origin
  531. }
  532. if m.tags != nil {
  533. event.Tags = m.tags
  534. }
  535. return &event
  536. }