sink.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package memory
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "strings"
  19. "github.com/lf-edge/ekuiper/internal/io/memory/pubsub"
  20. "github.com/lf-edge/ekuiper/internal/topo/transform"
  21. "github.com/lf-edge/ekuiper/pkg/api"
  22. "github.com/lf-edge/ekuiper/pkg/ast"
  23. "github.com/lf-edge/ekuiper/pkg/cast"
  24. )
  25. type config struct {
  26. Topic string `json:"topic"`
  27. DataTemplate string `json:"dataTemplate"`
  28. RowkindField string `json:"rowkindField"`
  29. KeyField string `json:"keyField"`
  30. Fields []string `json:"fields"`
  31. DataField string `json:"dataField"`
  32. ResendTopic string `json:"resendDestination"`
  33. }
  34. type sink struct {
  35. topic string
  36. hasTransform bool
  37. keyField string
  38. rowkindField string
  39. fields []string
  40. dataField string
  41. resendTopic string
  42. }
  43. func (s *sink) Open(ctx api.StreamContext) error {
  44. ctx.GetLogger().Debugf("Opening memory sink: %v", s.topic)
  45. pubsub.CreatePub(s.topic)
  46. return nil
  47. }
  48. func (s *sink) Configure(props map[string]interface{}) error {
  49. cfg := &config{}
  50. err := cast.MapToStruct(props, cfg)
  51. if err != nil {
  52. return err
  53. }
  54. if strings.ContainsAny(cfg.Topic, "#+") {
  55. return fmt.Errorf("invalid memory topic %s: wildcard found", cfg.Topic)
  56. }
  57. s.topic = cfg.Topic
  58. if cfg.DataTemplate != "" {
  59. s.hasTransform = true
  60. }
  61. s.dataField = cfg.DataField
  62. s.fields = cfg.Fields
  63. s.rowkindField = cfg.RowkindField
  64. s.keyField = cfg.KeyField
  65. if s.rowkindField != "" && s.keyField == "" {
  66. return fmt.Errorf("keyField is required when rowkindField is set")
  67. }
  68. s.resendTopic = cfg.ResendTopic
  69. return nil
  70. }
  71. func (s *sink) collectWithTopic(ctx api.StreamContext, data interface{}, t string) error {
  72. topic, err := ctx.ParseTemplate(t, data)
  73. if err != nil {
  74. return err
  75. }
  76. if s.hasTransform {
  77. jsonBytes, _, err := ctx.TransformOutput(data)
  78. if err != nil {
  79. return err
  80. }
  81. m := make(map[string]interface{})
  82. err = json.Unmarshal(jsonBytes, &m)
  83. if err != nil {
  84. return fmt.Errorf("fail to decode data %s after applying dataTemplate for error %v", string(jsonBytes), err)
  85. }
  86. data = m
  87. } else {
  88. m, _, err := transform.TransItem(data, s.dataField, s.fields)
  89. if err != nil {
  90. return fmt.Errorf("fail to select fields %v for data %v", s.fields, data)
  91. }
  92. data = m
  93. }
  94. switch d := data.(type) {
  95. case []map[string]interface{}:
  96. for _, el := range d {
  97. err := s.publish(ctx, topic, el)
  98. if err != nil {
  99. return fmt.Errorf("fail to publish data %v for error %v", d, err)
  100. }
  101. }
  102. case map[string]interface{}:
  103. err := s.publish(ctx, topic, d)
  104. if err != nil {
  105. return fmt.Errorf("fail to publish data %v for error %v", d, err)
  106. }
  107. default:
  108. return fmt.Errorf("unrecognized format of %s", data)
  109. }
  110. return nil
  111. }
  112. func (s *sink) Collect(ctx api.StreamContext, data interface{}) error {
  113. ctx.GetLogger().Debugf("receive %+v", data)
  114. return s.collectWithTopic(ctx, data, s.topic)
  115. }
  116. func (s *sink) CollectResend(ctx api.StreamContext, data interface{}) error {
  117. ctx.GetLogger().Debugf("resend %+v", data)
  118. return s.collectWithTopic(ctx, data, s.resendTopic)
  119. }
  120. func (s *sink) Close(ctx api.StreamContext) error {
  121. ctx.GetLogger().Debugf("closing memory sink")
  122. pubsub.RemovePub(s.topic)
  123. return nil
  124. }
  125. func (s *sink) publish(ctx api.StreamContext, topic string, el map[string]interface{}) error {
  126. if s.rowkindField != "" {
  127. c, ok := el[s.rowkindField]
  128. var rowkind string
  129. if !ok {
  130. rowkind = ast.RowkindUpsert
  131. } else {
  132. rowkind, ok = c.(string)
  133. if !ok {
  134. return fmt.Errorf("rowkind field %s is not a string in data %v", s.rowkindField, el)
  135. }
  136. if rowkind != ast.RowkindInsert && rowkind != ast.RowkindUpdate && rowkind != ast.RowkindDelete && rowkind != ast.RowkindUpsert {
  137. return fmt.Errorf("invalid rowkind %s", rowkind)
  138. }
  139. }
  140. key, ok := el[s.keyField]
  141. if !ok {
  142. return fmt.Errorf("key field %s not found in data %v", s.keyField, el)
  143. }
  144. pubsub.ProduceUpdatable(ctx, topic, el, rowkind, key)
  145. } else {
  146. pubsub.Produce(ctx, topic, el)
  147. }
  148. return nil
  149. }