functions.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. package xsql
  2. import (
  3. "github.com/emqx/kuiper/common"
  4. "github.com/emqx/kuiper/plugins"
  5. "github.com/emqx/kuiper/xstream/api"
  6. "strings"
  7. )
  8. type FunctionValuer struct {
  9. plugins map[string]api.Function
  10. }
  11. func (*FunctionValuer) Value(key string) (interface{}, bool) {
  12. return nil, false
  13. }
  14. func (*FunctionValuer) Meta(key string) (interface{}, bool) {
  15. return nil, false
  16. }
  17. var aggFuncMap = map[string]string{"avg": "",
  18. "count": "",
  19. "max": "", "min": "",
  20. "sum": "",
  21. }
  22. var mathFuncMap = map[string]string{"abs": "", "acos": "", "asin": "", "atan": "", "atan2": "",
  23. "bitand": "", "bitor": "", "bitxor": "", "bitnot": "",
  24. "ceil": "", "cos": "", "cosh": "",
  25. "exp": "",
  26. "ln": "", "log": "",
  27. "mod": "",
  28. "power": "",
  29. "rand": "", "round": "",
  30. "sign": "", "sin": "", "sinh": "", "sqrt": "",
  31. "tan": "", "tanh": "",
  32. }
  33. var strFuncMap = map[string]string{"concat": "",
  34. "endswith": "",
  35. "format_time": "",
  36. "indexof": "",
  37. "length": "", "lower": "", "lpad": "", "ltrim": "",
  38. "numbytes": "",
  39. "regexp_matches": "", "regexp_replace": "", "regexp_substr": "", "rpad": "", "rtrim": "",
  40. "substring": "", "startswith": "", "split_value": "",
  41. "trim": "",
  42. "upper": "",
  43. }
  44. var convFuncMap = map[string]string{"concat": "", "cast": "", "chr": "",
  45. "encode": "",
  46. "trunc": "",
  47. }
  48. var hashFuncMap = map[string]string{"md5": "",
  49. "sha1": "", "sha256": "", "sha384": "", "sha512": "",
  50. }
  51. var jsonFuncMap = map[string]string{
  52. "json_path_query": "", "json_path_query_first": "", "json_path_exists": "",
  53. }
  54. var otherFuncMap = map[string]string{"isnull": "",
  55. "newuuid": "", "tstamp": "", "mqtt": "", "meta": "",
  56. }
  57. func (fv *FunctionValuer) Call(name string, args []interface{}) (interface{}, bool) {
  58. lowerName := strings.ToLower(name)
  59. if _, ok := mathFuncMap[lowerName]; ok {
  60. return mathCall(name, args)
  61. } else if _, ok := strFuncMap[lowerName]; ok {
  62. return strCall(lowerName, args)
  63. } else if _, ok := convFuncMap[lowerName]; ok {
  64. return convCall(lowerName, args)
  65. } else if _, ok := hashFuncMap[lowerName]; ok {
  66. return hashCall(lowerName, args)
  67. } else if _, ok := jsonFuncMap[lowerName]; ok {
  68. return jsonCall(lowerName, args)
  69. } else if _, ok := otherFuncMap[lowerName]; ok {
  70. return otherCall(lowerName, args)
  71. } else if _, ok := aggFuncMap[lowerName]; ok {
  72. return nil, false
  73. } else {
  74. common.Log.Debugf("run func %s", name)
  75. if fv.plugins == nil {
  76. fv.plugins = make(map[string]api.Function)
  77. }
  78. var (
  79. nf api.Function
  80. ok bool
  81. err error
  82. )
  83. if nf, ok = fv.plugins[name]; !ok {
  84. nf, err = plugins.GetFunction(name)
  85. if err != nil {
  86. return err, false
  87. }
  88. fv.plugins[name] = nf
  89. }
  90. if nf.IsAggregate() {
  91. return nil, false
  92. }
  93. result, ok := nf.Exec(args)
  94. common.Log.Debugf("run custom function %s, get result %v", name, result)
  95. return result, ok
  96. }
  97. }