|
@@ -19,10 +19,10 @@ import (
|
|
"fmt"
|
|
"fmt"
|
|
"github.com/lf-edge/ekuiper/internal/binder/io"
|
|
"github.com/lf-edge/ekuiper/internal/binder/io"
|
|
"github.com/lf-edge/ekuiper/internal/conf"
|
|
"github.com/lf-edge/ekuiper/internal/conf"
|
|
- "github.com/lf-edge/ekuiper/internal/infra"
|
|
|
|
kctx "github.com/lf-edge/ekuiper/internal/topo/context"
|
|
kctx "github.com/lf-edge/ekuiper/internal/topo/context"
|
|
"github.com/lf-edge/ekuiper/internal/topo/state"
|
|
"github.com/lf-edge/ekuiper/internal/topo/state"
|
|
"github.com/lf-edge/ekuiper/pkg/api"
|
|
"github.com/lf-edge/ekuiper/pkg/api"
|
|
|
|
+ "github.com/lf-edge/ekuiper/pkg/infra"
|
|
"sync"
|
|
"sync"
|
|
)
|
|
)
|
|
|
|
|
|
@@ -75,7 +75,9 @@ func getSourceInstance(node *SourceNode, index int) (*sourceInstance, error) {
|
|
}
|
|
}
|
|
go func() {
|
|
go func() {
|
|
err := infra.SafeRun(func() error {
|
|
err := infra.SafeRun(func() error {
|
|
- si.source.Open(node.ctx.WithInstance(index), si.dataCh.In, si.errorCh)
|
|
|
|
|
|
+ nctx := node.ctx.WithInstance(index)
|
|
|
|
+ defer si.source.Close(nctx)
|
|
|
|
+ si.source.Open(nctx, si.dataCh.In, si.errorCh)
|
|
return nil
|
|
return nil
|
|
})
|
|
})
|
|
if err != nil {
|
|
if err != nil {
|
|
@@ -151,7 +153,9 @@ func (p *sourcePool) addInstance(k string, node *SourceNode, source api.Source,
|
|
p.registry[k] = newS
|
|
p.registry[k] = newS
|
|
go func() {
|
|
go func() {
|
|
err := infra.SafeRun(func() error {
|
|
err := infra.SafeRun(func() error {
|
|
- si.source.Open(node.ctx.WithInstance(index), si.dataCh.In, si.errorCh)
|
|
|
|
|
|
+ nctx := node.ctx.WithInstance(index)
|
|
|
|
+ defer si.source.Close(nctx)
|
|
|
|
+ si.source.Open(nctx, si.dataCh.In, si.errorCh)
|
|
return nil
|
|
return nil
|
|
})
|
|
})
|
|
if err != nil {
|
|
if err != nil {
|