xsql_processor.go 14 KB


  1. package processors
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "engine/common"
  6. "engine/xsql"
  7. "engine/xsql/plans"
  8. "engine/xstream"
  9. "engine/xstream/api"
  10. "engine/xstream/extensions"
  11. "engine/xstream/nodes"
  12. "engine/xstream/operators"
  13. "engine/xstream/sinks"
  14. "fmt"
  15. "github.com/dgraph-io/badger"
  16. "github.com/go-yaml/yaml"
  17. "path"
  18. "plugin"
  19. "strings"
  20. )
  21. var log = common.Log
  22. type StreamProcessor struct {
  23. statement string
  24. badgerDir string
  25. }
  26. //@params s : the sql string of create stream statement
  27. //@params d : the directory of the badger DB to save the stream info
  28. func NewStreamProcessor(s, d string) *StreamProcessor {
  29. processor := &StreamProcessor{
  30. statement: s,
  31. badgerDir: d,
  32. }
  33. return processor
  34. }
  35. func (p *StreamProcessor) Exec() (result []string, err error) {
  36. parser := xsql.NewParser(strings.NewReader(p.statement))
  37. stmt, err := xsql.Language.Parse(parser)
  38. if err != nil {
  39. return
  40. }
  41. db, err := common.DbOpen(p.badgerDir)
  42. if err != nil {
  43. return
  44. }
  45. defer common.DbClose(db)
  46. switch s := stmt.(type) {
  47. case *xsql.StreamStmt:
  48. var r string
  49. r, err = p.execCreateStream(s, db)
  50. result = append(result, r)
  51. case *xsql.ShowStreamsStatement:
  52. result, err = p.execShowStream(s, db)
  53. case *xsql.DescribeStreamStatement:
  54. var r string
  55. r, err = p.execDescribeStream(s, db)
  56. result = append(result, r)
  57. case *xsql.ExplainStreamStatement:
  58. var r string
  59. r, err = p.execExplainStream(s, db)
  60. result = append(result, r)
  61. case *xsql.DropStreamStatement:
  62. var r string
  63. r, err = p.execDropStream(s, db)
  64. result = append(result, r)
  65. }
  66. return
  67. }
  68. func (p *StreamProcessor) execCreateStream(stmt *xsql.StreamStmt, db *badger.DB) (string, error) {
  69. err := common.DbSet(db, string(stmt.Name), p.statement)
  70. if err != nil {
  71. return "", err
  72. }else{
  73. return fmt.Sprintf("Stream %s is created.", stmt.Name), nil
  74. }
  75. }
  76. func (p *StreamProcessor) execShowStream(stmt *xsql.ShowStreamsStatement, db *badger.DB) ([]string,error) {
  77. keys, err := common.DbKeys(db)
  78. if len(keys) == 0 {
  79. keys = append(keys, "No stream definitions are found.")
  80. }
  81. return keys, err
  82. }
  83. func (p *StreamProcessor) execDescribeStream(stmt *xsql.DescribeStreamStatement, db *badger.DB) (string,error) {
  84. s, err := common.DbGet(db, string(stmt.Name))
  85. if err != nil {
  86. return "", fmt.Errorf("Stream %s is not found.", stmt.Name)
  87. }
  88. parser := xsql.NewParser(strings.NewReader(s))
  89. stream, err := xsql.Language.Parse(parser)
  90. streamStmt, ok := stream.(*xsql.StreamStmt)
  91. if !ok{
  92. return "", fmt.Errorf("Error resolving the stream %s, the data in db may be corrupted.", stmt.Name)
  93. }
  94. var buff bytes.Buffer
  95. buff.WriteString("Fields\n--------------------------------------------------------------------------------\n")
  96. for _, f := range streamStmt.StreamFields {
  97. buff.WriteString(f.Name + "\t")
  98. xsql.PrintFieldType(f.FieldType, &buff)
  99. buff.WriteString("\n")
  100. }
  101. buff.WriteString("\n")
  102. common.PrintMap(streamStmt.Options, &buff)
  103. return buff.String(), err
  104. }
  105. func (p *StreamProcessor) execExplainStream(stmt *xsql.ExplainStreamStatement, db *badger.DB) (string,error) {
  106. _, err := common.DbGet(db, string(stmt.Name))
  107. if err != nil{
  108. return "", fmt.Errorf("Stream %s is not found.", stmt.Name)
  109. }
  110. return "TO BE SUPPORTED", nil
  111. }
  112. func (p *StreamProcessor) execDropStream(stmt *xsql.DropStreamStatement, db *badger.DB) (string, error) {
  113. err := common.DbDelete(db, string(stmt.Name))
  114. if err != nil {
  115. return "", err
  116. }else{
  117. return fmt.Sprintf("Stream %s is dropped.", stmt.Name), nil
  118. }
  119. }
  120. func GetStream(db *badger.DB, name string) (stmt *xsql.StreamStmt, err error){
  121. s, err := common.DbGet(db, name)
  122. if err != nil {
  123. return
  124. }
  125. parser := xsql.NewParser(strings.NewReader(s))
  126. stream, err := xsql.Language.Parse(parser)
  127. stmt, ok := stream.(*xsql.StreamStmt)
  128. if !ok{
  129. err = fmt.Errorf("Error resolving the stream %s, the data in db may be corrupted.", name)
  130. }
  131. return
  132. }
  133. type RuleProcessor struct {
  134. badgerDir string
  135. }
  136. func NewRuleProcessor(d string) *RuleProcessor {
  137. processor := &RuleProcessor{
  138. badgerDir: d,
  139. }
  140. return processor
  141. }
  142. func (p *RuleProcessor) ExecCreate(name, ruleJson string) (*api.Rule, error) {
  143. rule, err := p.getRuleByJson(name, ruleJson)
  144. if err != nil {
  145. return nil, err
  146. }
  147. db, err := common.DbOpen(path.Join(p.badgerDir, "rule"))
  148. if err != nil {
  149. return nil, err
  150. }
  151. err = common.DbSet(db, string(name), ruleJson)
  152. if err != nil {
  153. common.DbClose(db)
  154. return nil, err
  155. }else{
  156. log.Infof("Rule %s is created.", name)
  157. common.DbClose(db)
  158. }
  159. return rule, nil
  160. }
  161. func (p *RuleProcessor) GetRuleByName(name string) (*api.Rule, error) {
  162. db, err := common.DbOpen(path.Join(p.badgerDir, "rule"))
  163. if err != nil {
  164. return nil, err
  165. }
  166. defer common.DbClose(db)
  167. s, err := common.DbGet(db, string(name))
  168. if err != nil {
  169. return nil, fmt.Errorf("Rule %s is not found.", name)
  170. }
  171. return p.getRuleByJson(name, s)
  172. }
  173. func (p *RuleProcessor) getRuleByJson(name, ruleJson string) (*api.Rule, error) {
  174. var rule api.Rule
  175. if err := json.Unmarshal([]byte(ruleJson), &rule); err != nil {
  176. return nil, fmt.Errorf("Parse rule %s error : %s.", ruleJson, err)
  177. }
  178. rule.Id = name
  179. //validation
  180. if name == ""{
  181. return nil, fmt.Errorf("Missing rule id.")
  182. }
  183. if rule.Sql == ""{
  184. return nil, fmt.Errorf("Missing rule SQL.")
  185. }
  186. if rule.Actions == nil || len(rule.Actions) == 0{
  187. return nil, fmt.Errorf("Missing rule actions.")
  188. }
  189. return &rule, nil
  190. }
  191. func (p *RuleProcessor) ExecInitRule(rule *api.Rule) (*xstream.TopologyNew, error) {
  192. if tp, inputs, err := p.createTopo(rule); err != nil {
  193. return nil, err
  194. }else{
  195. for _, m := range rule.Actions {
  196. for name, action := range m {
  197. if s, err := getSink(name, action); err != nil{
  198. return nil, err
  199. }else{
  200. tp.AddSink(inputs, nodes.NewSinkNode("sink_" + name, s))
  201. }
  202. }
  203. }
  204. return tp, nil
  205. }
  206. }
  207. func (p *RuleProcessor) ExecQuery(ruleid, sql string) (*xstream.TopologyNew, error) {
  208. if tp, inputs, err := p.createTopo(&api.Rule{Id: ruleid, Sql: sql}); err != nil {
  209. return nil, err
  210. } else {
  211. tp.AddSink(inputs, nodes.NewSinkNode("sink_memory_log", sinks.NewLogSinkToMemory()))
  212. go func() {
  213. select {
  214. case err := <-tp.Open():
  215. log.Println(err)
  216. tp.Cancel()
  217. }
  218. }()
  219. return tp, nil
  220. }
  221. }
  222. func (p *RuleProcessor) ExecDesc(name string) (string, error) {
  223. db, err := common.DbOpen(path.Join(p.badgerDir, "rule"))
  224. if err != nil {
  225. return "", err
  226. }
  227. defer common.DbClose(db)
  228. s, err := common.DbGet(db, string(name))
  229. if err != nil {
  230. return "", fmt.Errorf("Rule %s is not found.", name)
  231. }
  232. dst := &bytes.Buffer{}
  233. if err := json.Indent(dst, []byte(s), "", " "); err != nil {
  234. return "", err
  235. }
  236. return fmt.Sprintln(dst.String()), nil
  237. }
  238. func (p *RuleProcessor) ExecShow() (string, error) {
  239. keys, err := p.GetAllRules()
  240. if err != nil{
  241. return "", err
  242. }
  243. if len(keys) == 0 {
  244. keys = append(keys, "No rule definitions are found.")
  245. }
  246. var result string
  247. for _, c := range keys{
  248. result = result + fmt.Sprintln(c)
  249. }
  250. return result, nil
  251. }
  252. func (p *RuleProcessor) GetAllRules() ([]string, error) {
  253. db, err := common.DbOpen(path.Join(p.badgerDir, "rule"))
  254. if err != nil {
  255. return nil, err
  256. }
  257. defer common.DbClose(db)
  258. return common.DbKeys(db)
  259. }
  260. func (p *RuleProcessor) ExecDrop(name string) (string, error) {
  261. db, err := common.DbOpen(path.Join(p.badgerDir, "rule"))
  262. if err != nil {
  263. return "", err
  264. }
  265. defer common.DbClose(db)
  266. err = common.DbDelete(db, string(name))
  267. if err != nil {
  268. return "", err
  269. }else{
  270. return fmt.Sprintf("Rule %s is dropped.", name), nil
  271. }
  272. }
  273. func (p *RuleProcessor) createTopo(rule *api.Rule) (*xstream.TopologyNew, []api.Emitter, error) {
  274. return p.createTopoWithSources(rule, nil)
  275. }
  276. //For test to mock source
  277. func (p *RuleProcessor) createTopoWithSources(rule *api.Rule, sources []*nodes.SourceNode) (*xstream.TopologyNew, []api.Emitter, error){
  278. name := rule.Id
  279. sql := rule.Sql
  280. var isEventTime bool
  281. var lateTol int64
  282. if iet, ok := rule.Options["isEventTime"]; ok{
  283. isEventTime, ok = iet.(bool)
  284. if !ok{
  285. return nil, nil, fmt.Errorf("Invalid rule option isEventTime %v, bool type is required.", iet)
  286. }
  287. }
  288. if isEventTime {
  289. if l, ok := rule.Options["lateTolerance"]; ok{
  290. if fl, ok := l.(float64); ok{
  291. lateTol = int64(fl)
  292. }else{
  293. return nil, nil, fmt.Errorf("Invalid rule option lateTolerance %v, int type is required.", l)
  294. }
  295. }
  296. }
  297. shouldCreateSource := sources == nil
  298. parser := xsql.NewParser(strings.NewReader(sql))
  299. if stmt, err := xsql.Language.Parse(parser); err != nil{
  300. return nil, nil, fmt.Errorf("Parse SQL %s error: %s.", sql , err)
  301. }else {
  302. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  303. return nil, nil, fmt.Errorf("SQL %s is not a select statement.", sql)
  304. } else {
  305. tp := xstream.NewWithName(name)
  306. var inputs []api.Emitter
  307. streamsFromStmt := xsql.GetStreams(selectStmt)
  308. if !shouldCreateSource && len(streamsFromStmt) != len(sources){
  309. return nil, nil, fmt.Errorf("Invalid parameter sources or streams, the length cannot match the statement, expect %d sources.", len(streamsFromStmt))
  310. }
  311. db, err := common.DbOpen(path.Join(p.badgerDir, "stream"))
  312. if err != nil {
  313. return nil, nil, err
  314. }
  315. defer common.DbClose(db)
  316. for i, s := range streamsFromStmt {
  317. streamStmt, err := GetStream(db, s)
  318. if err != nil {
  319. return nil, nil, fmt.Errorf("fail to get stream %s, please check if stream is created", s)
  320. }
  321. pp, err := plans.NewPreprocessor(streamStmt, selectStmt.Fields, isEventTime)
  322. if err != nil{
  323. return nil, nil, err
  324. }
  325. if shouldCreateSource{
  326. src, err := getSource(streamStmt)
  327. if err != nil {
  328. return nil, nil, fmt.Errorf("fail to get source: %v", err)
  329. }
  330. node := nodes.NewSourceNode(s, src)
  331. tp.AddSrc(node)
  332. preprocessorOp := xstream.Transform(pp, "preprocessor_"+s)
  333. tp.AddOperator([]api.Emitter{node}, preprocessorOp)
  334. inputs = append(inputs, preprocessorOp)
  335. } else {
  336. tp.AddSrc(sources[i])
  337. preprocessorOp := xstream.Transform(pp, "preprocessor_"+s)
  338. tp.AddOperator([]api.Emitter{sources[i]}, preprocessorOp)
  339. inputs = append(inputs, preprocessorOp)
  340. }
  341. }
  342. dimensions := selectStmt.Dimensions
  343. var w *xsql.Window
  344. if dimensions != nil {
  345. w = dimensions.GetWindow()
  346. if w != nil {
  347. wop, err := operators.NewWindowOp("window", w, isEventTime, lateTol, streamsFromStmt)
  348. if err != nil {
  349. return nil, nil, err
  350. }
  351. tp.AddOperator(inputs, wop)
  352. inputs = []api.Emitter{wop}
  353. }
  354. }
  355. if w != nil && selectStmt.Joins != nil {
  356. joinOp := xstream.Transform(&plans.JoinPlan{Joins: selectStmt.Joins, From: selectStmt.Sources[0].(*xsql.Table)}, "join")
  357. //TODO concurrency setting by command
  358. //joinOp.SetConcurrency(3)
  359. tp.AddOperator(inputs, joinOp)
  360. inputs = []api.Emitter{joinOp}
  361. }
  362. if selectStmt.Condition != nil {
  363. filterOp := xstream.Transform(&plans.FilterPlan{Condition: selectStmt.Condition}, "filter")
  364. //TODO concurrency setting by command
  365. // filterOp.SetConcurrency(3)
  366. tp.AddOperator(inputs, filterOp)
  367. inputs = []api.Emitter{filterOp}
  368. }
  369. var ds xsql.Dimensions
  370. if dimensions != nil {
  371. ds = dimensions.GetGroups()
  372. if ds != nil && len(ds) > 0 {
  373. aggregateOp := xstream.Transform(&plans.AggregatePlan{Dimensions: ds}, "aggregate")
  374. tp.AddOperator(inputs, aggregateOp)
  375. inputs = []api.Emitter{aggregateOp}
  376. }
  377. }
  378. if selectStmt.Having != nil {
  379. havingOp := xstream.Transform(&plans.HavingPlan{selectStmt.Having}, "having")
  380. tp.AddOperator(inputs, havingOp)
  381. inputs = []api.Emitter{havingOp}
  382. }
  383. if selectStmt.SortFields != nil {
  384. orderOp := xstream.Transform(&plans.OrderPlan{SortFields: selectStmt.SortFields}, "order")
  385. tp.AddOperator(inputs, orderOp)
  386. inputs = []api.Emitter{orderOp}
  387. }
  388. if selectStmt.Fields != nil {
  389. projectOp := xstream.Transform(&plans.ProjectPlan{Fields: selectStmt.Fields, IsAggregate: xsql.IsAggStatement(selectStmt)}, "project")
  390. tp.AddOperator(inputs, projectOp)
  391. inputs = []api.Emitter{projectOp}
  392. }
  393. return tp, inputs, nil
  394. }
  395. }
  396. }
  397. func getSource(streamStmt *xsql.StreamStmt) (api.Source, error) {
  398. t, ok := streamStmt.Options["TYPE"]
  399. if !ok{
  400. t = "mqtt"
  401. }
  402. switch t {
  403. case "mqtt":
  404. mqs, err := extensions.NewMQTTSource(streamStmt.Options["DATASOURCE"], streamStmt.Options["CONF_KEY"])
  405. if err != nil {
  406. return nil, err
  407. }
  408. log.Tracef("Source mqtt created")
  409. return mqs, nil
  410. default:
  411. nf, err := getPlugin(t, "sources")
  412. if err != nil {
  413. return nil, err
  414. }
  415. s, ok := nf.(api.Source)
  416. if !ok {
  417. return nil, fmt.Errorf("exported symbol %s is not type of api.Source", t)
  418. }
  419. props := getConf(t, streamStmt.Options["CONF_KEY"])
  420. s.Configure(streamStmt.Options["DATASOURCE"], props)
  421. log.Tracef("Source %s created", t)
  422. return s, nil
  423. }
  424. }
  425. func getConf(t string, confkey string) map[string]interface{} {
  426. conf, err := common.LoadConf(t + ".yaml")
  427. props := make(map[string]interface{})
  428. if err == nil {
  429. cfg := make(map[string]map[string]interface{})
  430. if err := yaml.Unmarshal(conf, &cfg); err != nil {
  431. log.Warnf("fail to parse yaml for source %s. Return an empty configuration", t)
  432. } else {
  433. var ok bool
  434. props, ok = cfg[confkey]
  435. if !ok {
  436. log.Warnf("conf for conf_key %s not found, use default conf instead", confkey)
  437. props = cfg["default"]
  438. }
  439. }
  440. } else {
  441. log.Warnf("config file %s.yaml is not loaded properly. Return an empty configuration", t)
  442. }
  443. log.Debugf("get conf for %s with conf key %s: %v", t, confkey, props)
  444. return props
  445. }
  446. func getPlugin(t string, ptype string) (plugin.Symbol, error) {
  447. mod := "plugins/" + ptype + "/" + t + ".so"
  448. plug, err := plugin.Open(mod)
  449. if err != nil {
  450. return nil, fmt.Errorf("cannot open %s: %v", mod, err)
  451. }
  452. nf, err := plug.Lookup(t)
  453. if err != nil {
  454. return nil, fmt.Errorf("cannot find symbol %s, please check if it is exported", t)
  455. }
  456. return nf, nil
  457. }
  458. func getSink(name string, action interface{}) (api.Sink, error) {
  459. log.Tracef("trying to get sink %s with action %v", name, action)
  460. switch name {
  461. case "log":
  462. return sinks.NewLogSink(), nil
  463. case "mqtt":
  464. return sinks.NewMqttSink(action)
  465. default:
  466. nf, err := getPlugin(name, "sinks")
  467. if err != nil {
  468. return nil, err
  469. }
  470. s, ok := nf.(api.Sink)
  471. if !ok {
  472. return nil, fmt.Errorf("exported symbol %s is not type of api.Sink", name)
  473. }
  474. props := getConf(name, "default")
  475. s.Configure(props)
  476. log.Tracef("Sink %s created", name)
  477. return s, nil
  478. }
  479. }