sorter.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package xsql
  15. import (
  16. "fmt"
  17. "sort"
  18. "github.com/lf-edge/ekuiper/pkg/ast"
  19. "github.com/lf-edge/ekuiper/pkg/cast"
  20. )
  21. // MultiSorter implements the Sort interface, sorting the changes within.
  22. type MultiSorter struct {
  23. SortingData
  24. fields ast.SortFields
  25. valuer *FunctionValuer
  26. aggValuer *AggregateFunctionValuer
  27. values []map[string]interface{}
  28. }
  29. // OrderedBy returns a Sorter that sorts using the less functions, in order.
  30. // Call its Sort method to sort the data.
  31. func OrderedBy(fields ast.SortFields, fv *FunctionValuer, afv *AggregateFunctionValuer) *MultiSorter {
  32. return &MultiSorter{
  33. fields: fields,
  34. valuer: fv,
  35. aggValuer: afv,
  36. }
  37. }
  38. // Less is part of sort.Interface. It is implemented by looping along the
  39. // less functions until it finds a comparison that discriminates between
  40. // the two items (one is less than the other). Note that it can call the
  41. // less functions twice per call. We could change the functions to return
  42. // -1, 0, 1 and reduce the number of calls for greater efficiency: an
  43. // exercise for the reader.
  44. func (ms *MultiSorter) Less(i, j int) bool {
  45. p, q := ms.values[i], ms.values[j]
  46. v := &ValuerEval{Valuer: MultiValuer(ms.valuer)}
  47. for _, field := range ms.fields {
  48. n := field.Uname
  49. vp, _ := p[n]
  50. vq, _ := q[n]
  51. if vp == nil && vq != nil {
  52. return false
  53. } else if vp != nil && vq == nil {
  54. ms.valueSwap(true, i, j)
  55. return true
  56. } else if vp == nil && vq == nil {
  57. return false
  58. }
  59. switch {
  60. case v.simpleDataEval(vp, vq, ast.LT):
  61. ms.valueSwap(field.Ascending, i, j)
  62. return field.Ascending
  63. case v.simpleDataEval(vq, vp, ast.LT):
  64. ms.valueSwap(!field.Ascending, i, j)
  65. return !field.Ascending
  66. }
  67. }
  68. return false
  69. }
  70. func (ms *MultiSorter) valueSwap(s bool, i, j int) {
  71. if s {
  72. ms.values[i], ms.values[j] = ms.values[j], ms.values[i]
  73. }
  74. }
  75. // Sort sorts the argument slice according to the less functions passed to OrderedBy.
  76. func (ms *MultiSorter) Sort(data SortingData) error {
  77. ms.SortingData = data
  78. types := make([]string, len(ms.fields))
  79. ms.values = make([]map[string]interface{}, data.Len())
  80. switch input := data.(type) {
  81. case error:
  82. return input
  83. case SingleCollection:
  84. err := input.RangeSet(func(i int, row Row) (bool, error) {
  85. ms.values[i] = make(map[string]interface{})
  86. vep := &ValuerEval{Valuer: MultiValuer(ms.valuer, row, ms.valuer, &WildcardValuer{Data: row})}
  87. for j, field := range ms.fields {
  88. vp := vep.Eval(field.FieldExpr)
  89. if types[j] == "" && vp != nil {
  90. types[j] = fmt.Sprintf("%T", vp)
  91. }
  92. if err := validate(types[j], vp); err != nil {
  93. return false, err
  94. } else {
  95. ms.values[i][field.Uname] = vp
  96. }
  97. }
  98. return true, nil
  99. })
  100. if err != nil {
  101. return err
  102. }
  103. case GroupedCollection:
  104. err := input.GroupRange(func(i int, aggRow CollectionRow) (bool, error) {
  105. ms.values[i] = make(map[string]interface{})
  106. ms.aggValuer.SetData(aggRow)
  107. vep := &ValuerEval{Valuer: MultiAggregateValuer(aggRow, ms.valuer, aggRow, ms.aggValuer, &WildcardValuer{Data: aggRow})}
  108. for j, field := range ms.fields {
  109. vp := vep.Eval(field.FieldExpr)
  110. if types[j] == "" && vp != nil {
  111. types[j] = fmt.Sprintf("%T", vp)
  112. }
  113. if err := validate(types[j], vp); err != nil {
  114. return false, err
  115. } else {
  116. ms.values[i][field.Uname] = vp
  117. }
  118. }
  119. return true, nil
  120. })
  121. if err != nil {
  122. return err
  123. }
  124. }
  125. sort.Sort(ms)
  126. return nil
  127. }
  128. func validate(t string, v interface{}) error {
  129. if v == nil || t == "" {
  130. return nil
  131. }
  132. vt := fmt.Sprintf("%T", v)
  133. switch t {
  134. case "int", "int64", "float64", "uint64":
  135. if vt == "int" || vt == "int64" || vt == "float64" || vt == "uint64" {
  136. return nil
  137. } else {
  138. return fmt.Errorf("incompatible types for comparison: %s and %s", t, vt)
  139. }
  140. case "bool":
  141. if vt == "bool" {
  142. return nil
  143. } else {
  144. return fmt.Errorf("incompatible types for comparison: %s and %s", t, vt)
  145. }
  146. case "string":
  147. if vt == "string" {
  148. return nil
  149. } else {
  150. return fmt.Errorf("incompatible types for comparison: %s and %s", t, vt)
  151. }
  152. case "time.Time":
  153. _, err := cast.InterfaceToTime(v, "")
  154. if err != nil {
  155. return fmt.Errorf("incompatible types for comparison: %s and %s", t, vt)
  156. } else {
  157. return nil
  158. }
  159. default:
  160. return fmt.Errorf("incompatible types for comparison: %s and %s", t, vt)
  161. }
  162. }