|
@@ -16,6 +16,9 @@ package node
|
|
|
|
|
|
import (
|
|
import (
|
|
"fmt"
|
|
"fmt"
|
|
|
|
+ "strings"
|
|
|
|
+ "sync"
|
|
|
|
+
|
|
"github.com/lf-edge/ekuiper/internal/binder/io"
|
|
"github.com/lf-edge/ekuiper/internal/binder/io"
|
|
"github.com/lf-edge/ekuiper/internal/conf"
|
|
"github.com/lf-edge/ekuiper/internal/conf"
|
|
"github.com/lf-edge/ekuiper/internal/topo/context"
|
|
"github.com/lf-edge/ekuiper/internal/topo/context"
|
|
@@ -29,13 +32,11 @@ import (
|
|
"github.com/lf-edge/ekuiper/pkg/errorx"
|
|
"github.com/lf-edge/ekuiper/pkg/errorx"
|
|
"github.com/lf-edge/ekuiper/pkg/infra"
|
|
"github.com/lf-edge/ekuiper/pkg/infra"
|
|
"github.com/lf-edge/ekuiper/pkg/message"
|
|
"github.com/lf-edge/ekuiper/pkg/message"
|
|
- "strings"
|
|
|
|
- "sync"
|
|
|
|
)
|
|
)
|
|
|
|
|
|
type SinkConf struct {
|
|
type SinkConf struct {
|
|
Concurrency int `json:"concurrency"`
|
|
Concurrency int `json:"concurrency"`
|
|
- RunAsync bool `json:"runAsync"`
|
|
|
|
|
|
+ RunAsync bool `json:"runAsync"` // deprecated, will remove in the next release
|
|
Omitempty bool `json:"omitIfEmpty"`
|
|
Omitempty bool `json:"omitIfEmpty"`
|
|
SendSingle bool `json:"sendSingle"`
|
|
SendSingle bool `json:"sendSingle"`
|
|
DataTemplate string `json:"dataTemplate"`
|
|
DataTemplate string `json:"dataTemplate"`
|
|
@@ -165,23 +166,11 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
|
|
stats.SetBufferLength(int64(len(m.input)))
|
|
stats.SetBufferLength(int64(len(m.input)))
|
|
stats.IncTotalRecordsIn()
|
|
stats.IncTotalRecordsIn()
|
|
if sconf.RunAsync {
|
|
if sconf.RunAsync {
|
|
- go func() {
|
|
|
|
- p := infra.SafeRun(func() error {
|
|
|
|
- err := doCollect(ctx, sink, data, stats, sconf)
|
|
|
|
- if err != nil {
|
|
|
|
- logger.Warnf("sink collect error: %v", err)
|
|
|
|
- }
|
|
|
|
- return nil
|
|
|
|
- })
|
|
|
|
- if p != nil {
|
|
|
|
- infra.DrainError(ctx, p, result)
|
|
|
|
- }
|
|
|
|
- }()
|
|
|
|
- } else {
|
|
|
|
- err := doCollect(ctx, sink, data, stats, sconf)
|
|
|
|
- if err != nil {
|
|
|
|
- logger.Warnf("sink collect error: %v", err)
|
|
|
|
- }
|
|
|
|
|
|
+ conf.Log.Warnf("RunAsync is deprecated and ignored.")
|
|
|
|
+ }
|
|
|
|
+ err := doCollect(ctx, sink, data, stats, sconf)
|
|
|
|
+ if err != nil {
|
|
|
|
+ logger.Warnf("sink collect error: %v", err)
|
|
}
|
|
}
|
|
case <-ctx.Done():
|
|
case <-ctx.Done():
|
|
logger.Infof("sink node %s instance %d done", m.name, instance)
|
|
logger.Infof("sink node %s instance %d done", m.name, instance)
|
|
@@ -291,6 +280,7 @@ func (m *SinkNode) parseConf(logger api.Logger) (*SinkConf, error) {
|
|
return nil, fmt.Errorf("read properties %v to cache conf fail with error: %v", m.options, err)
|
|
return nil, fmt.Errorf("read properties %v to cache conf fail with error: %v", m.options, err)
|
|
}
|
|
}
|
|
if sconf.SinkConf.EnableCache && sconf.RunAsync {
|
|
if sconf.SinkConf.EnableCache && sconf.RunAsync {
|
|
|
|
+ conf.Log.Warnf("RunAsync is deprecated and ignored.")
|
|
return nil, fmt.Errorf("cache is not supported for async sink, do not use enableCache and runAsync properties together")
|
|
return nil, fmt.Errorf("cache is not supported for async sink, do not use enableCache and runAsync properties together")
|
|
}
|
|
}
|
|
err = sconf.SinkConf.Validate()
|
|
err = sconf.SinkConf.Validate()
|