accumulateWordCount.go 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/pkg/api"
  5. "github.com/emqx/kuiper/pkg/ast"
  6. "strings"
  7. )
  8. /**
  9. ** A function which will count how many words had been received from the beginning
  10. ** to demonstrate how to use states
  11. ** There are 2 arguments:
  12. ** 0: column, the column to be calculated. The column value type must be string
  13. ** 1: separator, a string literal for word separator
  14. **/
  15. type accumulateWordCountFunc struct {
  16. }
  17. func (f *accumulateWordCountFunc) Validate(args []interface{}) error {
  18. if len(args) != 2 {
  19. return fmt.Errorf("wordCount function only supports 2 parameter but got %d", len(args))
  20. }
  21. if arg1, ok := args[1].(ast.Expr); ok {
  22. if _, ok := arg1.(*ast.StringLiteral); !ok {
  23. return fmt.Errorf("the second parameter of wordCount function must be a string literal")
  24. }
  25. }
  26. return nil
  27. }
  28. func (f *accumulateWordCountFunc) Exec(args []interface{}, ctx api.FunctionContext) (interface{}, bool) {
  29. logger := ctx.GetLogger()
  30. fmt.Printf("Exec accumulate")
  31. col, ok := args[0].(string)
  32. if !ok {
  33. logger.Debugf("Exec accumulateWordCountFunc with arg0 %s", col)
  34. return fmt.Errorf("args[0] is not a string, got %v", args[0]), false
  35. }
  36. sep, ok := args[1].(string)
  37. if !ok {
  38. logger.Debugf("Exec accumulateWordCountFunc with arg1 %s", sep)
  39. return fmt.Errorf("args[1] is not a string, got %v", args[0]), false
  40. }
  41. err := ctx.IncrCounter("allwordcount", len(strings.Split(col, sep)))
  42. if err != nil {
  43. logger.Debugf("call accumulateWordCountFunc incrCounter error %s", err)
  44. return err, false
  45. }
  46. if c, err := ctx.GetCounter("allwordcount"); err != nil {
  47. logger.Debugf("call accumulateWordCountFunc getCounter error %s", err)
  48. return err, false
  49. } else {
  50. return c, true
  51. }
  52. }
  53. func (f *accumulateWordCountFunc) IsAggregate() bool {
  54. return false
  55. }
  56. func AccumulateWordCount() api.Function {
  57. return &accumulateWordCountFunc{}
  58. }