util.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. package xsql
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/common/kv"
  7. "strings"
  8. )
  9. func PrintFieldType(ft FieldType) (result string) {
  10. switch t := ft.(type) {
  11. case *BasicType:
  12. result = t.Type.String()
  13. case *ArrayType:
  14. result = "array("
  15. if t.FieldType != nil {
  16. result += PrintFieldType(t.FieldType)
  17. } else {
  18. result += t.Type.String()
  19. }
  20. result += ")"
  21. case *RecType:
  22. result = "struct("
  23. isFirst := true
  24. for _, f := range t.StreamFields {
  25. if isFirst {
  26. isFirst = false
  27. } else {
  28. result += ", "
  29. }
  30. result = result + f.Name + " " + PrintFieldType(f.FieldType)
  31. }
  32. result += ")"
  33. }
  34. return
  35. }
  36. func PrintFieldTypeForJson(ft FieldType) (result interface{}) {
  37. r, q := doPrintFieldTypeForJson(ft)
  38. if q {
  39. return r
  40. } else {
  41. return json.RawMessage(r)
  42. }
  43. }
  44. func doPrintFieldTypeForJson(ft FieldType) (result string, isLiteral bool) {
  45. switch t := ft.(type) {
  46. case *BasicType:
  47. return t.Type.String(), true
  48. case *ArrayType:
  49. var (
  50. fieldType string
  51. q bool
  52. )
  53. if t.FieldType != nil {
  54. fieldType, q = doPrintFieldTypeForJson(t.FieldType)
  55. } else {
  56. fieldType, q = t.Type.String(), true
  57. }
  58. if q {
  59. result = fmt.Sprintf(`{"Type":"array","ElementType":"%s"}`, fieldType)
  60. } else {
  61. result = fmt.Sprintf(`{"Type":"array","ElementType":%s}`, fieldType)
  62. }
  63. case *RecType:
  64. result = `{"Type":"struct","Fields":[`
  65. isFirst := true
  66. for _, f := range t.StreamFields {
  67. if isFirst {
  68. isFirst = false
  69. } else {
  70. result += ","
  71. }
  72. fieldType, q := doPrintFieldTypeForJson(f.FieldType)
  73. if q {
  74. result = fmt.Sprintf(`%s{"FieldType":"%s","Name":"%s"}`, result, fieldType, f.Name)
  75. } else {
  76. result = fmt.Sprintf(`%s{"FieldType":"%s","Name":"%s"}`, result, fieldType, f.Name)
  77. }
  78. }
  79. result += `]}`
  80. }
  81. return result, false
  82. }
  83. func GetStreams(stmt *SelectStatement) (result []string) {
  84. if stmt == nil {
  85. return nil
  86. }
  87. for _, source := range stmt.Sources {
  88. if s, ok := source.(*Table); ok {
  89. result = append(result, s.Name)
  90. }
  91. }
  92. for _, join := range stmt.Joins {
  93. result = append(result, join.Name)
  94. }
  95. return
  96. }
  97. func LowercaseKeyMap(m map[string]interface{}) map[string]interface{} {
  98. m1 := make(map[string]interface{})
  99. for k, v := range m {
  100. if m2, ok := v.(map[string]interface{}); ok {
  101. m1[strings.ToLower(k)] = LowercaseKeyMap(m2)
  102. } else {
  103. m1[strings.ToLower(k)] = v
  104. }
  105. }
  106. return m1
  107. }
  108. func GetStatementFromSql(sql string) (*SelectStatement, error) {
  109. parser := NewParser(strings.NewReader(sql))
  110. if stmt, err := Language.Parse(parser); err != nil {
  111. return nil, fmt.Errorf("Parse SQL %s error: %s.", sql, err)
  112. } else {
  113. if r, ok := stmt.(*SelectStatement); !ok {
  114. return nil, fmt.Errorf("SQL %s is not a select statement.", sql)
  115. } else {
  116. return r, nil
  117. }
  118. }
  119. }
  120. type StreamInfo struct {
  121. StreamType StreamType `json:"streamType"`
  122. Statement string `json:"statement"`
  123. }
  124. func GetDataSourceStatement(m kv.KeyValue, name string) (*StreamInfo, error) {
  125. var (
  126. v string
  127. vs = &StreamInfo{}
  128. )
  129. err := m.Open()
  130. if err != nil {
  131. return nil, fmt.Errorf("error when opening db: %v", err)
  132. }
  133. defer m.Close()
  134. if ok, _ := m.Get(name, &v); ok {
  135. if err := json.Unmarshal([]byte(v), vs); err != nil {
  136. return nil, fmt.Errorf("error unmarshall %s, the data in db may be corrupted", name)
  137. } else {
  138. return vs, nil
  139. }
  140. }
  141. return nil, common.NewErrorWithCode(common.NOT_FOUND, fmt.Sprintf("%s is not found", name))
  142. }
  143. func GetDataSource(m kv.KeyValue, name string) (stmt *StreamStmt, err error) {
  144. info, err := GetDataSourceStatement(m, name)
  145. if err != nil {
  146. return nil, err
  147. }
  148. parser := NewParser(strings.NewReader(info.Statement))
  149. stream, err := Language.Parse(parser)
  150. stmt, ok := stream.(*StreamStmt)
  151. if !ok {
  152. err = fmt.Errorf("Error resolving the stream %s, the data in db may be corrupted.", name)
  153. }
  154. return
  155. }