|
@@ -5,8 +5,8 @@ import (
|
|
|
"fmt"
|
|
|
"github.com/emqx/kuiper/internal/conf"
|
|
|
"github.com/emqx/kuiper/internal/topo"
|
|
|
- "github.com/emqx/kuiper/internal/topo/nodes"
|
|
|
- "github.com/emqx/kuiper/internal/topo/operators"
|
|
|
+ "github.com/emqx/kuiper/internal/topo/node"
|
|
|
+ "github.com/emqx/kuiper/internal/topo/operator"
|
|
|
"github.com/emqx/kuiper/internal/xsql"
|
|
|
"github.com/emqx/kuiper/pkg/api"
|
|
|
"github.com/emqx/kuiper/pkg/ast"
|
|
@@ -19,7 +19,7 @@ func Plan(rule *api.Rule, storePath string) (*topo.Topo, error) {
|
|
|
}
|
|
|
|
|
|
// For test only
|
|
|
-func PlanWithSourcesAndSinks(rule *api.Rule, storePath string, sources []*nodes.SourceNode, sinks []*nodes.SinkNode) (*topo.Topo, error) {
|
|
|
+func PlanWithSourcesAndSinks(rule *api.Rule, storePath string, sources []*node.SourceNode, sinks []*node.SinkNode) (*topo.Topo, error) {
|
|
|
sql := rule.Sql
|
|
|
|
|
|
conf.Log.Infof("Init rule with options %+v", rule.Options)
|
|
@@ -53,7 +53,7 @@ func PlanWithSourcesAndSinks(rule *api.Rule, storePath string, sources []*nodes.
|
|
|
return tp, nil
|
|
|
}
|
|
|
|
|
|
-func createTopo(rule *api.Rule, lp LogicalPlan, sources []*nodes.SourceNode, sinks []*nodes.SinkNode, streamsFromStmt []string) (*topo.Topo, error) {
|
|
|
+func createTopo(rule *api.Rule, lp LogicalPlan, sources []*node.SourceNode, sinks []*node.SinkNode, streamsFromStmt []string) (*topo.Topo, error) {
|
|
|
// Create topology
|
|
|
tp, err := topo.NewWithNameAndQos(rule.Id, rule.Options.Qos, rule.Options.CheckpointInterval)
|
|
|
if err != nil {
|
|
@@ -77,7 +77,7 @@ func createTopo(rule *api.Rule, lp LogicalPlan, sources []*nodes.SourceNode, sin
|
|
|
if !ok {
|
|
|
return nil, fmt.Errorf("expect map[string]interface{} type for the action properties, but found %v", action)
|
|
|
}
|
|
|
- tp.AddSink(inputs, nodes.NewSinkNode(fmt.Sprintf("%s_%d", name, i), name, props))
|
|
|
+ tp.AddSink(inputs, node.NewSinkNode(fmt.Sprintf("%s_%d", name, i), name, props))
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -85,7 +85,7 @@ func createTopo(rule *api.Rule, lp LogicalPlan, sources []*nodes.SourceNode, sin
|
|
|
return tp, nil
|
|
|
}
|
|
|
|
|
|
-func buildOps(lp LogicalPlan, tp *topo.Topo, options *api.RuleOption, sources []*nodes.SourceNode, streamsFromStmt []string, index int) (api.Emitter, int, error) {
|
|
|
+func buildOps(lp LogicalPlan, tp *topo.Topo, options *api.RuleOption, sources []*node.SourceNode, streamsFromStmt []string, index int) (api.Emitter, int, error) {
|
|
|
var inputs []api.Emitter
|
|
|
newIndex := index
|
|
|
for _, c := range lp.Children() {
|
|
@@ -98,20 +98,20 @@ func buildOps(lp LogicalPlan, tp *topo.Topo, options *api.RuleOption, sources []
|
|
|
}
|
|
|
newIndex++
|
|
|
var (
|
|
|
- op nodes.OperatorNode
|
|
|
+ op node.OperatorNode
|
|
|
err error
|
|
|
)
|
|
|
switch t := lp.(type) {
|
|
|
case *DataSourcePlan:
|
|
|
switch t.streamStmt.StreamType {
|
|
|
case ast.TypeStream:
|
|
|
- pp, err := operators.NewPreprocessor(t.streamFields, t.allMeta, t.metaFields, t.iet, t.timestampField, t.timestampFormat, t.isBinary)
|
|
|
+ pp, err := operator.NewPreprocessor(t.streamFields, t.allMeta, t.metaFields, t.iet, t.timestampField, t.timestampFormat, t.isBinary)
|
|
|
if err != nil {
|
|
|
return nil, 0, err
|
|
|
}
|
|
|
- var srcNode *nodes.SourceNode
|
|
|
+ var srcNode *node.SourceNode
|
|
|
if len(sources) == 0 {
|
|
|
- node := nodes.NewSourceNode(string(t.name), t.streamStmt.StreamType, t.streamStmt.Options)
|
|
|
+ node := node.NewSourceNode(string(t.name), t.streamStmt.StreamType, t.streamStmt.Options)
|
|
|
srcNode = node
|
|
|
} else {
|
|
|
srcNode = getMockSource(sources, string(t.name))
|
|
@@ -123,16 +123,16 @@ func buildOps(lp LogicalPlan, tp *topo.Topo, options *api.RuleOption, sources []
|
|
|
op = Transform(pp, fmt.Sprintf("%d_preprocessor_%s", newIndex, t.name), options)
|
|
|
inputs = []api.Emitter{srcNode}
|
|
|
case ast.TypeTable:
|
|
|
- pp, err := operators.NewTableProcessor(string(t.name), t.streamFields, t.streamStmt.Options)
|
|
|
+ pp, err := operator.NewTableProcessor(string(t.name), t.streamFields, t.streamStmt.Options)
|
|
|
if err != nil {
|
|
|
return nil, 0, err
|
|
|
}
|
|
|
- var srcNode *nodes.SourceNode
|
|
|
+ var srcNode *node.SourceNode
|
|
|
if len(sources) > 0 {
|
|
|
srcNode = getMockSource(sources, string(t.name))
|
|
|
}
|
|
|
if srcNode == nil {
|
|
|
- srcNode = nodes.NewSourceNode(string(t.name), t.streamStmt.StreamType, t.streamStmt.Options)
|
|
|
+ srcNode = node.NewSourceNode(string(t.name), t.streamStmt.StreamType, t.streamStmt.Options)
|
|
|
}
|
|
|
tp.AddSrc(srcNode)
|
|
|
op = Transform(pp, fmt.Sprintf("%d_tableprocessor_%s", newIndex, t.name), options)
|
|
@@ -140,13 +140,13 @@ func buildOps(lp LogicalPlan, tp *topo.Topo, options *api.RuleOption, sources []
|
|
|
}
|
|
|
case *WindowPlan:
|
|
|
if t.condition != nil {
|
|
|
- wfilterOp := Transform(&operators.FilterOp{Condition: t.condition}, fmt.Sprintf("%d_windowFilter", newIndex), options)
|
|
|
+ wfilterOp := Transform(&operator.FilterOp{Condition: t.condition}, fmt.Sprintf("%d_windowFilter", newIndex), options)
|
|
|
wfilterOp.SetConcurrency(options.Concurrency)
|
|
|
tp.AddOperator(inputs, wfilterOp)
|
|
|
inputs = []api.Emitter{wfilterOp}
|
|
|
}
|
|
|
|
|
|
- op, err = nodes.NewWindowOp(fmt.Sprintf("%d_window", newIndex), nodes.WindowConfig{
|
|
|
+ op, err = node.NewWindowOp(fmt.Sprintf("%d_window", newIndex), node.WindowConfig{
|
|
|
Type: t.wtype,
|
|
|
Length: t.length,
|
|
|
Interval: t.interval,
|
|
@@ -155,30 +155,30 @@ func buildOps(lp LogicalPlan, tp *topo.Topo, options *api.RuleOption, sources []
|
|
|
return nil, 0, err
|
|
|
}
|
|
|
case *JoinAlignPlan:
|
|
|
- op, err = nodes.NewJoinAlignNode(fmt.Sprintf("%d_join_aligner", newIndex), t.Emitters, options)
|
|
|
+ op, err = node.NewJoinAlignNode(fmt.Sprintf("%d_join_aligner", newIndex), t.Emitters, options)
|
|
|
case *JoinPlan:
|
|
|
- op = Transform(&operators.JoinOp{Joins: t.joins, From: t.from}, fmt.Sprintf("%d_join", newIndex), options)
|
|
|
+ op = Transform(&operator.JoinOp{Joins: t.joins, From: t.from}, fmt.Sprintf("%d_join", newIndex), options)
|
|
|
case *FilterPlan:
|
|
|
- op = Transform(&operators.FilterOp{Condition: t.condition}, fmt.Sprintf("%d_filter", newIndex), options)
|
|
|
+ op = Transform(&operator.FilterOp{Condition: t.condition}, fmt.Sprintf("%d_filter", newIndex), options)
|
|
|
case *AggregatePlan:
|
|
|
- op = Transform(&operators.AggregateOp{Dimensions: t.dimensions}, fmt.Sprintf("%d_aggregate", newIndex), options)
|
|
|
+ op = Transform(&operator.AggregateOp{Dimensions: t.dimensions}, fmt.Sprintf("%d_aggregate", newIndex), options)
|
|
|
case *HavingPlan:
|
|
|
- op = Transform(&operators.HavingOp{Condition: t.condition}, fmt.Sprintf("%d_having", newIndex), options)
|
|
|
+ op = Transform(&operator.HavingOp{Condition: t.condition}, fmt.Sprintf("%d_having", newIndex), options)
|
|
|
case *OrderPlan:
|
|
|
- op = Transform(&operators.OrderOp{SortFields: t.SortFields}, fmt.Sprintf("%d_order", newIndex), options)
|
|
|
+ op = Transform(&operator.OrderOp{SortFields: t.SortFields}, fmt.Sprintf("%d_order", newIndex), options)
|
|
|
case *ProjectPlan:
|
|
|
- op = Transform(&operators.ProjectOp{Fields: t.fields, IsAggregate: t.isAggregate, SendMeta: t.sendMeta}, fmt.Sprintf("%d_project", newIndex), options)
|
|
|
+ op = Transform(&operator.ProjectOp{Fields: t.fields, IsAggregate: t.isAggregate, SendMeta: t.sendMeta}, fmt.Sprintf("%d_project", newIndex), options)
|
|
|
default:
|
|
|
return nil, 0, fmt.Errorf("unknown logical plan %v", t)
|
|
|
}
|
|
|
- if uop, ok := op.(*nodes.UnaryOperator); ok {
|
|
|
+ if uop, ok := op.(*node.UnaryOperator); ok {
|
|
|
uop.SetConcurrency(options.Concurrency)
|
|
|
}
|
|
|
tp.AddOperator(inputs, op)
|
|
|
return op, newIndex, nil
|
|
|
}
|
|
|
|
|
|
-func getMockSource(sources []*nodes.SourceNode, name string) *nodes.SourceNode {
|
|
|
+func getMockSource(sources []*node.SourceNode, name string) *node.SourceNode {
|
|
|
for _, source := range sources {
|
|
|
if name == source.GetName() {
|
|
|
return source
|
|
@@ -311,8 +311,8 @@ func createLogicalPlan(stmt *ast.SelectStatement, opt *api.RuleOption, store kv.
|
|
|
return optimize(p)
|
|
|
}
|
|
|
|
|
|
-func Transform(op nodes.UnOperation, name string, options *api.RuleOption) *nodes.UnaryOperator {
|
|
|
- operator := nodes.New(name, xsql.FuncRegisters, options)
|
|
|
+func Transform(op node.UnOperation, name string, options *api.RuleOption) *node.UnaryOperator {
|
|
|
+ operator := node.New(name, xsql.FuncRegisters, options)
|
|
|
operator.SetOperation(op)
|
|
|
return operator
|
|
|
}
|