sink.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package memory
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "strings"
  19. "github.com/lf-edge/ekuiper/internal/io/memory/pubsub"
  20. "github.com/lf-edge/ekuiper/internal/topo/transform"
  21. "github.com/lf-edge/ekuiper/pkg/api"
  22. "github.com/lf-edge/ekuiper/pkg/ast"
  23. "github.com/lf-edge/ekuiper/pkg/cast"
  24. )
  25. type config struct {
  26. Topic string `json:"topic"`
  27. DataTemplate string `json:"dataTemplate"`
  28. RowkindField string `json:"rowkindField"`
  29. KeyField string `json:"keyField"`
  30. Fields []string `json:"fields"`
  31. DataField string `json:"dataField"`
  32. ResendTopic string `json:"resendDestination"`
  33. }
  34. type sink struct {
  35. topic string
  36. hasTransform bool
  37. keyField string
  38. rowkindField string
  39. fields []string
  40. dataField string
  41. resendTopic string
  42. }
  43. func (s *sink) Open(ctx api.StreamContext) error {
  44. ctx.GetLogger().Debugf("Opening memory sink: %v", s.topic)
  45. pubsub.CreatePub(s.topic)
  46. return nil
  47. }
  48. func (s *sink) Configure(props map[string]interface{}) error {
  49. cfg := &config{}
  50. err := cast.MapToStruct(props, cfg)
  51. if err != nil {
  52. return err
  53. }
  54. if strings.ContainsAny(cfg.Topic, "#+") {
  55. return fmt.Errorf("invalid memory topic %s: wildcard found", cfg.Topic)
  56. }
  57. s.topic = cfg.Topic
  58. if cfg.DataTemplate != "" {
  59. s.hasTransform = true
  60. }
  61. s.dataField = cfg.DataField
  62. s.fields = cfg.Fields
  63. s.rowkindField = cfg.RowkindField
  64. s.keyField = cfg.KeyField
  65. if s.rowkindField != "" && s.keyField == "" {
  66. return fmt.Errorf("keyField is required when rowkindField is set")
  67. }
  68. s.resendTopic = cfg.ResendTopic
  69. if s.resendTopic == "" {
  70. s.resendTopic = s.topic
  71. }
  72. return nil
  73. }
  74. func (s *sink) collectWithTopic(ctx api.StreamContext, data interface{}, t string) error {
  75. topic, err := ctx.ParseTemplate(t, data)
  76. if err != nil {
  77. return err
  78. }
  79. if s.hasTransform {
  80. jsonBytes, _, err := ctx.TransformOutput(data)
  81. if err != nil {
  82. return err
  83. }
  84. m := make(map[string]interface{})
  85. err = json.Unmarshal(jsonBytes, &m)
  86. if err != nil {
  87. return fmt.Errorf("fail to decode data %s after applying dataTemplate for error %v", string(jsonBytes), err)
  88. }
  89. data = m
  90. } else {
  91. m, _, err := transform.TransItem(data, s.dataField, s.fields)
  92. if err != nil {
  93. return fmt.Errorf("fail to select fields %v for data %v", s.fields, data)
  94. }
  95. data = m
  96. }
  97. switch d := data.(type) {
  98. case []map[string]interface{}:
  99. for _, el := range d {
  100. err := s.publish(ctx, topic, el)
  101. if err != nil {
  102. return fmt.Errorf("fail to publish data %v for error %v", d, err)
  103. }
  104. }
  105. case map[string]interface{}:
  106. err := s.publish(ctx, topic, d)
  107. if err != nil {
  108. return fmt.Errorf("fail to publish data %v for error %v", d, err)
  109. }
  110. default:
  111. return fmt.Errorf("unrecognized format of %s", data)
  112. }
  113. return nil
  114. }
  115. func (s *sink) Collect(ctx api.StreamContext, data interface{}) error {
  116. ctx.GetLogger().Debugf("receive %+v", data)
  117. return s.collectWithTopic(ctx, data, s.topic)
  118. }
  119. func (s *sink) CollectResend(ctx api.StreamContext, data interface{}) error {
  120. ctx.GetLogger().Debugf("resend %+v", data)
  121. return s.collectWithTopic(ctx, data, s.resendTopic)
  122. }
  123. func (s *sink) Close(ctx api.StreamContext) error {
  124. ctx.GetLogger().Debugf("closing memory sink")
  125. pubsub.RemovePub(s.topic)
  126. return nil
  127. }
  128. func (s *sink) publish(ctx api.StreamContext, topic string, el map[string]interface{}) error {
  129. if s.rowkindField != "" {
  130. c, ok := el[s.rowkindField]
  131. var rowkind string
  132. if !ok {
  133. rowkind = ast.RowkindUpsert
  134. } else {
  135. rowkind, ok = c.(string)
  136. if !ok {
  137. return fmt.Errorf("rowkind field %s is not a string in data %v", s.rowkindField, el)
  138. }
  139. if rowkind != ast.RowkindInsert && rowkind != ast.RowkindUpdate && rowkind != ast.RowkindDelete && rowkind != ast.RowkindUpsert {
  140. return fmt.Errorf("invalid rowkind %s", rowkind)
  141. }
  142. }
  143. key, ok := el[s.keyField]
  144. if !ok {
  145. return fmt.Errorf("key field %s not found in data %v", s.keyField, el)
  146. }
  147. pubsub.ProduceUpdatable(ctx, topic, el, rowkind, key)
  148. } else {
  149. pubsub.Produce(ctx, topic, el)
  150. }
  151. return nil
  152. }