accumulateWordCount.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "fmt"
  17. "strings"
  18. "github.com/lf-edge/ekuiper/pkg/api"
  19. "github.com/lf-edge/ekuiper/pkg/ast"
  20. )
  21. /**
  22. ** A function which will count how many words had been received from the beginning
  23. ** to demonstrate how to use states
  24. ** There are 2 arguments:
  25. ** 0: column, the column to be calculated. The column value type must be string
  26. ** 1: separator, a string literal for word separator
  27. **/
  28. type accumulateWordCountFunc struct{}
  29. func (f *accumulateWordCountFunc) Validate(args []interface{}) error {
  30. if len(args) != 2 {
  31. return fmt.Errorf("wordCount function only supports 2 parameter but got %d", len(args))
  32. }
  33. if arg1, ok := args[1].(ast.Expr); ok {
  34. if _, ok := arg1.(*ast.StringLiteral); !ok {
  35. return fmt.Errorf("the second parameter of wordCount function must be a string literal")
  36. }
  37. }
  38. return nil
  39. }
  40. func (f *accumulateWordCountFunc) Exec(args []interface{}, ctx api.FunctionContext) (interface{}, bool) {
  41. logger := ctx.GetLogger()
  42. fmt.Printf("Exec accumulate")
  43. col, ok := args[0].(string)
  44. if !ok {
  45. logger.Debugf("Exec accumulateWordCountFunc with arg0 %s", col)
  46. return fmt.Errorf("args[0] is not a string, got %v", args[0]), false
  47. }
  48. sep, ok := args[1].(string)
  49. if !ok {
  50. logger.Debugf("Exec accumulateWordCountFunc with arg1 %s", sep)
  51. return fmt.Errorf("args[1] is not a string, got %v", args[1]), false
  52. }
  53. err := ctx.IncrCounter("allwordcount", len(strings.Split(col, sep)))
  54. if err != nil {
  55. logger.Debugf("call accumulateWordCountFunc incrCounter error %s", err)
  56. return err, false
  57. }
  58. if c, err := ctx.GetCounter("allwordcount"); err != nil {
  59. logger.Debugf("call accumulateWordCountFunc getCounter error %s", err)
  60. return err, false
  61. } else {
  62. return c, true
  63. }
  64. }
  65. func (f *accumulateWordCountFunc) IsAggregate() bool {
  66. return false
  67. }
  68. func AccumulateWordCount() api.Function {
  69. return &accumulateWordCountFunc{}
  70. }