funcs_array.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423
  1. // Copyright 2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package function
  15. import (
  16. "fmt"
  17. "math"
  18. "github.com/lf-edge/ekuiper/pkg/api"
  19. "github.com/lf-edge/ekuiper/pkg/ast"
  20. "github.com/lf-edge/ekuiper/pkg/cast"
  21. )
  22. var (
  23. errorArrayFirstArgumentNotArrayError = fmt.Errorf("first argument should be array of interface{}")
  24. errorArrayIndex = fmt.Errorf("index out of range")
  25. errorArraySecondArgumentNotArrayError = fmt.Errorf("second argument should be array of interface{}")
  26. errorArrayFirstArgumentNotIntError = fmt.Errorf("first argument should be int")
  27. errorArraySecondArgumentNotIntError = fmt.Errorf("second argument should be int")
  28. errorArrayThirdArgumentNotIntError = fmt.Errorf("third argument should be int")
  29. errorArrayContainsNonNumOrBoolValError = fmt.Errorf("array contain elements that are not numeric or Boolean")
  30. )
  31. func registerArrayFunc() {
  32. builtins["array_create"] = builtinFunc{
  33. fType: ast.FuncTypeScalar,
  34. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  35. return args, true
  36. },
  37. val: func(ctx api.FunctionContext, args []ast.Expr) error {
  38. return nil
  39. },
  40. }
  41. builtins["array_position"] = builtinFunc{
  42. fType: ast.FuncTypeScalar,
  43. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  44. array, ok := args[0].([]interface{})
  45. if !ok {
  46. return errorArrayFirstArgumentNotArrayError, false
  47. }
  48. for i, item := range array {
  49. if item == args[1] {
  50. return i, true
  51. }
  52. }
  53. return -1, true
  54. },
  55. val: func(ctx api.FunctionContext, args []ast.Expr) error {
  56. return ValidateLen(2, len(args))
  57. },
  58. }
  59. builtins["element_at"] = builtinFunc{
  60. fType: ast.FuncTypeScalar,
  61. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  62. switch args[0].(type) {
  63. case []interface{}:
  64. array := args[0].([]interface{})
  65. index, err := cast.ToInt(args[1], cast.STRICT)
  66. if err != nil {
  67. return err, false
  68. }
  69. if index >= len(array) || -index > len(array) {
  70. return errorArrayIndex, false
  71. }
  72. if index >= 0 {
  73. return array[index], true
  74. }
  75. return array[len(array)+index], true
  76. case map[string]interface{}:
  77. m := args[0].(map[string]interface{})
  78. key, ok := args[1].(string)
  79. if !ok {
  80. return fmt.Errorf("second argument should be string"), false
  81. }
  82. return m[key], true
  83. default:
  84. return fmt.Errorf("first argument should be []interface{} or map[string]interface{}"), false
  85. }
  86. },
  87. val: func(ctx api.FunctionContext, args []ast.Expr) error {
  88. return ValidateLen(2, len(args))
  89. },
  90. }
  91. builtins["array_contains"] = builtinFunc{
  92. fType: ast.FuncTypeScalar,
  93. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  94. array, ok := args[0].([]interface{})
  95. if !ok {
  96. return errorArrayFirstArgumentNotArrayError, false
  97. }
  98. for _, item := range array {
  99. if item == args[1] {
  100. return true, true
  101. }
  102. }
  103. return false, true
  104. },
  105. val: func(ctx api.FunctionContext, args []ast.Expr) error {
  106. return ValidateLen(2, len(args))
  107. },
  108. }
  109. builtins["array_remove"] = builtinFunc{
  110. fType: ast.FuncTypeScalar,
  111. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  112. array, ok := args[0].([]interface{})
  113. if !ok {
  114. return errorArrayFirstArgumentNotArrayError, false
  115. }
  116. index := 0
  117. for _, item := range array {
  118. if item != args[1] {
  119. array[index] = item
  120. index++
  121. }
  122. }
  123. return array[:index], true
  124. },
  125. val: func(ctx api.FunctionContext, args []ast.Expr) error {
  126. return ValidateLen(2, len(args))
  127. },
  128. }
  129. builtins["array_last_position"] = builtinFunc{
  130. fType: ast.FuncTypeScalar,
  131. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  132. array, ok := args[0].([]interface{})
  133. if !ok {
  134. return errorArrayFirstArgumentNotArrayError, false
  135. }
  136. lastPos := -1
  137. for i := len(array) - 1; i >= 0; i-- {
  138. if array[i] == args[1] {
  139. lastPos = i
  140. break
  141. }
  142. }
  143. return lastPos, true
  144. },
  145. val: func(ctx api.FunctionContext, args []ast.Expr) error {
  146. return ValidateLen(2, len(args))
  147. },
  148. }
  149. builtins["array_contains_any"] = builtinFunc{
  150. fType: ast.FuncTypeScalar,
  151. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  152. array1, ok1 := args[0].([]interface{})
  153. if !ok1 {
  154. return errorArrayFirstArgumentNotArrayError, false
  155. }
  156. array2, ok2 := args[1].([]interface{})
  157. if !ok2 {
  158. return errorArraySecondArgumentNotArrayError, false
  159. }
  160. for _, a := range array1 {
  161. for _, b := range array2 {
  162. if a == b {
  163. return true, true
  164. }
  165. }
  166. }
  167. return false, true
  168. },
  169. val: func(ctx api.FunctionContext, args []ast.Expr) error {
  170. return ValidateLen(2, len(args))
  171. },
  172. }
  173. builtins["array_intersect"] = builtinFunc{
  174. fType: ast.FuncTypeScalar,
  175. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  176. array1, ok1 := args[0].([]interface{})
  177. if !ok1 {
  178. return errorArrayFirstArgumentNotArrayError, false
  179. }
  180. array2, ok2 := args[1].([]interface{})
  181. if !ok2 {
  182. return errorArraySecondArgumentNotArrayError, false
  183. }
  184. capacity := len(array1)
  185. if len(array2) > capacity {
  186. capacity = len(array2)
  187. }
  188. intersection := make([]interface{}, 0, capacity)
  189. set := make(map[interface{}]bool)
  190. for _, a := range array1 {
  191. set[a] = true
  192. }
  193. for _, b := range array2 {
  194. if set[b] {
  195. intersection = append(intersection, b)
  196. set[b] = false
  197. }
  198. }
  199. return intersection, true
  200. },
  201. val: func(ctx api.FunctionContext, args []ast.Expr) error {
  202. return ValidateLen(2, len(args))
  203. },
  204. }
  205. builtins["array_union"] = builtinFunc{
  206. fType: ast.FuncTypeScalar,
  207. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  208. array1, ok1 := args[0].([]interface{})
  209. if !ok1 {
  210. return errorArrayFirstArgumentNotArrayError, false
  211. }
  212. array2, ok2 := args[1].([]interface{})
  213. if !ok2 {
  214. return errorArraySecondArgumentNotArrayError, false
  215. }
  216. union := make([]interface{}, 0, len(array1)+len(array2))
  217. set := make(map[interface{}]bool)
  218. for _, a := range array1 {
  219. if !set[a] {
  220. union = append(union, a)
  221. set[a] = true
  222. }
  223. }
  224. for _, b := range array2 {
  225. if !set[b] {
  226. set[b] = true
  227. union = append(union, b)
  228. }
  229. }
  230. return union, true
  231. },
  232. val: func(ctx api.FunctionContext, args []ast.Expr) error {
  233. return ValidateLen(2, len(args))
  234. },
  235. }
  236. builtins["array_max"] = builtinFunc{
  237. fType: ast.FuncTypeScalar,
  238. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  239. array, ok := args[0].([]interface{})
  240. if !ok {
  241. return errorArrayFirstArgumentNotArrayError, false
  242. }
  243. var res interface{}
  244. var maxVal float64 = math.Inf(-1)
  245. for _, val := range array {
  246. if val == nil {
  247. return nil, true
  248. }
  249. f, err := cast.ToFloat64(val, cast.CONVERT_ALL)
  250. if err != nil {
  251. return errorArrayContainsNonNumOrBoolValError, false
  252. }
  253. if f > maxVal {
  254. maxVal = f
  255. res = val
  256. }
  257. }
  258. return res, true
  259. },
  260. val: func(ctx api.FunctionContext, args []ast.Expr) error {
  261. return ValidateLen(1, len(args))
  262. },
  263. }
  264. builtins["array_min"] = builtinFunc{
  265. fType: ast.FuncTypeScalar,
  266. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  267. array, ok := args[0].([]interface{})
  268. if !ok {
  269. return errorArrayFirstArgumentNotArrayError, false
  270. }
  271. var res interface{}
  272. var min float64 = math.Inf(1)
  273. for _, val := range array {
  274. if val == nil {
  275. return nil, true
  276. }
  277. f, err := cast.ToFloat64(val, cast.CONVERT_ALL)
  278. if err != nil {
  279. return errorArrayContainsNonNumOrBoolValError, false
  280. }
  281. if f < min {
  282. min = f
  283. res = val
  284. }
  285. }
  286. return res, true
  287. },
  288. val: func(ctx api.FunctionContext, args []ast.Expr) error {
  289. return ValidateLen(1, len(args))
  290. },
  291. }
  292. builtins["array_except"] = builtinFunc{
  293. fType: ast.FuncTypeScalar,
  294. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  295. array1, ok1 := args[0].([]interface{})
  296. if !ok1 {
  297. return errorArrayFirstArgumentNotArrayError, false
  298. }
  299. array2, ok2 := args[1].([]interface{})
  300. if !ok2 {
  301. return errorArraySecondArgumentNotArrayError, false
  302. }
  303. except := make([]interface{}, 0, len(array1))
  304. set := make(map[interface{}]bool)
  305. for _, v := range array2 {
  306. set[v] = true
  307. }
  308. for _, v := range array1 {
  309. if !set[v] {
  310. except = append(except, v)
  311. set[v] = true
  312. }
  313. }
  314. return except, true
  315. },
  316. val: func(ctx api.FunctionContext, args []ast.Expr) error {
  317. return ValidateLen(2, len(args))
  318. },
  319. }
  320. builtins["repeat"] = builtinFunc{
  321. fType: ast.FuncTypeScalar,
  322. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  323. elemt, ok := args[0].(interface{})
  324. if !ok {
  325. return errorArrayFirstArgumentNotArrayError, false
  326. }
  327. count, ok := args[1].(int)
  328. if !ok {
  329. return errorArraySecondArgumentNotIntError, false
  330. }
  331. arr := make([]interface{}, count)
  332. for i := range arr {
  333. arr[i] = elemt
  334. }
  335. return arr, true
  336. },
  337. val: func(ctx api.FunctionContext, args []ast.Expr) error {
  338. return ValidateLen(2, len(args))
  339. },
  340. }
  341. builtins["sequence"] = builtinFunc{
  342. fType: ast.FuncTypeScalar,
  343. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  344. var step, start, stop int
  345. var ok bool
  346. start, ok = args[0].(int)
  347. if !ok {
  348. return errorArrayFirstArgumentNotIntError, false
  349. }
  350. stop, ok = args[1].(int)
  351. if !ok {
  352. return errorArraySecondArgumentNotIntError, false
  353. }
  354. if len(args) == 3 {
  355. step, ok = args[2].(int)
  356. if !ok {
  357. return errorArrayThirdArgumentNotIntError, false
  358. }
  359. if step == 0 {
  360. return fmt.Errorf("invalid step: should not be zero"), false
  361. }
  362. } else {
  363. if start < stop {
  364. step = 1
  365. } else {
  366. step = -1
  367. }
  368. }
  369. n := (stop-start)/step + 1
  370. arr := make([]interface{}, n)
  371. for i := range arr {
  372. arr[i] = start + i*step
  373. }
  374. return arr, true
  375. },
  376. val: func(ctx api.FunctionContext, args []ast.Expr) error {
  377. if err := ValidateLen(2, len(args)); err != nil {
  378. if err := ValidateLen(3, len(args)); err != nil {
  379. return fmt.Errorf("Expect two or three arguments but found %d.", len(args))
  380. }
  381. }
  382. return nil
  383. },
  384. }
  385. }