funcs_analytic.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package function
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/pkg/api"
  18. "github.com/lf-edge/ekuiper/pkg/ast"
  19. "reflect"
  20. "strconv"
  21. )
  22. // registerAnalyticFunc registers the analytic functions
  23. // The last parameter of the function is always the partition key
  24. func registerAnalyticFunc() {
  25. builtins["changed_col"] = builtinFunc{
  26. fType: ast.FuncTypeScalar,
  27. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  28. ignoreNull, ok := args[0].(bool)
  29. if !ok {
  30. return fmt.Errorf("first arg is not a bool but got %v", args[0]), false
  31. }
  32. if ignoreNull && args[1] == nil {
  33. return nil, true
  34. }
  35. key := args[len(args)-1].(string)
  36. lv, err := ctx.GetState(key)
  37. if err != nil {
  38. return err, false
  39. }
  40. if !reflect.DeepEqual(args[1], lv) {
  41. err := ctx.PutState(key, args[1])
  42. if err != nil {
  43. return err, false
  44. }
  45. return args[1], true
  46. }
  47. return nil, true
  48. },
  49. val: func(_ api.FunctionContext, args []ast.Expr) error {
  50. if err := ValidateLen(2, len(args)); err != nil {
  51. return err
  52. }
  53. if ast.IsNumericArg(args[0]) || ast.IsTimeArg(args[0]) || ast.IsStringArg(args[0]) {
  54. return ProduceErrInfo(0, "boolean")
  55. }
  56. return nil
  57. },
  58. }
  59. builtins["had_changed"] = builtinFunc{
  60. fType: ast.FuncTypeScalar,
  61. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  62. l := len(args) - 1
  63. if l <= 1 {
  64. return fmt.Errorf("expect more than one arg but got %d", len(args)), false
  65. }
  66. ignoreNull, ok := args[0].(bool)
  67. if !ok {
  68. return fmt.Errorf("first arg is not a bool but got %v", args[0]), false
  69. }
  70. key := args[l].(string)
  71. result := false
  72. for i := 1; i < l; i++ {
  73. v := args[i]
  74. k := key + strconv.Itoa(i)
  75. if ignoreNull && v == nil {
  76. continue
  77. }
  78. lv, err := ctx.GetState(k)
  79. if err != nil {
  80. return fmt.Errorf("error getting state for %s: %v", k, err), false
  81. }
  82. if !reflect.DeepEqual(v, lv) {
  83. result = true
  84. err := ctx.PutState(k, v)
  85. if err != nil {
  86. return fmt.Errorf("error setting state for %s: %v", k, err), false
  87. }
  88. }
  89. }
  90. return result, true
  91. },
  92. val: func(_ api.FunctionContext, args []ast.Expr) error {
  93. if len(args) <= 1 {
  94. return fmt.Errorf("expect more than one arg but got %d", len(args))
  95. }
  96. if ast.IsNumericArg(args[0]) || ast.IsTimeArg(args[0]) || ast.IsStringArg(args[0]) {
  97. return ProduceErrInfo(0, "bool")
  98. }
  99. return nil
  100. },
  101. }
  102. builtins["lag"] = builtinFunc{
  103. fType: ast.FuncTypeScalar,
  104. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  105. l := len(args) - 1
  106. key := args[l].(string)
  107. if l != 1 && l != 2 && l != 3 {
  108. return fmt.Errorf("expect one two or three args but got %d", l), false
  109. }
  110. v, err := ctx.GetState(key)
  111. if err != nil {
  112. return fmt.Errorf("error getting state for %s: %v", key, err), false
  113. }
  114. if v == nil {
  115. size := 0
  116. var dftVal interface{} = nil
  117. if l == 3 {
  118. dftVal = args[2]
  119. }
  120. // first time call, need create state for lag
  121. if l == 1 {
  122. size = 1
  123. } else {
  124. siz, ok := args[1].(int)
  125. if !ok {
  126. return fmt.Errorf("second arg is not a int but got %v", args[1]), false
  127. }
  128. size = siz
  129. }
  130. rq := newRingqueue(size)
  131. rq.fill(dftVal)
  132. rtnVal, _ := rq.fetch()
  133. rq.append(args[0])
  134. err := ctx.PutState(key, rq)
  135. if err != nil {
  136. return fmt.Errorf("error setting state for %s: %v", key, err), false
  137. }
  138. return rtnVal, true
  139. } else {
  140. rq, ok := v.(*ringqueue)
  141. if !ok {
  142. return fmt.Errorf("error getting state for %s: %v", key, err), false
  143. }
  144. rtnVal, _ := rq.fetch()
  145. rq.append(args[0])
  146. err := ctx.PutState(key, rq)
  147. if err != nil {
  148. return fmt.Errorf("error setting state for %s: %v", key, err), false
  149. }
  150. return rtnVal, true
  151. }
  152. },
  153. val: func(_ api.FunctionContext, args []ast.Expr) error {
  154. l := len(args)
  155. if l != 1 && l != 2 && l != 3 {
  156. return fmt.Errorf("expect one two or three args but got %d", l)
  157. }
  158. if l >= 2 {
  159. if ast.IsFloatArg(args[1]) || ast.IsTimeArg(args[1]) || ast.IsBooleanArg(args[1]) || ast.IsStringArg(args[1]) || ast.IsFieldRefArg(args[1]) {
  160. return ProduceErrInfo(1, "int")
  161. }
  162. if s, ok := args[1].(*ast.IntegerLiteral); ok {
  163. if s.Val < 0 {
  164. return fmt.Errorf("the index should not be a nagtive integer")
  165. }
  166. }
  167. }
  168. return nil
  169. },
  170. }
  171. builtins["latest"] = builtinFunc{
  172. fType: ast.FuncTypeScalar,
  173. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  174. l := len(args) - 1
  175. key := args[l].(string)
  176. if l != 1 && l != 2 {
  177. return fmt.Errorf("expect one or two args but got %d", l), false
  178. }
  179. if args[0] == nil {
  180. v, err := ctx.GetState(key)
  181. if err != nil {
  182. return fmt.Errorf("error getting state for %s: %v", key, err), false
  183. }
  184. if v == nil {
  185. if l == 2 {
  186. return args[1], true
  187. } else {
  188. return nil, true
  189. }
  190. } else {
  191. return v, true
  192. }
  193. } else {
  194. ctx.PutState(key, args[0])
  195. return args[0], true
  196. }
  197. },
  198. val: func(_ api.FunctionContext, args []ast.Expr) error {
  199. l := len(args)
  200. if l != 1 && l != 2 {
  201. return fmt.Errorf("expect one or two args but got %d", l)
  202. }
  203. return nil
  204. },
  205. }
  206. }