|
@@ -37,7 +37,6 @@ import (
|
|
|
|
|
|
type SinkConf struct {
|
|
|
Concurrency int `json:"concurrency"`
|
|
|
- RunAsync bool `json:"runAsync"` // deprecated, will remove in the next release
|
|
|
Omitempty bool `json:"omitIfEmpty"`
|
|
|
SendSingle bool `json:"sendSingle"`
|
|
|
DataTemplate string `json:"dataTemplate"`
|
|
@@ -187,9 +186,6 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
|
|
|
}
|
|
|
stats.SetBufferLength(int64(len(m.input)))
|
|
|
stats.IncTotalRecordsIn()
|
|
|
- if sconf.RunAsync {
|
|
|
- conf.Log.Warnf("RunAsync is deprecated and ignored.")
|
|
|
- }
|
|
|
err := doCollect(ctx, sink, data, sendManager, stats, sconf)
|
|
|
if err != nil {
|
|
|
logger.Warnf("sink collect error: %v", err)
|
|
@@ -204,10 +200,7 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
|
|
|
}
|
|
|
} else {
|
|
|
logger.Infof("Creating sink cache")
|
|
|
- if sconf.RunAsync { // async mode, the ack must have an id
|
|
|
- // is not supported and validated in the configure, should not go here
|
|
|
- return fmt.Errorf("async mode is not supported for cache sink")
|
|
|
- } else { // sync mode, the ack is already in order
|
|
|
+ { // sync mode, the ack is already in order
|
|
|
dataCh := make(chan []map[string]interface{}, sconf.BufferLength)
|
|
|
c := cache.NewSyncCache(ctx, dataCh, result, stats, &sconf.SinkConf, sconf.BufferLength)
|
|
|
for {
|
|
@@ -256,7 +249,6 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
}
|
|
|
})
|
|
|
if panicOrError != nil {
|
|
@@ -275,7 +267,6 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
|
|
|
func (m *SinkNode) parseConf(logger api.Logger) (*SinkConf, error) {
|
|
|
sconf := &SinkConf{
|
|
|
Concurrency: 1,
|
|
|
- RunAsync: false,
|
|
|
Omitempty: false,
|
|
|
SendSingle: false,
|
|
|
DataTemplate: "",
|
|
@@ -301,10 +292,6 @@ func (m *SinkNode) parseConf(logger api.Logger) (*SinkConf, error) {
|
|
|
if err != nil {
|
|
|
return nil, fmt.Errorf("read properties %v to cache conf fail with error: %v", m.options, err)
|
|
|
}
|
|
|
- if sconf.SinkConf.EnableCache && sconf.RunAsync {
|
|
|
- conf.Log.Warnf("RunAsync is deprecated and ignored.")
|
|
|
- return nil, fmt.Errorf("cache is not supported for async sink, do not use enableCache and runAsync properties together")
|
|
|
- }
|
|
|
if sconf.DataField == "" {
|
|
|
if v, ok := m.options["tableDataField"]; ok {
|
|
|
sconf.DataField = v.(string)
|