functions.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. package xsql
  2. import (
  3. "errors"
  4. "github.com/emqx/kuiper/xstream/api"
  5. "strings"
  6. )
  7. // ONLY use NewFunctionValuer function to initialize
  8. type FunctionValuer struct {
  9. runtime *funcRuntime
  10. }
  11. //Should only be called by stream to make sure a single instance for an operation
  12. func NewFunctionValuer(p *funcRuntime) *FunctionValuer {
  13. fv := &FunctionValuer{
  14. runtime: p,
  15. }
  16. return fv
  17. }
  18. func (*FunctionValuer) Value(string) (interface{}, bool) {
  19. return nil, false
  20. }
  21. func (*FunctionValuer) Meta(string) (interface{}, bool) {
  22. return nil, false
  23. }
  24. func (*FunctionValuer) AppendAlias(string, interface{}) bool {
  25. return false
  26. }
  27. type FunctionRegister interface {
  28. HasFunction(name string) bool
  29. Function(name string) (api.Function, error)
  30. }
  31. var aggFuncMap = map[string]string{"avg": "",
  32. "count": "",
  33. "max": "", "min": "",
  34. "sum": "",
  35. "collect": "",
  36. "deduplicate": "",
  37. "window_start": "",
  38. "window_end": "",
  39. }
  40. var funcWithAsteriskSupportMap = map[string]string{
  41. "collect": "",
  42. "count": "",
  43. }
  44. var mathFuncMap = map[string]string{"abs": "", "acos": "", "asin": "", "atan": "", "atan2": "",
  45. "bitand": "", "bitor": "", "bitxor": "", "bitnot": "",
  46. "ceil": "", "cos": "", "cosh": "",
  47. "exp": "",
  48. "ln": "", "log": "",
  49. "mod": "",
  50. "power": "",
  51. "rand": "", "round": "",
  52. "sign": "", "sin": "", "sinh": "", "sqrt": "",
  53. "tan": "", "tanh": "",
  54. }
  55. var strFuncMap = map[string]string{"concat": "",
  56. "endswith": "",
  57. "format_time": "",
  58. "indexof": "",
  59. "length": "", "lower": "", "lpad": "", "ltrim": "",
  60. "numbytes": "",
  61. "regexp_matches": "", "regexp_replace": "", "regexp_substr": "", "rpad": "", "rtrim": "",
  62. "substring": "", "startswith": "", "split_value": "",
  63. "trim": "",
  64. "upper": "",
  65. }
  66. var convFuncMap = map[string]string{"concat": "", "cast": "", "chr": "",
  67. "encode": "",
  68. "trunc": "",
  69. }
  70. var hashFuncMap = map[string]string{"md5": "",
  71. "sha1": "", "sha256": "", "sha384": "", "sha512": "",
  72. }
  73. var jsonFuncMap = map[string]string{
  74. "json_path_query": "", "json_path_query_first": "", "json_path_exists": "",
  75. }
  76. var otherFuncMap = map[string]string{"isnull": "",
  77. "newuuid": "", "tstamp": "", "mqtt": "", "meta": "", "cardinality": "",
  78. }
  79. var NotFoundErr = errors.New("not found")
  80. func (fv *FunctionValuer) Call(name string, args []interface{}) (interface{}, bool) {
  81. lowerName := strings.ToLower(name)
  82. if _, ok := mathFuncMap[lowerName]; ok {
  83. return mathCall(name, args)
  84. } else if _, ok := strFuncMap[lowerName]; ok {
  85. return strCall(lowerName, args)
  86. } else if _, ok := convFuncMap[lowerName]; ok {
  87. return convCall(lowerName, args)
  88. } else if _, ok := hashFuncMap[lowerName]; ok {
  89. return hashCall(lowerName, args)
  90. } else if _, ok := jsonFuncMap[lowerName]; ok {
  91. return jsonCall(lowerName, args)
  92. } else if _, ok := otherFuncMap[lowerName]; ok {
  93. return otherCall(lowerName, args)
  94. } else if _, ok := aggFuncMap[lowerName]; ok {
  95. return nil, false
  96. } else {
  97. nf, fctx, err := fv.runtime.getCustom(name)
  98. switch err {
  99. case NotFoundErr:
  100. return nil, false
  101. case nil:
  102. // do nothing, continue
  103. default:
  104. return err, false
  105. }
  106. if nf.IsAggregate() {
  107. return nil, false
  108. }
  109. logger := fctx.GetLogger()
  110. logger.Debugf("run func %s", name)
  111. return nf.Exec(args, fctx)
  112. }
  113. }
  114. func IsAggStatement(stmt *SelectStatement) bool {
  115. if stmt.Dimensions != nil {
  116. ds := stmt.Dimensions.GetGroups()
  117. if ds != nil && len(ds) > 0 {
  118. return true
  119. }
  120. }
  121. r := false
  122. WalkFunc(stmt.Fields, func(n Node) bool {
  123. switch f := n.(type) {
  124. case *Call:
  125. if ok := IsAggFunc(f); ok {
  126. r = true
  127. return false
  128. }
  129. }
  130. return true
  131. })
  132. return r
  133. }
  134. func IsAggFunc(f *Call) bool {
  135. fn := strings.ToLower(f.Name)
  136. if _, ok := aggFuncMap[fn]; ok {
  137. return true
  138. } else if _, ok := strFuncMap[fn]; ok {
  139. return false
  140. } else if _, ok := convFuncMap[fn]; ok {
  141. return false
  142. } else if _, ok := hashFuncMap[fn]; ok {
  143. return false
  144. } else if _, ok := otherFuncMap[fn]; ok {
  145. return false
  146. } else if _, ok := mathFuncMap[fn]; ok {
  147. return false
  148. } else {
  149. if nf, _, err := parserFuncRuntime.getCustom(f.Name); err == nil {
  150. if nf.IsAggregate() {
  151. //Add cache
  152. aggFuncMap[fn] = ""
  153. return true
  154. }
  155. }
  156. }
  157. return false
  158. }
  159. func HasAggFuncs(node Node) bool {
  160. if node == nil {
  161. return false
  162. }
  163. var r = false
  164. WalkFunc(node, func(n Node) bool {
  165. if f, ok := n.(*Call); ok {
  166. if ok := IsAggFunc(f); ok {
  167. r = true
  168. return false
  169. }
  170. }
  171. return true
  172. })
  173. return r
  174. }