stmtx.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. package xsql
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/pkg/ast"
  6. "github.com/emqx/kuiper/pkg/errorx"
  7. "github.com/emqx/kuiper/pkg/kv"
  8. "strings"
  9. )
  10. func GetStreams(stmt *ast.SelectStatement) (result []string) {
  11. if stmt == nil {
  12. return nil
  13. }
  14. for _, source := range stmt.Sources {
  15. if s, ok := source.(*ast.Table); ok {
  16. result = append(result, s.Name)
  17. }
  18. }
  19. for _, join := range stmt.Joins {
  20. result = append(result, join.Name)
  21. }
  22. return
  23. }
  24. func GetStatementFromSql(sql string) (*ast.SelectStatement, error) {
  25. parser := NewParser(strings.NewReader(sql))
  26. if stmt, err := Language.Parse(parser); err != nil {
  27. return nil, fmt.Errorf("Parse SQL %s error: %s.", sql, err)
  28. } else {
  29. if r, ok := stmt.(*ast.SelectStatement); !ok {
  30. return nil, fmt.Errorf("SQL %s is not a select statement.", sql)
  31. } else {
  32. return r, nil
  33. }
  34. }
  35. }
  36. type StreamInfo struct {
  37. StreamType ast.StreamType `json:"streamType"`
  38. Statement string `json:"statement"`
  39. }
  40. func GetDataSourceStatement(m kv.KeyValue, name string) (*StreamInfo, error) {
  41. var (
  42. v string
  43. vs = &StreamInfo{}
  44. )
  45. err := m.Open()
  46. if err != nil {
  47. return nil, fmt.Errorf("error when opening db: %v", err)
  48. }
  49. defer m.Close()
  50. if ok, _ := m.Get(name, &v); ok {
  51. if err := json.Unmarshal([]byte(v), vs); err != nil {
  52. return nil, fmt.Errorf("error unmarshall %s, the data in db may be corrupted", name)
  53. } else {
  54. return vs, nil
  55. }
  56. }
  57. return nil, errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("%s is not found", name))
  58. }
  59. func GetDataSource(m kv.KeyValue, name string) (stmt *ast.StreamStmt, err error) {
  60. info, err := GetDataSourceStatement(m, name)
  61. if err != nil {
  62. return nil, err
  63. }
  64. parser := NewParser(strings.NewReader(info.Statement))
  65. stream, err := Language.Parse(parser)
  66. stmt, ok := stream.(*ast.StreamStmt)
  67. if !ok {
  68. err = fmt.Errorf("Error resolving the stream %s, the data in db may be corrupted.", name)
  69. }
  70. return
  71. }