funcs_misc.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package function
  15. import (
  16. "crypto/md5"
  17. "crypto/sha1"
  18. "crypto/sha256"
  19. "crypto/sha512"
  20. b64 "encoding/base64"
  21. "encoding/json"
  22. "fmt"
  23. "io"
  24. "math"
  25. "reflect"
  26. "strconv"
  27. "strings"
  28. "time"
  29. "github.com/google/uuid"
  30. "github.com/lf-edge/ekuiper/internal/conf"
  31. "github.com/lf-edge/ekuiper/internal/keyedstate"
  32. "github.com/lf-edge/ekuiper/pkg/api"
  33. "github.com/lf-edge/ekuiper/pkg/ast"
  34. "github.com/lf-edge/ekuiper/pkg/cast"
  35. )
  36. func registerMiscFunc() {
  37. builtins["cast"] = builtinFunc{
  38. fType: ast.FuncTypeScalar,
  39. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  40. value := args[0]
  41. newType := args[1]
  42. return cast.ToType(value, newType)
  43. },
  44. val: func(_ api.FunctionContext, args []ast.Expr) error {
  45. if err := ValidateLen(2, len(args)); err != nil {
  46. return err
  47. }
  48. a := args[1]
  49. if ast.IsNumericArg(a) || ast.IsTimeArg(a) || ast.IsBooleanArg(a) {
  50. return ProduceErrInfo(0, "string")
  51. }
  52. if av, ok := a.(*ast.StringLiteral); ok {
  53. if !(av.Val == "bigint" || av.Val == "float" || av.Val == "string" || av.Val == "boolean" || av.Val == "datetime") {
  54. return fmt.Errorf("Expect one of following value for the 2nd parameter: bigint, float, string, boolean, datetime.")
  55. }
  56. }
  57. return nil
  58. },
  59. check: returnNilIfHasAnyNil,
  60. }
  61. builtins["convert_tz"] = builtinFunc{
  62. fType: ast.FuncTypeScalar,
  63. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  64. arg0, err := cast.InterfaceToTime(args[0], "")
  65. if err != nil {
  66. return err, false
  67. }
  68. arg1 := cast.ToStringAlways(args[1])
  69. loc, err := time.LoadLocation(arg1)
  70. if err != nil {
  71. return err, false
  72. }
  73. return arg0.In(loc), true
  74. },
  75. val: func(_ api.FunctionContext, args []ast.Expr) error {
  76. if err := ValidateLen(2, len(args)); err != nil {
  77. return err
  78. }
  79. if ast.IsNumericArg(args[0]) || ast.IsStringArg(args[0]) || ast.IsBooleanArg(args[0]) {
  80. return ProduceErrInfo(0, "datetime")
  81. }
  82. if ast.IsNumericArg(args[1]) || ast.IsTimeArg(args[1]) || ast.IsBooleanArg(args[1]) {
  83. return ProduceErrInfo(1, "string")
  84. }
  85. return nil
  86. },
  87. check: returnNilIfHasAnyNil,
  88. }
  89. builtins["to_json"] = builtinFunc{
  90. fType: ast.FuncTypeScalar,
  91. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  92. rr, err := json.Marshal(args[0])
  93. if err != nil {
  94. return fmt.Errorf("fail to convert %v to json", args[0]), false
  95. }
  96. return string(rr), true
  97. },
  98. val: ValidateOneArg,
  99. check: returnNilIfHasAnyNil,
  100. }
  101. builtins["parse_json"] = builtinFunc{
  102. fType: ast.FuncTypeScalar,
  103. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  104. if args[0] == nil || args[0] == "null" {
  105. return nil, true
  106. }
  107. text, err := cast.ToString(args[0], cast.CONVERT_SAMEKIND)
  108. if err != nil {
  109. return fmt.Errorf("fail to convert %v to string", args[0]), false
  110. }
  111. var data interface{}
  112. err = json.Unmarshal(cast.StringToBytes(text), &data)
  113. if err != nil {
  114. return fmt.Errorf("fail to parse json: %v", err), false
  115. }
  116. return data, true
  117. },
  118. val: ValidateOneStrArg,
  119. }
  120. builtins["chr"] = builtinFunc{
  121. fType: ast.FuncTypeScalar,
  122. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  123. if v, ok := args[0].(int); ok {
  124. return rune(v), true
  125. } else if v, ok := args[0].(float64); ok {
  126. temp := int(v)
  127. return rune(temp), true
  128. } else if v, ok := args[0].(string); ok {
  129. if len(v) > 1 {
  130. return fmt.Errorf("Parameter length cannot larger than 1."), false
  131. }
  132. r := []rune(v)
  133. return r[0], true
  134. } else {
  135. return fmt.Errorf("Only bigint, float and string type can be convert to char type."), false
  136. }
  137. },
  138. val: func(_ api.FunctionContext, args []ast.Expr) error {
  139. if err := ValidateLen(1, len(args)); err != nil {
  140. return err
  141. }
  142. if ast.IsFloatArg(args[0]) || ast.IsTimeArg(args[0]) || ast.IsBooleanArg(args[0]) {
  143. return ProduceErrInfo(0, "int")
  144. }
  145. return nil
  146. },
  147. check: returnNilIfHasAnyNil,
  148. }
  149. builtins["encode"] = builtinFunc{
  150. fType: ast.FuncTypeScalar,
  151. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  152. if v, ok := args[1].(string); ok {
  153. if strings.EqualFold(v, "base64") {
  154. if v1, ok1 := args[0].(string); ok1 {
  155. return b64.StdEncoding.EncodeToString([]byte(v1)), true
  156. } else {
  157. return fmt.Errorf("Only string type can be encoded."), false
  158. }
  159. } else {
  160. return fmt.Errorf("Only base64 encoding is supported."), false
  161. }
  162. }
  163. return nil, false
  164. },
  165. val: func(_ api.FunctionContext, args []ast.Expr) error {
  166. if err := ValidateLen(2, len(args)); err != nil {
  167. return err
  168. }
  169. if ast.IsNumericArg(args[0]) || ast.IsTimeArg(args[0]) || ast.IsBooleanArg(args[0]) {
  170. return ProduceErrInfo(0, "string")
  171. }
  172. a := args[1]
  173. if !ast.IsStringArg(a) {
  174. return ProduceErrInfo(1, "string")
  175. }
  176. if av, ok := a.(*ast.StringLiteral); ok {
  177. if av.Val != "base64" {
  178. return fmt.Errorf("Only base64 is supported for the 2nd parameter.")
  179. }
  180. }
  181. return nil
  182. },
  183. check: returnNilIfHasAnyNil,
  184. }
  185. builtins["decode"] = builtinFunc{
  186. fType: ast.FuncTypeScalar,
  187. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  188. if v, ok := args[1].(string); ok {
  189. if strings.EqualFold(v, "base64") {
  190. if v1, ok1 := args[0].(string); ok1 {
  191. r, e := b64.StdEncoding.DecodeString(v1)
  192. if e != nil {
  193. return fmt.Errorf("fail to decode base64 string: %v", e), false
  194. }
  195. return r, true
  196. } else {
  197. return fmt.Errorf("Only string type can be decoded."), false
  198. }
  199. } else {
  200. return fmt.Errorf("Only base64 decoding is supported."), false
  201. }
  202. }
  203. return nil, false
  204. },
  205. val: func(_ api.FunctionContext, args []ast.Expr) error {
  206. if err := ValidateLen(2, len(args)); err != nil {
  207. return err
  208. }
  209. if ast.IsNumericArg(args[0]) || ast.IsTimeArg(args[0]) || ast.IsBooleanArg(args[0]) {
  210. return ProduceErrInfo(0, "string")
  211. }
  212. a := args[1]
  213. if !ast.IsStringArg(a) {
  214. return ProduceErrInfo(1, "string")
  215. }
  216. if av, ok := a.(*ast.StringLiteral); ok {
  217. if av.Val != "base64" {
  218. return fmt.Errorf("Only base64 is supported for the 2nd parameter.")
  219. }
  220. }
  221. return nil
  222. },
  223. check: returnNilIfHasAnyNil,
  224. }
  225. builtins["trunc"] = builtinFunc{
  226. fType: ast.FuncTypeScalar,
  227. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  228. var v0 float64
  229. if v1, ok := args[0].(int); ok {
  230. v0 = float64(v1)
  231. } else if v1, ok := args[0].(float64); ok {
  232. v0 = v1
  233. } else {
  234. return fmt.Errorf("Only int and float type can be truncated."), false
  235. }
  236. if v2, ok := args[1].(int); ok {
  237. return toFixed(v0, v2), true
  238. } else {
  239. return fmt.Errorf("The 2nd parameter must be int value."), false
  240. }
  241. },
  242. val: func(_ api.FunctionContext, args []ast.Expr) error {
  243. if err := ValidateLen(2, len(args)); err != nil {
  244. return err
  245. }
  246. if ast.IsTimeArg(args[0]) || ast.IsBooleanArg(args[0]) || ast.IsStringArg(args[0]) {
  247. return ProduceErrInfo(0, "number - float or int")
  248. }
  249. if ast.IsFloatArg(args[1]) || ast.IsTimeArg(args[1]) || ast.IsBooleanArg(args[1]) || ast.IsStringArg(args[1]) {
  250. return ProduceErrInfo(1, "int")
  251. }
  252. return nil
  253. },
  254. check: returnNilIfHasAnyNil,
  255. }
  256. builtins["md5"] = builtinFunc{
  257. fType: ast.FuncTypeScalar,
  258. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  259. arg0 := cast.ToStringAlways(args[0])
  260. h := md5.New()
  261. _, err := io.WriteString(h, arg0)
  262. if err != nil {
  263. return err, false
  264. }
  265. return fmt.Sprintf("%x", h.Sum(nil)), true
  266. },
  267. val: ValidateOneStrArg,
  268. check: returnNilIfHasAnyNil,
  269. }
  270. builtins["sha1"] = builtinFunc{
  271. fType: ast.FuncTypeScalar,
  272. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  273. arg0 := cast.ToStringAlways(args[0])
  274. h := sha1.New()
  275. _, err := io.WriteString(h, arg0)
  276. if err != nil {
  277. return err, false
  278. }
  279. return fmt.Sprintf("%x", h.Sum(nil)), true
  280. },
  281. val: ValidateOneStrArg,
  282. check: returnNilIfHasAnyNil,
  283. }
  284. builtins["sha256"] = builtinFunc{
  285. fType: ast.FuncTypeScalar,
  286. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  287. arg0 := cast.ToStringAlways(args[0])
  288. h := sha256.New()
  289. _, err := io.WriteString(h, arg0)
  290. if err != nil {
  291. return err, false
  292. }
  293. return fmt.Sprintf("%x", h.Sum(nil)), true
  294. },
  295. val: ValidateOneStrArg,
  296. check: returnNilIfHasAnyNil,
  297. }
  298. builtins["sha384"] = builtinFunc{
  299. fType: ast.FuncTypeScalar,
  300. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  301. arg0 := cast.ToStringAlways(args[0])
  302. h := sha512.New384()
  303. _, err := io.WriteString(h, arg0)
  304. if err != nil {
  305. return err, false
  306. }
  307. return fmt.Sprintf("%x", h.Sum(nil)), true
  308. },
  309. val: ValidateOneStrArg,
  310. check: returnNilIfHasAnyNil,
  311. }
  312. builtins["sha512"] = builtinFunc{
  313. fType: ast.FuncTypeScalar,
  314. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  315. arg0 := cast.ToStringAlways(args[0])
  316. h := sha512.New()
  317. _, err := io.WriteString(h, arg0)
  318. if err != nil {
  319. return err, false
  320. }
  321. return fmt.Sprintf("%x", h.Sum(nil)), true
  322. },
  323. val: ValidateOneStrArg,
  324. check: returnNilIfHasAnyNil,
  325. }
  326. builtinStatfulFuncs["compress"] = func() api.Function {
  327. conf.Log.Infof("initializing compress function")
  328. return &compressFunc{}
  329. }
  330. builtinStatfulFuncs["decompress"] = func() api.Function {
  331. conf.Log.Infof("initializing decompress function")
  332. return &decompressFunc{}
  333. }
  334. builtins["isnull"] = builtinFunc{
  335. fType: ast.FuncTypeScalar,
  336. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  337. if args[0] == nil {
  338. return true, true
  339. } else {
  340. v := reflect.ValueOf(args[0])
  341. switch v.Kind() {
  342. case reflect.Slice, reflect.Map:
  343. return v.IsNil(), true
  344. default:
  345. return false, true
  346. }
  347. }
  348. },
  349. val: ValidateOneArg,
  350. }
  351. builtins["coalesce"] = builtinFunc{
  352. fType: ast.FuncTypeScalar,
  353. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  354. for _, arg := range args {
  355. if arg != nil {
  356. return arg, true
  357. }
  358. }
  359. return nil, true
  360. },
  361. val: func(_ api.FunctionContext, args []ast.Expr) error {
  362. if len(args) == 0 {
  363. return fmt.Errorf("The arguments should be at least one.")
  364. }
  365. return nil
  366. },
  367. }
  368. builtins["newuuid"] = builtinFunc{
  369. fType: ast.FuncTypeScalar,
  370. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  371. if newUUID, err := uuid.NewUUID(); err != nil {
  372. return err, false
  373. } else {
  374. return newUUID.String(), true
  375. }
  376. },
  377. val: ValidateNoArg,
  378. }
  379. builtins["tstamp"] = builtinFunc{
  380. fType: ast.FuncTypeScalar,
  381. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  382. return conf.GetNowInMilli(), true
  383. },
  384. val: ValidateNoArg,
  385. }
  386. builtins["mqtt"] = builtinFunc{
  387. fType: ast.FuncTypeScalar,
  388. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  389. if v, ok := args[0].(string); ok {
  390. return v, true
  391. }
  392. return nil, false
  393. },
  394. val: func(_ api.FunctionContext, args []ast.Expr) error {
  395. if err := ValidateLen(1, len(args)); err != nil {
  396. return err
  397. }
  398. if ast.IsIntegerArg(args[0]) || ast.IsTimeArg(args[0]) || ast.IsBooleanArg(args[0]) || ast.IsStringArg(args[0]) || ast.IsFloatArg(args[0]) {
  399. return ProduceErrInfo(0, "meta reference")
  400. }
  401. if p, ok := args[0].(*ast.MetaRef); ok {
  402. name := strings.ToLower(p.Name)
  403. if name != "topic" && name != "messageid" {
  404. return fmt.Errorf("Parameter of mqtt function can be only topic or messageid.")
  405. }
  406. }
  407. return nil
  408. },
  409. check: returnNilIfHasAnyNil,
  410. }
  411. builtins["rule_id"] = builtinFunc{
  412. fType: ast.FuncTypeScalar,
  413. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  414. return ctx.GetRuleId(), true
  415. },
  416. val: ValidateNoArg,
  417. }
  418. builtins["meta"] = builtinFunc{
  419. fType: ast.FuncTypeScalar,
  420. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  421. return args[0], true
  422. },
  423. val: func(_ api.FunctionContext, args []ast.Expr) error {
  424. if err := ValidateLen(1, len(args)); err != nil {
  425. return err
  426. }
  427. if _, ok := args[0].(*ast.MetaRef); ok {
  428. return nil
  429. }
  430. expr := args[0]
  431. for {
  432. if be, ok := expr.(*ast.BinaryExpr); ok {
  433. if _, ok := be.LHS.(*ast.MetaRef); ok && be.OP == ast.ARROW {
  434. return nil
  435. }
  436. expr = be.LHS
  437. } else {
  438. break
  439. }
  440. }
  441. return ProduceErrInfo(0, "meta reference")
  442. },
  443. }
  444. builtins["cardinality"] = builtinFunc{
  445. fType: ast.FuncTypeScalar,
  446. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  447. val := reflect.ValueOf(args[0])
  448. if val.Kind() == reflect.Slice {
  449. return val.Len(), true
  450. }
  451. return 0, true
  452. },
  453. val: ValidateOneArg,
  454. check: return0IfHasAnyNil,
  455. }
  456. builtins["json_path_query"] = builtinFunc{
  457. fType: ast.FuncTypeScalar,
  458. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  459. result, err := jsonCall(ctx, args)
  460. if err != nil {
  461. return err, false
  462. }
  463. return result, true
  464. },
  465. val: ValidateJsonFunc,
  466. }
  467. builtins["json_path_query_first"] = builtinFunc{
  468. fType: ast.FuncTypeScalar,
  469. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  470. result, err := jsonCall(ctx, args)
  471. if err != nil {
  472. return err, false
  473. }
  474. if arr, ok := result.([]interface{}); ok {
  475. return arr[0], true
  476. } else {
  477. return fmt.Errorf("query result (%v) is not an array", result), false
  478. }
  479. },
  480. val: ValidateJsonFunc,
  481. }
  482. builtins["json_path_exists"] = builtinFunc{
  483. fType: ast.FuncTypeScalar,
  484. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  485. result, err := jsonCall(ctx, args)
  486. if err != nil {
  487. return false, true
  488. }
  489. if result == nil {
  490. return false, true
  491. }
  492. e := true
  493. switch reflect.TypeOf(result).Kind() {
  494. case reflect.Slice, reflect.Array:
  495. e = reflect.ValueOf(result).Len() > 0
  496. default:
  497. e = result != nil
  498. }
  499. return e, true
  500. },
  501. val: ValidateJsonFunc,
  502. }
  503. builtins["window_start"] = builtinFunc{
  504. fType: ast.FuncTypeScalar,
  505. exec: nil, // directly return in the valuer
  506. val: ValidateNoArg,
  507. }
  508. builtins["window_end"] = builtinFunc{
  509. fType: ast.FuncTypeScalar,
  510. exec: nil, // directly return in the valuer
  511. val: ValidateNoArg,
  512. }
  513. builtins["event_time"] = builtinFunc{
  514. fType: ast.FuncTypeScalar,
  515. exec: nil, // directly return in the valuer
  516. val: ValidateNoArg,
  517. }
  518. builtins["object_construct"] = builtinFunc{
  519. fType: ast.FuncTypeScalar,
  520. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  521. result := make(map[string]interface{})
  522. for i := 0; i < len(args); i += 2 {
  523. if args[i+1] != nil {
  524. s, err := cast.ToString(args[i], cast.CONVERT_SAMEKIND)
  525. if err != nil {
  526. return fmt.Errorf("key %v is not a string", args[i]), false
  527. }
  528. result[s] = args[i+1]
  529. }
  530. }
  531. return result, true
  532. },
  533. val: func(_ api.FunctionContext, args []ast.Expr) error {
  534. if len(args)%2 != 0 {
  535. return fmt.Errorf("the args must be key value pairs")
  536. }
  537. for i, arg := range args {
  538. if i%2 == 0 {
  539. if ast.IsNumericArg(arg) || ast.IsTimeArg(arg) || ast.IsBooleanArg(arg) {
  540. return ProduceErrInfo(i, "string")
  541. }
  542. }
  543. }
  544. return nil
  545. },
  546. check: returnNilIfHasAnyNil,
  547. }
  548. builtins["delay"] = builtinFunc{
  549. fType: ast.FuncTypeScalar,
  550. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  551. d, err := cast.ToInt(args[0], cast.CONVERT_SAMEKIND)
  552. if err != nil {
  553. return err, false
  554. }
  555. time.Sleep(time.Duration(d) * time.Millisecond)
  556. return args[1], true
  557. },
  558. val: func(_ api.FunctionContext, args []ast.Expr) error {
  559. if err := ValidateLen(2, len(args)); err != nil {
  560. return err
  561. }
  562. if ast.IsStringArg(args[0]) || ast.IsTimeArg(args[0]) || ast.IsBooleanArg(args[0]) {
  563. return ProduceErrInfo(0, "number - float or int")
  564. }
  565. return nil
  566. },
  567. check: returnNilIfHasAnyNil,
  568. }
  569. builtins["get_keyed_state"] = builtinFunc{
  570. fType: ast.FuncTypeScalar,
  571. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  572. if len(args) != 3 {
  573. return fmt.Errorf("the args must be two or three"), false
  574. }
  575. key, ok := args[0].(string)
  576. if !ok {
  577. return fmt.Errorf("key %v is not a string", args[0]), false
  578. }
  579. value, err := keyedstate.GetKeyedState(key)
  580. if err != nil {
  581. return args[2], true
  582. }
  583. return cast.ToType(value, args[1])
  584. },
  585. val: func(_ api.FunctionContext, args []ast.Expr) error {
  586. if err := ValidateLen(3, len(args)); err != nil {
  587. return err
  588. }
  589. a := args[1]
  590. if ast.IsNumericArg(a) || ast.IsTimeArg(a) || ast.IsBooleanArg(a) {
  591. return ProduceErrInfo(0, "string")
  592. }
  593. if av, ok := a.(*ast.StringLiteral); ok {
  594. if !(av.Val == "bigint" || av.Val == "float" || av.Val == "string" || av.Val == "boolean" || av.Val == "datetime") {
  595. return fmt.Errorf("expect one of following value for the 2nd parameter: bigint, float, string, boolean, datetime")
  596. }
  597. }
  598. return nil
  599. },
  600. check: returnNilIfHasAnyNil,
  601. }
  602. builtins["hex2dec"] = builtinFunc{
  603. fType: ast.FuncTypeScalar,
  604. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  605. hex, ok := args[0].(string)
  606. if !ok {
  607. return fmt.Errorf("invalid input type: %v please input hex string", args[0]), false
  608. }
  609. hex = strings.TrimPrefix(hex, "0x")
  610. dec, err := strconv.ParseInt(hex, 16, 64)
  611. if err != nil {
  612. return fmt.Errorf("invalid hexadecimal value: %v", hex), false
  613. }
  614. return dec, true
  615. },
  616. val: ValidateOneStrArg,
  617. check: returnNilIfHasAnyNil,
  618. }
  619. builtins["dec2hex"] = builtinFunc{
  620. fType: ast.FuncTypeScalar,
  621. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  622. dec, err := cast.ToInt(args[0], cast.STRICT)
  623. if err != nil {
  624. return err, false
  625. }
  626. hex := "0x" + strconv.FormatInt(int64(dec), 16)
  627. return hex, true
  628. },
  629. val: ValidateOneStrArg,
  630. check: returnNilIfHasAnyNil,
  631. }
  632. }
  633. func round(num float64) int {
  634. return int(num + math.Copysign(0.5, num))
  635. }
  636. func toFixed(num float64, precision int) float64 {
  637. output := math.Pow(10, float64(precision))
  638. return float64(round(num*output)) / output
  639. }
  640. func jsonCall(ctx api.StreamContext, args []interface{}) (interface{}, error) {
  641. jp, ok := args[1].(string)
  642. if !ok {
  643. return nil, fmt.Errorf("invalid jsonPath, must be a string but got %v", args[1])
  644. }
  645. return ctx.ParseJsonPath(jp, args[0])
  646. }
  647. // page Rotate storage for in memory cache
  648. // Not thread safe!
  649. type ringqueue struct {
  650. data []interface{}
  651. h int
  652. t int
  653. l int
  654. size int
  655. }
  656. func newRingqueue(size int) *ringqueue {
  657. return &ringqueue{
  658. data: make([]interface{}, size),
  659. h: 0, // When deleting, head++, if tail == head, it is empty
  660. t: 0, // When append, tail++, if tail== head, it is full
  661. size: size,
  662. }
  663. }
  664. // fill item will fill the queue with item value
  665. func (p *ringqueue) fill(item interface{}) {
  666. for {
  667. if !p.append(item) {
  668. return
  669. }
  670. }
  671. }
  672. // append item if list is not full and return true; otherwise return false
  673. func (p *ringqueue) append(item interface{}) bool {
  674. if p.l == p.size { // full
  675. return false
  676. }
  677. p.data[p.t] = item
  678. p.t++
  679. if p.t == p.size {
  680. p.t = 0
  681. }
  682. p.l++
  683. return true
  684. }
  685. // fetch get the first item in the cache and remove
  686. func (p *ringqueue) fetch() (interface{}, bool) {
  687. if p.l == 0 {
  688. return nil, false
  689. }
  690. result := p.data[p.h]
  691. p.h++
  692. if p.h == p.size {
  693. p.h = 0
  694. }
  695. p.l--
  696. return result, true
  697. }
  698. // peek get the first item in the cache but keep it
  699. func (p *ringqueue) peek() (interface{}, bool) {
  700. if p.l == 0 {
  701. return nil, false
  702. }
  703. result := p.data[p.h]
  704. return result, true
  705. }
  706. func (p *ringqueue) isFull() bool {
  707. return p.l == p.size
  708. }