|
@@ -1,4 +1,4 @@
|
|
|
-// Copyright 2022 EMQ Technologies Co., Ltd.
|
|
|
+// Copyright 2022-2023 EMQ Technologies Co., Ltd.
|
|
|
//
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
// you may not use this file except in compliance with the License.
|
|
@@ -17,6 +17,7 @@ package planner
|
|
|
import (
|
|
|
"errors"
|
|
|
"fmt"
|
|
|
+
|
|
|
"github.com/lf-edge/ekuiper/internal/conf"
|
|
|
store2 "github.com/lf-edge/ekuiper/internal/pkg/store"
|
|
|
"github.com/lf-edge/ekuiper/internal/topo"
|
|
@@ -198,6 +199,9 @@ func buildOps(lp LogicalPlan, tp *topo.Topo, options *api.RuleOption, sources []
|
|
|
default:
|
|
|
return nil, 0, fmt.Errorf("unknown logical plan %v", t)
|
|
|
}
|
|
|
+ if err != nil {
|
|
|
+ return nil, 0, err
|
|
|
+ }
|
|
|
if uop, ok := op.(*node.UnaryOperator); ok {
|
|
|
uop.SetConcurrency(options.Concurrency)
|
|
|
}
|