|
@@ -24,7 +24,7 @@ import (
|
|
"github.com/lf-edge/ekuiper/internal/plugin/portable/runtime"
|
|
"github.com/lf-edge/ekuiper/internal/plugin/portable/runtime"
|
|
"github.com/lf-edge/ekuiper/internal/processor"
|
|
"github.com/lf-edge/ekuiper/internal/processor"
|
|
"github.com/lf-edge/ekuiper/internal/topo"
|
|
"github.com/lf-edge/ekuiper/internal/topo"
|
|
- "github.com/lf-edge/ekuiper/internal/topo/memory"
|
|
|
|
|
|
+ "github.com/lf-edge/ekuiper/internal/topo/memory/pubsub"
|
|
"github.com/lf-edge/ekuiper/internal/topo/planner"
|
|
"github.com/lf-edge/ekuiper/internal/topo/planner"
|
|
"github.com/lf-edge/ekuiper/internal/topo/topotest"
|
|
"github.com/lf-edge/ekuiper/internal/topo/topotest"
|
|
"github.com/lf-edge/ekuiper/pkg/api"
|
|
"github.com/lf-edge/ekuiper/pkg/api"
|
|
@@ -111,7 +111,7 @@ func TestSourceAndFunc(t *testing.T) {
|
|
continue
|
|
continue
|
|
}
|
|
}
|
|
var mm [][]map[string]interface{}
|
|
var mm [][]map[string]interface{}
|
|
- ch := memory.CreateSub("cache", nil, fmt.Sprintf("cache%d", i+1), 10)
|
|
|
|
|
|
+ ch := pubsub.CreateSub("cache", nil, fmt.Sprintf("cache%d", i+1), 10)
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10000*time.Second)
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10000*time.Second)
|
|
go func(ctx context.Context) {
|
|
go func(ctx context.Context) {
|
|
select {
|
|
select {
|