util.go 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. package planner
  2. import "github.com/emqx/kuiper/pkg/ast"
  3. func getRefSources(node ast.Node) ([]ast.StreamName, bool) {
  4. result := make(map[ast.StreamName]bool)
  5. keys := make([]ast.StreamName, 0, len(result))
  6. if node == nil {
  7. return keys, false
  8. }
  9. hasDefault := false
  10. ast.WalkFunc(node, func(n ast.Node) bool {
  11. if f, ok := n.(*ast.FieldRef); ok {
  12. for _, sn := range f.RefSources() {
  13. if sn == ast.DefaultStream {
  14. hasDefault = true
  15. }
  16. result[sn] = true
  17. }
  18. return false
  19. }
  20. return true
  21. })
  22. for k := range result {
  23. keys = append(keys, k)
  24. }
  25. return keys, hasDefault
  26. }
  27. func combine(l ast.Expr, r ast.Expr) ast.Expr {
  28. if l != nil && r != nil {
  29. return &ast.BinaryExpr{
  30. OP: ast.AND,
  31. LHS: l,
  32. RHS: r,
  33. }
  34. } else if l != nil {
  35. return l
  36. } else {
  37. return r
  38. }
  39. }
  40. func getFields(node ast.Node) []ast.Expr {
  41. result := make([]ast.Expr, 0)
  42. ast.WalkFunc(node, func(n ast.Node) bool {
  43. switch t := n.(type) {
  44. case *ast.FieldRef:
  45. if t.IsColumn() {
  46. result = append(result, t)
  47. }
  48. case *ast.Wildcard:
  49. result = append(result, t)
  50. case *ast.MetaRef:
  51. if t.StreamName != "" {
  52. result = append(result, t)
  53. }
  54. case *ast.SortField:
  55. result = append(result, t)
  56. }
  57. return true
  58. })
  59. return result
  60. }