stream.go 8.4 KB


  1. package processor
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/lf-edge/ekuiper/internal/conf"
  7. "github.com/lf-edge/ekuiper/internal/xsql"
  8. "github.com/lf-edge/ekuiper/pkg/ast"
  9. "github.com/lf-edge/ekuiper/pkg/errorx"
  10. "github.com/lf-edge/ekuiper/pkg/kv"
  11. "strings"
  12. )
  13. var (
  14. log = conf.Log
  15. )
  16. type StreamProcessor struct {
  17. db kv.KeyValue
  18. }
  19. //@params d : the directory of the DB to save the stream info
  20. func NewStreamProcessor(d string) *StreamProcessor {
  21. processor := &StreamProcessor{
  22. db: kv.GetDefaultKVStore(d),
  23. }
  24. return processor
  25. }
  26. func (p *StreamProcessor) ExecStmt(statement string) (result []string, err error) {
  27. parser := xsql.NewParser(strings.NewReader(statement))
  28. stmt, err := xsql.Language.Parse(parser)
  29. if err != nil {
  30. return nil, err
  31. }
  32. switch s := stmt.(type) {
  33. case *ast.StreamStmt: //Table is also StreamStmt
  34. var r string
  35. err = p.execSave(s, statement, false)
  36. stt := ast.StreamTypeMap[s.StreamType]
  37. if err != nil {
  38. err = fmt.Errorf("Create %s fails: %v.", stt, err)
  39. } else {
  40. r = fmt.Sprintf("%s %s is created.", strings.Title(stt), s.Name)
  41. log.Printf("%s", r)
  42. }
  43. result = append(result, r)
  44. case *ast.ShowStreamsStatement:
  45. result, err = p.execShow(ast.TypeStream)
  46. case *ast.ShowTablesStatement:
  47. result, err = p.execShow(ast.TypeTable)
  48. case *ast.DescribeStreamStatement:
  49. var r string
  50. r, err = p.execDescribe(s, ast.TypeStream)
  51. result = append(result, r)
  52. case *ast.DescribeTableStatement:
  53. var r string
  54. r, err = p.execDescribe(s, ast.TypeTable)
  55. result = append(result, r)
  56. case *ast.ExplainStreamStatement:
  57. var r string
  58. r, err = p.execExplain(s, ast.TypeStream)
  59. result = append(result, r)
  60. case *ast.ExplainTableStatement:
  61. var r string
  62. r, err = p.execExplain(s, ast.TypeTable)
  63. result = append(result, r)
  64. case *ast.DropStreamStatement:
  65. var r string
  66. r, err = p.execDrop(s, ast.TypeStream)
  67. result = append(result, r)
  68. case *ast.DropTableStatement:
  69. var r string
  70. r, err = p.execDrop(s, ast.TypeTable)
  71. result = append(result, r)
  72. default:
  73. return nil, fmt.Errorf("Invalid stream statement: %s", statement)
  74. }
  75. return
  76. }
  77. func (p *StreamProcessor) execSave(stmt *ast.StreamStmt, statement string, replace bool) error {
  78. err := p.db.Open()
  79. if err != nil {
  80. return fmt.Errorf("error when opening db: %v.", err)
  81. }
  82. defer p.db.Close()
  83. s, err := json.Marshal(xsql.StreamInfo{
  84. StreamType: stmt.StreamType,
  85. Statement: statement,
  86. })
  87. if err != nil {
  88. return fmt.Errorf("error when saving to db: %v.", err)
  89. }
  90. if replace {
  91. err = p.db.Set(string(stmt.Name), string(s))
  92. } else {
  93. err = p.db.Setnx(string(stmt.Name), string(s))
  94. }
  95. return err
  96. }
  97. func (p *StreamProcessor) ExecReplaceStream(statement string, st ast.StreamType) (string, error) {
  98. parser := xsql.NewParser(strings.NewReader(statement))
  99. stmt, err := xsql.Language.Parse(parser)
  100. if err != nil {
  101. return "", err
  102. }
  103. stt := ast.StreamTypeMap[st]
  104. switch s := stmt.(type) {
  105. case *ast.StreamStmt:
  106. if s.StreamType != st {
  107. return "", errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("%s %s is not found", ast.StreamTypeMap[st], s.Name))
  108. }
  109. err = p.execSave(s, statement, true)
  110. if err != nil {
  111. return "", fmt.Errorf("Replace %s fails: %v.", stt, err)
  112. } else {
  113. info := fmt.Sprintf("%s %s is replaced.", strings.Title(stt), s.Name)
  114. log.Printf("%s", info)
  115. return info, nil
  116. }
  117. default:
  118. return "", fmt.Errorf("Invalid %s statement: %s", stt, statement)
  119. }
  120. }
  121. func (p *StreamProcessor) ExecStreamSql(statement string) (string, error) {
  122. r, err := p.ExecStmt(statement)
  123. if err != nil {
  124. return "", err
  125. } else {
  126. return strings.Join(r, "\n"), err
  127. }
  128. }
  129. func (p *StreamProcessor) execShow(st ast.StreamType) ([]string, error) {
  130. keys, err := p.ShowStream(st)
  131. if len(keys) == 0 {
  132. keys = append(keys, fmt.Sprintf("No %s definitions are found.", ast.StreamTypeMap[st]))
  133. }
  134. return keys, err
  135. }
  136. func (p *StreamProcessor) ShowStream(st ast.StreamType) ([]string, error) {
  137. stt := ast.StreamTypeMap[st]
  138. err := p.db.Open()
  139. if err != nil {
  140. return nil, fmt.Errorf("Show %ss fails, error when opening db: %v.", stt, err)
  141. }
  142. defer p.db.Close()
  143. keys, err := p.db.Keys()
  144. if err != nil {
  145. return nil, fmt.Errorf("Show %ss fails, error when loading data from db: %v.", stt, err)
  146. }
  147. var (
  148. v string
  149. vs = &xsql.StreamInfo{}
  150. result = make([]string, 0)
  151. )
  152. for _, k := range keys {
  153. if ok, _ := p.db.Get(k, &v); ok {
  154. if err := json.Unmarshal([]byte(v), vs); err == nil && vs.StreamType == st {
  155. result = append(result, k)
  156. }
  157. }
  158. }
  159. return result, nil
  160. }
  161. func (p *StreamProcessor) getStream(name string, st ast.StreamType) (string, error) {
  162. vs, err := xsql.GetDataSourceStatement(p.db, name)
  163. if vs != nil && vs.StreamType == st {
  164. return vs.Statement, nil
  165. }
  166. if err != nil {
  167. return "", err
  168. }
  169. return "", errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("%s %s is not found", ast.StreamTypeMap[st], name))
  170. }
  171. func (p *StreamProcessor) execDescribe(stmt ast.NameNode, st ast.StreamType) (string, error) {
  172. streamStmt, err := p.DescStream(stmt.GetName(), st)
  173. if err != nil {
  174. return "", err
  175. }
  176. switch s := streamStmt.(type) {
  177. case *ast.StreamStmt:
  178. var buff bytes.Buffer
  179. buff.WriteString("Fields\n--------------------------------------------------------------------------------\n")
  180. for _, f := range s.StreamFields {
  181. buff.WriteString(f.Name + "\t")
  182. buff.WriteString(printFieldType(f.FieldType))
  183. buff.WriteString("\n")
  184. }
  185. buff.WriteString("\n")
  186. printOptions(s.Options, &buff)
  187. return buff.String(), err
  188. default:
  189. return "%s", fmt.Errorf("Error resolving the %s %s, the data in db may be corrupted.", ast.StreamTypeMap[st], stmt.GetName())
  190. }
  191. }
  192. func printOptions(opts *ast.Options, buff *bytes.Buffer) {
  193. if opts.CONF_KEY != "" {
  194. buff.WriteString(fmt.Sprintf("CONF_KEY: %s\n", opts.CONF_KEY))
  195. }
  196. if opts.DATASOURCE != "" {
  197. buff.WriteString(fmt.Sprintf("DATASOURCE: %s\n", opts.DATASOURCE))
  198. }
  199. if opts.FORMAT != "" {
  200. buff.WriteString(fmt.Sprintf("FORMAT: %s\n", opts.FORMAT))
  201. }
  202. if opts.KEY != "" {
  203. buff.WriteString(fmt.Sprintf("KEY: %s\n", opts.KEY))
  204. }
  205. if opts.RETAIN_SIZE != 0 {
  206. buff.WriteString(fmt.Sprintf("RETAIN_SIZE: %d\n", opts.RETAIN_SIZE))
  207. }
  208. if opts.SHARED {
  209. buff.WriteString(fmt.Sprintf("SHARED: %v\n", opts.SHARED))
  210. }
  211. if opts.STRICT_VALIDATION {
  212. buff.WriteString(fmt.Sprintf("STRICT_VALIDATION: %v\n", opts.STRICT_VALIDATION))
  213. }
  214. if opts.TIMESTAMP != "" {
  215. buff.WriteString(fmt.Sprintf("TIMESTAMP: %s\n", opts.TIMESTAMP))
  216. }
  217. if opts.TIMESTAMP_FORMAT != "" {
  218. buff.WriteString(fmt.Sprintf("TIMESTAMP_FORMAT: %s\n", opts.TIMESTAMP_FORMAT))
  219. }
  220. if opts.TYPE != "" {
  221. buff.WriteString(fmt.Sprintf("TYPE: %s\n", opts.TYPE))
  222. }
  223. }
  224. func (p *StreamProcessor) DescStream(name string, st ast.StreamType) (ast.Statement, error) {
  225. statement, err := p.getStream(name, st)
  226. if err != nil {
  227. return nil, fmt.Errorf("Describe %s fails, %s.", ast.StreamTypeMap[st], err)
  228. }
  229. parser := xsql.NewParser(strings.NewReader(statement))
  230. stream, err := xsql.Language.Parse(parser)
  231. if err != nil {
  232. return nil, err
  233. }
  234. return stream, nil
  235. }
  236. func (p *StreamProcessor) execExplain(stmt ast.NameNode, st ast.StreamType) (string, error) {
  237. _, err := p.getStream(stmt.GetName(), st)
  238. if err != nil {
  239. return "", fmt.Errorf("Explain %s fails, %s.", ast.StreamTypeMap[st], err)
  240. }
  241. return "TO BE SUPPORTED", nil
  242. }
  243. func (p *StreamProcessor) execDrop(stmt ast.NameNode, st ast.StreamType) (string, error) {
  244. s, err := p.DropStream(stmt.GetName(), st)
  245. if err != nil {
  246. return s, fmt.Errorf("Drop %s fails: %s.", ast.StreamTypeMap[st], err)
  247. }
  248. return s, nil
  249. }
  250. func (p *StreamProcessor) DropStream(name string, st ast.StreamType) (string, error) {
  251. defer p.db.Close()
  252. _, err := p.getStream(name, st)
  253. if err != nil {
  254. return "", err
  255. }
  256. err = p.db.Open()
  257. if err != nil {
  258. return "", fmt.Errorf("error when opening db: %v", err)
  259. }
  260. defer p.db.Close()
  261. err = p.db.Delete(name)
  262. if err != nil {
  263. return "", err
  264. } else {
  265. return fmt.Sprintf("%s %s is dropped.", strings.Title(ast.StreamTypeMap[st]), name), nil
  266. }
  267. }
  268. func printFieldType(ft ast.FieldType) (result string) {
  269. switch t := ft.(type) {
  270. case *ast.BasicType:
  271. result = t.Type.String()
  272. case *ast.ArrayType:
  273. result = "array("
  274. if t.FieldType != nil {
  275. result += printFieldType(t.FieldType)
  276. } else {
  277. result += t.Type.String()
  278. }
  279. result += ")"
  280. case *ast.RecType:
  281. result = "struct("
  282. isFirst := true
  283. for _, f := range t.StreamFields {
  284. if isFirst {
  285. isFirst = false
  286. } else {
  287. result += ", "
  288. }
  289. result = result + f.Name + " " + printFieldType(f.FieldType)
  290. }
  291. result += ")"
  292. }
  293. return
  294. }