funcs_aggregate.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. package xsql
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/plugins"
  6. "github.com/emqx/kuiper/xstream/api"
  7. "strings"
  8. )
  9. type AggregateFunctionValuer struct {
  10. data AggregateData
  11. fv *FunctionValuer
  12. plugins map[string]api.Function
  13. }
  14. //Should only be called by stream to make sure a single instance for an operation
  15. func NewAggregateFunctionValuers() (*FunctionValuer, *AggregateFunctionValuer) {
  16. fv := &FunctionValuer{}
  17. return fv, &AggregateFunctionValuer{
  18. fv: fv,
  19. }
  20. }
  21. func (v *AggregateFunctionValuer) SetData(data AggregateData) {
  22. v.data = data
  23. }
  24. func (v *AggregateFunctionValuer) GetSingleCallValuer() CallValuer {
  25. return v.fv
  26. }
  27. func (v *AggregateFunctionValuer) Value(key string) (interface{}, bool) {
  28. return nil, false
  29. }
  30. func (v *AggregateFunctionValuer) Meta(key string) (interface{}, bool) {
  31. return nil, false
  32. }
  33. func (v *AggregateFunctionValuer) Call(name string, args []interface{}) (interface{}, bool) {
  34. lowerName := strings.ToLower(name)
  35. switch lowerName {
  36. case "avg":
  37. arg0 := args[0].([]interface{})
  38. if len(arg0) > 0 {
  39. v := getFirstValidArg(arg0)
  40. switch v.(type) {
  41. case int, int64:
  42. if r, err := sliceIntTotal(arg0); err != nil {
  43. return err, false
  44. } else {
  45. return r / len(arg0), true
  46. }
  47. case float64:
  48. if r, err := sliceFloatTotal(arg0); err != nil {
  49. return err, false
  50. } else {
  51. return r / float64(len(arg0)), true
  52. }
  53. default:
  54. return fmt.Errorf("run avg function error: found invalid arg %[1]T(%[1]v)", v), false
  55. }
  56. }
  57. return 0, true
  58. case "count":
  59. arg0 := args[0].([]interface{})
  60. return len(arg0), true
  61. case "max":
  62. arg0 := args[0].([]interface{})
  63. if len(arg0) > 0 {
  64. v := getFirstValidArg(arg0)
  65. switch t := v.(type) {
  66. case int:
  67. if r, err := sliceIntMax(arg0, t); err != nil {
  68. return err, false
  69. } else {
  70. return r, true
  71. }
  72. case int64:
  73. if r, err := sliceIntMax(arg0, int(t)); err != nil {
  74. return err, false
  75. } else {
  76. return r, true
  77. }
  78. case float64:
  79. if r, err := sliceFloatMax(arg0, t); err != nil {
  80. return err, false
  81. } else {
  82. return r, true
  83. }
  84. case string:
  85. if r, err := sliceStringMax(arg0, t); err != nil {
  86. return err, false
  87. } else {
  88. return r, true
  89. }
  90. default:
  91. return fmt.Errorf("run max function error: found invalid arg %[1]T(%[1]v)", v), false
  92. }
  93. }
  94. return fmt.Errorf("run max function error: empty data"), false
  95. case "min":
  96. arg0 := args[0].([]interface{})
  97. if len(arg0) > 0 {
  98. v := getFirstValidArg(arg0)
  99. switch t := v.(type) {
  100. case int:
  101. if r, err := sliceIntMin(arg0, t); err != nil {
  102. return err, false
  103. } else {
  104. return r, true
  105. }
  106. case int64:
  107. if r, err := sliceIntMin(arg0, int(t)); err != nil {
  108. return err, false
  109. } else {
  110. return r, true
  111. }
  112. case float64:
  113. if r, err := sliceFloatMin(arg0, t); err != nil {
  114. return err, false
  115. } else {
  116. return r, true
  117. }
  118. case string:
  119. if r, err := sliceStringMin(arg0, t); err != nil {
  120. return err, false
  121. } else {
  122. return r, true
  123. }
  124. default:
  125. return fmt.Errorf("run min function error: found invalid arg %[1]T(%[1]v)", v), false
  126. }
  127. }
  128. return fmt.Errorf("run min function error: empty data"), false
  129. case "sum":
  130. arg0 := args[0].([]interface{})
  131. if len(arg0) > 0 {
  132. v := getFirstValidArg(arg0)
  133. switch v.(type) {
  134. case int, int64:
  135. if r, err := sliceIntTotal(arg0); err != nil {
  136. return err, false
  137. } else {
  138. return r, true
  139. }
  140. case float64:
  141. if r, err := sliceFloatTotal(arg0); err != nil {
  142. return err, false
  143. } else {
  144. return r, true
  145. }
  146. default:
  147. return fmt.Errorf("run sum function error: found invalid arg %[1]T(%[1]v)", v), false
  148. }
  149. }
  150. return 0, true
  151. default:
  152. common.Log.Debugf("run aggregate func %s", name)
  153. if v.plugins == nil {
  154. v.plugins = make(map[string]api.Function)
  155. }
  156. var (
  157. nf api.Function
  158. ok bool
  159. err error
  160. )
  161. if nf, ok = v.plugins[name]; !ok {
  162. nf, err = plugins.GetFunction(name)
  163. if err != nil {
  164. return err, false
  165. }
  166. v.plugins[name] = nf
  167. }
  168. if !nf.IsAggregate() {
  169. return nil, false
  170. }
  171. result, ok := nf.Exec(args)
  172. common.Log.Debugf("run custom aggregate function %s, get result %v", name, result)
  173. return result, ok
  174. }
  175. }
  176. func (v *AggregateFunctionValuer) GetAllTuples() AggregateData {
  177. return v.data
  178. }
  179. func getFirstValidArg(s []interface{}) interface{} {
  180. for _, v := range s {
  181. if v != nil {
  182. return v
  183. }
  184. }
  185. return nil
  186. }
  187. func sliceIntTotal(s []interface{}) (int, error) {
  188. var total int
  189. for _, v := range s {
  190. if vi, ok := v.(int); ok {
  191. total += vi
  192. } else {
  193. return 0, fmt.Errorf("requires int but found %[1]T(%[1]v)", v)
  194. }
  195. }
  196. return total, nil
  197. }
  198. func sliceFloatTotal(s []interface{}) (float64, error) {
  199. var total float64
  200. for _, v := range s {
  201. if vf, ok := v.(float64); ok {
  202. total += vf
  203. } else {
  204. return 0, fmt.Errorf("requires float64 but found %[1]T(%[1]v)", v)
  205. }
  206. }
  207. return total, nil
  208. }
  209. func sliceIntMax(s []interface{}, max int) (int, error) {
  210. for _, v := range s {
  211. if vi, ok := v.(int); ok {
  212. if max < vi {
  213. max = vi
  214. }
  215. } else {
  216. return 0, fmt.Errorf("requires int but found %[1]T(%[1]v)", v)
  217. }
  218. }
  219. return max, nil
  220. }
  221. func sliceFloatMax(s []interface{}, max float64) (float64, error) {
  222. for _, v := range s {
  223. if vf, ok := v.(float64); ok {
  224. if max < vf {
  225. max = vf
  226. }
  227. } else {
  228. return 0, fmt.Errorf("requires float64 but found %[1]T(%[1]v)", v)
  229. }
  230. }
  231. return max, nil
  232. }
  233. func sliceStringMax(s []interface{}, max string) (string, error) {
  234. for _, v := range s {
  235. if vs, ok := v.(string); ok {
  236. if max < vs {
  237. max = vs
  238. }
  239. } else {
  240. return "", fmt.Errorf("requires string but found %[1]T(%[1]v)", v)
  241. }
  242. }
  243. return max, nil
  244. }
  245. func sliceIntMin(s []interface{}, min int) (int, error) {
  246. for _, v := range s {
  247. if vi, ok := v.(int); ok {
  248. if min > vi {
  249. min = vi
  250. }
  251. } else {
  252. return 0, fmt.Errorf("requires int but found %[1]T(%[1]v)", v)
  253. }
  254. }
  255. return min, nil
  256. }
  257. func sliceFloatMin(s []interface{}, min float64) (float64, error) {
  258. for _, v := range s {
  259. if vf, ok := v.(float64); ok {
  260. if min > vf {
  261. min = vf
  262. }
  263. } else {
  264. return 0, fmt.Errorf("requires float64 but found %[1]T(%[1]v)", v)
  265. }
  266. }
  267. return min, nil
  268. }
  269. func sliceStringMin(s []interface{}, min string) (string, error) {
  270. for _, v := range s {
  271. if vs, ok := v.(string); ok {
  272. if min < vs {
  273. min = vs
  274. }
  275. } else {
  276. return "", fmt.Errorf("requires string but found %[1]T(%[1]v)", v)
  277. }
  278. }
  279. return min, nil
  280. }