order_operator.go 923 B

12345678910111213141516171819202122232425262728293031323334
  1. package plans
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/xsql"
  5. "github.com/emqx/kuiper/xstream/api"
  6. )
  7. type OrderPlan struct {
  8. SortFields xsql.SortFields
  9. }
  10. /**
  11. * input: *xsql.Tuple from preprocessor | xsql.WindowTuplesSet from windowOp | xsql.JoinTupleSets from joinOp
  12. * output: *xsql.Tuple | xsql.WindowTuplesSet | xsql.JoinTupleSets
  13. */
  14. func (p *OrderPlan) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
  15. log := ctx.GetLogger()
  16. log.Debugf("order plan receive %s", data)
  17. sorter := xsql.OrderedBy(p.SortFields, fv)
  18. switch input := data.(type) {
  19. case error:
  20. return input
  21. case xsql.Valuer:
  22. return input
  23. case xsql.SortingData:
  24. if err := sorter.Sort(input); err != nil {
  25. return fmt.Errorf("run Order By error: %s", err)
  26. }
  27. return input
  28. default:
  29. return fmt.Errorf("run Order By error: expect xsql.Valuer or its array type")
  30. }
  31. }