kafka.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. // Copyright 2023 carlclone@gmail.com
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "context"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/pkg/api"
  19. "github.com/lf-edge/ekuiper/pkg/cast"
  20. kafkago "github.com/segmentio/kafka-go"
  21. "github.com/segmentio/kafka-go/sasl"
  22. "github.com/segmentio/kafka-go/sasl/plain"
  23. "github.com/segmentio/kafka-go/sasl/scram"
  24. "strings"
  25. "time"
  26. )
  27. type kafkaSink struct {
  28. writer *kafkago.Writer
  29. c *sinkConf
  30. }
  31. const (
  32. SASL_NONE = "none"
  33. SASL_PLAIN = "plain"
  34. SASL_SCRAM = "scram"
  35. )
  36. type sinkConf struct {
  37. Brokers string `json:"brokers"`
  38. Topic string `json:"topic"`
  39. SaslAuthType string `json:"saslAuthType"`
  40. SaslUserName string `json:"saslUserName"`
  41. SaslPassword string `json:"saslPassword"`
  42. }
  43. func (m *kafkaSink) Configure(props map[string]interface{}) error {
  44. c := &sinkConf{
  45. Brokers: "localhost:9092",
  46. Topic: "",
  47. SaslAuthType: SASL_NONE,
  48. }
  49. if err := cast.MapToStruct(props, c); err != nil {
  50. return err
  51. }
  52. if len(strings.Split(c.Brokers, ",")) == 0 {
  53. return fmt.Errorf("brokers can not be empty")
  54. }
  55. if c.Topic == "" {
  56. return fmt.Errorf("topic can not be empty")
  57. }
  58. if !(c.SaslAuthType == SASL_NONE || c.SaslAuthType == SASL_SCRAM || c.SaslAuthType == SASL_PLAIN) {
  59. return fmt.Errorf("saslAuthType incorrect")
  60. }
  61. if (c.SaslAuthType == SASL_SCRAM || c.SaslAuthType == SASL_PLAIN) && (c.SaslUserName == "" || c.SaslPassword == "") {
  62. return fmt.Errorf("username and password can not be empty")
  63. }
  64. m.c = c
  65. return nil
  66. }
  67. func (m *kafkaSink) Open(ctx api.StreamContext) error {
  68. ctx.GetLogger().Debug("Opening kafka sink")
  69. var err error
  70. var mechanism sasl.Mechanism
  71. //sasl authentication type
  72. switch m.c.SaslAuthType {
  73. case SASL_PLAIN:
  74. mechanism = plain.Mechanism{
  75. Username: m.c.SaslUserName,
  76. Password: m.c.SaslPassword,
  77. }
  78. case SASL_SCRAM:
  79. mechanism, err = scram.Mechanism(scram.SHA512, m.c.SaslUserName, m.c.SaslPassword)
  80. if err != nil {
  81. return err
  82. }
  83. default:
  84. mechanism = nil
  85. }
  86. brokers := strings.Split(m.c.Brokers, ",")
  87. w := &kafkago.Writer{
  88. Addr: kafkago.TCP(brokers...),
  89. Topic: m.c.Topic,
  90. Balancer: &kafkago.LeastBytes{},
  91. Async: false,
  92. AllowAutoTopicCreation: true,
  93. MaxAttempts: 10,
  94. RequiredAcks: -1,
  95. Transport: &kafkago.Transport{
  96. SASL: mechanism,
  97. },
  98. }
  99. m.writer = w
  100. return nil
  101. }
  102. func (m *kafkaSink) Collect(ctx api.StreamContext, item interface{}) error {
  103. logger := ctx.GetLogger()
  104. logger.Debugf("kafka sink receive %s", item)
  105. var messages []kafkago.Message
  106. switch d := item.(type) {
  107. case []map[string]interface{}:
  108. for _, el := range d {
  109. decodedBytes, _, err := ctx.TransformOutput(el)
  110. if err != nil {
  111. return fmt.Errorf("kafka sink transform data error: %v", err)
  112. }
  113. messages = append(messages, kafkago.Message{Value: decodedBytes})
  114. }
  115. case map[string]interface{}:
  116. decodedBytes, _, err := ctx.TransformOutput(d)
  117. if err != nil {
  118. return fmt.Errorf("kafka sink transform data error: %v", err)
  119. }
  120. messages = append(messages, kafkago.Message{Value: decodedBytes})
  121. default:
  122. return fmt.Errorf("unrecognized format of %s", item)
  123. }
  124. vctx, cancel := context.WithTimeout(ctx, 10*time.Second)
  125. defer cancel()
  126. err := m.kafkaWriteWithBackoff(vctx, ctx.GetLogger().Errorf, 100*time.Millisecond, time.Second, messages...)
  127. if err != nil {
  128. return err
  129. }
  130. logger.Debug("insert data into kafka success")
  131. return nil
  132. }
  133. func (m *kafkaSink) kafkaWriteWithBackoff(ctx context.Context, log func(string, ...interface{}), interval, maxInterval time.Duration, messages ...kafkago.Message) error {
  134. var berr error
  135. tries := 0
  136. retry:
  137. for {
  138. tries++
  139. err := m.writer.WriteMessages(ctx, messages...)
  140. switch err := err.(type) {
  141. case nil:
  142. return nil
  143. case kafkago.Error:
  144. berr = err
  145. if !err.Temporary() {
  146. break retry
  147. }
  148. case kafkago.WriteErrors:
  149. var remaining []kafkago.Message
  150. for i, m := range messages {
  151. switch err := err[i].(type) {
  152. case nil:
  153. continue
  154. case kafkago.Error:
  155. if err.Temporary() {
  156. remaining = append(remaining, m)
  157. continue
  158. }
  159. }
  160. return fmt.Errorf("failed to deliver messages: %v", err)
  161. }
  162. messages = remaining
  163. berr = err
  164. default:
  165. if berr == nil || err != context.DeadlineExceeded {
  166. berr = err
  167. }
  168. break retry
  169. }
  170. log("temporary write error: %v", err)
  171. interval *= 2
  172. if interval > maxInterval {
  173. interval = maxInterval
  174. }
  175. timer := time.NewTimer(interval)
  176. select {
  177. case <-timer.C:
  178. case <-ctx.Done():
  179. timer.Stop()
  180. break retry
  181. }
  182. }
  183. return fmt.Errorf("failed to deliver messages after %d tries: %v", tries, berr)
  184. }
  185. func (m *kafkaSink) Close(ctx api.StreamContext) error {
  186. return m.writer.Close()
  187. }
  188. func Kafka() api.Sink {
  189. return &kafkaSink{}
  190. }