order_operator.go 960 B

1234567891011121314151617181920212223242526272829303132333435
  1. package operator
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/internal/xsql"
  5. "github.com/emqx/kuiper/pkg/api"
  6. "github.com/emqx/kuiper/pkg/ast"
  7. )
  8. type OrderOp struct {
  9. SortFields ast.SortFields
  10. }
  11. /**
  12. * input: *xsql.Tuple from preprocessor | xsql.WindowTuplesSet from windowOp | xsql.JoinTupleSets from joinOp
  13. * output: *xsql.Tuple | xsql.WindowTuplesSet | xsql.JoinTupleSets
  14. */
  15. func (p *OrderOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
  16. log := ctx.GetLogger()
  17. log.Debugf("order plan receive %s", data)
  18. sorter := xsql.OrderedBy(p.SortFields, fv)
  19. switch input := data.(type) {
  20. case error:
  21. return input
  22. case xsql.Valuer:
  23. return input
  24. case xsql.SortingData:
  25. if err := sorter.Sort(input); err != nil {
  26. return fmt.Errorf("run Order By error: %s", err)
  27. }
  28. return input
  29. default:
  30. return fmt.Errorf("run Order By error: expect xsql.Valuer or its array type")
  31. }
  32. }