funcs_aggregate.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. package xsql
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/plugins"
  6. "github.com/emqx/kuiper/xstream/api"
  7. "strings"
  8. )
  9. type AggregateFunctionValuer struct {
  10. Data AggregateData
  11. plugins map[string]api.Function
  12. }
  13. func (v *AggregateFunctionValuer) Value(key string) (interface{}, bool) {
  14. return nil, false
  15. }
  16. func (v *AggregateFunctionValuer) Meta(key string) (interface{}, bool) {
  17. return nil, false
  18. }
  19. func (v *AggregateFunctionValuer) Call(name string, args []interface{}) (interface{}, bool) {
  20. lowerName := strings.ToLower(name)
  21. switch lowerName {
  22. case "avg":
  23. arg0 := args[0].([]interface{})
  24. if len(arg0) > 0 {
  25. v := getFirstValidArg(arg0)
  26. switch v.(type) {
  27. case int, int64:
  28. if r, err := sliceIntTotal(arg0); err != nil {
  29. return err, false
  30. } else {
  31. return r / len(arg0), true
  32. }
  33. case float64:
  34. if r, err := sliceFloatTotal(arg0); err != nil {
  35. return err, false
  36. } else {
  37. return r / float64(len(arg0)), true
  38. }
  39. default:
  40. return fmt.Errorf("run avg function error: found invalid arg %[1]T(%[1]v)", v), false
  41. }
  42. }
  43. return 0, true
  44. case "count":
  45. arg0 := args[0].([]interface{})
  46. return len(arg0), true
  47. case "max":
  48. arg0 := args[0].([]interface{})
  49. if len(arg0) > 0 {
  50. v := getFirstValidArg(arg0)
  51. switch t := v.(type) {
  52. case int:
  53. if r, err := sliceIntMax(arg0, t); err != nil {
  54. return err, false
  55. } else {
  56. return r, true
  57. }
  58. case int64:
  59. if r, err := sliceIntMax(arg0, int(t)); err != nil {
  60. return err, false
  61. } else {
  62. return r, true
  63. }
  64. case float64:
  65. if r, err := sliceFloatMax(arg0, t); err != nil {
  66. return err, false
  67. } else {
  68. return r, true
  69. }
  70. case string:
  71. if r, err := sliceStringMax(arg0, t); err != nil {
  72. return err, false
  73. } else {
  74. return r, true
  75. }
  76. default:
  77. return fmt.Errorf("run max function error: found invalid arg %[1]T(%[1]v)", v), false
  78. }
  79. }
  80. return fmt.Errorf("run max function error: empty data"), false
  81. case "min":
  82. arg0 := args[0].([]interface{})
  83. if len(arg0) > 0 {
  84. v := getFirstValidArg(arg0)
  85. switch t := v.(type) {
  86. case int:
  87. if r, err := sliceIntMin(arg0, t); err != nil {
  88. return err, false
  89. } else {
  90. return r, true
  91. }
  92. case int64:
  93. if r, err := sliceIntMin(arg0, int(t)); err != nil {
  94. return err, false
  95. } else {
  96. return r, true
  97. }
  98. case float64:
  99. if r, err := sliceFloatMin(arg0, t); err != nil {
  100. return err, false
  101. } else {
  102. return r, true
  103. }
  104. case string:
  105. if r, err := sliceStringMin(arg0, t); err != nil {
  106. return err, false
  107. } else {
  108. return r, true
  109. }
  110. default:
  111. return fmt.Errorf("run min function error: found invalid arg %[1]T(%[1]v)", v), false
  112. }
  113. }
  114. return fmt.Errorf("run min function error: empty data"), false
  115. case "sum":
  116. arg0 := args[0].([]interface{})
  117. if len(arg0) > 0 {
  118. v := getFirstValidArg(arg0)
  119. switch v.(type) {
  120. case int, int64:
  121. if r, err := sliceIntTotal(arg0); err != nil {
  122. return err, false
  123. } else {
  124. return r, true
  125. }
  126. case float64:
  127. if r, err := sliceFloatTotal(arg0); err != nil {
  128. return err, false
  129. } else {
  130. return r, true
  131. }
  132. default:
  133. return fmt.Errorf("run sum function error: found invalid arg %[1]T(%[1]v)", v), false
  134. }
  135. }
  136. return 0, true
  137. default:
  138. common.Log.Debugf("run aggregate func %s", name)
  139. var (
  140. nf api.Function
  141. ok bool
  142. err error
  143. )
  144. if nf, ok = v.plugins[name]; !ok {
  145. nf, err = plugins.GetFunction(name)
  146. if err != nil {
  147. return err, false
  148. }
  149. v.plugins[name] = nf
  150. }
  151. if !nf.IsAggregate() {
  152. return nil, false
  153. }
  154. result, ok := nf.Exec(args)
  155. common.Log.Debugf("run custom aggregate function %s, get result %v", name, result)
  156. return result, ok
  157. }
  158. }
  159. func (v *AggregateFunctionValuer) GetAllTuples() AggregateData {
  160. return v.Data
  161. }
  162. func getFirstValidArg(s []interface{}) interface{} {
  163. for _, v := range s {
  164. if v != nil {
  165. return v
  166. }
  167. }
  168. return nil
  169. }
  170. func sliceIntTotal(s []interface{}) (int, error) {
  171. var total int
  172. for _, v := range s {
  173. if vi, ok := v.(int); ok {
  174. total += vi
  175. } else {
  176. return 0, fmt.Errorf("requires int but found %[1]T(%[1]v)", v)
  177. }
  178. }
  179. return total, nil
  180. }
  181. func sliceFloatTotal(s []interface{}) (float64, error) {
  182. var total float64
  183. for _, v := range s {
  184. if vf, ok := v.(float64); ok {
  185. total += vf
  186. } else {
  187. return 0, fmt.Errorf("requires float64 but found %[1]T(%[1]v)", v)
  188. }
  189. }
  190. return total, nil
  191. }
  192. func sliceIntMax(s []interface{}, max int) (int, error) {
  193. for _, v := range s {
  194. if vi, ok := v.(int); ok {
  195. if max < vi {
  196. max = vi
  197. }
  198. } else {
  199. return 0, fmt.Errorf("requires int but found %[1]T(%[1]v)", v)
  200. }
  201. }
  202. return max, nil
  203. }
  204. func sliceFloatMax(s []interface{}, max float64) (float64, error) {
  205. for _, v := range s {
  206. if vf, ok := v.(float64); ok {
  207. if max < vf {
  208. max = vf
  209. }
  210. } else {
  211. return 0, fmt.Errorf("requires float64 but found %[1]T(%[1]v)", v)
  212. }
  213. }
  214. return max, nil
  215. }
  216. func sliceStringMax(s []interface{}, max string) (string, error) {
  217. for _, v := range s {
  218. if vs, ok := v.(string); ok {
  219. if max < vs {
  220. max = vs
  221. }
  222. } else {
  223. return "", fmt.Errorf("requires string but found %[1]T(%[1]v)", v)
  224. }
  225. }
  226. return max, nil
  227. }
  228. func sliceIntMin(s []interface{}, min int) (int, error) {
  229. for _, v := range s {
  230. if vi, ok := v.(int); ok {
  231. if min > vi {
  232. min = vi
  233. }
  234. } else {
  235. return 0, fmt.Errorf("requires int but found %[1]T(%[1]v)", v)
  236. }
  237. }
  238. return min, nil
  239. }
  240. func sliceFloatMin(s []interface{}, min float64) (float64, error) {
  241. for _, v := range s {
  242. if vf, ok := v.(float64); ok {
  243. if min > vf {
  244. min = vf
  245. }
  246. } else {
  247. return 0, fmt.Errorf("requires float64 but found %[1]T(%[1]v)", v)
  248. }
  249. }
  250. return min, nil
  251. }
  252. func sliceStringMin(s []interface{}, min string) (string, error) {
  253. for _, v := range s {
  254. if vs, ok := v.(string); ok {
  255. if min < vs {
  256. min = vs
  257. }
  258. } else {
  259. return "", fmt.Errorf("requires string but found %[1]T(%[1]v)", v)
  260. }
  261. }
  262. return min, nil
  263. }