funcs_misc.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package function
  15. import (
  16. "crypto/md5"
  17. "crypto/sha1"
  18. "crypto/sha256"
  19. "crypto/sha512"
  20. b64 "encoding/base64"
  21. "encoding/json"
  22. "fmt"
  23. "github.com/google/uuid"
  24. "github.com/lf-edge/ekuiper/internal/conf"
  25. "github.com/lf-edge/ekuiper/pkg/api"
  26. "github.com/lf-edge/ekuiper/pkg/ast"
  27. "github.com/lf-edge/ekuiper/pkg/cast"
  28. "io"
  29. "math"
  30. "reflect"
  31. "strconv"
  32. "strings"
  33. )
  34. func registerMiscFunc() {
  35. builtins["cast"] = builtinFunc{
  36. fType: ast.FuncTypeScalar,
  37. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  38. if v, ok := args[1].(string); ok {
  39. switch v {
  40. case "bigint":
  41. if v1, ok1 := args[0].(int); ok1 {
  42. return v1, true
  43. } else if v1, ok1 := args[0].(float64); ok1 {
  44. return int(v1), true
  45. } else if v1, ok1 := args[0].(string); ok1 {
  46. if temp, err := strconv.Atoi(v1); err == nil {
  47. return temp, true
  48. } else {
  49. return err, false
  50. }
  51. } else if v1, ok1 := args[0].(bool); ok1 {
  52. if v1 {
  53. return 1, true
  54. } else {
  55. return 0, true
  56. }
  57. } else {
  58. return fmt.Errorf("Not supported type conversion."), false
  59. }
  60. case "float":
  61. if v1, ok1 := args[0].(int); ok1 {
  62. return float64(v1), true
  63. } else if v1, ok1 := args[0].(float64); ok1 {
  64. return v1, true
  65. } else if v1, ok1 := args[0].(string); ok1 {
  66. if temp, err := strconv.ParseFloat(v1, 64); err == nil {
  67. return temp, true
  68. } else {
  69. return err, false
  70. }
  71. } else if v1, ok1 := args[0].(bool); ok1 {
  72. if v1 {
  73. return 1.0, true
  74. } else {
  75. return 0.0, true
  76. }
  77. } else {
  78. return fmt.Errorf("Not supported type conversion."), false
  79. }
  80. case "string":
  81. r, e := cast.ToString(args[0], cast.CONVERT_ALL)
  82. if e != nil {
  83. return fmt.Errorf("Not supported type conversion, got error %v.", e), false
  84. } else {
  85. return r, true
  86. }
  87. case "boolean":
  88. if v1, ok1 := args[0].(int); ok1 {
  89. if v1 == 0 {
  90. return false, true
  91. } else {
  92. return true, true
  93. }
  94. } else if v1, ok1 := args[0].(float64); ok1 {
  95. if v1 == 0.0 {
  96. return false, true
  97. } else {
  98. return true, true
  99. }
  100. } else if v1, ok1 := args[0].(string); ok1 {
  101. if temp, err := strconv.ParseBool(v1); err == nil {
  102. return temp, true
  103. } else {
  104. return err, false
  105. }
  106. } else if v1, ok1 := args[0].(bool); ok1 {
  107. return v1, true
  108. } else {
  109. return fmt.Errorf("Not supported type conversion."), false
  110. }
  111. case "datetime":
  112. dt, err := cast.InterfaceToTime(args[0], "")
  113. if err != nil {
  114. return err, false
  115. } else {
  116. return dt, true
  117. }
  118. default:
  119. return fmt.Errorf("Unknow type, only support bigint, float, string, boolean and datetime."), false
  120. }
  121. } else {
  122. return fmt.Errorf("Expect string type for the 2nd parameter."), false
  123. }
  124. },
  125. val: func(_ api.FunctionContext, args []ast.Expr) error {
  126. if err := ValidateLen(2, len(args)); err != nil {
  127. return err
  128. }
  129. a := args[1]
  130. if !ast.IsStringArg(a) {
  131. return ProduceErrInfo(1, "string")
  132. }
  133. if av, ok := a.(*ast.StringLiteral); ok {
  134. if !(av.Val == "bigint" || av.Val == "float" || av.Val == "string" || av.Val == "boolean" || av.Val == "datetime") {
  135. return fmt.Errorf("Expect one of following value for the 2nd parameter: bigint, float, string, boolean, datetime.")
  136. }
  137. }
  138. return nil
  139. },
  140. }
  141. builtins["to_json"] = builtinFunc{
  142. fType: ast.FuncTypeScalar,
  143. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  144. if args[0] == nil {
  145. return "null", true
  146. }
  147. rr, err := json.Marshal(args[0])
  148. if err != nil {
  149. return fmt.Errorf("fail to convert %v to json", args[0]), false
  150. }
  151. return string(rr), true
  152. },
  153. val: ValidateOneArg,
  154. }
  155. builtins["parse_json"] = builtinFunc{
  156. fType: ast.FuncTypeScalar,
  157. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  158. if args[0] == nil || args[0] == "null" {
  159. return nil, true
  160. }
  161. text, err := cast.ToString(args[0], cast.CONVERT_SAMEKIND)
  162. if err != nil {
  163. return fmt.Errorf("fail to convert %v to string", args[0]), false
  164. }
  165. var data interface{}
  166. err = json.Unmarshal([]byte(text), &data)
  167. if err != nil {
  168. return fmt.Errorf("fail to parse json: %v", err), false
  169. }
  170. return data, true
  171. },
  172. val: ValidateOneStrArg,
  173. }
  174. builtins["chr"] = builtinFunc{
  175. fType: ast.FuncTypeScalar,
  176. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  177. if v, ok := args[0].(int); ok {
  178. return rune(v), true
  179. } else if v, ok := args[0].(float64); ok {
  180. temp := int(v)
  181. return rune(temp), true
  182. } else if v, ok := args[0].(string); ok {
  183. if len(v) > 1 {
  184. return fmt.Errorf("Parameter length cannot larger than 1."), false
  185. }
  186. r := []rune(v)
  187. return r[0], true
  188. } else {
  189. return fmt.Errorf("Only bigint, float and string type can be convert to char type."), false
  190. }
  191. },
  192. val: func(_ api.FunctionContext, args []ast.Expr) error {
  193. if err := ValidateLen(1, len(args)); err != nil {
  194. return err
  195. }
  196. if ast.IsFloatArg(args[0]) || ast.IsTimeArg(args[0]) || ast.IsBooleanArg(args[0]) {
  197. return ProduceErrInfo(0, "int")
  198. }
  199. return nil
  200. },
  201. }
  202. builtins["encode"] = builtinFunc{
  203. fType: ast.FuncTypeScalar,
  204. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  205. if v, ok := args[1].(string); ok {
  206. if strings.EqualFold(v, "base64") {
  207. if v1, ok1 := args[0].(string); ok1 {
  208. return b64.StdEncoding.EncodeToString([]byte(v1)), true
  209. } else {
  210. return fmt.Errorf("Only string type can be encoded."), false
  211. }
  212. } else {
  213. return fmt.Errorf("Only base64 encoding is supported."), false
  214. }
  215. }
  216. return nil, false
  217. },
  218. val: func(_ api.FunctionContext, args []ast.Expr) error {
  219. if err := ValidateLen(2, len(args)); err != nil {
  220. return err
  221. }
  222. if ast.IsNumericArg(args[0]) || ast.IsTimeArg(args[0]) || ast.IsBooleanArg(args[0]) {
  223. return ProduceErrInfo(0, "string")
  224. }
  225. a := args[1]
  226. if !ast.IsStringArg(a) {
  227. return ProduceErrInfo(1, "string")
  228. }
  229. if av, ok := a.(*ast.StringLiteral); ok {
  230. if av.Val != "base64" {
  231. return fmt.Errorf("Only base64 is supported for the 2nd parameter.")
  232. }
  233. }
  234. return nil
  235. },
  236. }
  237. builtins["decode"] = builtinFunc{
  238. fType: ast.FuncTypeScalar,
  239. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  240. if v, ok := args[1].(string); ok {
  241. if strings.EqualFold(v, "base64") {
  242. if v1, ok1 := args[0].(string); ok1 {
  243. r, e := b64.StdEncoding.DecodeString(v1)
  244. if e != nil {
  245. return fmt.Errorf("fail to decode base64 string: %v", e), false
  246. }
  247. return r, true
  248. } else {
  249. return fmt.Errorf("Only string type can be decoded."), false
  250. }
  251. } else {
  252. return fmt.Errorf("Only base64 decoding is supported."), false
  253. }
  254. }
  255. return nil, false
  256. },
  257. val: func(_ api.FunctionContext, args []ast.Expr) error {
  258. if err := ValidateLen(2, len(args)); err != nil {
  259. return err
  260. }
  261. if ast.IsNumericArg(args[0]) || ast.IsTimeArg(args[0]) || ast.IsBooleanArg(args[0]) {
  262. return ProduceErrInfo(0, "string")
  263. }
  264. a := args[1]
  265. if !ast.IsStringArg(a) {
  266. return ProduceErrInfo(1, "string")
  267. }
  268. if av, ok := a.(*ast.StringLiteral); ok {
  269. if av.Val != "base64" {
  270. return fmt.Errorf("Only base64 is supported for the 2nd parameter.")
  271. }
  272. }
  273. return nil
  274. },
  275. }
  276. builtins["trunc"] = builtinFunc{
  277. fType: ast.FuncTypeScalar,
  278. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  279. var v0 float64
  280. if v1, ok := args[0].(int); ok {
  281. v0 = float64(v1)
  282. } else if v1, ok := args[0].(float64); ok {
  283. v0 = v1
  284. } else {
  285. return fmt.Errorf("Only int and float type can be truncated."), false
  286. }
  287. if v2, ok := args[1].(int); ok {
  288. return toFixed(v0, v2), true
  289. } else {
  290. return fmt.Errorf("The 2nd parameter must be int value."), false
  291. }
  292. },
  293. val: func(_ api.FunctionContext, args []ast.Expr) error {
  294. if err := ValidateLen(2, len(args)); err != nil {
  295. return err
  296. }
  297. if ast.IsTimeArg(args[0]) || ast.IsBooleanArg(args[0]) || ast.IsStringArg(args[0]) {
  298. return ProduceErrInfo(0, "number - float or int")
  299. }
  300. if ast.IsFloatArg(args[1]) || ast.IsTimeArg(args[1]) || ast.IsBooleanArg(args[1]) || ast.IsStringArg(args[1]) {
  301. return ProduceErrInfo(1, "int")
  302. }
  303. return nil
  304. },
  305. }
  306. builtins["md5"] = builtinFunc{
  307. fType: ast.FuncTypeScalar,
  308. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  309. if args[0] == nil {
  310. return nil, true
  311. }
  312. arg0 := cast.ToStringAlways(args[0])
  313. h := md5.New()
  314. _, err := io.WriteString(h, arg0)
  315. if err != nil {
  316. return err, false
  317. }
  318. return fmt.Sprintf("%x", h.Sum(nil)), true
  319. },
  320. val: ValidateOneStrArg,
  321. }
  322. builtins["sha1"] = builtinFunc{
  323. fType: ast.FuncTypeScalar,
  324. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  325. if args[0] == nil {
  326. return nil, true
  327. }
  328. arg0 := cast.ToStringAlways(args[0])
  329. h := sha1.New()
  330. _, err := io.WriteString(h, arg0)
  331. if err != nil {
  332. return err, false
  333. }
  334. return fmt.Sprintf("%x", h.Sum(nil)), true
  335. },
  336. val: ValidateOneStrArg,
  337. }
  338. builtins["sha256"] = builtinFunc{
  339. fType: ast.FuncTypeScalar,
  340. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  341. if args[0] == nil {
  342. return nil, true
  343. }
  344. arg0 := cast.ToStringAlways(args[0])
  345. h := sha256.New()
  346. _, err := io.WriteString(h, arg0)
  347. if err != nil {
  348. return err, false
  349. }
  350. return fmt.Sprintf("%x", h.Sum(nil)), true
  351. },
  352. val: ValidateOneStrArg,
  353. }
  354. builtins["sha384"] = builtinFunc{
  355. fType: ast.FuncTypeScalar,
  356. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  357. if args[0] == nil {
  358. return nil, true
  359. }
  360. arg0 := cast.ToStringAlways(args[0])
  361. h := sha512.New384()
  362. _, err := io.WriteString(h, arg0)
  363. if err != nil {
  364. return err, false
  365. }
  366. return fmt.Sprintf("%x", h.Sum(nil)), true
  367. },
  368. val: ValidateOneStrArg,
  369. }
  370. builtins["sha512"] = builtinFunc{
  371. fType: ast.FuncTypeScalar,
  372. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  373. if args[0] == nil {
  374. return nil, true
  375. }
  376. arg0 := cast.ToStringAlways(args[0])
  377. h := sha512.New()
  378. _, err := io.WriteString(h, arg0)
  379. if err != nil {
  380. return err, false
  381. }
  382. return fmt.Sprintf("%x", h.Sum(nil)), true
  383. },
  384. val: ValidateOneStrArg,
  385. }
  386. builtinStatfulFuncs["compress"] = func() api.Function {
  387. conf.Log.Infof("initializing compress function")
  388. return &compressFunc{}
  389. }
  390. builtinStatfulFuncs["decompress"] = func() api.Function {
  391. conf.Log.Infof("initializing decompress function")
  392. return &decompressFunc{}
  393. }
  394. builtins["isnull"] = builtinFunc{
  395. fType: ast.FuncTypeScalar,
  396. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  397. if args[0] == nil {
  398. return true, true
  399. } else {
  400. v := reflect.ValueOf(args[0])
  401. switch v.Kind() {
  402. case reflect.Slice, reflect.Map:
  403. return v.IsNil(), true
  404. default:
  405. return false, true
  406. }
  407. }
  408. },
  409. val: ValidateOneArg,
  410. }
  411. builtins["coalesce"] = builtinFunc{
  412. fType: ast.FuncTypeScalar,
  413. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  414. for _, arg := range args {
  415. if arg != nil {
  416. return arg, true
  417. }
  418. }
  419. return nil, true
  420. },
  421. val: func(_ api.FunctionContext, args []ast.Expr) error {
  422. if len(args) == 0 {
  423. return fmt.Errorf("The arguments should be at least one.")
  424. }
  425. return nil
  426. },
  427. }
  428. builtins["newuuid"] = builtinFunc{
  429. fType: ast.FuncTypeScalar,
  430. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  431. if newUUID, err := uuid.NewUUID(); err != nil {
  432. return err, false
  433. } else {
  434. return newUUID.String(), true
  435. }
  436. },
  437. val: ValidateNoArg,
  438. }
  439. builtins["tstamp"] = builtinFunc{
  440. fType: ast.FuncTypeScalar,
  441. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  442. return conf.GetNowInMilli(), true
  443. },
  444. val: ValidateNoArg,
  445. }
  446. builtins["mqtt"] = builtinFunc{
  447. fType: ast.FuncTypeScalar,
  448. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  449. if v, ok := args[0].(string); ok {
  450. return v, true
  451. }
  452. return nil, false
  453. },
  454. val: func(_ api.FunctionContext, args []ast.Expr) error {
  455. if err := ValidateLen(1, len(args)); err != nil {
  456. return err
  457. }
  458. if ast.IsIntegerArg(args[0]) || ast.IsTimeArg(args[0]) || ast.IsBooleanArg(args[0]) || ast.IsStringArg(args[0]) || ast.IsFloatArg(args[0]) {
  459. return ProduceErrInfo(0, "meta reference")
  460. }
  461. if p, ok := args[0].(*ast.MetaRef); ok {
  462. name := strings.ToLower(p.Name)
  463. if name != "topic" && name != "messageid" {
  464. return fmt.Errorf("Parameter of mqtt function can be only topic or messageid.")
  465. }
  466. }
  467. return nil
  468. },
  469. }
  470. builtins["meta"] = builtinFunc{
  471. fType: ast.FuncTypeScalar,
  472. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  473. return args[0], true
  474. },
  475. val: func(_ api.FunctionContext, args []ast.Expr) error {
  476. if err := ValidateLen(1, len(args)); err != nil {
  477. return err
  478. }
  479. if _, ok := args[0].(*ast.MetaRef); ok {
  480. return nil
  481. }
  482. expr := args[0]
  483. for {
  484. if be, ok := expr.(*ast.BinaryExpr); ok {
  485. if _, ok := be.LHS.(*ast.MetaRef); ok && be.OP == ast.ARROW {
  486. return nil
  487. }
  488. expr = be.LHS
  489. } else {
  490. break
  491. }
  492. }
  493. return ProduceErrInfo(0, "meta reference")
  494. },
  495. }
  496. builtins["cardinality"] = builtinFunc{
  497. fType: ast.FuncTypeScalar,
  498. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  499. val := reflect.ValueOf(args[0])
  500. if val.Kind() == reflect.Slice {
  501. return val.Len(), true
  502. }
  503. return 0, true
  504. },
  505. val: ValidateOneArg,
  506. }
  507. builtins["json_path_query"] = builtinFunc{
  508. fType: ast.FuncTypeScalar,
  509. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  510. result, err := jsonCall(ctx, args)
  511. if err != nil {
  512. return err, false
  513. }
  514. return result, true
  515. },
  516. val: ValidateJsonFunc,
  517. }
  518. builtins["json_path_query_first"] = builtinFunc{
  519. fType: ast.FuncTypeScalar,
  520. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  521. result, err := jsonCall(ctx, args)
  522. if err != nil {
  523. return err, false
  524. }
  525. if arr, ok := result.([]interface{}); ok {
  526. return arr[0], true
  527. } else {
  528. return fmt.Errorf("query result (%v) is not an array", result), false
  529. }
  530. },
  531. val: ValidateJsonFunc,
  532. }
  533. builtins["json_path_exists"] = builtinFunc{
  534. fType: ast.FuncTypeScalar,
  535. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  536. result, err := jsonCall(ctx, args)
  537. if err != nil {
  538. return false, true
  539. }
  540. if result == nil {
  541. return false, true
  542. }
  543. e := true
  544. switch reflect.TypeOf(result).Kind() {
  545. case reflect.Slice, reflect.Array:
  546. e = reflect.ValueOf(result).Len() > 0
  547. default:
  548. e = result != nil
  549. }
  550. return e, true
  551. },
  552. val: ValidateJsonFunc,
  553. }
  554. builtins["window_start"] = builtinFunc{
  555. fType: ast.FuncTypeScalar,
  556. exec: nil, // directly return in the valuer
  557. val: ValidateNoArg,
  558. }
  559. builtins["window_end"] = builtinFunc{
  560. fType: ast.FuncTypeScalar,
  561. exec: nil, // directly return in the valuer
  562. val: ValidateNoArg,
  563. }
  564. builtins["object_construct"] = builtinFunc{
  565. fType: ast.FuncTypeScalar,
  566. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  567. result := make(map[string]interface{})
  568. for i := 0; i < len(args); i += 2 {
  569. if args[i+1] != nil {
  570. s, err := cast.ToString(args[i], cast.CONVERT_SAMEKIND)
  571. if err != nil {
  572. return fmt.Errorf("key %v is not a string", args[i]), false
  573. }
  574. result[s] = args[i+1]
  575. }
  576. }
  577. return result, true
  578. },
  579. val: func(_ api.FunctionContext, args []ast.Expr) error {
  580. if len(args)%2 != 0 {
  581. return fmt.Errorf("the args must be key value pairs")
  582. }
  583. for i, arg := range args {
  584. if i%2 == 0 {
  585. if ast.IsNumericArg(arg) || ast.IsTimeArg(arg) || ast.IsBooleanArg(arg) {
  586. return ProduceErrInfo(i, "string")
  587. }
  588. }
  589. }
  590. return nil
  591. },
  592. }
  593. }
  594. func round(num float64) int {
  595. return int(num + math.Copysign(0.5, num))
  596. }
  597. func toFixed(num float64, precision int) float64 {
  598. output := math.Pow(10, float64(precision))
  599. return float64(round(num*output)) / output
  600. }
  601. func jsonCall(ctx api.StreamContext, args []interface{}) (interface{}, error) {
  602. jp, ok := args[1].(string)
  603. if !ok {
  604. return nil, fmt.Errorf("invalid jsonPath, must be a string but got %v", args[1])
  605. }
  606. return ctx.ParseJsonPath(jp, args[0])
  607. }
  608. // page Rotate storage for in memory cache
  609. // Not thread safe!
  610. type ringqueue struct {
  611. data []interface{}
  612. h int
  613. t int
  614. l int
  615. size int
  616. }
  617. func newRingqueue(size int) *ringqueue {
  618. return &ringqueue{
  619. data: make([]interface{}, size),
  620. h: 0, // When deleting, head++, if tail == head, it is empty
  621. t: 0, // When append, tail++, if tail== head, it is full
  622. size: size,
  623. }
  624. }
  625. // fill item will fill the queue with item value
  626. func (p *ringqueue) fill(item interface{}) {
  627. for {
  628. if !p.append(item) {
  629. return
  630. }
  631. }
  632. }
  633. // append item if list is not full and return true; otherwise return false
  634. func (p *ringqueue) append(item interface{}) bool {
  635. if p.l == p.size { // full
  636. return false
  637. }
  638. p.data[p.t] = item
  639. p.t++
  640. if p.t == p.size {
  641. p.t = 0
  642. }
  643. p.l++
  644. return true
  645. }
  646. // fetch get the first item in the cache and remove
  647. func (p *ringqueue) fetch() (interface{}, bool) {
  648. if p.l == 0 {
  649. return nil, false
  650. }
  651. result := p.data[p.h]
  652. p.h++
  653. if p.h == p.size {
  654. p.h = 0
  655. }
  656. p.l--
  657. return result, true
  658. }
  659. // peek get the first item in the cache but keep it
  660. func (p *ringqueue) peek() (interface{}, bool) {
  661. if p.l == 0 {
  662. return nil, false
  663. }
  664. result := p.data[p.h]
  665. return result, true
  666. }
  667. func (p *ringqueue) isFull() bool {
  668. return p.l == p.size
  669. }