funcs_agg.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package function
  15. import (
  16. "fmt"
  17. )
  18. func aggCall(name string, args []interface{}) (interface{}, bool) {
  19. switch name {
  20. case "avg":
  21. arg0 := args[0].([]interface{})
  22. c := getCount(arg0)
  23. if c > 0 {
  24. v := getFirstValidArg(arg0)
  25. switch v.(type) {
  26. case int, int64:
  27. if r, err := sliceIntTotal(arg0); err != nil {
  28. return err, false
  29. } else {
  30. return r / c, true
  31. }
  32. case float64:
  33. if r, err := sliceFloatTotal(arg0); err != nil {
  34. return err, false
  35. } else {
  36. return r / float64(c), true
  37. }
  38. case nil:
  39. return nil, true
  40. default:
  41. return fmt.Errorf("run avg function error: found invalid arg %[1]T(%[1]v)", v), false
  42. }
  43. }
  44. return 0, true
  45. case "count":
  46. arg0 := args[0].([]interface{})
  47. return getCount(arg0), true
  48. case "max":
  49. arg0 := args[0].([]interface{})
  50. if len(arg0) > 0 {
  51. v := getFirstValidArg(arg0)
  52. switch t := v.(type) {
  53. case int:
  54. if r, err := sliceIntMax(arg0, t); err != nil {
  55. return err, false
  56. } else {
  57. return r, true
  58. }
  59. case int64:
  60. if r, err := sliceIntMax(arg0, int(t)); err != nil {
  61. return err, false
  62. } else {
  63. return r, true
  64. }
  65. case float64:
  66. if r, err := sliceFloatMax(arg0, t); err != nil {
  67. return err, false
  68. } else {
  69. return r, true
  70. }
  71. case string:
  72. if r, err := sliceStringMax(arg0, t); err != nil {
  73. return err, false
  74. } else {
  75. return r, true
  76. }
  77. case nil:
  78. return nil, true
  79. default:
  80. return fmt.Errorf("run max function error: found invalid arg %[1]T(%[1]v)", v), false
  81. }
  82. }
  83. return fmt.Errorf("run max function error: empty data"), false
  84. case "min":
  85. arg0 := args[0].([]interface{})
  86. if len(arg0) > 0 {
  87. v := getFirstValidArg(arg0)
  88. switch t := v.(type) {
  89. case int:
  90. if r, err := sliceIntMin(arg0, t); err != nil {
  91. return err, false
  92. } else {
  93. return r, true
  94. }
  95. case int64:
  96. if r, err := sliceIntMin(arg0, int(t)); err != nil {
  97. return err, false
  98. } else {
  99. return r, true
  100. }
  101. case float64:
  102. if r, err := sliceFloatMin(arg0, t); err != nil {
  103. return err, false
  104. } else {
  105. return r, true
  106. }
  107. case string:
  108. if r, err := sliceStringMin(arg0, t); err != nil {
  109. return err, false
  110. } else {
  111. return r, true
  112. }
  113. case nil:
  114. return nil, true
  115. default:
  116. return fmt.Errorf("run min function error: found invalid arg %[1]T(%[1]v)", v), false
  117. }
  118. }
  119. return fmt.Errorf("run min function error: empty data"), false
  120. case "sum":
  121. arg0 := args[0].([]interface{})
  122. if len(arg0) > 0 {
  123. v := getFirstValidArg(arg0)
  124. switch v.(type) {
  125. case int, int64:
  126. if r, err := sliceIntTotal(arg0); err != nil {
  127. return err, false
  128. } else {
  129. return r, true
  130. }
  131. case float64:
  132. if r, err := sliceFloatTotal(arg0); err != nil {
  133. return err, false
  134. } else {
  135. return r, true
  136. }
  137. case nil:
  138. return nil, true
  139. default:
  140. return fmt.Errorf("run sum function error: found invalid arg %[1]T(%[1]v)", v), false
  141. }
  142. }
  143. return 0, true
  144. case "collect":
  145. return args[0], true
  146. case "deduplicate":
  147. v1, ok1 := args[0].([]interface{})
  148. v2, ok2 := args[1].([]interface{})
  149. v3a, ok3 := args[2].([]interface{})
  150. if ok1 && ok2 && ok3 && len(v3a) > 0 {
  151. v3, ok4 := getFirstValidArg(v3a).(bool)
  152. if ok4 {
  153. if r, err := dedup(v1, v2, v3); err != nil {
  154. return err, false
  155. } else {
  156. return r, true
  157. }
  158. }
  159. }
  160. return fmt.Errorf("Invalid argument type found."), false
  161. default:
  162. return fmt.Errorf("Unknown aggregate function name."), false
  163. }
  164. }
  165. func getCount(s []interface{}) int {
  166. c := 0
  167. for _, v := range s {
  168. if v != nil {
  169. c++
  170. }
  171. }
  172. return c
  173. }
  174. func getFirstValidArg(s []interface{}) interface{} {
  175. for _, v := range s {
  176. if v != nil {
  177. return v
  178. }
  179. }
  180. return nil
  181. }
  182. func sliceIntTotal(s []interface{}) (int, error) {
  183. var total int
  184. for _, v := range s {
  185. if vi, ok := v.(int); ok {
  186. total += vi
  187. } else if v != nil {
  188. return 0, fmt.Errorf("requires int but found %[1]T(%[1]v)", v)
  189. }
  190. }
  191. return total, nil
  192. }
  193. func sliceFloatTotal(s []interface{}) (float64, error) {
  194. var total float64
  195. for _, v := range s {
  196. if vf, ok := v.(float64); ok {
  197. total += vf
  198. } else if v != nil {
  199. return 0, fmt.Errorf("requires float64 but found %[1]T(%[1]v)", v)
  200. }
  201. }
  202. return total, nil
  203. }
  204. func sliceIntMax(s []interface{}, max int) (int, error) {
  205. for _, v := range s {
  206. if vi, ok := v.(int); ok {
  207. if max < vi {
  208. max = vi
  209. }
  210. } else if v != nil {
  211. return 0, fmt.Errorf("requires int but found %[1]T(%[1]v)", v)
  212. }
  213. }
  214. return max, nil
  215. }
  216. func sliceFloatMax(s []interface{}, max float64) (float64, error) {
  217. for _, v := range s {
  218. if vf, ok := v.(float64); ok {
  219. if max < vf {
  220. max = vf
  221. }
  222. } else if v != nil {
  223. return 0, fmt.Errorf("requires float64 but found %[1]T(%[1]v)", v)
  224. }
  225. }
  226. return max, nil
  227. }
  228. func sliceStringMax(s []interface{}, max string) (string, error) {
  229. for _, v := range s {
  230. if vs, ok := v.(string); ok {
  231. if max < vs {
  232. max = vs
  233. }
  234. } else if v != nil {
  235. return "", fmt.Errorf("requires string but found %[1]T(%[1]v)", v)
  236. }
  237. }
  238. return max, nil
  239. }
  240. func sliceIntMin(s []interface{}, min int) (int, error) {
  241. for _, v := range s {
  242. if vi, ok := v.(int); ok {
  243. if min > vi {
  244. min = vi
  245. }
  246. } else if v != nil {
  247. return 0, fmt.Errorf("requires int but found %[1]T(%[1]v)", v)
  248. }
  249. }
  250. return min, nil
  251. }
  252. func sliceFloatMin(s []interface{}, min float64) (float64, error) {
  253. for _, v := range s {
  254. if vf, ok := v.(float64); ok {
  255. if min > vf {
  256. min = vf
  257. }
  258. } else if v != nil {
  259. return 0, fmt.Errorf("requires float64 but found %[1]T(%[1]v)", v)
  260. }
  261. }
  262. return min, nil
  263. }
  264. func sliceStringMin(s []interface{}, min string) (string, error) {
  265. for _, v := range s {
  266. if vs, ok := v.(string); ok {
  267. if min < vs {
  268. min = vs
  269. }
  270. } else if v != nil {
  271. return "", fmt.Errorf("requires string but found %[1]T(%[1]v)", v)
  272. }
  273. }
  274. return min, nil
  275. }
  276. func dedup(r []interface{}, col []interface{}, all bool) (interface{}, error) {
  277. keyset := make(map[string]bool)
  278. result := make([]interface{}, 0)
  279. for i, m := range col {
  280. key := fmt.Sprintf("%v", m)
  281. if _, ok := keyset[key]; !ok {
  282. if all {
  283. result = append(result, r[i])
  284. } else if i == len(col)-1 {
  285. result = append(result, r[i])
  286. }
  287. keyset[key] = true
  288. }
  289. }
  290. if !all {
  291. if len(result) == 0 {
  292. return nil, nil
  293. } else {
  294. return result[0], nil
  295. }
  296. } else {
  297. return result, nil
  298. }
  299. }