funcs_global_state.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. // Copyright 2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package function
  15. import (
  16. "github.com/lf-edge/ekuiper/pkg/api"
  17. "github.com/lf-edge/ekuiper/pkg/ast"
  18. )
  19. const (
  20. countKey = "$$last_hit_count"
  21. timeKey = "$$last_hit_time"
  22. aggCountKey = "$$last_agg_hit_count"
  23. aggTimeKey = "$$last_agg_hit_time"
  24. )
  25. func registerGlobalStateFunc() {
  26. builtins["last_hit_count"] = builtinFunc{
  27. fType: ast.FuncTypeScalar,
  28. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  29. doUpdate := args[0].(bool)
  30. lv, err := ctx.GetCounter(countKey)
  31. if err != nil {
  32. return err, false
  33. }
  34. if doUpdate {
  35. err := ctx.IncrCounter(countKey, 1)
  36. if err != nil {
  37. return nil, false
  38. }
  39. }
  40. return lv, true
  41. },
  42. val: ValidateNoArg,
  43. }
  44. builtins["last_hit_time"] = builtinFunc{
  45. fType: ast.FuncTypeScalar,
  46. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  47. args0 := args[0].(bool)
  48. args1 := args[1].(int64)
  49. lv, err := ctx.GetState(timeKey)
  50. if err != nil {
  51. return err, false
  52. }
  53. if lv == nil {
  54. lv = 0
  55. }
  56. if args0 {
  57. err := ctx.PutState(timeKey, args1)
  58. if err != nil {
  59. return nil, false
  60. }
  61. }
  62. return lv, true
  63. },
  64. val: ValidateNoArg,
  65. }
  66. builtins["last_agg_hit_count"] = builtinFunc{
  67. fType: ast.FuncTypeAgg,
  68. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  69. doUpdate := args[0].(bool)
  70. lv, err := ctx.GetCounter(aggCountKey)
  71. if err != nil {
  72. return err, false
  73. }
  74. if doUpdate {
  75. err := ctx.IncrCounter(aggCountKey, 1)
  76. if err != nil {
  77. return nil, false
  78. }
  79. }
  80. return lv, true
  81. },
  82. val: ValidateNoArg,
  83. }
  84. builtins["last_agg_hit_time"] = builtinFunc{
  85. fType: ast.FuncTypeAgg,
  86. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  87. args0 := args[0].(bool)
  88. args1 := args[1].(int64)
  89. lv, err := ctx.GetState(aggTimeKey)
  90. if err != nil {
  91. return err, false
  92. }
  93. if lv == nil {
  94. lv = 0
  95. }
  96. if args0 {
  97. err := ctx.PutState(aggTimeKey, args1)
  98. if err != nil {
  99. return nil, false
  100. }
  101. }
  102. return lv, true
  103. },
  104. val: ValidateNoArg,
  105. }
  106. }