xsql_processor.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. package processors
  2. import (
  3. "bytes"
  4. "engine/common"
  5. "engine/xsql"
  6. "engine/xsql/plans"
  7. "engine/xstream"
  8. "engine/xstream/collectors"
  9. "engine/xstream/extensions"
  10. "fmt"
  11. "github.com/dgraph-io/badger"
  12. "strings"
  13. )
  14. var log = common.Log
  15. type StreamProcessor struct {
  16. statement string
  17. badgerDir string
  18. }
  19. //@params s : the sql string of create stream statement
  20. //@params d : the directory of the badger DB to save the stream info
  21. func NewStreamProcessor(s, d string) *StreamProcessor {
  22. processor := &StreamProcessor{
  23. statement: s,
  24. badgerDir: d,
  25. }
  26. return processor
  27. }
  28. func (p *StreamProcessor) Exec() (result []string, err error) {
  29. parser := xsql.NewParser(strings.NewReader(p.statement))
  30. stmt, err := xsql.Language.Parse(parser)
  31. if err != nil {
  32. return
  33. }
  34. db, err := common.DbOpen(p.badgerDir)
  35. if err != nil {
  36. return
  37. }
  38. defer common.DbClose(db)
  39. switch s := stmt.(type) {
  40. case *xsql.StreamStmt:
  41. var r string
  42. r, err = p.execCreateStream(s, db)
  43. result = append(result, r)
  44. case *xsql.ShowStreamsStatement:
  45. result, err = p.execShowStream(s, db)
  46. case *xsql.DescribeStreamStatement:
  47. var r string
  48. r, err = p.execDescribeStream(s, db)
  49. result = append(result, r)
  50. case *xsql.ExplainStreamStatement:
  51. var r string
  52. r, err = p.execExplainStream(s, db)
  53. result = append(result, r)
  54. case *xsql.DropStreamStatement:
  55. var r string
  56. r, err = p.execDropStream(s, db)
  57. result = append(result, r)
  58. }
  59. return
  60. }
  61. func (p *StreamProcessor) execCreateStream(stmt *xsql.StreamStmt, db *badger.DB) (string, error) {
  62. err := common.DbSet(db, string(stmt.Name), p.statement)
  63. if err != nil {
  64. return "", err
  65. }else{
  66. return fmt.Sprintf("stream %s created", stmt.Name), nil
  67. }
  68. }
  69. func (p *StreamProcessor) execShowStream(stmt *xsql.ShowStreamsStatement, db *badger.DB) ([]string,error) {
  70. keys, err := common.DbKeys(db)
  71. if len(keys) == 0 {
  72. keys = append(keys, "no stream definition found")
  73. }
  74. return keys, err
  75. }
  76. func (p *StreamProcessor) execDescribeStream(stmt *xsql.DescribeStreamStatement, db *badger.DB) (string,error) {
  77. s, err := common.DbGet(db, string(stmt.Name))
  78. if err != nil {
  79. return "", fmt.Errorf("stream %s not found", stmt.Name)
  80. }
  81. parser := xsql.NewParser(strings.NewReader(s))
  82. stream, err := xsql.Language.Parse(parser)
  83. streamStmt, ok := stream.(*xsql.StreamStmt)
  84. if !ok{
  85. return "", fmt.Errorf("error resolving the stream %s, the data in db may be corrupted", stmt.Name)
  86. }
  87. var buff bytes.Buffer
  88. buff.WriteString("Fields\n--------------------------------------------------------------------------------\n")
  89. for _, f := range streamStmt.StreamFields {
  90. buff.WriteString(f.Name + "\t")
  91. xsql.PrintFieldType(f.FieldType, &buff)
  92. buff.WriteString("\n")
  93. }
  94. buff.WriteString("\n")
  95. common.PrintMap(streamStmt.Options, &buff)
  96. return buff.String(), err
  97. }
  98. func (p *StreamProcessor) execExplainStream(stmt *xsql.ExplainStreamStatement, db *badger.DB) (string,error) {
  99. _, err := common.DbGet(db, string(stmt.Name))
  100. if err != nil{
  101. return "", fmt.Errorf("stream %s not found", stmt.Name)
  102. }
  103. return "TO BE SUPPORTED", nil
  104. }
  105. func (p *StreamProcessor) execDropStream(stmt *xsql.DropStreamStatement, db *badger.DB) (string, error) {
  106. err := common.DbDelete(db, string(stmt.Name))
  107. if err != nil {
  108. return "", err
  109. }else{
  110. return fmt.Sprintf("stream %s dropped", stmt.Name), nil
  111. }
  112. }
  113. func GetStream(db *badger.DB, name string) (stmt *xsql.StreamStmt, err error){
  114. s, err := common.DbGet(db, string(name))
  115. if err != nil {
  116. return
  117. }
  118. parser := xsql.NewParser(strings.NewReader(s))
  119. stream, err := xsql.Language.Parse(parser)
  120. stmt, ok := stream.(*xsql.StreamStmt)
  121. if !ok{
  122. err = fmt.Errorf("error resolving the stream %s, the data in db may be corrupted", name)
  123. }
  124. return
  125. }
  126. type RuleProcessor struct {
  127. sql string
  128. // actions string
  129. badgerDir string
  130. }
  131. func NewRuleProcessor(s, d string) *RuleProcessor {
  132. processor := &RuleProcessor{
  133. sql: s,
  134. badgerDir: d,
  135. }
  136. return processor
  137. }
  138. func (p *RuleProcessor) Exec() error {
  139. parser := xsql.NewParser(strings.NewReader(p.sql))
  140. if stmt, err := xsql.Language.Parse(parser); err != nil{
  141. return fmt.Errorf("parse sql %s error: %s", p.sql , err)
  142. }else{
  143. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok{
  144. return fmt.Errorf("sql %s is not a select statement", p.sql)
  145. }else{
  146. //TODO Validation here or in the cli?
  147. tp := xstream.New()
  148. //create sources and preprocessor
  149. db, err := common.DbOpen(p.badgerDir)
  150. if err != nil {
  151. return err
  152. }
  153. defer common.DbClose(db)
  154. var inputs []xstream.Emitter
  155. for _, s := range selectStmt.Sources {
  156. switch t := s.(type){
  157. case *xsql.Table:
  158. if streamStmt, err := GetStream(db, t.Name); err != nil{
  159. return err
  160. } else {
  161. mqs, err := extensions.NewWithName(string(streamStmt.Name), streamStmt.Options["DATASOURCE"], streamStmt.Options["CONF_KEY"])
  162. if err != nil {
  163. return err
  164. }
  165. tp.AddSrc(mqs)
  166. preprocessorOp := xstream.Transform(&plans.Preprocessor{StreamStmt: streamStmt}, "preprocessor_" +t.Name)
  167. tp.AddOperator([]xstream.Emitter{mqs}, preprocessorOp)
  168. inputs = append(inputs, preprocessorOp)
  169. }
  170. default:
  171. return fmt.Errorf("unsupported source type %T", s)
  172. }
  173. }
  174. //if selectStmt.Joins != nil {
  175. // for _, join := range selectStmt.Joins {
  176. // if streamStmt, err := GetStream(db, join.Name); err != nil{
  177. // return err
  178. // } else {
  179. // mqs, err := extensions.NewWithName(string(streamStmt.Name), streamStmt.Options["DATASOURCE"], streamStmt.Options["CONF_KEY"])
  180. // if err != nil {
  181. // return err
  182. // }
  183. // tp.AddSrc(mqs)
  184. //
  185. // preprocessorOp := xstream.Transform(&plans.Preprocessor{StreamStmt: streamStmt}, "preprocessor_" + join.Name)
  186. // tp.AddOperator([]xstream.Emitter{mqs}, preprocessorOp)
  187. // inputs = append(inputs, preprocessorOp)
  188. // }
  189. // }
  190. //
  191. // joinOp := xstream.Transform(&plans.JoinPlan{Joins:selectStmt.Joins, Dimensions: selectStmt.Dimensions}, "join")
  192. // //TODO concurrency setting by command
  193. // //joinOp.SetConcurrency(3)
  194. // //TODO Read the ticker from dimension statement
  195. // joinOp.SetTicker(time.Second * 5)
  196. // tp.AddOperator(inputs, joinOp)
  197. // inputs = []xstream.Emitter{joinOp}
  198. //}
  199. if selectStmt.Condition != nil {
  200. filterOp := xstream.Transform(&plans.FilterPlan{Condition: selectStmt.Condition}, "filter")
  201. //TODO concurrency setting by command
  202. // filterOp.SetConcurrency(3)
  203. tp.AddOperator(inputs, filterOp)
  204. inputs = []xstream.Emitter{filterOp}
  205. }
  206. if selectStmt.Fields != nil {
  207. projectOp := xstream.Transform(&plans.ProjectPlan{Fields: selectStmt.Fields}, "project")
  208. tp.AddOperator(inputs, projectOp)
  209. inputs = []xstream.Emitter{projectOp}
  210. }
  211. //TODO hard coded sink now. parameterize it
  212. tp.AddSink(inputs, collectors.Func(func(data interface{}) error {
  213. fmt.Printf("Sink: %s\n", data)
  214. return nil
  215. }))
  216. if err := <-tp.Open(); err != nil {
  217. return err
  218. }
  219. }
  220. }
  221. return nil
  222. }