order_operator.go 762 B

1234567891011121314151617181920212223242526272829303132
  1. package plans
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/xsql"
  5. "github.com/emqx/kuiper/xstream/api"
  6. )
  7. type OrderPlan struct {
  8. SortFields xsql.SortFields
  9. }
  10. /**
  11. * input: *xsql.Tuple from preprocessor | xsql.WindowTuplesSet from windowOp | xsql.JoinTupleSets from joinOp
  12. * output: *xsql.Tuple | xsql.WindowTuplesSet | xsql.JoinTupleSets
  13. */
  14. func (p *OrderPlan) Apply(ctx api.StreamContext, data interface{}) interface{} {
  15. log := ctx.GetLogger()
  16. log.Debugf("order plan receive %s", data)
  17. sorter := xsql.OrderedBy(p.SortFields)
  18. switch input := data.(type) {
  19. case error:
  20. return input
  21. case xsql.Valuer:
  22. return input
  23. case xsql.SortingData:
  24. sorter.Sort(input)
  25. return input
  26. default:
  27. return fmt.Errorf("Expect xsql.Valuer or its array type.")
  28. }
  29. }