kafka.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. // Copyright 2023-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "fmt"
  17. "strings"
  18. kafkago "github.com/segmentio/kafka-go"
  19. "github.com/segmentio/kafka-go/sasl"
  20. "github.com/segmentio/kafka-go/sasl/plain"
  21. "github.com/segmentio/kafka-go/sasl/scram"
  22. "github.com/lf-edge/ekuiper/pkg/api"
  23. "github.com/lf-edge/ekuiper/pkg/cast"
  24. "github.com/lf-edge/ekuiper/pkg/errorx"
  25. )
  26. type kafkaSink struct {
  27. writer *kafkago.Writer
  28. c *sinkConf
  29. }
  30. const (
  31. SASL_NONE = "none"
  32. SASL_PLAIN = "plain"
  33. SASL_SCRAM = "scram"
  34. )
  35. type sinkConf struct {
  36. Brokers string `json:"brokers"`
  37. Topic string `json:"topic"`
  38. SaslAuthType string `json:"saslAuthType"`
  39. SaslUserName string `json:"saslUserName"`
  40. SaslPassword string `json:"saslPassword"`
  41. }
  42. func (m *kafkaSink) Configure(props map[string]interface{}) error {
  43. c := &sinkConf{
  44. Brokers: "localhost:9092",
  45. Topic: "",
  46. SaslAuthType: SASL_NONE,
  47. }
  48. if err := cast.MapToStruct(props, c); err != nil {
  49. return err
  50. }
  51. if len(strings.Split(c.Brokers, ",")) == 0 {
  52. return fmt.Errorf("brokers can not be empty")
  53. }
  54. if c.Topic == "" {
  55. return fmt.Errorf("topic can not be empty")
  56. }
  57. if !(c.SaslAuthType == SASL_NONE || c.SaslAuthType == SASL_SCRAM || c.SaslAuthType == SASL_PLAIN) {
  58. return fmt.Errorf("saslAuthType incorrect")
  59. }
  60. if (c.SaslAuthType == SASL_SCRAM || c.SaslAuthType == SASL_PLAIN) && (c.SaslUserName == "" || c.SaslPassword == "") {
  61. return fmt.Errorf("username and password can not be empty")
  62. }
  63. m.c = c
  64. return nil
  65. }
  66. func (m *kafkaSink) Open(ctx api.StreamContext) error {
  67. ctx.GetLogger().Debug("Opening kafka sink")
  68. var err error
  69. var mechanism sasl.Mechanism
  70. // sasl authentication type
  71. switch m.c.SaslAuthType {
  72. case SASL_PLAIN:
  73. mechanism = plain.Mechanism{
  74. Username: m.c.SaslUserName,
  75. Password: m.c.SaslPassword,
  76. }
  77. case SASL_SCRAM:
  78. mechanism, err = scram.Mechanism(scram.SHA512, m.c.SaslUserName, m.c.SaslPassword)
  79. if err != nil {
  80. return err
  81. }
  82. default:
  83. mechanism = nil
  84. }
  85. brokers := strings.Split(m.c.Brokers, ",")
  86. w := &kafkago.Writer{
  87. Addr: kafkago.TCP(brokers...),
  88. Topic: m.c.Topic,
  89. Balancer: &kafkago.LeastBytes{},
  90. Async: false,
  91. AllowAutoTopicCreation: true,
  92. MaxAttempts: 1,
  93. RequiredAcks: -1,
  94. BatchSize: 1,
  95. Transport: &kafkago.Transport{
  96. SASL: mechanism,
  97. },
  98. }
  99. m.writer = w
  100. return nil
  101. }
  102. func (m *kafkaSink) Collect(ctx api.StreamContext, item interface{}) error {
  103. logger := ctx.GetLogger()
  104. logger.Debugf("kafka sink receive %s", item)
  105. var messages []kafkago.Message
  106. switch d := item.(type) {
  107. case []map[string]interface{}:
  108. for _, el := range d {
  109. decodedBytes, _, err := ctx.TransformOutput(el)
  110. if err != nil {
  111. return fmt.Errorf("kafka sink transform data error: %v", err)
  112. }
  113. messages = append(messages, kafkago.Message{Value: decodedBytes})
  114. }
  115. case map[string]interface{}:
  116. decodedBytes, _, err := ctx.TransformOutput(d)
  117. if err != nil {
  118. return fmt.Errorf("kafka sink transform data error: %v", err)
  119. }
  120. messages = append(messages, kafkago.Message{Value: decodedBytes})
  121. default:
  122. return fmt.Errorf("unrecognized format of %s", item)
  123. }
  124. err := m.writer.WriteMessages(ctx, messages...)
  125. switch err := err.(type) {
  126. case kafkago.Error:
  127. if err.Temporary() {
  128. return fmt.Errorf(`%s: kafka sink fails to send out the data . %v`, errorx.IOErr, err)
  129. }
  130. case kafkago.WriteErrors:
  131. count := 0
  132. for i := range messages {
  133. switch err := err[i].(type) {
  134. case nil:
  135. continue
  136. case kafkago.Error:
  137. if err.Temporary() {
  138. count++
  139. continue
  140. }
  141. }
  142. }
  143. if count == len(messages) {
  144. return fmt.Errorf(`%s: kafka sink fails to send out the data . %v`, errorx.IOErr, err)
  145. }
  146. }
  147. return err
  148. }
  149. func (m *kafkaSink) Close(ctx api.StreamContext) error {
  150. return m.writer.Close()
  151. }
  152. func Kafka() api.Sink {
  153. return &kafkaSink{}
  154. }