edgex_sink.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. // +build edgex
  15. package sink
  16. import (
  17. "encoding/base64"
  18. "encoding/json"
  19. "fmt"
  20. v2 "github.com/edgexfoundry/go-mod-core-contracts/v2/common"
  21. "github.com/edgexfoundry/go-mod-core-contracts/v2/dtos"
  22. "github.com/edgexfoundry/go-mod-messaging/v2/messaging"
  23. "github.com/edgexfoundry/go-mod-messaging/v2/pkg/types"
  24. "github.com/lf-edge/ekuiper/internal/conf"
  25. "github.com/lf-edge/ekuiper/pkg/api"
  26. "github.com/lf-edge/ekuiper/pkg/cast"
  27. "reflect"
  28. )
  29. type EdgexMsgBusSink struct {
  30. protocol string
  31. host string
  32. port int
  33. ptype string
  34. topic string
  35. contentType string
  36. deviceName string
  37. profileName string
  38. sourceName string
  39. metadata string
  40. optional map[string]string
  41. client messaging.MessageClient
  42. }
  43. func (ems *EdgexMsgBusSink) Configure(ps map[string]interface{}) error {
  44. ems.host = "*"
  45. ems.protocol = "tcp"
  46. ems.port = 5573
  47. ems.topic = "events"
  48. ems.contentType = "application/json"
  49. ems.ptype = messaging.ZeroMQ
  50. if host, ok := ps["host"]; ok {
  51. ems.host = host.(string)
  52. } else {
  53. conf.Log.Infof("Not find host conf, will use default value '*'.")
  54. }
  55. if pro, ok := ps["protocol"]; ok {
  56. ems.protocol = pro.(string)
  57. } else {
  58. conf.Log.Infof("Not find protocol conf, will use default value 'tcp'.")
  59. }
  60. if port, ok := ps["port"]; ok {
  61. if pv, ok := port.(float64); ok {
  62. ems.port = int(pv)
  63. } else if pv, ok := port.(float32); ok {
  64. ems.port = int(pv)
  65. } else {
  66. conf.Log.Infof("Not valid port value, will use default value '5563'.")
  67. }
  68. } else {
  69. conf.Log.Infof("Not find port conf, will use default value '5563'.")
  70. }
  71. if topic, ok := ps["topic"]; ok {
  72. ems.topic = topic.(string)
  73. } else {
  74. conf.Log.Infof("Not find topic conf, will use default value 'events'.")
  75. }
  76. if contentType, ok := ps["contentType"]; ok {
  77. ems.contentType = contentType.(string)
  78. } else {
  79. conf.Log.Infof("Not find contentType conf, will use default value 'application/json'.")
  80. }
  81. if ptype, ok := ps["type"]; ok {
  82. ems.ptype = ptype.(string)
  83. if ems.ptype != messaging.ZeroMQ && ems.ptype != messaging.MQTT && ems.ptype != messaging.Redis {
  84. conf.Log.Infof("Specified wrong message type value %s, will use zeromq messagebus.\n", ems.ptype)
  85. ems.ptype = messaging.ZeroMQ
  86. }
  87. }
  88. if dname, ok := ps["deviceName"]; ok {
  89. ems.deviceName = dname.(string)
  90. }
  91. if pname, ok := ps["profileName"]; ok {
  92. ems.profileName = pname.(string)
  93. }
  94. if metadata, ok := ps["metadata"]; ok {
  95. ems.metadata = metadata.(string)
  96. }
  97. if optIntf, ok := ps["optional"]; ok {
  98. if opt, ok1 := optIntf.(map[string]interface{}); ok1 {
  99. optional := make(map[string]string)
  100. for k, v := range opt {
  101. if sv, ok2 := v.(string); ok2 {
  102. optional[k] = sv
  103. } else {
  104. info := fmt.Sprintf("Only string value is allowed for optional value, the value for key %s is not a string.", k)
  105. conf.Log.Infof(info)
  106. return fmt.Errorf(info)
  107. }
  108. }
  109. ems.optional = optional
  110. }
  111. }
  112. return nil
  113. }
  114. func (ems *EdgexMsgBusSink) Open(ctx api.StreamContext) error {
  115. log := ctx.GetLogger()
  116. conf := types.MessageBusConfig{
  117. PublishHost: types.HostInfo{
  118. Host: ems.host,
  119. Port: ems.port,
  120. Protocol: ems.protocol,
  121. },
  122. Type: ems.ptype,
  123. Optional: ems.optional,
  124. }
  125. log.Infof("Using configuration for EdgeX message bus sink: %+v", conf)
  126. if msgClient, err := messaging.NewMessageClient(conf); err != nil {
  127. return err
  128. } else {
  129. if ec := msgClient.Connect(); ec != nil {
  130. return ec
  131. } else {
  132. ems.client = msgClient
  133. }
  134. }
  135. return nil
  136. }
  137. func (ems *EdgexMsgBusSink) produceEvents(ctx api.StreamContext, result []byte) (*dtos.Event, error) {
  138. var m []map[string]interface{}
  139. if err := json.Unmarshal(result, &m); err == nil {
  140. m1 := ems.getMeta(m)
  141. event := m1.createEvent()
  142. //Override the devicename if user specified the value
  143. if ems.deviceName != "" {
  144. event.DeviceName = ems.deviceName
  145. }
  146. if ems.profileName != "" {
  147. event.ProfileName = ems.profileName
  148. }
  149. if event.SourceName == "" {
  150. event.SourceName = ems.topic
  151. }
  152. for _, v := range m {
  153. for k1, v1 := range v {
  154. if k1 == ems.metadata {
  155. continue
  156. } else {
  157. var (
  158. vt string
  159. vv interface{}
  160. )
  161. mm1 := m1.readingMeta(ctx, k1)
  162. if mm1 != nil && mm1.valueType != nil {
  163. vt = *mm1.valueType
  164. vv, err = getValueByType(v1, vt)
  165. } else {
  166. vt, vv, err = getValueType(v1)
  167. }
  168. if err != nil {
  169. ctx.GetLogger().Errorf("%v", err)
  170. continue
  171. }
  172. switch vt {
  173. case v2.ValueTypeBinary:
  174. // default media type
  175. event.AddBinaryReading(k1, vv.([]byte), "application/text")
  176. default:
  177. err = event.AddSimpleReading(k1, vt, vv)
  178. }
  179. if err != nil {
  180. ctx.GetLogger().Errorf("%v", err)
  181. continue
  182. }
  183. r := event.Readings[len(event.Readings)-1]
  184. if mm1 != nil {
  185. event.Readings[len(event.Readings)-1] = mm1.decorate(&r)
  186. }
  187. }
  188. }
  189. }
  190. return event, nil
  191. } else {
  192. return nil, err
  193. }
  194. }
  195. func getValueType(v interface{}) (string, interface{}, error) {
  196. k := reflect.TypeOf(v).Kind()
  197. switch k {
  198. case reflect.Bool:
  199. return v2.ValueTypeBool, v, nil
  200. case reflect.String:
  201. return v2.ValueTypeString, v, nil
  202. case reflect.Int64:
  203. return v2.ValueTypeInt64, v, nil
  204. case reflect.Int:
  205. return v2.ValueTypeInt64, v, nil
  206. case reflect.Float64:
  207. return v2.ValueTypeFloat64, v, nil
  208. case reflect.Slice:
  209. switch arrayValue := v.(type) {
  210. case []interface{}:
  211. if len(arrayValue) > 0 {
  212. ka := reflect.TypeOf(arrayValue[0]).Kind()
  213. switch ka {
  214. case reflect.Bool:
  215. result := make([]bool, len(arrayValue))
  216. for i, av := range arrayValue {
  217. temp, ok := av.(bool)
  218. if !ok {
  219. return "", nil, fmt.Errorf("unable to cast value to []bool for %v", v)
  220. }
  221. result[i] = temp
  222. }
  223. return v2.ValueTypeBoolArray, result, nil
  224. case reflect.String:
  225. result := make([]string, len(arrayValue))
  226. for i, av := range arrayValue {
  227. temp, ok := av.(string)
  228. if !ok {
  229. return "", nil, fmt.Errorf("unable to cast value to []string for %v", v)
  230. }
  231. result[i] = temp
  232. }
  233. return v2.ValueTypeStringArray, result, nil
  234. case reflect.Int64, reflect.Int:
  235. result := make([]int64, len(arrayValue))
  236. for i, av := range arrayValue {
  237. temp, ok := av.(int64)
  238. if !ok {
  239. return "", nil, fmt.Errorf("unable to cast value to []int64 for %v", v)
  240. }
  241. result[i] = temp
  242. }
  243. return v2.ValueTypeInt64Array, result, nil
  244. case reflect.Float64:
  245. result := make([]float64, len(arrayValue))
  246. for i, av := range arrayValue {
  247. temp, ok := av.(float64)
  248. if !ok {
  249. return "", nil, fmt.Errorf("unable to cast value to []float64 for %v", v)
  250. }
  251. result[i] = temp
  252. }
  253. return v2.ValueTypeFloat64Array, result, nil
  254. }
  255. } else { // default to string array
  256. return v2.ValueTypeStringArray, []string{}, nil
  257. }
  258. case []byte:
  259. return v2.ValueTypeBinary, v, nil
  260. default:
  261. return "", nil, fmt.Errorf("unable to cast value to []interface{} for %v", v)
  262. }
  263. }
  264. return "", nil, fmt.Errorf("unsupported value %v(%s)", v, k)
  265. }
  266. func getValueByType(v interface{}, vt string) (interface{}, error) {
  267. switch vt {
  268. case v2.ValueTypeBool:
  269. return cast.ToBool(v, cast.CONVERT_SAMEKIND)
  270. case v2.ValueTypeInt8:
  271. return cast.ToInt8(v, cast.CONVERT_SAMEKIND)
  272. case v2.ValueTypeInt16:
  273. return cast.ToInt16(v, cast.CONVERT_SAMEKIND)
  274. case v2.ValueTypeInt32:
  275. return cast.ToInt32(v, cast.CONVERT_SAMEKIND)
  276. case v2.ValueTypeInt64:
  277. return cast.ToInt64(v, cast.CONVERT_SAMEKIND)
  278. case v2.ValueTypeUint8:
  279. return cast.ToUint8(v, cast.CONVERT_SAMEKIND)
  280. case v2.ValueTypeUint16:
  281. return cast.ToUint16(v, cast.CONVERT_SAMEKIND)
  282. case v2.ValueTypeUint32:
  283. return cast.ToUint32(v, cast.CONVERT_SAMEKIND)
  284. case v2.ValueTypeUint64:
  285. return cast.ToUint64(v, cast.CONVERT_SAMEKIND)
  286. case v2.ValueTypeFloat32:
  287. return cast.ToFloat32(v, cast.CONVERT_SAMEKIND)
  288. case v2.ValueTypeFloat64:
  289. return cast.ToFloat64(v, cast.CONVERT_SAMEKIND)
  290. case v2.ValueTypeString:
  291. return cast.ToString(v, cast.CONVERT_SAMEKIND)
  292. case v2.ValueTypeBoolArray:
  293. return cast.ToBoolSlice(v, cast.CONVERT_SAMEKIND)
  294. case v2.ValueTypeInt8Array:
  295. return cast.ToTypedSlice(v, func(input interface{}, sn cast.Strictness) (interface{}, error) {
  296. return cast.ToInt8(input, sn)
  297. }, "int8", cast.CONVERT_SAMEKIND)
  298. case v2.ValueTypeInt16Array:
  299. return cast.ToTypedSlice(v, func(input interface{}, sn cast.Strictness) (interface{}, error) {
  300. return cast.ToInt16(input, sn)
  301. }, "int16", cast.CONVERT_SAMEKIND)
  302. case v2.ValueTypeInt32Array:
  303. return cast.ToTypedSlice(v, func(input interface{}, sn cast.Strictness) (interface{}, error) {
  304. return cast.ToInt32(input, sn)
  305. }, "int32", cast.CONVERT_SAMEKIND)
  306. case v2.ValueTypeInt64Array:
  307. return cast.ToInt64Slice(v, cast.CONVERT_SAMEKIND)
  308. case v2.ValueTypeUint8Array:
  309. return cast.ToTypedSlice(v, func(input interface{}, sn cast.Strictness) (interface{}, error) {
  310. return cast.ToUint8(input, sn)
  311. }, "uint8", cast.CONVERT_SAMEKIND)
  312. case v2.ValueTypeUint16Array:
  313. return cast.ToTypedSlice(v, func(input interface{}, sn cast.Strictness) (interface{}, error) {
  314. return cast.ToUint16(input, sn)
  315. }, "uint16", cast.CONVERT_SAMEKIND)
  316. case v2.ValueTypeUint32Array:
  317. return cast.ToTypedSlice(v, func(input interface{}, sn cast.Strictness) (interface{}, error) {
  318. return cast.ToUint32(input, sn)
  319. }, "uint32", cast.CONVERT_SAMEKIND)
  320. case v2.ValueTypeUint64Array:
  321. return cast.ToUint64Slice(v, cast.CONVERT_SAMEKIND)
  322. case v2.ValueTypeFloat32Array:
  323. return cast.ToTypedSlice(v, func(input interface{}, sn cast.Strictness) (interface{}, error) {
  324. return cast.ToFloat32(input, sn)
  325. }, "float32", cast.CONVERT_SAMEKIND)
  326. case v2.ValueTypeFloat64Array:
  327. return cast.ToFloat64Slice(v, cast.CONVERT_SAMEKIND)
  328. case v2.ValueTypeStringArray:
  329. return cast.ToStringSlice(v, cast.CONVERT_SAMEKIND)
  330. case v2.ValueTypeBinary:
  331. var (
  332. bv []byte
  333. err error
  334. )
  335. switch vv := v.(type) {
  336. case string:
  337. if bv, err = base64.StdEncoding.DecodeString(vv); err != nil {
  338. return nil, fmt.Errorf("fail to decode binary value from %s: %v", vv, err)
  339. }
  340. case []byte:
  341. bv = vv
  342. default:
  343. return nil, fmt.Errorf("fail to decode binary value from %v: not binary type", vv)
  344. }
  345. return bv, nil
  346. default:
  347. return nil, fmt.Errorf("unsupported type %v", vt)
  348. }
  349. }
  350. func (ems *EdgexMsgBusSink) getMeta(result []map[string]interface{}) *meta {
  351. if ems.metadata == "" {
  352. return newMetaFromMap(nil)
  353. }
  354. //Try to get the meta field
  355. for _, v := range result {
  356. if m, ok := v[ems.metadata]; ok {
  357. if m1, ok1 := m.(map[string]interface{}); ok1 {
  358. return newMetaFromMap(m1)
  359. } else {
  360. conf.Log.Infof("Specified a meta field, but the field does not contains any EdgeX metadata.")
  361. }
  362. }
  363. }
  364. return newMetaFromMap(nil)
  365. }
  366. func (ems *EdgexMsgBusSink) Collect(ctx api.StreamContext, item interface{}) error {
  367. logger := ctx.GetLogger()
  368. if payload, ok := item.([]byte); ok {
  369. logger.Debugf("EdgeX message bus sink: %s\n", payload)
  370. evt, err := ems.produceEvents(ctx, payload)
  371. if err != nil {
  372. return fmt.Errorf("Failed to convert to EdgeX event: %s.", err.Error())
  373. }
  374. data, err := json.Marshal(evt)
  375. if err != nil {
  376. return fmt.Errorf("unexpected error MarshalEvent %v", err)
  377. }
  378. env := types.NewMessageEnvelope([]byte(data), ctx)
  379. env.ContentType = ems.contentType
  380. if e := ems.client.Publish(env, ems.topic); e != nil {
  381. logger.Errorf("Found error %s when publish to EdgeX message bus.\n", e)
  382. return e
  383. }
  384. } else {
  385. return fmt.Errorf("Unkown type %t, the message cannot be published.\n", item)
  386. }
  387. return nil
  388. }
  389. func (ems *EdgexMsgBusSink) Close(ctx api.StreamContext) error {
  390. logger := ctx.GetLogger()
  391. logger.Infof("Closing edgex sink")
  392. if ems.client != nil {
  393. if e := ems.client.Disconnect(); e != nil {
  394. return e
  395. }
  396. }
  397. return nil
  398. }
  399. type eventMeta struct {
  400. id *string
  401. deviceName string
  402. profileName string
  403. sourceName string
  404. origin *int64
  405. tags map[string]string
  406. }
  407. type readingMeta struct {
  408. id *string
  409. deviceName *string
  410. profileName *string
  411. resourceName *string
  412. origin *int64
  413. valueType *string
  414. mediaType *string
  415. }
  416. func (m *readingMeta) decorate(r *dtos.BaseReading) dtos.BaseReading {
  417. if m.id != nil {
  418. r.Id = *m.id
  419. }
  420. if m.deviceName != nil {
  421. r.DeviceName = *m.deviceName
  422. }
  423. if m.profileName != nil {
  424. r.ProfileName = *m.profileName
  425. }
  426. if m.origin != nil {
  427. r.Origin = *m.origin
  428. }
  429. if m.valueType != nil {
  430. r.ValueType = *m.valueType
  431. }
  432. if m.mediaType != nil {
  433. r.MediaType = *m.mediaType
  434. }
  435. return *r
  436. }
  437. type meta struct {
  438. eventMeta
  439. readingMetas map[string]interface{}
  440. }
  441. func newMetaFromMap(m1 map[string]interface{}) *meta {
  442. result := &meta{
  443. eventMeta: eventMeta{
  444. profileName: "kuiperProfile",
  445. },
  446. }
  447. for k, v := range m1 {
  448. switch k {
  449. case "id":
  450. if v1, ok := v.(string); ok {
  451. result.id = &v1
  452. }
  453. case "deviceName":
  454. if v1, ok := v.(string); ok {
  455. result.deviceName = v1
  456. }
  457. case "profileName":
  458. if v1, ok := v.(string); ok {
  459. result.profileName = v1
  460. }
  461. case "sourceName":
  462. if v1, ok := v.(string); ok {
  463. result.sourceName = v1
  464. }
  465. case "origin":
  466. if v1, ok := v.(float64); ok {
  467. temp := int64(v1)
  468. result.origin = &temp
  469. }
  470. case "tags":
  471. if v1, ok1 := v.(map[string]interface{}); ok1 {
  472. r := make(map[string]string)
  473. for k, vi := range v1 {
  474. s, ok := vi.(string)
  475. if ok {
  476. r[k] = s
  477. }
  478. }
  479. result.tags = r
  480. }
  481. default:
  482. if result.readingMetas == nil {
  483. result.readingMetas = make(map[string]interface{})
  484. }
  485. result.readingMetas[k] = v
  486. }
  487. }
  488. return result
  489. }
  490. func (m *meta) readingMeta(ctx api.StreamContext, readingName string) *readingMeta {
  491. vi, ok := m.readingMetas[readingName]
  492. if !ok {
  493. return nil
  494. }
  495. m1, ok := vi.(map[string]interface{})
  496. if !ok {
  497. ctx.GetLogger().Errorf("reading %s meta is not a map, but %v", readingName, vi)
  498. return nil
  499. }
  500. result := &readingMeta{}
  501. for k, v := range m1 {
  502. switch k {
  503. case "id":
  504. if v1, ok := v.(string); ok {
  505. result.id = &v1
  506. }
  507. case "deviceName":
  508. if v1, ok := v.(string); ok {
  509. result.deviceName = &v1
  510. }
  511. case "profileName":
  512. if v1, ok := v.(string); ok {
  513. result.profileName = &v1
  514. }
  515. case "resourceName":
  516. if v1, ok := v.(string); ok {
  517. result.resourceName = &v1
  518. }
  519. case "origin":
  520. if v1, ok := v.(float64); ok {
  521. temp := int64(v1)
  522. result.origin = &temp
  523. }
  524. case "valueType":
  525. if v1, ok := v.(string); ok {
  526. result.valueType = &v1
  527. }
  528. case "mediaType":
  529. if v1, ok := v.(string); ok {
  530. result.mediaType = &v1
  531. }
  532. default:
  533. ctx.GetLogger().Warnf("reading %s meta got unknown field %s of value %v", readingName, k, v)
  534. }
  535. }
  536. return result
  537. }
  538. func (m *meta) createEvent() *dtos.Event {
  539. event := dtos.NewEvent(m.profileName, m.deviceName, m.sourceName)
  540. if m.id != nil {
  541. event.Id = *m.id
  542. }
  543. if m.origin != nil {
  544. event.Origin = *m.origin
  545. }
  546. if m.tags != nil {
  547. event.Tags = m.tags
  548. }
  549. return &event
  550. }