funcs_analytic.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package function
  15. import (
  16. "fmt"
  17. "math"
  18. "reflect"
  19. "strconv"
  20. "github.com/lf-edge/ekuiper/pkg/api"
  21. "github.com/lf-edge/ekuiper/pkg/ast"
  22. "github.com/lf-edge/ekuiper/pkg/cast"
  23. )
  24. // registerAnalyticFunc registers the analytic functions
  25. // The last parameter of the function is always the partition key
  26. func registerAnalyticFunc() {
  27. builtins["changed_col"] = builtinFunc{
  28. fType: ast.FuncTypeScalar,
  29. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  30. ignoreNull, ok := args[0].(bool)
  31. if !ok {
  32. return fmt.Errorf("first arg is not a bool but got %v", args[0]), false
  33. }
  34. if ignoreNull && args[1] == nil {
  35. return nil, true
  36. }
  37. validData, ok := args[len(args)-2].(bool)
  38. if !ok {
  39. return fmt.Errorf("when arg is not a bool but got %v", args[len(args)-2]), false
  40. }
  41. if !validData {
  42. return nil, true
  43. }
  44. key := args[len(args)-1].(string)
  45. lv, err := ctx.GetState(key)
  46. if err != nil {
  47. return err, false
  48. }
  49. if !reflect.DeepEqual(args[1], lv) {
  50. err := ctx.PutState(key, args[1])
  51. if err != nil {
  52. return err, false
  53. }
  54. return args[1], true
  55. }
  56. return nil, true
  57. },
  58. val: func(_ api.FunctionContext, args []ast.Expr) error {
  59. if err := ValidateLen(2, len(args)); err != nil {
  60. return err
  61. }
  62. if ast.IsNumericArg(args[0]) || ast.IsTimeArg(args[0]) || ast.IsStringArg(args[0]) {
  63. return ProduceErrInfo(0, "boolean")
  64. }
  65. return nil
  66. },
  67. }
  68. builtins["had_changed"] = builtinFunc{
  69. fType: ast.FuncTypeScalar,
  70. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  71. l := len(args) - 2
  72. if l <= 1 {
  73. return fmt.Errorf("expect more than one arg but got %d", len(args)), false
  74. }
  75. validData, ok := args[len(args)-2].(bool)
  76. if !ok {
  77. return fmt.Errorf("when arg is not a bool but got %v", args[len(args)-2]), false
  78. }
  79. if !validData {
  80. return false, true
  81. }
  82. ignoreNull, ok := args[0].(bool)
  83. if !ok {
  84. return fmt.Errorf("first arg is not a bool but got %v", args[0]), false
  85. }
  86. key := args[len(args)-1].(string)
  87. paraLen := len(args) - 2
  88. result := false
  89. for i := 1; i < paraLen; i++ {
  90. v := args[i]
  91. k := key + strconv.Itoa(i)
  92. if ignoreNull && v == nil {
  93. continue
  94. }
  95. lv, err := ctx.GetState(k)
  96. if err != nil {
  97. return fmt.Errorf("error getting state for %s: %v", k, err), false
  98. }
  99. if !reflect.DeepEqual(v, lv) {
  100. result = true
  101. err := ctx.PutState(k, v)
  102. if err != nil {
  103. return fmt.Errorf("error setting state for %s: %v", k, err), false
  104. }
  105. }
  106. }
  107. return result, true
  108. },
  109. val: func(_ api.FunctionContext, args []ast.Expr) error {
  110. if len(args) <= 1 {
  111. return fmt.Errorf("expect more than one arg but got %d", len(args))
  112. }
  113. if ast.IsNumericArg(args[0]) || ast.IsTimeArg(args[0]) || ast.IsStringArg(args[0]) {
  114. return ProduceErrInfo(0, "bool")
  115. }
  116. return nil
  117. },
  118. }
  119. builtins["lag"] = builtinFunc{
  120. fType: ast.FuncTypeScalar,
  121. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  122. l := len(args) - 2
  123. if l != 1 && l != 2 && l != 3 && l != 4 {
  124. return fmt.Errorf("expect one two or three args but got %d", l), false
  125. }
  126. key := args[len(args)-1].(string)
  127. v, err := ctx.GetState(key)
  128. if err != nil {
  129. return fmt.Errorf("error getting state for %s: %v", key, err), false
  130. }
  131. validData, ok := args[len(args)-2].(bool)
  132. if !ok {
  133. return fmt.Errorf("when arg is not a bool but got %v", args[len(args)-2]), false
  134. }
  135. ignoreNull := false
  136. if l == 4 {
  137. ignoreNull, ok = args[3].(bool)
  138. if !ok {
  139. return fmt.Errorf("The fourth arg is not a bool but got %v", args[0]), false
  140. }
  141. }
  142. paraLen := len(args) - 2
  143. var rq *ringqueue = nil
  144. var rtnVal interface{} = nil
  145. // first time call, need create state for lag
  146. if v == nil {
  147. size := 0
  148. var dftVal interface{} = nil
  149. if paraLen == 3 {
  150. dftVal = args[2]
  151. }
  152. if paraLen == 1 {
  153. size = 1
  154. } else {
  155. size, err = cast.ToInt(args[1], cast.STRICT)
  156. if err != nil {
  157. return fmt.Errorf("error converting second arg %v to int: %v", args[1], err), false
  158. }
  159. }
  160. rq = newRingqueue(size)
  161. rq.fill(dftVal)
  162. err := ctx.PutState(key, rq)
  163. if err != nil {
  164. return fmt.Errorf("error setting state for %s: %v", key, err), false
  165. }
  166. } else {
  167. rq, _ = v.(*ringqueue)
  168. }
  169. rtnVal, _ = rq.peek()
  170. if validData {
  171. if !ignoreNull || args[0] != nil {
  172. rtnVal, _ = rq.fetch()
  173. rq.append(args[0])
  174. err := ctx.PutState(key, rq)
  175. if err != nil {
  176. return fmt.Errorf("error setting state for %s: %v", key, err), false
  177. }
  178. }
  179. }
  180. return rtnVal, true
  181. },
  182. val: func(_ api.FunctionContext, args []ast.Expr) error {
  183. l := len(args)
  184. if l != 1 && l != 2 && l != 3 && l != 4 {
  185. return fmt.Errorf("expect one two or three args but got %d", l)
  186. }
  187. if l >= 2 {
  188. if ast.IsFloatArg(args[1]) || ast.IsTimeArg(args[1]) || ast.IsBooleanArg(args[1]) || ast.IsStringArg(args[1]) || ast.IsFieldRefArg(args[1]) {
  189. return ProduceErrInfo(1, "int")
  190. }
  191. if s, ok := args[1].(*ast.IntegerLiteral); ok {
  192. if s.Val < 0 {
  193. return fmt.Errorf("the index should not be a nagtive integer")
  194. }
  195. }
  196. }
  197. if l == 4 {
  198. if ast.IsNumericArg(args[3]) || ast.IsTimeArg(args[3]) || ast.IsStringArg(args[3]) {
  199. return ProduceErrInfo(3, "bool")
  200. }
  201. }
  202. return nil
  203. },
  204. }
  205. builtins["latest"] = builtinFunc{
  206. fType: ast.FuncTypeScalar,
  207. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  208. l := len(args) - 2
  209. if l != 1 && l != 2 {
  210. return fmt.Errorf("expect one or two args but got %d", l), false
  211. }
  212. paraLen := len(args) - 2
  213. key := args[len(args)-1].(string)
  214. validData, ok := args[len(args)-2].(bool)
  215. if !ok {
  216. return fmt.Errorf("when arg is not a bool but got %v", args[len(args)-2]), false
  217. }
  218. // notice nil is ignored in latest
  219. if validData && args[0] != nil {
  220. ctx.PutState(key, args[0])
  221. return args[0], true
  222. } else {
  223. v, err := ctx.GetState(key)
  224. if err != nil {
  225. return fmt.Errorf("error getting state for %s: %v", key, err), false
  226. }
  227. if v == nil {
  228. if paraLen == 2 {
  229. return args[1], true
  230. } else {
  231. return nil, true
  232. }
  233. } else {
  234. return v, true
  235. }
  236. }
  237. },
  238. val: func(_ api.FunctionContext, args []ast.Expr) error {
  239. l := len(args)
  240. if l != 1 && l != 2 {
  241. return fmt.Errorf("expect one or two args but got %d", l)
  242. }
  243. return nil
  244. },
  245. }
  246. }
  247. func registerGlobalAggFunc() {
  248. builtins["acc_avg"] = builtinFunc{
  249. fType: ast.FuncTypeScalar,
  250. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  251. key := args[len(args)-1].(string)
  252. keyCount := fmt.Sprintf("%s_count", key)
  253. keySum := fmt.Sprintf("%s_sum", key)
  254. keyAvg := fmt.Sprintf("%s_avg", key)
  255. vCount, err := ctx.GetState(keyCount)
  256. if err != nil {
  257. return err, false
  258. }
  259. vSum, err := ctx.GetState(keySum)
  260. if err != nil {
  261. return err, false
  262. }
  263. vAvg, err := ctx.GetState(keyAvg)
  264. if err != nil {
  265. return err, false
  266. }
  267. validData, ok := args[len(args)-2].(bool)
  268. if !ok {
  269. return fmt.Errorf("when arg is not a bool but got %v", args[len(args)-2]), false
  270. }
  271. if vSum == nil || vCount == nil || vAvg == nil {
  272. vSum = float64(0)
  273. vCount = float64(0)
  274. vAvg = float64(0)
  275. }
  276. if args[0] == nil || !validData {
  277. return vAvg.(float64), true
  278. }
  279. count := vCount.(float64)
  280. sum := vSum.(float64)
  281. count = count + 1
  282. switch v := args[0].(type) {
  283. case int:
  284. sum += float64(v)
  285. case int32:
  286. sum += float64(v)
  287. case int64:
  288. sum += float64(v)
  289. case float32:
  290. sum += float64(v)
  291. case float64:
  292. sum += v
  293. default:
  294. return fmt.Errorf("the value should be number"), false
  295. }
  296. if err := ctx.PutState(keyCount, count); err != nil {
  297. return err, false
  298. }
  299. if err := ctx.PutState(keySum, sum); err != nil {
  300. return err, false
  301. }
  302. if err := ctx.PutState(keyAvg, sum/count); err != nil {
  303. return err, false
  304. }
  305. return sum / count, true
  306. },
  307. val: func(ctx api.FunctionContext, args []ast.Expr) error {
  308. return nil
  309. },
  310. }
  311. builtins["acc_max"] = builtinFunc{
  312. fType: ast.FuncTypeScalar,
  313. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  314. key := args[len(args)-1].(string)
  315. val, err := ctx.GetState(key)
  316. if err != nil {
  317. return err, false
  318. }
  319. validData, ok := args[len(args)-2].(bool)
  320. if !ok {
  321. return fmt.Errorf("when arg is not a bool but got %v", args[len(args)-2]), false
  322. }
  323. if val == nil {
  324. if !validData {
  325. return 0, true
  326. }
  327. val = float64(math.MinInt64)
  328. }
  329. m := val.(float64)
  330. if !validData {
  331. return m, true
  332. }
  333. switch v := args[0].(type) {
  334. case int:
  335. v1 := float64(v)
  336. m = getMax(m, v1)
  337. case int32:
  338. v1 := float64(v)
  339. m = getMax(m, v1)
  340. case int64:
  341. v1 := float64(v)
  342. m = getMax(m, v1)
  343. case float32:
  344. v1 := float64(v)
  345. m = getMax(m, v1)
  346. case float64:
  347. m = getMax(m, v)
  348. default:
  349. return fmt.Errorf("the value should be number"), false
  350. }
  351. if err := ctx.PutState(key, m); err != nil {
  352. return err, false
  353. }
  354. return m, true
  355. },
  356. val: func(ctx api.FunctionContext, args []ast.Expr) error {
  357. return ValidateLen(1, len(args))
  358. },
  359. }
  360. builtins["acc_min"] = builtinFunc{
  361. fType: ast.FuncTypeScalar,
  362. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  363. key := args[len(args)-1].(string)
  364. val, err := ctx.GetState(key)
  365. if err != nil {
  366. return err, false
  367. }
  368. validData, ok := args[len(args)-2].(bool)
  369. if !ok {
  370. return fmt.Errorf("when arg is not a bool but got %v", args[len(args)-2]), false
  371. }
  372. if val == nil {
  373. if !validData {
  374. return 0, true
  375. }
  376. val = float64(math.MaxInt64)
  377. }
  378. m := val.(float64)
  379. if !validData {
  380. return m, true
  381. }
  382. switch v := args[0].(type) {
  383. case int:
  384. v1 := float64(v)
  385. m = getMin(m, v1)
  386. case int32:
  387. v1 := float64(v)
  388. m = getMin(m, v1)
  389. case int64:
  390. v1 := float64(v)
  391. m = getMin(m, v1)
  392. case float32:
  393. v1 := float64(v)
  394. m = getMin(m, v1)
  395. case float64:
  396. m = getMin(m, v)
  397. default:
  398. return fmt.Errorf("the value should be number"), false
  399. }
  400. if err := ctx.PutState(key, m); err != nil {
  401. return err, false
  402. }
  403. return m, true
  404. },
  405. val: func(ctx api.FunctionContext, args []ast.Expr) error {
  406. return ValidateLen(1, len(args))
  407. },
  408. }
  409. builtins["acc_sum"] = builtinFunc{
  410. fType: ast.FuncTypeScalar,
  411. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  412. key := args[len(args)-1].(string)
  413. val, err := ctx.GetState(key)
  414. if err != nil {
  415. return err, false
  416. }
  417. validData, ok := args[len(args)-2].(bool)
  418. if !ok {
  419. return fmt.Errorf("when arg is not a bool but got %v", args[len(args)-2]), false
  420. }
  421. if val == nil {
  422. val = float64(0)
  423. }
  424. accu := val.(float64)
  425. if !validData {
  426. return accu, true
  427. }
  428. switch sumValue := args[0].(type) {
  429. case int:
  430. accu += float64(sumValue)
  431. case int32:
  432. accu += float64(sumValue)
  433. case int64:
  434. accu += float64(sumValue)
  435. case float32:
  436. accu += float64(sumValue)
  437. case float64:
  438. accu += sumValue
  439. default:
  440. return fmt.Errorf("the value should be number"), false
  441. }
  442. if err := ctx.PutState(key, accu); err != nil {
  443. return err, false
  444. }
  445. return accu, true
  446. },
  447. val: func(ctx api.FunctionContext, args []ast.Expr) error {
  448. return ValidateLen(1, len(args))
  449. },
  450. }
  451. builtins["acc_count"] = builtinFunc{
  452. fType: ast.FuncTypeScalar,
  453. exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
  454. key := args[len(args)-1].(string)
  455. val, err := ctx.GetState(key)
  456. if err != nil {
  457. return err, false
  458. }
  459. validData, ok := args[len(args)-2].(bool)
  460. if !ok {
  461. return fmt.Errorf("when arg is not a bool but got %v", args[len(args)-2]), false
  462. }
  463. if val == nil {
  464. val = 0
  465. }
  466. cnt := val.(int)
  467. if !validData {
  468. return cnt, true
  469. }
  470. if args[0] != nil {
  471. cnt = cnt + 1
  472. }
  473. if err := ctx.PutState(key, cnt); err != nil {
  474. return err, false
  475. }
  476. return cnt, true
  477. },
  478. val: func(ctx api.FunctionContext, args []ast.Expr) error {
  479. return ValidateLen(1, len(args))
  480. },
  481. }
  482. }
  483. func getMax(a, b float64) float64 {
  484. if a > b {
  485. return a
  486. }
  487. return b
  488. }
  489. func getMin(a, b float64) float64 {
  490. if a < b {
  491. return a
  492. }
  493. return b
  494. }