xsql_processor.go 12 KB


  1. package processors
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "engine/common"
  6. "engine/xsql"
  7. "engine/xsql/plans"
  8. "engine/xstream"
  9. "engine/xstream/api"
  10. "engine/xstream/extensions"
  11. "engine/xstream/nodes"
  12. "engine/xstream/operators"
  13. "engine/xstream/sinks"
  14. "fmt"
  15. "github.com/dgraph-io/badger"
  16. "path"
  17. "strings"
  18. )
  19. var log = common.Log
  20. type StreamProcessor struct {
  21. statement string
  22. badgerDir string
  23. }
  24. //@params s : the sql string of create stream statement
  25. //@params d : the directory of the badger DB to save the stream info
  26. func NewStreamProcessor(s, d string) *StreamProcessor {
  27. processor := &StreamProcessor{
  28. statement: s,
  29. badgerDir: d,
  30. }
  31. return processor
  32. }
  33. func (p *StreamProcessor) Exec() (result []string, err error) {
  34. parser := xsql.NewParser(strings.NewReader(p.statement))
  35. stmt, err := xsql.Language.Parse(parser)
  36. if err != nil {
  37. return
  38. }
  39. db, err := common.DbOpen(p.badgerDir)
  40. if err != nil {
  41. return
  42. }
  43. defer common.DbClose(db)
  44. switch s := stmt.(type) {
  45. case *xsql.StreamStmt:
  46. var r string
  47. r, err = p.execCreateStream(s, db)
  48. result = append(result, r)
  49. case *xsql.ShowStreamsStatement:
  50. result, err = p.execShowStream(s, db)
  51. case *xsql.DescribeStreamStatement:
  52. var r string
  53. r, err = p.execDescribeStream(s, db)
  54. result = append(result, r)
  55. case *xsql.ExplainStreamStatement:
  56. var r string
  57. r, err = p.execExplainStream(s, db)
  58. result = append(result, r)
  59. case *xsql.DropStreamStatement:
  60. var r string
  61. r, err = p.execDropStream(s, db)
  62. result = append(result, r)
  63. }
  64. return
  65. }
  66. func (p *StreamProcessor) execCreateStream(stmt *xsql.StreamStmt, db *badger.DB) (string, error) {
  67. err := common.DbSet(db, string(stmt.Name), p.statement)
  68. if err != nil {
  69. return "", err
  70. }else{
  71. return fmt.Sprintf("stream %s created", stmt.Name), nil
  72. }
  73. }
  74. func (p *StreamProcessor) execShowStream(stmt *xsql.ShowStreamsStatement, db *badger.DB) ([]string,error) {
  75. keys, err := common.DbKeys(db)
  76. if len(keys) == 0 {
  77. keys = append(keys, "no stream definition found")
  78. }
  79. return keys, err
  80. }
  81. func (p *StreamProcessor) execDescribeStream(stmt *xsql.DescribeStreamStatement, db *badger.DB) (string,error) {
  82. s, err := common.DbGet(db, string(stmt.Name))
  83. if err != nil {
  84. return "", fmt.Errorf("stream %s not found", stmt.Name)
  85. }
  86. parser := xsql.NewParser(strings.NewReader(s))
  87. stream, err := xsql.Language.Parse(parser)
  88. streamStmt, ok := stream.(*xsql.StreamStmt)
  89. if !ok{
  90. return "", fmt.Errorf("error resolving the stream %s, the data in db may be corrupted", stmt.Name)
  91. }
  92. var buff bytes.Buffer
  93. buff.WriteString("Fields\n--------------------------------------------------------------------------------\n")
  94. for _, f := range streamStmt.StreamFields {
  95. buff.WriteString(f.Name + "\t")
  96. xsql.PrintFieldType(f.FieldType, &buff)
  97. buff.WriteString("\n")
  98. }
  99. buff.WriteString("\n")
  100. common.PrintMap(streamStmt.Options, &buff)
  101. return buff.String(), err
  102. }
  103. func (p *StreamProcessor) execExplainStream(stmt *xsql.ExplainStreamStatement, db *badger.DB) (string,error) {
  104. _, err := common.DbGet(db, string(stmt.Name))
  105. if err != nil{
  106. return "", fmt.Errorf("stream %s not found", stmt.Name)
  107. }
  108. return "TO BE SUPPORTED", nil
  109. }
  110. func (p *StreamProcessor) execDropStream(stmt *xsql.DropStreamStatement, db *badger.DB) (string, error) {
  111. err := common.DbDelete(db, string(stmt.Name))
  112. if err != nil {
  113. return "", err
  114. }else{
  115. return fmt.Sprintf("stream %s dropped", stmt.Name), nil
  116. }
  117. }
  118. func GetStream(db *badger.DB, name string) (stmt *xsql.StreamStmt, err error){
  119. s, err := common.DbGet(db, name)
  120. if err != nil {
  121. return
  122. }
  123. parser := xsql.NewParser(strings.NewReader(s))
  124. stream, err := xsql.Language.Parse(parser)
  125. stmt, ok := stream.(*xsql.StreamStmt)
  126. if !ok{
  127. err = fmt.Errorf("error resolving the stream %s, the data in db may be corrupted", name)
  128. }
  129. return
  130. }
  131. type RuleProcessor struct {
  132. badgerDir string
  133. }
  134. func NewRuleProcessor(d string) *RuleProcessor {
  135. processor := &RuleProcessor{
  136. badgerDir: d,
  137. }
  138. return processor
  139. }
  140. func (p *RuleProcessor) ExecCreate(name, ruleJson string) (*api.Rule, error) {
  141. rule, err := p.getRuleByJson(name, ruleJson)
  142. if err != nil {
  143. return nil, err
  144. }
  145. db, err := common.DbOpen(path.Join(p.badgerDir, "rule"))
  146. if err != nil {
  147. return nil, err
  148. }
  149. err = common.DbSet(db, string(name), ruleJson)
  150. if err != nil {
  151. common.DbClose(db)
  152. return nil, err
  153. }else{
  154. log.Infof("rule %s created", name)
  155. common.DbClose(db)
  156. }
  157. return rule, nil
  158. }
  159. func (p *RuleProcessor) GetRuleByName(name string) (*api.Rule, error) {
  160. db, err := common.DbOpen(path.Join(p.badgerDir, "rule"))
  161. if err != nil {
  162. return nil, err
  163. }
  164. defer common.DbClose(db)
  165. s, err := common.DbGet(db, string(name))
  166. if err != nil {
  167. return nil, fmt.Errorf("rule %s not found", name)
  168. }
  169. return p.getRuleByJson(name, s)
  170. }
  171. func (p *RuleProcessor) getRuleByJson(name, ruleJson string) (*api.Rule, error) {
  172. var rule api.Rule
  173. if err := json.Unmarshal([]byte(ruleJson), &rule); err != nil {
  174. return nil, fmt.Errorf("parse rule %s error : %s", ruleJson, err)
  175. }
  176. rule.Id = name
  177. //validation
  178. if name == ""{
  179. return nil, fmt.Errorf("missing rule id")
  180. }
  181. if rule.Sql == ""{
  182. return nil, fmt.Errorf("missing rule sql")
  183. }
  184. if rule.Actions == nil || len(rule.Actions) == 0{
  185. return nil, fmt.Errorf("missing rule actions")
  186. }
  187. return &rule, nil
  188. }
  189. func (p *RuleProcessor) ExecInitRule(rule *api.Rule) (*xstream.TopologyNew, error) {
  190. if tp, inputs, err := p.createTopo(rule); err != nil {
  191. return nil, err
  192. }else{
  193. for _, m := range rule.Actions {
  194. for name, action := range m {
  195. switch name {
  196. case "log":
  197. log.Printf("Create log sink with %s", action)
  198. tp.AddSink(inputs, nodes.NewSinkNode("sink_log", sinks.NewLogSink()))
  199. case "mqtt":
  200. log.Printf("Create mqtt sink with %s", action)
  201. if ms, err := sinks.NewMqttSink(action); err != nil{
  202. return nil, err
  203. }else{
  204. tp.AddSink(inputs, nodes.NewSinkNode("sink_mqtt", ms))
  205. }
  206. default:
  207. return nil, fmt.Errorf("unsupported action: %s", name)
  208. }
  209. }
  210. }
  211. return tp, nil
  212. }
  213. }
  214. func (p *RuleProcessor) ExecQuery(ruleid, sql string) (*xstream.TopologyNew, error) {
  215. if tp, inputs, err := p.createTopo(&api.Rule{Id: ruleid, Sql: sql}); err != nil {
  216. return nil, err
  217. } else {
  218. tp.AddSink(inputs, nodes.NewSinkNode("sink_memory_log", sinks.NewLogSinkToMemory()))
  219. go func() {
  220. select {
  221. case err := <-tp.Open():
  222. log.Println(err)
  223. tp.Cancel()
  224. }
  225. }()
  226. return tp, nil
  227. }
  228. }
  229. func (p *RuleProcessor) ExecDesc(name string) (string, error) {
  230. db, err := common.DbOpen(path.Join(p.badgerDir, "rule"))
  231. if err != nil {
  232. return "", err
  233. }
  234. defer common.DbClose(db)
  235. s, err := common.DbGet(db, string(name))
  236. if err != nil {
  237. return "", fmt.Errorf("rule %s not found", name)
  238. }
  239. dst := &bytes.Buffer{}
  240. if err := json.Indent(dst, []byte(s), "", " "); err != nil {
  241. return "", err
  242. }
  243. return fmt.Sprintln(dst.String()), nil
  244. }
  245. func (p *RuleProcessor) ExecShow() (string, error) {
  246. keys, err := p.GetAllRules()
  247. if err != nil{
  248. return "", err
  249. }
  250. if len(keys) == 0 {
  251. keys = append(keys, "no rule definition found")
  252. }
  253. var result string
  254. for _, c := range keys{
  255. result = result + fmt.Sprintln(c)
  256. }
  257. return result, nil
  258. }
  259. func (p *RuleProcessor) GetAllRules() ([]string, error) {
  260. db, err := common.DbOpen(path.Join(p.badgerDir, "rule"))
  261. if err != nil {
  262. return nil, err
  263. }
  264. defer common.DbClose(db)
  265. return common.DbKeys(db)
  266. }
  267. func (p *RuleProcessor) ExecDrop(name string) (string, error) {
  268. db, err := common.DbOpen(path.Join(p.badgerDir, "rule"))
  269. if err != nil {
  270. return "", err
  271. }
  272. defer common.DbClose(db)
  273. err = common.DbDelete(db, string(name))
  274. if err != nil {
  275. return "", err
  276. }else{
  277. return fmt.Sprintf("rule %s dropped", name), nil
  278. }
  279. }
  280. func (p *RuleProcessor) createTopo(rule *api.Rule) (*xstream.TopologyNew, []api.Emitter, error) {
  281. return p.createTopoWithSources(rule, nil)
  282. }
  283. //For test to mock source
  284. func (p *RuleProcessor) createTopoWithSources(rule *api.Rule, sources []*nodes.SourceNode) (*xstream.TopologyNew, []api.Emitter, error){
  285. name := rule.Id
  286. sql := rule.Sql
  287. var isEventTime bool
  288. var lateTol int64
  289. if iet, ok := rule.Options["isEventTime"]; ok{
  290. isEventTime, ok = iet.(bool)
  291. if !ok{
  292. return nil, nil, fmt.Errorf("invalid rule option isEventTime %v, bool type required", iet)
  293. }
  294. }
  295. if isEventTime {
  296. if l, ok := rule.Options["lateTolerance"]; ok{
  297. if fl, ok := l.(float64); ok{
  298. lateTol = int64(fl)
  299. }else{
  300. return nil, nil, fmt.Errorf("invalid rule option lateTolerance %v, int type required", l)
  301. }
  302. }
  303. }
  304. shouldCreateSource := sources == nil
  305. parser := xsql.NewParser(strings.NewReader(sql))
  306. if stmt, err := xsql.Language.Parse(parser); err != nil{
  307. return nil, nil, fmt.Errorf("parse sql %s error: %s", sql , err)
  308. }else {
  309. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  310. return nil, nil, fmt.Errorf("sql %s is not a select statement", sql)
  311. } else {
  312. tp := xstream.NewWithName(name)
  313. var inputs []api.Emitter
  314. streamsFromStmt := xsql.GetStreams(selectStmt)
  315. if !shouldCreateSource && len(streamsFromStmt) != len(sources){
  316. return nil, nil, fmt.Errorf("invalid parameter sources or streams, the length cannot match the statement, expect %d sources", len(streamsFromStmt))
  317. }
  318. db, err := common.DbOpen(path.Join(p.badgerDir, "stream"))
  319. if err != nil {
  320. return nil, nil, err
  321. }
  322. defer common.DbClose(db)
  323. for i, s := range streamsFromStmt {
  324. streamStmt, err := GetStream(db, s)
  325. if err != nil {
  326. return nil, nil, err
  327. }
  328. pp, err := plans.NewPreprocessor(streamStmt, selectStmt.Fields, isEventTime)
  329. if err != nil{
  330. return nil, nil, err
  331. }
  332. if shouldCreateSource{
  333. mqs, err := extensions.NewMQTTSource(streamStmt.Options["DATASOURCE"], streamStmt.Options["CONF_KEY"])
  334. if err != nil {
  335. return nil, nil, err
  336. }
  337. node := nodes.NewSourceNode(string(streamStmt.Name), mqs)
  338. tp.AddSrc(node)
  339. preprocessorOp := xstream.Transform(pp, "preprocessor_"+s)
  340. tp.AddOperator([]api.Emitter{node}, preprocessorOp)
  341. inputs = append(inputs, preprocessorOp)
  342. }else{
  343. tp.AddSrc(sources[i])
  344. preprocessorOp := xstream.Transform(pp, "preprocessor_"+s)
  345. tp.AddOperator([]api.Emitter{sources[i]}, preprocessorOp)
  346. inputs = append(inputs, preprocessorOp)
  347. }
  348. }
  349. dimensions := selectStmt.Dimensions
  350. var w *xsql.Window
  351. if dimensions != nil {
  352. w = dimensions.GetWindow()
  353. if w != nil {
  354. wop, err := operators.NewWindowOp("window", w, isEventTime, lateTol, streamsFromStmt)
  355. if err != nil {
  356. return nil, nil, err
  357. }
  358. tp.AddOperator(inputs, wop)
  359. inputs = []api.Emitter{wop}
  360. }
  361. }
  362. if w != nil && selectStmt.Joins != nil {
  363. joinOp := xstream.Transform(&plans.JoinPlan{Joins: selectStmt.Joins, From: selectStmt.Sources[0].(*xsql.Table)}, "join")
  364. //TODO concurrency setting by command
  365. //joinOp.SetConcurrency(3)
  366. tp.AddOperator(inputs, joinOp)
  367. inputs = []api.Emitter{joinOp}
  368. }
  369. if selectStmt.Condition != nil {
  370. filterOp := xstream.Transform(&plans.FilterPlan{Condition: selectStmt.Condition}, "filter")
  371. //TODO concurrency setting by command
  372. // filterOp.SetConcurrency(3)
  373. tp.AddOperator(inputs, filterOp)
  374. inputs = []api.Emitter{filterOp}
  375. }
  376. var ds xsql.Dimensions
  377. if dimensions != nil {
  378. ds = dimensions.GetGroups()
  379. if ds != nil && len(ds) > 0 {
  380. aggregateOp := xstream.Transform(&plans.AggregatePlan{Dimensions: ds}, "aggregate")
  381. tp.AddOperator(inputs, aggregateOp)
  382. inputs = []api.Emitter{aggregateOp}
  383. }
  384. }
  385. if selectStmt.Having != nil {
  386. havingOp := xstream.Transform(&plans.HavingPlan{selectStmt.Having}, "having")
  387. tp.AddOperator(inputs, havingOp)
  388. inputs = []xstream.Emitter{havingOp}
  389. }
  390. if selectStmt.SortFields != nil {
  391. orderOp := xstream.Transform(&plans.OrderPlan{SortFields:selectStmt.SortFields}, "order")
  392. tp.AddOperator(inputs, orderOp)
  393. inputs = []api.Emitter{orderOp}
  394. }
  395. if selectStmt.Fields != nil {
  396. projectOp := xstream.Transform(&plans.ProjectPlan{Fields: selectStmt.Fields, IsAggregate: xsql.IsAggStatement(selectStmt)}, "project")
  397. tp.AddOperator(inputs, projectOp)
  398. inputs = []api.Emitter{projectOp}
  399. }
  400. return tp, inputs, nil
  401. }
  402. }
  403. }