sink.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package memory
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "strings"
  19. "github.com/lf-edge/ekuiper/internal/io/memory/pubsub"
  20. "github.com/lf-edge/ekuiper/internal/topo/transform"
  21. "github.com/lf-edge/ekuiper/pkg/api"
  22. "github.com/lf-edge/ekuiper/pkg/ast"
  23. "github.com/lf-edge/ekuiper/pkg/cast"
  24. )
  25. type config struct {
  26. Topic string `json:"topic"`
  27. DataTemplate string `json:"dataTemplate"`
  28. RowkindField string `json:"rowkindField"`
  29. KeyField string `json:"keyField"`
  30. Fields []string `json:"fields"`
  31. DataField string `json:"dataField"`
  32. }
  33. type sink struct {
  34. topic string
  35. hasTransform bool
  36. keyField string
  37. rowkindField string
  38. fields []string
  39. dataField string
  40. }
  41. func (s *sink) Open(ctx api.StreamContext) error {
  42. ctx.GetLogger().Debugf("Opening memory sink: %v", s.topic)
  43. pubsub.CreatePub(s.topic)
  44. return nil
  45. }
  46. func (s *sink) Configure(props map[string]interface{}) error {
  47. cfg := &config{}
  48. err := cast.MapToStruct(props, cfg)
  49. if err != nil {
  50. return err
  51. }
  52. if strings.ContainsAny(cfg.Topic, "#+") {
  53. return fmt.Errorf("invalid memory topic %s: wildcard found", cfg.Topic)
  54. }
  55. s.topic = cfg.Topic
  56. if cfg.DataTemplate != "" {
  57. s.hasTransform = true
  58. }
  59. s.dataField = cfg.DataField
  60. s.fields = cfg.Fields
  61. s.rowkindField = cfg.RowkindField
  62. s.keyField = cfg.KeyField
  63. if s.rowkindField != "" && s.keyField == "" {
  64. return fmt.Errorf("keyField is required when rowkindField is set")
  65. }
  66. return nil
  67. }
  68. func (s *sink) Collect(ctx api.StreamContext, data interface{}) error {
  69. ctx.GetLogger().Debugf("receive %+v", data)
  70. topic, err := ctx.ParseTemplate(s.topic, data)
  71. if err != nil {
  72. return err
  73. }
  74. if s.hasTransform {
  75. jsonBytes, _, err := ctx.TransformOutput(data)
  76. if err != nil {
  77. return err
  78. }
  79. m := make(map[string]interface{})
  80. err = json.Unmarshal(jsonBytes, &m)
  81. if err != nil {
  82. return fmt.Errorf("fail to decode data %s after applying dataTemplate for error %v", string(jsonBytes), err)
  83. }
  84. data = m
  85. } else {
  86. m, _, err := transform.TransItem(data, s.dataField, s.fields)
  87. if err != nil {
  88. return fmt.Errorf("fail to select fields %v for data %v", s.fields, data)
  89. }
  90. data = m
  91. }
  92. switch d := data.(type) {
  93. case []map[string]interface{}:
  94. for _, el := range d {
  95. err := s.publish(ctx, topic, el)
  96. if err != nil {
  97. return fmt.Errorf("fail to publish data %v for error %v", d, err)
  98. }
  99. }
  100. case map[string]interface{}:
  101. err := s.publish(ctx, topic, d)
  102. if err != nil {
  103. return fmt.Errorf("fail to publish data %v for error %v", d, err)
  104. }
  105. default:
  106. return fmt.Errorf("unrecognized format of %s", data)
  107. }
  108. return nil
  109. }
  110. func (s *sink) Close(ctx api.StreamContext) error {
  111. ctx.GetLogger().Debugf("closing memory sink")
  112. pubsub.RemovePub(s.topic)
  113. return nil
  114. }
  115. func (s *sink) publish(ctx api.StreamContext, topic string, el map[string]interface{}) error {
  116. if s.rowkindField != "" {
  117. c, ok := el[s.rowkindField]
  118. var rowkind string
  119. if !ok {
  120. rowkind = ast.RowkindUpsert
  121. } else {
  122. rowkind, ok = c.(string)
  123. if !ok {
  124. return fmt.Errorf("rowkind field %s is not a string in data %v", s.rowkindField, el)
  125. }
  126. if rowkind != ast.RowkindInsert && rowkind != ast.RowkindUpdate && rowkind != ast.RowkindDelete && rowkind != ast.RowkindUpsert {
  127. return fmt.Errorf("invalid rowkind %s", rowkind)
  128. }
  129. }
  130. key, ok := el[s.keyField]
  131. if !ok {
  132. return fmt.Errorf("key field %s not found in data %v", s.keyField, el)
  133. }
  134. pubsub.ProduceUpdatable(ctx, topic, el, rowkind, key)
  135. } else {
  136. pubsub.Produce(ctx, topic, el)
  137. }
  138. return nil
  139. }