functions.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. package xsql
  2. import (
  3. "errors"
  4. "github.com/emqx/kuiper/xstream/api"
  5. "strings"
  6. )
  7. // ONLY use NewFunctionValuer function to initialize
  8. type FunctionValuer struct {
  9. runtime *funcRuntime
  10. }
  11. //Should only be called by stream to make sure a single instance for an operation
  12. func NewFunctionValuer(p *funcRuntime) *FunctionValuer {
  13. fv := &FunctionValuer{
  14. runtime: p,
  15. }
  16. return fv
  17. }
  18. func (*FunctionValuer) Value(_ string) (interface{}, bool) {
  19. return nil, false
  20. }
  21. func (*FunctionValuer) Meta(_ string) (interface{}, bool) {
  22. return nil, false
  23. }
  24. type FunctionRegister interface {
  25. HasFunction(name string) bool
  26. Function(name string) (api.Function, error)
  27. }
  28. var aggFuncMap = map[string]string{"avg": "",
  29. "count": "",
  30. "max": "", "min": "",
  31. "sum": "",
  32. "collect": "",
  33. "deduplicate": "",
  34. }
  35. var funcWithAsteriskSupportMap = map[string]string{
  36. "collect": "",
  37. "count": "",
  38. }
  39. var mathFuncMap = map[string]string{"abs": "", "acos": "", "asin": "", "atan": "", "atan2": "",
  40. "bitand": "", "bitor": "", "bitxor": "", "bitnot": "",
  41. "ceil": "", "cos": "", "cosh": "",
  42. "exp": "",
  43. "ln": "", "log": "",
  44. "mod": "",
  45. "power": "",
  46. "rand": "", "round": "",
  47. "sign": "", "sin": "", "sinh": "", "sqrt": "",
  48. "tan": "", "tanh": "",
  49. }
  50. var strFuncMap = map[string]string{"concat": "",
  51. "endswith": "",
  52. "format_time": "",
  53. "indexof": "",
  54. "length": "", "lower": "", "lpad": "", "ltrim": "",
  55. "numbytes": "",
  56. "regexp_matches": "", "regexp_replace": "", "regexp_substr": "", "rpad": "", "rtrim": "",
  57. "substring": "", "startswith": "", "split_value": "",
  58. "trim": "",
  59. "upper": "",
  60. }
  61. var convFuncMap = map[string]string{"concat": "", "cast": "", "chr": "",
  62. "encode": "",
  63. "trunc": "",
  64. }
  65. var hashFuncMap = map[string]string{"md5": "",
  66. "sha1": "", "sha256": "", "sha384": "", "sha512": "",
  67. }
  68. var jsonFuncMap = map[string]string{
  69. "json_path_query": "", "json_path_query_first": "", "json_path_exists": "",
  70. }
  71. var otherFuncMap = map[string]string{"isnull": "",
  72. "newuuid": "", "tstamp": "", "mqtt": "", "meta": "", "cardinality": "",
  73. }
  74. var NotFoundErr = errors.New("not found")
  75. func (fv *FunctionValuer) Call(name string, args []interface{}) (interface{}, bool) {
  76. lowerName := strings.ToLower(name)
  77. if _, ok := mathFuncMap[lowerName]; ok {
  78. return mathCall(name, args)
  79. } else if _, ok := strFuncMap[lowerName]; ok {
  80. return strCall(lowerName, args)
  81. } else if _, ok := convFuncMap[lowerName]; ok {
  82. return convCall(lowerName, args)
  83. } else if _, ok := hashFuncMap[lowerName]; ok {
  84. return hashCall(lowerName, args)
  85. } else if _, ok := jsonFuncMap[lowerName]; ok {
  86. return jsonCall(lowerName, args)
  87. } else if _, ok := otherFuncMap[lowerName]; ok {
  88. return otherCall(lowerName, args)
  89. } else if _, ok := aggFuncMap[lowerName]; ok {
  90. return nil, false
  91. } else {
  92. nf, fctx, err := fv.runtime.getCustom(name)
  93. switch err {
  94. case NotFoundErr:
  95. return nil, false
  96. case nil:
  97. // do nothing, continue
  98. default:
  99. return err, false
  100. }
  101. if nf.IsAggregate() {
  102. return nil, false
  103. }
  104. logger := fctx.GetLogger()
  105. logger.Debugf("run func %s", name)
  106. return nf.Exec(args, fctx)
  107. }
  108. }
  109. func IsAggStatement(stmt *SelectStatement) bool {
  110. if stmt.Dimensions != nil {
  111. ds := stmt.Dimensions.GetGroups()
  112. if ds != nil && len(ds) > 0 {
  113. return true
  114. }
  115. }
  116. r := false
  117. WalkFunc(stmt.Fields, func(n Node) {
  118. switch f := n.(type) {
  119. case *Call:
  120. if ok := isAggFunc(f); ok {
  121. r = true
  122. return
  123. }
  124. }
  125. })
  126. return r
  127. }
  128. func isAggFunc(f *Call) bool {
  129. fn := strings.ToLower(f.Name)
  130. if _, ok := aggFuncMap[fn]; ok {
  131. return true
  132. } else if _, ok := strFuncMap[fn]; ok {
  133. return false
  134. } else if _, ok := convFuncMap[fn]; ok {
  135. return false
  136. } else if _, ok := hashFuncMap[fn]; ok {
  137. return false
  138. } else if _, ok := otherFuncMap[fn]; ok {
  139. return false
  140. } else if _, ok := mathFuncMap[fn]; ok {
  141. return false
  142. } else {
  143. if nf, _, err := parserFuncRuntime.getCustom(f.Name); err == nil {
  144. if nf.IsAggregate() {
  145. //Add cache
  146. aggFuncMap[fn] = ""
  147. return true
  148. }
  149. }
  150. }
  151. return false
  152. }
  153. func HasAggFuncs(node Node) bool {
  154. if node == nil {
  155. return false
  156. }
  157. var r = false
  158. WalkFunc(node, func(n Node) {
  159. if f, ok := n.(*Call); ok {
  160. if ok := isAggFunc(f); ok {
  161. r = true
  162. return
  163. }
  164. }
  165. })
  166. return r
  167. }
  168. func HasNoAggFuncs(node Node) bool {
  169. if node == nil {
  170. return false
  171. }
  172. var r = false
  173. WalkFunc(node, func(n Node) {
  174. if f, ok := n.(*Call); ok {
  175. if ok := isAggFunc(f); !ok {
  176. r = true
  177. return
  178. }
  179. }
  180. })
  181. return r
  182. }