funcs_aggregate.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. package xsql
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/common/plugin_manager"
  6. "github.com/emqx/kuiper/xstream/api"
  7. "strings"
  8. )
  9. type AggregateFunctionValuer struct{
  10. Data AggregateData
  11. }
  12. func (v AggregateFunctionValuer) Value(key string) (interface{}, bool) {
  13. return nil, false
  14. }
  15. func (v AggregateFunctionValuer) Call(name string, args []interface{}) (interface{}, bool) {
  16. lowerName := strings.ToLower(name)
  17. switch lowerName {
  18. case "avg":
  19. arg0 := args[0].([]interface{})
  20. if len(arg0) > 0{
  21. v := getFirstValidArg(arg0)
  22. switch v.(type){
  23. case int:
  24. return sliceIntTotal(arg0)/len(arg0), true
  25. case int64:
  26. return sliceIntTotal(arg0)/len(arg0), true
  27. case float64:
  28. return sliceFloatTotal(arg0)/float64(len(arg0)), true
  29. default:
  30. return fmt.Errorf("invalid data type for avg function"), false
  31. }
  32. }
  33. return 0, true
  34. case "count":
  35. arg0 := args[0].([]interface{})
  36. return len(arg0), true
  37. case "max":
  38. arg0 := args[0].([]interface{})
  39. if len(arg0) > 0{
  40. v := getFirstValidArg(arg0)
  41. switch t := v.(type){
  42. case int:
  43. return sliceIntMax(arg0, t), true
  44. case int64:
  45. return sliceIntMax(arg0, int(t)), true
  46. case float64:
  47. return sliceFloatMax(arg0, t), true
  48. case string:
  49. return sliceStringMax(arg0, t), true
  50. default:
  51. return fmt.Errorf("unsupported data type for avg function"), false
  52. }
  53. }
  54. return fmt.Errorf("empty data for max function"), false
  55. case "min":
  56. arg0 := args[0].([]interface{})
  57. if len(arg0) > 0{
  58. v := getFirstValidArg(arg0)
  59. switch t := v.(type){
  60. case int:
  61. return sliceIntMin(arg0, t), true
  62. case int64:
  63. return sliceIntMin(arg0, int(t)), true
  64. case float64:
  65. return sliceFloatMin(arg0, t), true
  66. case string:
  67. return sliceStringMin(arg0, t), true
  68. default:
  69. return fmt.Errorf("unsupported data type for avg function"), false
  70. }
  71. }
  72. return fmt.Errorf("empty data for max function"), false
  73. case "sum":
  74. arg0 := args[0].([]interface{})
  75. if len(arg0) > 0{
  76. v := getFirstValidArg(arg0)
  77. switch v.(type){
  78. case int:
  79. return sliceIntTotal(arg0), true
  80. case int64:
  81. return sliceIntTotal(arg0), true
  82. case float64:
  83. return sliceFloatTotal(arg0), true
  84. default:
  85. return fmt.Errorf("invalid data type for sum function"), false
  86. }
  87. }
  88. return 0, true
  89. default:
  90. common.Log.Debugf("run aggregate func %s", name)
  91. if nf, err := plugin_manager.GetPlugin(name, "functions"); err != nil {
  92. return nil, false
  93. }else{
  94. f, ok := nf.(api.Function)
  95. if !ok {
  96. return nil, false
  97. }
  98. if !f.IsAggregate(){
  99. return nil, false
  100. }
  101. result, ok := f.Exec(args)
  102. common.Log.Debugf("run custom aggregate function %s, get result %v", name, result)
  103. return result, ok
  104. }
  105. }
  106. }
  107. func (v *AggregateFunctionValuer) GetAllTuples() AggregateData {
  108. return v.Data
  109. }
  110. func getFirstValidArg(s []interface{}) interface{}{
  111. for _, v := range s{
  112. if v != nil{
  113. return v
  114. }
  115. }
  116. return nil
  117. }
  118. func sliceIntTotal(s []interface{}) int{
  119. var total int
  120. for _, v := range s{
  121. if v, ok := v.(int); ok {
  122. total += v
  123. }
  124. }
  125. return total
  126. }
  127. func sliceFloatTotal(s []interface{}) float64{
  128. var total float64
  129. for _, v := range s{
  130. if v, ok := v.(float64); ok {
  131. total += v
  132. }
  133. }
  134. return total
  135. }
  136. func sliceIntMax(s []interface{}, max int) int{
  137. for _, v := range s{
  138. if v, ok := v.(int); ok {
  139. if max < v {
  140. max = v
  141. }
  142. }
  143. }
  144. return max
  145. }
  146. func sliceFloatMax(s []interface{}, max float64) float64{
  147. for _, v := range s{
  148. if v, ok := v.(float64); ok {
  149. if max < v {
  150. max = v
  151. }
  152. }
  153. }
  154. return max
  155. }
  156. func sliceStringMax(s []interface{}, max string) string{
  157. for _, v := range s{
  158. if v, ok := v.(string); ok {
  159. if max < v {
  160. max = v
  161. }
  162. }
  163. }
  164. return max
  165. }
  166. func sliceIntMin(s []interface{}, min int) int{
  167. for _, v := range s{
  168. if v, ok := v.(int); ok {
  169. if min > v {
  170. min = v
  171. }
  172. }
  173. }
  174. return min
  175. }
  176. func sliceFloatMin(s []interface{}, min float64) float64{
  177. for _, v := range s{
  178. if v, ok := v.(float64); ok {
  179. if min > v {
  180. min = v
  181. }
  182. }
  183. }
  184. return min
  185. }
  186. func sliceStringMin(s []interface{}, min string) string{
  187. for _, v := range s{
  188. if v, ok := v.(string); ok {
  189. if min < v {
  190. min = v
  191. }
  192. }
  193. }
  194. return min
  195. }