order_operator.go 733 B

123456789101112131415161718192021222324252627282930
  1. package plans
  2. import (
  3. "github.com/emqx/kuiper/xsql"
  4. "github.com/emqx/kuiper/xstream/api"
  5. )
  6. type OrderPlan struct {
  7. SortFields xsql.SortFields
  8. }
  9. /**
  10. * input: *xsql.Tuple from preprocessor | xsql.WindowTuplesSet from windowOp | xsql.JoinTupleSets from joinOp
  11. * output: *xsql.Tuple | xsql.WindowTuplesSet | xsql.JoinTupleSets
  12. */
  13. func (p *OrderPlan) Apply(ctx api.StreamContext, data interface{}) interface{} {
  14. log := ctx.GetLogger()
  15. log.Debugf("order plan receive %s", data)
  16. sorter := xsql.OrderedBy(p.SortFields)
  17. switch input := data.(type) {
  18. case xsql.Valuer:
  19. return input
  20. case xsql.SortingData:
  21. sorter.Sort(input)
  22. return input
  23. default:
  24. log.Errorf("Expect xsql.Valuer or its array type.")
  25. return nil
  26. }
  27. }