edgex_sink.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571
  1. // +build edgex
  2. package sink
  3. import (
  4. "encoding/base64"
  5. "encoding/json"
  6. "fmt"
  7. v2 "github.com/edgexfoundry/go-mod-core-contracts/v2/common"
  8. "github.com/edgexfoundry/go-mod-core-contracts/v2/dtos"
  9. "github.com/edgexfoundry/go-mod-messaging/v2/messaging"
  10. "github.com/edgexfoundry/go-mod-messaging/v2/pkg/types"
  11. "github.com/emqx/kuiper/internal/conf"
  12. "github.com/emqx/kuiper/pkg/api"
  13. "github.com/emqx/kuiper/pkg/cast"
  14. "reflect"
  15. )
  16. type EdgexMsgBusSink struct {
  17. protocol string
  18. host string
  19. port int
  20. ptype string
  21. topic string
  22. contentType string
  23. deviceName string
  24. profileName string
  25. sourceName string
  26. metadata string
  27. optional map[string]string
  28. client messaging.MessageClient
  29. }
  30. func (ems *EdgexMsgBusSink) Configure(ps map[string]interface{}) error {
  31. ems.host = "*"
  32. ems.protocol = "tcp"
  33. ems.port = 5573
  34. ems.topic = "events"
  35. ems.contentType = "application/json"
  36. ems.ptype = messaging.ZeroMQ
  37. if host, ok := ps["host"]; ok {
  38. ems.host = host.(string)
  39. } else {
  40. conf.Log.Infof("Not find host conf, will use default value '*'.")
  41. }
  42. if pro, ok := ps["protocol"]; ok {
  43. ems.protocol = pro.(string)
  44. } else {
  45. conf.Log.Infof("Not find protocol conf, will use default value 'tcp'.")
  46. }
  47. if port, ok := ps["port"]; ok {
  48. if pv, ok := port.(float64); ok {
  49. ems.port = int(pv)
  50. } else if pv, ok := port.(float32); ok {
  51. ems.port = int(pv)
  52. } else {
  53. conf.Log.Infof("Not valid port value, will use default value '5563'.")
  54. }
  55. } else {
  56. conf.Log.Infof("Not find port conf, will use default value '5563'.")
  57. }
  58. if topic, ok := ps["topic"]; ok {
  59. ems.topic = topic.(string)
  60. } else {
  61. conf.Log.Infof("Not find topic conf, will use default value 'events'.")
  62. }
  63. if contentType, ok := ps["contentType"]; ok {
  64. ems.contentType = contentType.(string)
  65. } else {
  66. conf.Log.Infof("Not find contentType conf, will use default value 'application/json'.")
  67. }
  68. if ptype, ok := ps["type"]; ok {
  69. ems.ptype = ptype.(string)
  70. if ems.ptype != messaging.ZeroMQ && ems.ptype != messaging.MQTT && ems.ptype != messaging.Redis {
  71. conf.Log.Infof("Specified wrong message type value %s, will use zeromq messagebus.\n", ems.ptype)
  72. ems.ptype = messaging.ZeroMQ
  73. }
  74. }
  75. if dname, ok := ps["deviceName"]; ok {
  76. ems.deviceName = dname.(string)
  77. }
  78. if pname, ok := ps["profileName"]; ok {
  79. ems.profileName = pname.(string)
  80. }
  81. if metadata, ok := ps["metadata"]; ok {
  82. ems.metadata = metadata.(string)
  83. }
  84. if optIntf, ok := ps["optional"]; ok {
  85. if opt, ok1 := optIntf.(map[string]interface{}); ok1 {
  86. optional := make(map[string]string)
  87. for k, v := range opt {
  88. if sv, ok2 := v.(string); ok2 {
  89. optional[k] = sv
  90. } else {
  91. info := fmt.Sprintf("Only string value is allowed for optional value, the value for key %s is not a string.", k)
  92. conf.Log.Infof(info)
  93. return fmt.Errorf(info)
  94. }
  95. }
  96. ems.optional = optional
  97. }
  98. }
  99. return nil
  100. }
  101. func (ems *EdgexMsgBusSink) Open(ctx api.StreamContext) error {
  102. log := ctx.GetLogger()
  103. conf := types.MessageBusConfig{
  104. PublishHost: types.HostInfo{
  105. Host: ems.host,
  106. Port: ems.port,
  107. Protocol: ems.protocol,
  108. },
  109. Type: ems.ptype,
  110. Optional: ems.optional,
  111. }
  112. log.Infof("Using configuration for EdgeX message bus sink: %+v", conf)
  113. if msgClient, err := messaging.NewMessageClient(conf); err != nil {
  114. return err
  115. } else {
  116. if ec := msgClient.Connect(); ec != nil {
  117. return ec
  118. } else {
  119. ems.client = msgClient
  120. }
  121. }
  122. return nil
  123. }
  124. func (ems *EdgexMsgBusSink) produceEvents(ctx api.StreamContext, result []byte) (*dtos.Event, error) {
  125. var m []map[string]interface{}
  126. if err := json.Unmarshal(result, &m); err == nil {
  127. m1 := ems.getMeta(m)
  128. event := m1.createEvent()
  129. //Override the devicename if user specified the value
  130. if ems.deviceName != "" {
  131. event.DeviceName = ems.deviceName
  132. }
  133. if ems.profileName != "" {
  134. event.ProfileName = ems.profileName
  135. }
  136. if event.SourceName == "" {
  137. event.SourceName = ems.topic
  138. }
  139. for _, v := range m {
  140. for k1, v1 := range v {
  141. if k1 == ems.metadata {
  142. continue
  143. } else {
  144. var (
  145. vt string
  146. vv interface{}
  147. )
  148. mm1 := m1.readingMeta(ctx, k1)
  149. if mm1 != nil && mm1.valueType != nil {
  150. vt = *mm1.valueType
  151. vv, err = getValueByType(v1, vt)
  152. } else {
  153. vt, vv, err = getValueType(v1)
  154. }
  155. if err != nil {
  156. ctx.GetLogger().Errorf("%v", err)
  157. continue
  158. }
  159. switch vt {
  160. case v2.ValueTypeBinary:
  161. // default media type
  162. event.AddBinaryReading(k1, vv.([]byte), "application/text")
  163. default:
  164. err = event.AddSimpleReading(k1, vt, vv)
  165. }
  166. if err != nil {
  167. ctx.GetLogger().Errorf("%v", err)
  168. continue
  169. }
  170. r := event.Readings[len(event.Readings)-1]
  171. if mm1 != nil {
  172. event.Readings[len(event.Readings)-1] = mm1.decorate(&r)
  173. }
  174. }
  175. }
  176. }
  177. return event, nil
  178. } else {
  179. return nil, err
  180. }
  181. }
  182. func getValueType(v interface{}) (string, interface{}, error) {
  183. k := reflect.TypeOf(v).Kind()
  184. switch k {
  185. case reflect.Bool:
  186. return v2.ValueTypeBool, v, nil
  187. case reflect.String:
  188. return v2.ValueTypeString, v, nil
  189. case reflect.Int64:
  190. return v2.ValueTypeInt64, v, nil
  191. case reflect.Int:
  192. return v2.ValueTypeInt64, v, nil
  193. case reflect.Float64:
  194. return v2.ValueTypeFloat64, v, nil
  195. case reflect.Slice:
  196. switch arrayValue := v.(type) {
  197. case []interface{}:
  198. if len(arrayValue) > 0 {
  199. ka := reflect.TypeOf(arrayValue[0]).Kind()
  200. switch ka {
  201. case reflect.Bool:
  202. result := make([]bool, len(arrayValue))
  203. for i, av := range arrayValue {
  204. temp, ok := av.(bool)
  205. if !ok {
  206. return "", nil, fmt.Errorf("unable to cast value to []bool for %v", v)
  207. }
  208. result[i] = temp
  209. }
  210. return v2.ValueTypeBoolArray, result, nil
  211. case reflect.String:
  212. result := make([]string, len(arrayValue))
  213. for i, av := range arrayValue {
  214. temp, ok := av.(string)
  215. if !ok {
  216. return "", nil, fmt.Errorf("unable to cast value to []string for %v", v)
  217. }
  218. result[i] = temp
  219. }
  220. return v2.ValueTypeStringArray, result, nil
  221. case reflect.Int64, reflect.Int:
  222. result := make([]int64, len(arrayValue))
  223. for i, av := range arrayValue {
  224. temp, ok := av.(int64)
  225. if !ok {
  226. return "", nil, fmt.Errorf("unable to cast value to []int64 for %v", v)
  227. }
  228. result[i] = temp
  229. }
  230. return v2.ValueTypeInt64Array, result, nil
  231. case reflect.Float64:
  232. result := make([]float64, len(arrayValue))
  233. for i, av := range arrayValue {
  234. temp, ok := av.(float64)
  235. if !ok {
  236. return "", nil, fmt.Errorf("unable to cast value to []float64 for %v", v)
  237. }
  238. result[i] = temp
  239. }
  240. return v2.ValueTypeFloat64Array, result, nil
  241. }
  242. } else { // default to string array
  243. return v2.ValueTypeStringArray, []string{}, nil
  244. }
  245. case []byte:
  246. return v2.ValueTypeBinary, v, nil
  247. default:
  248. return "", nil, fmt.Errorf("unable to cast value to []interface{} for %v", v)
  249. }
  250. }
  251. return "", nil, fmt.Errorf("unsupported value %v(%s)", v, k)
  252. }
  253. func getValueByType(v interface{}, vt string) (interface{}, error) {
  254. switch vt {
  255. case v2.ValueTypeBool:
  256. return cast.ToBool(v, cast.CONVERT_SAMEKIND)
  257. case v2.ValueTypeInt8:
  258. return cast.ToInt8(v, cast.CONVERT_SAMEKIND)
  259. case v2.ValueTypeInt16:
  260. return cast.ToInt16(v, cast.CONVERT_SAMEKIND)
  261. case v2.ValueTypeInt32:
  262. return cast.ToInt32(v, cast.CONVERT_SAMEKIND)
  263. case v2.ValueTypeInt64:
  264. return cast.ToInt64(v, cast.CONVERT_SAMEKIND)
  265. case v2.ValueTypeUint8:
  266. return cast.ToUint8(v, cast.CONVERT_SAMEKIND)
  267. case v2.ValueTypeUint16:
  268. return cast.ToUint16(v, cast.CONVERT_SAMEKIND)
  269. case v2.ValueTypeUint32:
  270. return cast.ToUint32(v, cast.CONVERT_SAMEKIND)
  271. case v2.ValueTypeUint64:
  272. return cast.ToUint64(v, cast.CONVERT_SAMEKIND)
  273. case v2.ValueTypeFloat32:
  274. return cast.ToFloat32(v, cast.CONVERT_SAMEKIND)
  275. case v2.ValueTypeFloat64:
  276. return cast.ToFloat64(v, cast.CONVERT_SAMEKIND)
  277. case v2.ValueTypeString:
  278. return cast.ToString(v, cast.CONVERT_SAMEKIND)
  279. case v2.ValueTypeBoolArray:
  280. return cast.ToBoolSlice(v, cast.CONVERT_SAMEKIND)
  281. case v2.ValueTypeInt8Array:
  282. return cast.ToTypedSlice(v, func(input interface{}, sn cast.Strictness) (interface{}, error) {
  283. return cast.ToInt8(input, sn)
  284. }, "int8", cast.CONVERT_SAMEKIND)
  285. case v2.ValueTypeInt16Array:
  286. return cast.ToTypedSlice(v, func(input interface{}, sn cast.Strictness) (interface{}, error) {
  287. return cast.ToInt16(input, sn)
  288. }, "int16", cast.CONVERT_SAMEKIND)
  289. case v2.ValueTypeInt32Array:
  290. return cast.ToTypedSlice(v, func(input interface{}, sn cast.Strictness) (interface{}, error) {
  291. return cast.ToInt32(input, sn)
  292. }, "int32", cast.CONVERT_SAMEKIND)
  293. case v2.ValueTypeInt64Array:
  294. return cast.ToInt64Slice(v, cast.CONVERT_SAMEKIND)
  295. case v2.ValueTypeUint8Array:
  296. return cast.ToTypedSlice(v, func(input interface{}, sn cast.Strictness) (interface{}, error) {
  297. return cast.ToUint8(input, sn)
  298. }, "uint8", cast.CONVERT_SAMEKIND)
  299. case v2.ValueTypeUint16Array:
  300. return cast.ToTypedSlice(v, func(input interface{}, sn cast.Strictness) (interface{}, error) {
  301. return cast.ToUint16(input, sn)
  302. }, "uint16", cast.CONVERT_SAMEKIND)
  303. case v2.ValueTypeUint32Array:
  304. return cast.ToTypedSlice(v, func(input interface{}, sn cast.Strictness) (interface{}, error) {
  305. return cast.ToUint32(input, sn)
  306. }, "uint32", cast.CONVERT_SAMEKIND)
  307. case v2.ValueTypeUint64Array:
  308. return cast.ToUint64Slice(v, cast.CONVERT_SAMEKIND)
  309. case v2.ValueTypeFloat32Array:
  310. return cast.ToTypedSlice(v, func(input interface{}, sn cast.Strictness) (interface{}, error) {
  311. return cast.ToFloat32(input, sn)
  312. }, "float32", cast.CONVERT_SAMEKIND)
  313. case v2.ValueTypeFloat64Array:
  314. return cast.ToFloat64Slice(v, cast.CONVERT_SAMEKIND)
  315. case v2.ValueTypeStringArray:
  316. return cast.ToStringSlice(v, cast.CONVERT_SAMEKIND)
  317. case v2.ValueTypeBinary:
  318. var (
  319. bv []byte
  320. err error
  321. )
  322. switch vv := v.(type) {
  323. case string:
  324. if bv, err = base64.StdEncoding.DecodeString(vv); err != nil {
  325. return nil, fmt.Errorf("fail to decode binary value from %s: %v", vv, err)
  326. }
  327. case []byte:
  328. bv = vv
  329. default:
  330. return nil, fmt.Errorf("fail to decode binary value from %v: not binary type", vv)
  331. }
  332. return bv, nil
  333. default:
  334. return nil, fmt.Errorf("unsupported type %v", vt)
  335. }
  336. }
  337. func (ems *EdgexMsgBusSink) getMeta(result []map[string]interface{}) *meta {
  338. if ems.metadata == "" {
  339. return newMetaFromMap(nil)
  340. }
  341. //Try to get the meta field
  342. for _, v := range result {
  343. if m, ok := v[ems.metadata]; ok {
  344. if m1, ok1 := m.(map[string]interface{}); ok1 {
  345. return newMetaFromMap(m1)
  346. } else {
  347. conf.Log.Infof("Specified a meta field, but the field does not contains any EdgeX metadata.")
  348. }
  349. }
  350. }
  351. return newMetaFromMap(nil)
  352. }
  353. func (ems *EdgexMsgBusSink) Collect(ctx api.StreamContext, item interface{}) error {
  354. logger := ctx.GetLogger()
  355. if payload, ok := item.([]byte); ok {
  356. logger.Debugf("EdgeX message bus sink: %s\n", payload)
  357. evt, err := ems.produceEvents(ctx, payload)
  358. if err != nil {
  359. return fmt.Errorf("Failed to convert to EdgeX event: %s.", err.Error())
  360. }
  361. data, err := json.Marshal(evt)
  362. if err != nil {
  363. return fmt.Errorf("unexpected error MarshalEvent %v", err)
  364. }
  365. env := types.NewMessageEnvelope([]byte(data), ctx)
  366. env.ContentType = ems.contentType
  367. if e := ems.client.Publish(env, ems.topic); e != nil {
  368. logger.Errorf("Found error %s when publish to EdgeX message bus.\n", e)
  369. return e
  370. }
  371. } else {
  372. return fmt.Errorf("Unkown type %t, the message cannot be published.\n", item)
  373. }
  374. return nil
  375. }
  376. func (ems *EdgexMsgBusSink) Close(ctx api.StreamContext) error {
  377. logger := ctx.GetLogger()
  378. logger.Infof("Closing edgex sink")
  379. if ems.client != nil {
  380. if e := ems.client.Disconnect(); e != nil {
  381. return e
  382. }
  383. }
  384. return nil
  385. }
  386. type eventMeta struct {
  387. id *string
  388. deviceName string
  389. profileName string
  390. sourceName string
  391. origin *int64
  392. tags map[string]string
  393. }
  394. type readingMeta struct {
  395. id *string
  396. deviceName *string
  397. profileName *string
  398. resourceName *string
  399. origin *int64
  400. valueType *string
  401. mediaType *string
  402. }
  403. func (m *readingMeta) decorate(r *dtos.BaseReading) dtos.BaseReading {
  404. if m.id != nil {
  405. r.Id = *m.id
  406. }
  407. if m.deviceName != nil {
  408. r.DeviceName = *m.deviceName
  409. }
  410. if m.profileName != nil {
  411. r.ProfileName = *m.profileName
  412. }
  413. if m.origin != nil {
  414. r.Origin = *m.origin
  415. }
  416. if m.valueType != nil {
  417. r.ValueType = *m.valueType
  418. }
  419. if m.mediaType != nil {
  420. r.MediaType = *m.mediaType
  421. }
  422. return *r
  423. }
  424. type meta struct {
  425. eventMeta
  426. readingMetas map[string]interface{}
  427. }
  428. func newMetaFromMap(m1 map[string]interface{}) *meta {
  429. result := &meta{
  430. eventMeta: eventMeta{
  431. profileName: "kuiperProfile",
  432. },
  433. }
  434. for k, v := range m1 {
  435. switch k {
  436. case "id":
  437. if v1, ok := v.(string); ok {
  438. result.id = &v1
  439. }
  440. case "deviceName":
  441. if v1, ok := v.(string); ok {
  442. result.deviceName = v1
  443. }
  444. case "profileName":
  445. if v1, ok := v.(string); ok {
  446. result.profileName = v1
  447. }
  448. case "sourceName":
  449. if v1, ok := v.(string); ok {
  450. result.sourceName = v1
  451. }
  452. case "origin":
  453. if v1, ok := v.(float64); ok {
  454. temp := int64(v1)
  455. result.origin = &temp
  456. }
  457. case "tags":
  458. if v1, ok1 := v.(map[string]interface{}); ok1 {
  459. r := make(map[string]string)
  460. for k, vi := range v1 {
  461. s, ok := vi.(string)
  462. if ok {
  463. r[k] = s
  464. }
  465. }
  466. result.tags = r
  467. }
  468. default:
  469. if result.readingMetas == nil {
  470. result.readingMetas = make(map[string]interface{})
  471. }
  472. result.readingMetas[k] = v
  473. }
  474. }
  475. return result
  476. }
  477. func (m *meta) readingMeta(ctx api.StreamContext, readingName string) *readingMeta {
  478. vi, ok := m.readingMetas[readingName]
  479. if !ok {
  480. return nil
  481. }
  482. m1, ok := vi.(map[string]interface{})
  483. if !ok {
  484. ctx.GetLogger().Errorf("reading %s meta is not a map, but %v", readingName, vi)
  485. return nil
  486. }
  487. result := &readingMeta{}
  488. for k, v := range m1 {
  489. switch k {
  490. case "id":
  491. if v1, ok := v.(string); ok {
  492. result.id = &v1
  493. }
  494. case "deviceName":
  495. if v1, ok := v.(string); ok {
  496. result.deviceName = &v1
  497. }
  498. case "profileName":
  499. if v1, ok := v.(string); ok {
  500. result.profileName = &v1
  501. }
  502. case "resourceName":
  503. if v1, ok := v.(string); ok {
  504. result.resourceName = &v1
  505. }
  506. case "origin":
  507. if v1, ok := v.(float64); ok {
  508. temp := int64(v1)
  509. result.origin = &temp
  510. }
  511. case "valueType":
  512. if v1, ok := v.(string); ok {
  513. result.valueType = &v1
  514. }
  515. case "mediaType":
  516. if v1, ok := v.(string); ok {
  517. result.mediaType = &v1
  518. }
  519. default:
  520. ctx.GetLogger().Warnf("reading %s meta got unknown field %s of value %v", readingName, k, v)
  521. }
  522. }
  523. return result
  524. }
  525. func (m *meta) createEvent() *dtos.Event {
  526. event := dtos.NewEvent(m.profileName, m.deviceName, m.sourceName)
  527. if m.id != nil {
  528. event.Id = *m.id
  529. }
  530. if m.origin != nil {
  531. event.Origin = *m.origin
  532. }
  533. if m.tags != nil {
  534. event.Tags = m.tags
  535. }
  536. return &event
  537. }