funcs_aggregate.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. package xsql
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/plugins"
  6. "github.com/emqx/kuiper/xstream/api"
  7. "strings"
  8. )
  9. type AggregateFunctionValuer struct {
  10. data AggregateData
  11. fv *FunctionValuer
  12. plugins map[string]api.Function
  13. }
  14. //Should only be called by stream to make sure a single instance for an operation
  15. func NewAggregateFunctionValuers() (*FunctionValuer, *AggregateFunctionValuer) {
  16. fv := &FunctionValuer{}
  17. return fv, &AggregateFunctionValuer{
  18. fv: fv,
  19. }
  20. }
  21. func (v *AggregateFunctionValuer) SetData(data AggregateData) {
  22. v.data = data
  23. }
  24. func (v *AggregateFunctionValuer) GetSingleCallValuer() CallValuer {
  25. return v.fv
  26. }
  27. func (v *AggregateFunctionValuer) Value(key string) (interface{}, bool) {
  28. return nil, false
  29. }
  30. func (v *AggregateFunctionValuer) Meta(key string) (interface{}, bool) {
  31. return nil, false
  32. }
  33. func (v *AggregateFunctionValuer) Call(name string, args []interface{}) (interface{}, bool) {
  34. lowerName := strings.ToLower(name)
  35. switch lowerName {
  36. case "avg":
  37. arg0 := args[0].([]interface{})
  38. c := getCount(arg0)
  39. if c > 0 {
  40. v := getFirstValidArg(arg0)
  41. switch v.(type) {
  42. case int, int64:
  43. if r, err := sliceIntTotal(arg0); err != nil {
  44. return err, false
  45. } else {
  46. return r / c, true
  47. }
  48. case float64:
  49. if r, err := sliceFloatTotal(arg0); err != nil {
  50. return err, false
  51. } else {
  52. return r / float64(c), true
  53. }
  54. case nil:
  55. return nil, true
  56. default:
  57. return fmt.Errorf("run avg function error: found invalid arg %[1]T(%[1]v)", v), false
  58. }
  59. }
  60. return 0, true
  61. case "count":
  62. arg0 := args[0].([]interface{})
  63. return getCount(arg0), true
  64. case "max":
  65. arg0 := args[0].([]interface{})
  66. if len(arg0) > 0 {
  67. v := getFirstValidArg(arg0)
  68. switch t := v.(type) {
  69. case int:
  70. if r, err := sliceIntMax(arg0, t); err != nil {
  71. return err, false
  72. } else {
  73. return r, true
  74. }
  75. case int64:
  76. if r, err := sliceIntMax(arg0, int(t)); err != nil {
  77. return err, false
  78. } else {
  79. return r, true
  80. }
  81. case float64:
  82. if r, err := sliceFloatMax(arg0, t); err != nil {
  83. return err, false
  84. } else {
  85. return r, true
  86. }
  87. case string:
  88. if r, err := sliceStringMax(arg0, t); err != nil {
  89. return err, false
  90. } else {
  91. return r, true
  92. }
  93. case nil:
  94. return nil, true
  95. default:
  96. return fmt.Errorf("run max function error: found invalid arg %[1]T(%[1]v)", v), false
  97. }
  98. }
  99. return fmt.Errorf("run max function error: empty data"), false
  100. case "min":
  101. arg0 := args[0].([]interface{})
  102. if len(arg0) > 0 {
  103. v := getFirstValidArg(arg0)
  104. switch t := v.(type) {
  105. case int:
  106. if r, err := sliceIntMin(arg0, t); err != nil {
  107. return err, false
  108. } else {
  109. return r, true
  110. }
  111. case int64:
  112. if r, err := sliceIntMin(arg0, int(t)); err != nil {
  113. return err, false
  114. } else {
  115. return r, true
  116. }
  117. case float64:
  118. if r, err := sliceFloatMin(arg0, t); err != nil {
  119. return err, false
  120. } else {
  121. return r, true
  122. }
  123. case string:
  124. if r, err := sliceStringMin(arg0, t); err != nil {
  125. return err, false
  126. } else {
  127. return r, true
  128. }
  129. case nil:
  130. return nil, true
  131. default:
  132. return fmt.Errorf("run min function error: found invalid arg %[1]T(%[1]v)", v), false
  133. }
  134. }
  135. return fmt.Errorf("run min function error: empty data"), false
  136. case "sum":
  137. arg0 := args[0].([]interface{})
  138. if len(arg0) > 0 {
  139. v := getFirstValidArg(arg0)
  140. switch v.(type) {
  141. case int, int64:
  142. if r, err := sliceIntTotal(arg0); err != nil {
  143. return err, false
  144. } else {
  145. return r, true
  146. }
  147. case float64:
  148. if r, err := sliceFloatTotal(arg0); err != nil {
  149. return err, false
  150. } else {
  151. return r, true
  152. }
  153. case nil:
  154. return nil, true
  155. default:
  156. return fmt.Errorf("run sum function error: found invalid arg %[1]T(%[1]v)", v), false
  157. }
  158. }
  159. return 0, true
  160. default:
  161. common.Log.Debugf("run aggregate func %s", name)
  162. if v.plugins == nil {
  163. v.plugins = make(map[string]api.Function)
  164. }
  165. var (
  166. nf api.Function
  167. ok bool
  168. err error
  169. )
  170. if nf, ok = v.plugins[name]; !ok {
  171. nf, err = plugins.GetFunction(name)
  172. if err != nil {
  173. return err, false
  174. }
  175. v.plugins[name] = nf
  176. }
  177. if !nf.IsAggregate() {
  178. return nil, false
  179. }
  180. result, ok := nf.Exec(args)
  181. common.Log.Debugf("run custom aggregate function %s, get result %v", name, result)
  182. return result, ok
  183. }
  184. }
  185. func getCount(s []interface{}) int {
  186. c := 0
  187. for _, v := range s {
  188. if v != nil {
  189. c++
  190. }
  191. }
  192. return c
  193. }
  194. func (v *AggregateFunctionValuer) GetAllTuples() AggregateData {
  195. return v.data
  196. }
  197. func getFirstValidArg(s []interface{}) interface{} {
  198. for _, v := range s {
  199. if v != nil {
  200. return v
  201. }
  202. }
  203. return nil
  204. }
  205. func sliceIntTotal(s []interface{}) (int, error) {
  206. var total int
  207. for _, v := range s {
  208. if vi, ok := v.(int); ok {
  209. total += vi
  210. } else if v != nil {
  211. return 0, fmt.Errorf("requires int but found %[1]T(%[1]v)", v)
  212. }
  213. }
  214. return total, nil
  215. }
  216. func sliceFloatTotal(s []interface{}) (float64, error) {
  217. var total float64
  218. for _, v := range s {
  219. if vf, ok := v.(float64); ok {
  220. total += vf
  221. } else if v != nil {
  222. return 0, fmt.Errorf("requires float64 but found %[1]T(%[1]v)", v)
  223. }
  224. }
  225. return total, nil
  226. }
  227. func sliceIntMax(s []interface{}, max int) (int, error) {
  228. for _, v := range s {
  229. if vi, ok := v.(int); ok {
  230. if max < vi {
  231. max = vi
  232. }
  233. } else if v != nil {
  234. return 0, fmt.Errorf("requires int but found %[1]T(%[1]v)", v)
  235. }
  236. }
  237. return max, nil
  238. }
  239. func sliceFloatMax(s []interface{}, max float64) (float64, error) {
  240. for _, v := range s {
  241. if vf, ok := v.(float64); ok {
  242. if max < vf {
  243. max = vf
  244. }
  245. } else if v != nil {
  246. return 0, fmt.Errorf("requires float64 but found %[1]T(%[1]v)", v)
  247. }
  248. }
  249. return max, nil
  250. }
  251. func sliceStringMax(s []interface{}, max string) (string, error) {
  252. for _, v := range s {
  253. if vs, ok := v.(string); ok {
  254. if max < vs {
  255. max = vs
  256. }
  257. } else if v != nil {
  258. return "", fmt.Errorf("requires string but found %[1]T(%[1]v)", v)
  259. }
  260. }
  261. return max, nil
  262. }
  263. func sliceIntMin(s []interface{}, min int) (int, error) {
  264. for _, v := range s {
  265. if vi, ok := v.(int); ok {
  266. if min > vi {
  267. min = vi
  268. }
  269. } else if v != nil {
  270. return 0, fmt.Errorf("requires int but found %[1]T(%[1]v)", v)
  271. }
  272. }
  273. return min, nil
  274. }
  275. func sliceFloatMin(s []interface{}, min float64) (float64, error) {
  276. for _, v := range s {
  277. if vf, ok := v.(float64); ok {
  278. if min > vf {
  279. min = vf
  280. }
  281. } else if v != nil {
  282. return 0, fmt.Errorf("requires float64 but found %[1]T(%[1]v)", v)
  283. }
  284. }
  285. return min, nil
  286. }
  287. func sliceStringMin(s []interface{}, min string) (string, error) {
  288. for _, v := range s {
  289. if vs, ok := v.(string); ok {
  290. if min < vs {
  291. min = vs
  292. }
  293. } else if v != nil {
  294. return "", fmt.Errorf("requires string but found %[1]T(%[1]v)", v)
  295. }
  296. }
  297. return min, nil
  298. }