123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563 |
- // Copyright 2022-2023 EMQ Technologies Co., Ltd.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package node
- import (
- "fmt"
- "strings"
- "sync"
- "github.com/lf-edge/ekuiper/internal/binder/io"
- "github.com/lf-edge/ekuiper/internal/conf"
- sinkUtil "github.com/lf-edge/ekuiper/internal/io/sink"
- "github.com/lf-edge/ekuiper/internal/topo/context"
- "github.com/lf-edge/ekuiper/internal/topo/node/cache"
- nodeConf "github.com/lf-edge/ekuiper/internal/topo/node/conf"
- "github.com/lf-edge/ekuiper/internal/topo/node/metric"
- "github.com/lf-edge/ekuiper/internal/topo/transform"
- "github.com/lf-edge/ekuiper/internal/xsql"
- "github.com/lf-edge/ekuiper/pkg/api"
- "github.com/lf-edge/ekuiper/pkg/cast"
- "github.com/lf-edge/ekuiper/pkg/errorx"
- "github.com/lf-edge/ekuiper/pkg/infra"
- "github.com/lf-edge/ekuiper/pkg/message"
- )
- type SinkConf struct {
- Concurrency int `json:"concurrency"`
- Omitempty bool `json:"omitIfEmpty"`
- SendSingle bool `json:"sendSingle"`
- DataTemplate string `json:"dataTemplate"`
- Format string `json:"format"`
- SchemaId string `json:"schemaId"`
- Delimiter string `json:"delimiter"`
- BufferLength int `json:"bufferLength"`
- Fields []string `json:"fields"`
- DataField string `json:"dataField"`
- BatchSize int `json:"batchSize"`
- LingerInterval int `json:"lingerInterval"`
- conf.SinkConf
- }
- func (sc *SinkConf) isBatchSinkEnabled() bool {
- if sc.BatchSize > 0 || sc.LingerInterval > 0 {
- return true
- }
- return false
- }
- type SinkNode struct {
- *defaultSinkNode
- // static
- sinkType string
- mutex sync.RWMutex
- // configs (also static for sinks)
- options map[string]interface{}
- isMock bool
- // states varies after restart
- sinks []api.Sink
- }
- func NewSinkNode(name string, sinkType string, props map[string]interface{}) *SinkNode {
- bufferLength := 1024
- if c, ok := props["bufferLength"]; ok {
- if t, err := cast.ToInt(c, cast.STRICT); err != nil || t <= 0 {
- // invalid property bufferLength
- } else {
- bufferLength = t
- }
- }
- return &SinkNode{
- defaultSinkNode: &defaultSinkNode{
- input: make(chan interface{}, bufferLength),
- defaultNode: &defaultNode{
- name: name,
- concurrency: 1,
- ctx: nil,
- },
- },
- sinkType: sinkType,
- options: props,
- }
- }
- // NewSinkNodeWithSink Only for mock source, do not use it in production
- func NewSinkNodeWithSink(name string, sink api.Sink, props map[string]interface{}) *SinkNode {
- return &SinkNode{
- defaultSinkNode: &defaultSinkNode{
- input: make(chan interface{}, 1024),
- defaultNode: &defaultNode{
- name: name,
- concurrency: 1,
- ctx: nil,
- },
- },
- sinks: []api.Sink{sink},
- options: props,
- isMock: true,
- }
- }
- func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
- m.ctx = ctx
- logger := ctx.GetLogger()
- logger.Debugf("open sink node %s", m.name)
- go func() {
- err := infra.SafeRun(func() error {
- sconf, err := m.parseConf(logger)
- if err != nil {
- return err
- }
- tf, err := transform.GenTransform(sconf.DataTemplate, sconf.Format, sconf.SchemaId, sconf.Delimiter, sconf.DataField, sconf.Fields)
- if err != nil {
- msg := fmt.Sprintf("property dataTemplate %v is invalid: %v", sconf.DataTemplate, err)
- logger.Warnf(msg)
- return fmt.Errorf(msg)
- }
- ctx = context.WithValue(ctx.(*context.DefaultContext), context.TransKey, tf)
- m.reset()
- logger.Infof("open sink node %d instances", m.concurrency)
- for i := 0; i < m.concurrency; i++ { // workers
- go func(instance int) {
- panicOrError := infra.SafeRun(func() error {
- var (
- sink api.Sink
- err error
- )
- if !m.isMock {
- logger.Debugf("Trying to get sink for rule %s with options %v\n", ctx.GetRuleId(), m.options)
- sink, err = getSink(m.sinkType, m.options)
- if err != nil {
- return err
- }
- logger.Debugf("Successfully get the sink %s", m.sinkType)
- m.mutex.Lock()
- m.sinks = append(m.sinks, sink)
- m.mutex.Unlock()
- logger.Debugf("Now is to open sink for rule %s.\n", ctx.GetRuleId())
- if err := sink.Open(ctx); err != nil {
- return err
- }
- logger.Debugf("Successfully open sink for rule %s.\n", ctx.GetRuleId())
- } else {
- sink = m.sinks[instance]
- }
- stats, err := metric.NewStatManager(ctx, "sink")
- if err != nil {
- return err
- }
- m.mutex.Lock()
- m.statManagers = append(m.statManagers, stats)
- m.mutex.Unlock()
- // The sink flow is: receive -> batch -> cache -> send.
- // In the outside loop, send received data to batch/cache by dataCh and receive data be dataOutCh
- // Only need to deal with dataOutCh in the outer loop
- dataCh := make(chan []map[string]interface{}, sconf.BufferLength)
- var (
- dataOutCh <-chan []map[string]interface{}
- resendCh chan []map[string]interface{}
- sendManager *sinkUtil.SendManager
- c *cache.SyncCache
- rq *cache.SyncCache
- )
- if sconf.isBatchSinkEnabled() {
- sendManager, err = sinkUtil.NewSendManager(sconf.BatchSize, sconf.LingerInterval)
- if err != nil {
- return err
- }
- go sendManager.Run(ctx)
- }
- if !sconf.EnableCache {
- if sendManager != nil {
- dataOutCh = sendManager.GetOutputChan()
- } else {
- dataOutCh = dataCh
- }
- } else {
- if sendManager != nil {
- c = cache.NewSyncCache(ctx, sendManager.GetOutputChan(), result, &sconf.SinkConf, sconf.BufferLength)
- } else {
- c = cache.NewSyncCache(ctx, dataCh, result, &sconf.SinkConf, sconf.BufferLength)
- }
- if sconf.ResendAlterQueue {
- resendCh = make(chan []map[string]interface{}, sconf.BufferLength)
- rq = cache.NewSyncCache(ctx, resendCh, result, &sconf.SinkConf, sconf.BufferLength)
- }
- dataOutCh = c.Out
- }
- receiveQ := func(data interface{}) {
- processed := false
- if data, processed = m.preprocess(data); processed {
- return
- }
- stats.IncTotalRecordsIn()
- stats.SetBufferLength(bufferLen(dataCh, c, rq))
- outs := itemToMap(data)
- if sconf.Omitempty && (data == nil || len(outs) == 0) {
- ctx.GetLogger().Debugf("receive empty in sink")
- return
- }
- if sconf.isBatchSinkEnabled() {
- for _, out := range outs {
- sendManager.RecvData(out)
- }
- } else {
- select {
- case dataCh <- outs:
- case <-ctx.Done():
- }
- }
- if resendCh != nil {
- select {
- case resendCh <- nil:
- case <-ctx.Done():
- }
- }
- }
- normalQ := func(data []map[string]interface{}) {
- stats.ProcessTimeStart()
- stats.SetBufferLength(bufferLen(dataCh, c, rq))
- ctx.GetLogger().Debugf("sending data: %v", data)
- err := doCollectMaps(ctx, sink, sconf, data, stats, false)
- if sconf.EnableCache {
- ack := checkAck(ctx, data, err)
- if sconf.ResendAlterQueue {
- // If ack is false, add it to the resend queue
- if !ack {
- select {
- case resendCh <- data:
- case <-ctx.Done():
- }
- }
- // Always ack for the normal queue as fail items are handled by the resend queue
- select {
- case c.Ack <- true:
- case <-ctx.Done():
- }
- } else {
- select {
- case c.Ack <- ack:
- case <-ctx.Done():
- }
- }
- }
- stats.ProcessTimeEnd()
- }
- resendQ := func(data []map[string]interface{}) {
- ctx.GetLogger().Debugf("resend data: %v", data)
- stats.SetBufferLength(bufferLen(dataCh, c, rq))
- if sconf.ResendIndicatorField != "" {
- for _, item := range data {
- item[sconf.ResendIndicatorField] = true
- }
- }
- err := doCollectMaps(ctx, sink, sconf, data, stats, true)
- ack := checkAck(ctx, data, err)
- select {
- case rq.Ack <- ack:
- case <-ctx.Done():
- }
- }
- doneQ := func() {
- logger.Infof("sink node %s instance %d done", m.name, instance)
- if err := sink.Close(ctx); err != nil {
- logger.Warnf("close sink node %s instance %d fails: %v", m.name, instance, err)
- }
- }
- if resendCh == nil { // no resend strategy
- for {
- select {
- case data := <-m.input:
- receiveQ(data)
- case data := <-dataOutCh:
- normalQ(data)
- case <-ctx.Done():
- doneQ()
- return nil
- }
- }
- } else {
- if sconf.ResendPriority == 0 {
- for {
- select {
- case data := <-m.input:
- receiveQ(data)
- case data := <-dataOutCh:
- normalQ(data)
- case data := <-rq.Out:
- resendQ(data)
- case <-ctx.Done():
- doneQ()
- return nil
- }
- }
- } else if sconf.ResendPriority < 0 { // normal queue has higher priority
- for {
- select {
- case data := <-m.input:
- receiveQ(data)
- case data := <-dataOutCh:
- normalQ(data)
- case <-ctx.Done():
- doneQ()
- return nil
- default:
- select {
- case data := <-dataOutCh:
- normalQ(data)
- case data := <-rq.Out:
- resendQ(data)
- }
- }
- }
- } else {
- for {
- select {
- case data := <-m.input:
- receiveQ(data)
- case data := <-rq.Out:
- resendQ(data)
- case <-ctx.Done():
- doneQ()
- return nil
- default:
- select {
- case data := <-dataOutCh:
- normalQ(data)
- case data := <-rq.Out:
- resendQ(data)
- }
- }
- }
- }
- }
- })
- if panicOrError != nil {
- infra.DrainError(ctx, panicOrError, result)
- }
- }(i)
- }
- return nil
- })
- if err != nil {
- infra.DrainError(ctx, err, result)
- }
- }()
- }
- func bufferLen(dataCh chan []map[string]interface{}, c *cache.SyncCache, rq *cache.SyncCache) int64 {
- l := len(dataCh)
- if c != nil {
- l += c.CacheLength
- }
- if rq != nil {
- l += rq.CacheLength
- }
- return int64(l)
- }
- func checkAck(ctx api.StreamContext, data interface{}, err error) bool {
- if err != nil {
- if strings.HasPrefix(err.Error(), errorx.IOErr) { // do not log to prevent a lot of logs!
- return false
- } else {
- ctx.GetLogger().Warnf("sink node %s instance %d publish %s error: %v", ctx.GetOpId(), ctx.GetInstanceId(), data, err)
- }
- } else {
- ctx.GetLogger().Debugf("sent data: %v", data)
- }
- return true
- }
- func (m *SinkNode) parseConf(logger api.Logger) (*SinkConf, error) {
- sconf := &SinkConf{
- Concurrency: 1,
- Omitempty: false,
- SendSingle: false,
- DataTemplate: "",
- SinkConf: *conf.Config.Sink,
- BufferLength: 1024,
- }
- err := cast.MapToStruct(m.options, sconf)
- if err != nil {
- return nil, fmt.Errorf("read properties %v fail with error: %v", m.options, err)
- }
- if sconf.Concurrency <= 0 {
- logger.Warnf("invalid type for concurrency property, should be positive integer but found %t", sconf.Concurrency)
- sconf.Concurrency = 1
- }
- m.concurrency = sconf.Concurrency
- if sconf.Format == "" {
- sconf.Format = "json"
- } else if sconf.Format != message.FormatJson && sconf.Format != message.FormatProtobuf && sconf.Format != message.FormatBinary && sconf.Format != message.FormatCustom && sconf.Format != message.FormatDelimited {
- logger.Warnf("invalid type for format property, should be json protobuf or binary but found %s", sconf.Format)
- sconf.Format = "json"
- }
- err = cast.MapToStruct(m.options, &sconf.SinkConf)
- if err != nil {
- return nil, fmt.Errorf("read properties %v to cache conf fail with error: %v", m.options, err)
- }
- if sconf.DataField == "" {
- if v, ok := m.options["tableDataField"]; ok {
- sconf.DataField = v.(string)
- }
- }
- err = sconf.SinkConf.Validate()
- if err != nil {
- return nil, fmt.Errorf("invalid cache properties: %v", err)
- }
- return sconf, err
- }
- func (m *SinkNode) reset() {
- if !m.isMock {
- m.sinks = nil
- }
- m.statManagers = nil
- }
- func doCollectMaps(ctx api.StreamContext, sink api.Sink, sconf *SinkConf, outs []map[string]interface{}, stats metric.StatManager, isResend bool) error {
- if !sconf.SendSingle {
- return doCollectData(ctx, sink, outs, stats, isResend)
- } else {
- var err error
- for _, d := range outs {
- if sconf.Omitempty && (d == nil || len(d) == 0) {
- ctx.GetLogger().Debugf("receive empty in sink")
- continue
- }
- newErr := doCollectData(ctx, sink, d, stats, isResend)
- if newErr != nil {
- err = newErr
- }
- }
- return err
- }
- }
- func itemToMap(item interface{}) []map[string]interface{} {
- var outs []map[string]interface{}
- switch val := item.(type) {
- case error:
- outs = []map[string]interface{}{
- {"error": val.Error()},
- }
- break
- case xsql.Collection: // The order is important here, because some element is both a collection and a row, such as WindowTuples, JoinTuples, etc.
- outs = val.ToMaps()
- break
- case xsql.Row:
- outs = []map[string]interface{}{
- val.ToMap(),
- }
- break
- case []map[string]interface{}: // for test only
- outs = val
- break
- case *xsql.WatermarkTuple:
- // just ignore
- default:
- outs = []map[string]interface{}{
- {"error": fmt.Sprintf("result is not a map slice but found %#v", val)},
- }
- }
- return outs
- }
- // doCollectData outData must be map or []map
- func doCollectData(ctx api.StreamContext, sink api.Sink, outData interface{}, stats metric.StatManager, isResend bool) error {
- select {
- case <-ctx.Done():
- ctx.GetLogger().Infof("sink node %s instance %d stops data resending", ctx.GetOpId(), ctx.GetInstanceId())
- return nil
- default:
- if isResend {
- return resendDataToSink(ctx, sink, outData, stats)
- } else {
- return sendDataToSink(ctx, sink, outData, stats)
- }
- }
- }
- func sendDataToSink(ctx api.StreamContext, sink api.Sink, outData interface{}, stats metric.StatManager) error {
- if err := sink.Collect(ctx, outData); err != nil {
- stats.IncTotalExceptions(err.Error())
- return err
- } else {
- ctx.GetLogger().Debugf("success")
- stats.IncTotalRecordsOut()
- return nil
- }
- }
- func resendDataToSink(ctx api.StreamContext, sink api.Sink, outData interface{}, stats metric.StatManager) error {
- var err error
- switch st := sink.(type) {
- case api.ResendSink:
- err = st.CollectResend(ctx, outData)
- default:
- err = st.Collect(ctx, outData)
- }
- if err != nil {
- stats.IncTotalExceptions(err.Error())
- return err
- } else {
- ctx.GetLogger().Debugf("success resend")
- return nil
- }
- }
- func getSink(name string, action map[string]interface{}) (api.Sink, error) {
- var (
- s api.Sink
- err error
- )
- s, err = io.Sink(name)
- if s != nil {
- newAction := nodeConf.GetSinkConf(name, action)
- err = s.Configure(newAction)
- if err != nil {
- return nil, err
- }
- return s, nil
- } else {
- if err != nil {
- return nil, err
- } else {
- return nil, fmt.Errorf("sink %s not found", name)
- }
- }
- }
- // AddOutput Override defaultNode
- func (m *SinkNode) AddOutput(_ chan<- interface{}, name string) error {
- return fmt.Errorf("fail to add output %s, sink %s cannot add output", name, m.name)
- }
- // Broadcast Override defaultNode
- func (m *SinkNode) Broadcast(_ interface{}) error {
- return fmt.Errorf("sink %s cannot add broadcast", m.name)
- }
|