浏览代码

fix(plans): comply for new stream api

ngjaying 5 年之前
父节点
当前提交
f46a0e66ed
共有 2 个文件被更改,包括 4 次插入5 次删除
  1. 3 4
      xsql/plans/having_operator.go
  2. 1 1
      xsql/processors/xsql_processor.go

+ 3 - 4
xsql/plans/having_operator.go

@@ -1,17 +1,16 @@
 package plans
 
 import (
-	"context"
-	"engine/common"
 	"engine/xsql"
+	"engine/xstream/api"
 )
 
 type HavingPlan struct {
 	Condition xsql.Expr
 }
 
-func (p *HavingPlan) Apply(ctx context.Context, data interface{}) interface{} {
-	log := common.GetLogger(ctx)
+func (p *HavingPlan) Apply(ctx api.StreamContext, data interface{}) interface{} {
+	log := ctx.GetLogger()
 	log.Debugf("having plan receive %s", data)
 	switch input := data.(type) {
 	case xsql.GroupedTuplesSet:

+ 1 - 1
xsql/processors/xsql_processor.go

@@ -422,7 +422,7 @@ func (p *RuleProcessor) createTopoWithSources(rule *api.Rule, sources []*nodes.S
 			if selectStmt.Having != nil {
 				havingOp := xstream.Transform(&plans.HavingPlan{selectStmt.Having}, "having")
 				tp.AddOperator(inputs, havingOp)
-				inputs = []xstream.Emitter{havingOp}
+				inputs = []api.Emitter{havingOp}
 			}
 
 			if selectStmt.SortFields != nil {