funcs_array.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604
  1. // Copyright 2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package function
  15. import (
  16. "fmt"
  17. "math"
  18. "math/rand"
  19. "strings"
  20. "github.com/lf-edge/ekuiper/pkg/api"
  21. "github.com/lf-edge/ekuiper/pkg/ast"
  22. "github.com/lf-edge/ekuiper/pkg/cast"
  23. )
  24. var (
  25. errorArrayFirstArgumentNotArrayError = fmt.Errorf("first argument should be array of interface{}")
  26. errorArrayIndex = fmt.Errorf("index out of range")
  27. errorArraySecondArgumentNotArrayError = fmt.Errorf("second argument should be array of interface{}")
  28. errorArrayFirstArgumentNotIntError = fmt.Errorf("first argument should be int")
  29. errorArrayFirstArgumentNotStringError = fmt.Errorf("first argument should be string")
  30. errorArraySecondArgumentNotIntError = fmt.Errorf("second argument should be int")
  31. errorArraySecondArgumentNotStringError = fmt.Errorf("second argument should be string")
  32. errorArrayThirdArgumentNotIntError = fmt.Errorf("third argument should be int")
  33. errorArrayThirdArgumentNotStringError = fmt.Errorf("third argument should be string")
  34. errorArrayContainsNonNumOrBoolValError = fmt.Errorf("array contain elements that are not numeric or Boolean")
  35. errorArrayNotArrayElementError = fmt.Errorf("array elements should be array")
  36. errorArrayNotStringElementError = fmt.Errorf("array elements should be string")
  37. )
  38. func registerArrayFunc() {
  39. builtins["array_create"] = builtinFunc{
  40. fType: ast.FuncTypeScalar,
  41. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  42. return args, true
  43. },
  44. val: func(ctx api.FunctionContext, args []ast.Expr) error {
  45. return nil
  46. },
  47. }
  48. builtins["array_position"] = builtinFunc{
  49. fType: ast.FuncTypeScalar,
  50. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  51. array, ok := args[0].([]interface{})
  52. if !ok {
  53. return errorArrayFirstArgumentNotArrayError, false
  54. }
  55. for i, item := range array {
  56. if item == args[1] {
  57. return i, true
  58. }
  59. }
  60. return -1, true
  61. },
  62. val: func(ctx api.FunctionContext, args []ast.Expr) error {
  63. return ValidateLen(2, len(args))
  64. },
  65. }
  66. builtins["element_at"] = builtinFunc{
  67. fType: ast.FuncTypeScalar,
  68. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  69. switch args[0].(type) {
  70. case []interface{}:
  71. array := args[0].([]interface{})
  72. index, err := cast.ToInt(args[1], cast.STRICT)
  73. if err != nil {
  74. return err, false
  75. }
  76. if index >= len(array) || -index > len(array) {
  77. return errorArrayIndex, false
  78. }
  79. if index >= 0 {
  80. return array[index], true
  81. }
  82. return array[len(array)+index], true
  83. case map[string]interface{}:
  84. m := args[0].(map[string]interface{})
  85. key, ok := args[1].(string)
  86. if !ok {
  87. return fmt.Errorf("second argument should be string"), false
  88. }
  89. return m[key], true
  90. default:
  91. return fmt.Errorf("first argument should be []interface{} or map[string]interface{}"), false
  92. }
  93. },
  94. val: func(ctx api.FunctionContext, args []ast.Expr) error {
  95. return ValidateLen(2, len(args))
  96. },
  97. }
  98. builtins["array_contains"] = builtinFunc{
  99. fType: ast.FuncTypeScalar,
  100. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  101. array, ok := args[0].([]interface{})
  102. if !ok {
  103. return errorArrayFirstArgumentNotArrayError, false
  104. }
  105. for _, item := range array {
  106. if item == args[1] {
  107. return true, true
  108. }
  109. }
  110. return false, true
  111. },
  112. val: func(ctx api.FunctionContext, args []ast.Expr) error {
  113. return ValidateLen(2, len(args))
  114. },
  115. }
  116. builtins["array_remove"] = builtinFunc{
  117. fType: ast.FuncTypeScalar,
  118. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  119. array, ok := args[0].([]interface{})
  120. if !ok {
  121. return errorArrayFirstArgumentNotArrayError, false
  122. }
  123. index := 0
  124. for _, item := range array {
  125. if item != args[1] {
  126. array[index] = item
  127. index++
  128. }
  129. }
  130. return array[:index], true
  131. },
  132. val: func(ctx api.FunctionContext, args []ast.Expr) error {
  133. return ValidateLen(2, len(args))
  134. },
  135. }
  136. builtins["array_last_position"] = builtinFunc{
  137. fType: ast.FuncTypeScalar,
  138. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  139. array, ok := args[0].([]interface{})
  140. if !ok {
  141. return errorArrayFirstArgumentNotArrayError, false
  142. }
  143. lastPos := -1
  144. for i := len(array) - 1; i >= 0; i-- {
  145. if array[i] == args[1] {
  146. lastPos = i
  147. break
  148. }
  149. }
  150. return lastPos, true
  151. },
  152. val: func(ctx api.FunctionContext, args []ast.Expr) error {
  153. return ValidateLen(2, len(args))
  154. },
  155. }
  156. builtins["array_contains_any"] = builtinFunc{
  157. fType: ast.FuncTypeScalar,
  158. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  159. array1, ok1 := args[0].([]interface{})
  160. if !ok1 {
  161. return errorArrayFirstArgumentNotArrayError, false
  162. }
  163. array2, ok2 := args[1].([]interface{})
  164. if !ok2 {
  165. return errorArraySecondArgumentNotArrayError, false
  166. }
  167. for _, a := range array1 {
  168. for _, b := range array2 {
  169. if a == b {
  170. return true, true
  171. }
  172. }
  173. }
  174. return false, true
  175. },
  176. val: func(ctx api.FunctionContext, args []ast.Expr) error {
  177. return ValidateLen(2, len(args))
  178. },
  179. }
  180. builtins["array_intersect"] = builtinFunc{
  181. fType: ast.FuncTypeScalar,
  182. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  183. array1, ok1 := args[0].([]interface{})
  184. if !ok1 {
  185. return errorArrayFirstArgumentNotArrayError, false
  186. }
  187. array2, ok2 := args[1].([]interface{})
  188. if !ok2 {
  189. return errorArraySecondArgumentNotArrayError, false
  190. }
  191. capacity := len(array1)
  192. if len(array2) > capacity {
  193. capacity = len(array2)
  194. }
  195. intersection := make([]interface{}, 0, capacity)
  196. set := make(map[interface{}]bool)
  197. for _, a := range array1 {
  198. set[a] = true
  199. }
  200. for _, b := range array2 {
  201. if set[b] {
  202. intersection = append(intersection, b)
  203. set[b] = false
  204. }
  205. }
  206. return intersection, true
  207. },
  208. val: func(ctx api.FunctionContext, args []ast.Expr) error {
  209. return ValidateLen(2, len(args))
  210. },
  211. }
  212. builtins["array_union"] = builtinFunc{
  213. fType: ast.FuncTypeScalar,
  214. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  215. array1, ok1 := args[0].([]interface{})
  216. if !ok1 {
  217. return errorArrayFirstArgumentNotArrayError, false
  218. }
  219. array2, ok2 := args[1].([]interface{})
  220. if !ok2 {
  221. return errorArraySecondArgumentNotArrayError, false
  222. }
  223. union := make([]interface{}, 0, len(array1)+len(array2))
  224. set := make(map[interface{}]bool)
  225. for _, a := range array1 {
  226. if !set[a] {
  227. union = append(union, a)
  228. set[a] = true
  229. }
  230. }
  231. for _, b := range array2 {
  232. if !set[b] {
  233. set[b] = true
  234. union = append(union, b)
  235. }
  236. }
  237. return union, true
  238. },
  239. val: func(ctx api.FunctionContext, args []ast.Expr) error {
  240. return ValidateLen(2, len(args))
  241. },
  242. }
  243. builtins["array_max"] = builtinFunc{
  244. fType: ast.FuncTypeScalar,
  245. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  246. array, ok := args[0].([]interface{})
  247. if !ok {
  248. return errorArrayFirstArgumentNotArrayError, false
  249. }
  250. var res interface{}
  251. var maxVal float64 = math.Inf(-1)
  252. for _, val := range array {
  253. if val == nil {
  254. return nil, true
  255. }
  256. f, err := cast.ToFloat64(val, cast.CONVERT_ALL)
  257. if err != nil {
  258. return errorArrayContainsNonNumOrBoolValError, false
  259. }
  260. if f > maxVal {
  261. maxVal = f
  262. res = val
  263. }
  264. }
  265. return res, true
  266. },
  267. val: func(ctx api.FunctionContext, args []ast.Expr) error {
  268. return ValidateLen(1, len(args))
  269. },
  270. }
  271. builtins["array_min"] = builtinFunc{
  272. fType: ast.FuncTypeScalar,
  273. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  274. array, ok := args[0].([]interface{})
  275. if !ok {
  276. return errorArrayFirstArgumentNotArrayError, false
  277. }
  278. var res interface{}
  279. var min float64 = math.Inf(1)
  280. for _, val := range array {
  281. if val == nil {
  282. return nil, true
  283. }
  284. f, err := cast.ToFloat64(val, cast.CONVERT_ALL)
  285. if err != nil {
  286. return errorArrayContainsNonNumOrBoolValError, false
  287. }
  288. if f < min {
  289. min = f
  290. res = val
  291. }
  292. }
  293. return res, true
  294. },
  295. val: func(ctx api.FunctionContext, args []ast.Expr) error {
  296. return ValidateLen(1, len(args))
  297. },
  298. }
  299. builtins["array_except"] = builtinFunc{
  300. fType: ast.FuncTypeScalar,
  301. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  302. array1, ok1 := args[0].([]interface{})
  303. if !ok1 {
  304. return errorArrayFirstArgumentNotArrayError, false
  305. }
  306. array2, ok2 := args[1].([]interface{})
  307. if !ok2 {
  308. return errorArraySecondArgumentNotArrayError, false
  309. }
  310. except := make([]interface{}, 0, len(array1))
  311. set := make(map[interface{}]bool)
  312. for _, v := range array2 {
  313. set[v] = true
  314. }
  315. for _, v := range array1 {
  316. if !set[v] {
  317. except = append(except, v)
  318. set[v] = true
  319. }
  320. }
  321. return except, true
  322. },
  323. val: func(ctx api.FunctionContext, args []ast.Expr) error {
  324. return ValidateLen(2, len(args))
  325. },
  326. }
  327. builtins["repeat"] = builtinFunc{
  328. fType: ast.FuncTypeScalar,
  329. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  330. elemt, ok := args[0].(interface{})
  331. if !ok {
  332. return errorArrayFirstArgumentNotArrayError, false
  333. }
  334. count, ok := args[1].(int)
  335. if !ok {
  336. return errorArraySecondArgumentNotIntError, false
  337. }
  338. arr := make([]interface{}, count)
  339. for i := range arr {
  340. arr[i] = elemt
  341. }
  342. return arr, true
  343. },
  344. val: func(ctx api.FunctionContext, args []ast.Expr) error {
  345. return ValidateLen(2, len(args))
  346. },
  347. }
  348. builtins["sequence"] = builtinFunc{
  349. fType: ast.FuncTypeScalar,
  350. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  351. var step, start, stop int
  352. var ok bool
  353. start, ok = args[0].(int)
  354. if !ok {
  355. return errorArrayFirstArgumentNotIntError, false
  356. }
  357. stop, ok = args[1].(int)
  358. if !ok {
  359. return errorArraySecondArgumentNotIntError, false
  360. }
  361. if len(args) == 3 {
  362. step, ok = args[2].(int)
  363. if !ok {
  364. return errorArrayThirdArgumentNotIntError, false
  365. }
  366. if step == 0 {
  367. return fmt.Errorf("invalid step: should not be zero"), false
  368. }
  369. } else {
  370. if start < stop {
  371. step = 1
  372. } else {
  373. step = -1
  374. }
  375. }
  376. n := (stop-start)/step + 1
  377. arr := make([]interface{}, n)
  378. for i := range arr {
  379. arr[i] = start + i*step
  380. }
  381. return arr, true
  382. },
  383. val: func(ctx api.FunctionContext, args []ast.Expr) error {
  384. if err := ValidateLen(2, len(args)); err != nil {
  385. if err := ValidateLen(3, len(args)); err != nil {
  386. return fmt.Errorf("Expect two or three arguments but found %d.", len(args))
  387. }
  388. }
  389. return nil
  390. },
  391. }
  392. builtins["array_cardinality"] = builtinFunc{
  393. fType: ast.FuncTypeScalar,
  394. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  395. array, ok := args[0].([]interface{})
  396. if !ok {
  397. return errorArrayFirstArgumentNotArrayError, false
  398. }
  399. return len(array), true
  400. },
  401. val: func(ctx api.FunctionContext, args []ast.Expr) error {
  402. return ValidateLen(1, len(args))
  403. },
  404. }
  405. builtins["array_flatten"] = builtinFunc{
  406. fType: ast.FuncTypeScalar,
  407. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  408. array, ok := args[0].([]interface{})
  409. if !ok {
  410. return errorArrayFirstArgumentNotArrayError, false
  411. }
  412. var output []interface{}
  413. for _, val := range array {
  414. innerArr, ok := val.([]interface{})
  415. if !ok {
  416. return errorArrayNotArrayElementError, false
  417. }
  418. output = append(output, innerArr...)
  419. }
  420. return output, true
  421. },
  422. val: func(ctx api.FunctionContext, args []ast.Expr) error {
  423. return ValidateLen(1, len(args))
  424. },
  425. }
  426. builtins["array_distinct"] = builtinFunc{
  427. fType: ast.FuncTypeScalar,
  428. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  429. array, ok := args[0].([]interface{})
  430. if !ok {
  431. return errorArrayFirstArgumentNotArrayError, false
  432. }
  433. output := make([]interface{}, 0, len(array))
  434. set := make(map[interface{}]bool)
  435. for _, val := range array {
  436. if !set[val] {
  437. output = append(output, val)
  438. set[val] = true
  439. }
  440. }
  441. return output, true
  442. },
  443. val: func(ctx api.FunctionContext, args []ast.Expr) error {
  444. return ValidateLen(1, len(args))
  445. },
  446. }
  447. builtins["array_map"] = builtinFunc{
  448. fType: ast.FuncTypeScalar,
  449. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  450. funcName, ok := args[0].(string)
  451. if !ok {
  452. return errorArrayFirstArgumentNotStringError, false
  453. }
  454. array, ok := args[1].([]interface{})
  455. if !ok {
  456. return errorArraySecondArgumentNotArrayError, false
  457. }
  458. mapped := make([]interface{}, 0, len(array))
  459. var result interface{}
  460. for _, v := range array {
  461. params := []interface{}{v}
  462. fs, ok := builtins[funcName]
  463. if !ok {
  464. return fmt.Errorf("unknown built-in function: %s.", funcName), false
  465. }
  466. if fs.fType != ast.FuncTypeScalar {
  467. return fmt.Errorf("first argument should be a scalar function."), false
  468. }
  469. eargs := make([]ast.Expr, len(params))
  470. if err := fs.val(nil, eargs); err != nil {
  471. return fmt.Errorf("validate function arguments failed."), false
  472. }
  473. result, ok = fs.exec(ctx, params)
  474. if !ok {
  475. return result, false
  476. }
  477. mapped = append(mapped, result)
  478. }
  479. return mapped, true
  480. },
  481. val: func(ctx api.FunctionContext, args []ast.Expr) error {
  482. return ValidateLen(2, len(args))
  483. },
  484. }
  485. builtins["array_join"] = builtinFunc{
  486. fType: ast.FuncTypeScalar,
  487. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  488. array, ok := args[0].([]interface{})
  489. if !ok {
  490. return errorArrayFirstArgumentNotArrayError, false
  491. }
  492. delimiter, ok := args[1].(string)
  493. if !ok {
  494. return errorArraySecondArgumentNotStringError, false
  495. }
  496. var nullReplacement string
  497. if len(args) == 3 {
  498. nullReplacement, ok = args[2].(string)
  499. if !ok {
  500. return errorArrayThirdArgumentNotStringError, false
  501. }
  502. }
  503. var index int
  504. for _, v := range array {
  505. if v == nil {
  506. if len(nullReplacement) != 0 {
  507. array[index] = nullReplacement
  508. index++
  509. }
  510. } else {
  511. array[index], ok = v.(string)
  512. index++
  513. if !ok {
  514. return errorArrayNotStringElementError, false
  515. }
  516. }
  517. }
  518. strs, err := cast.ToStringSlice(array[:index], cast.CONVERT_ALL)
  519. if err != nil {
  520. return err, false
  521. }
  522. return strings.Join(strs, delimiter), true
  523. },
  524. val: func(ctx api.FunctionContext, args []ast.Expr) error {
  525. if err := ValidateLen(2, len(args)); err != nil {
  526. if err := ValidateLen(3, len(args)); err != nil {
  527. return fmt.Errorf("Expect two or three arguments but found %d.", len(args))
  528. }
  529. }
  530. return nil
  531. },
  532. }
  533. builtins["array_shuffle"] = builtinFunc{
  534. fType: ast.FuncTypeScalar,
  535. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  536. array, ok := args[0].([]interface{})
  537. if !ok {
  538. return errorArrayFirstArgumentNotArrayError, false
  539. }
  540. rand.Shuffle(len(array), func(i, j int) {
  541. array[i], array[j] = array[j], array[i]
  542. })
  543. return array, true
  544. },
  545. val: func(ctx api.FunctionContext, args []ast.Expr) error {
  546. return ValidateLen(1, len(args))
  547. },
  548. }
  549. }