xsql_processor.go 14 KB


  1. package processors
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/emqx/kuiper/common"
  7. "github.com/emqx/kuiper/common/kv"
  8. "github.com/emqx/kuiper/xsql"
  9. "github.com/emqx/kuiper/xstream"
  10. "github.com/emqx/kuiper/xstream/api"
  11. "github.com/emqx/kuiper/xstream/nodes"
  12. "github.com/emqx/kuiper/xstream/planner"
  13. "os"
  14. "path"
  15. "strings"
  16. )
  17. var (
  18. log = common.Log
  19. )
  20. type StreamProcessor struct {
  21. db kv.KeyValue
  22. }
  23. //@params d : the directory of the DB to save the stream info
  24. func NewStreamProcessor(d string) *StreamProcessor {
  25. processor := &StreamProcessor{
  26. db: kv.GetDefaultKVStore(d),
  27. }
  28. return processor
  29. }
  30. func (p *StreamProcessor) ExecStmt(statement string) (result []string, err error) {
  31. parser := xsql.NewParser(strings.NewReader(statement))
  32. stmt, err := xsql.Language.Parse(parser)
  33. if err != nil {
  34. return nil, err
  35. }
  36. switch s := stmt.(type) {
  37. case *xsql.StreamStmt: //Table is also StreamStmt
  38. var r string
  39. err = p.execSave(s, statement, false)
  40. stt := xsql.StreamTypeMap[s.StreamType]
  41. if err != nil {
  42. err = fmt.Errorf("Create %s fails: %v.", stt, err)
  43. } else {
  44. r = fmt.Sprintf("%s %s is created.", strings.Title(stt), s.Name)
  45. log.Printf("%s", r)
  46. }
  47. result = append(result, r)
  48. case *xsql.ShowStreamsStatement:
  49. result, err = p.execShow(xsql.TypeStream)
  50. case *xsql.ShowTablesStatement:
  51. result, err = p.execShow(xsql.TypeTable)
  52. case *xsql.DescribeStreamStatement:
  53. var r string
  54. r, err = p.execDescribe(s, xsql.TypeStream)
  55. result = append(result, r)
  56. case *xsql.DescribeTableStatement:
  57. var r string
  58. r, err = p.execDescribe(s, xsql.TypeTable)
  59. result = append(result, r)
  60. case *xsql.ExplainStreamStatement:
  61. var r string
  62. r, err = p.execExplain(s, xsql.TypeStream)
  63. result = append(result, r)
  64. case *xsql.ExplainTableStatement:
  65. var r string
  66. r, err = p.execExplain(s, xsql.TypeTable)
  67. result = append(result, r)
  68. case *xsql.DropStreamStatement:
  69. var r string
  70. r, err = p.execDrop(s, xsql.TypeStream)
  71. result = append(result, r)
  72. case *xsql.DropTableStatement:
  73. var r string
  74. r, err = p.execDrop(s, xsql.TypeTable)
  75. result = append(result, r)
  76. default:
  77. return nil, fmt.Errorf("Invalid stream statement: %s", statement)
  78. }
  79. return
  80. }
  81. func (p *StreamProcessor) execSave(stmt *xsql.StreamStmt, statement string, replace bool) error {
  82. err := p.db.Open()
  83. if err != nil {
  84. return fmt.Errorf("error when opening db: %v.", err)
  85. }
  86. defer p.db.Close()
  87. s, err := json.Marshal(xsql.StreamInfo{
  88. StreamType: stmt.StreamType,
  89. Statement: statement,
  90. })
  91. if err != nil {
  92. return fmt.Errorf("error when saving to db: %v.", err)
  93. }
  94. if replace {
  95. err = p.db.Set(string(stmt.Name), string(s))
  96. } else {
  97. err = p.db.Setnx(string(stmt.Name), string(s))
  98. }
  99. return err
  100. }
  101. func (p *StreamProcessor) ExecReplaceStream(statement string, st xsql.StreamType) (string, error) {
  102. parser := xsql.NewParser(strings.NewReader(statement))
  103. stmt, err := xsql.Language.Parse(parser)
  104. if err != nil {
  105. return "", err
  106. }
  107. stt := xsql.StreamTypeMap[st]
  108. switch s := stmt.(type) {
  109. case *xsql.StreamStmt:
  110. if s.StreamType != st {
  111. return "", common.NewErrorWithCode(common.NOT_FOUND, fmt.Sprintf("%s %s is not found", xsql.StreamTypeMap[st], s.Name))
  112. }
  113. err = p.execSave(s, statement, true)
  114. if err != nil {
  115. return "", fmt.Errorf("Replace %s fails: %v.", stt, err)
  116. } else {
  117. info := fmt.Sprintf("%s %s is replaced.", strings.Title(stt), s.Name)
  118. log.Printf("%s", info)
  119. return info, nil
  120. }
  121. default:
  122. return "", fmt.Errorf("Invalid %s statement: %s", stt, statement)
  123. }
  124. }
  125. func (p *StreamProcessor) ExecStreamSql(statement string) (string, error) {
  126. r, err := p.ExecStmt(statement)
  127. if err != nil {
  128. return "", err
  129. } else {
  130. return strings.Join(r, "\n"), err
  131. }
  132. }
  133. func (p *StreamProcessor) execShow(st xsql.StreamType) ([]string, error) {
  134. keys, err := p.ShowStream(st)
  135. if len(keys) == 0 {
  136. keys = append(keys, fmt.Sprintf("No %s definitions are found.", xsql.StreamTypeMap[st]))
  137. }
  138. return keys, err
  139. }
  140. func (p *StreamProcessor) ShowStream(st xsql.StreamType) ([]string, error) {
  141. stt := xsql.StreamTypeMap[st]
  142. err := p.db.Open()
  143. if err != nil {
  144. return nil, fmt.Errorf("Show %ss fails, error when opening db: %v.", stt, err)
  145. }
  146. defer p.db.Close()
  147. keys, err := p.db.Keys()
  148. if err != nil {
  149. return nil, fmt.Errorf("Show %ss fails, error when loading data from db: %v.", stt, err)
  150. }
  151. var (
  152. v string
  153. vs = &xsql.StreamInfo{}
  154. result = make([]string, 0)
  155. )
  156. for _, k := range keys {
  157. if ok, _ := p.db.Get(k, &v); ok {
  158. if err := json.Unmarshal([]byte(v), vs); err == nil && vs.StreamType == st {
  159. result = append(result, k)
  160. }
  161. }
  162. }
  163. return result, nil
  164. }
  165. func (p *StreamProcessor) getStream(name string, st xsql.StreamType) (string, error) {
  166. vs, err := xsql.GetDataSourceStatement(p.db, name)
  167. if vs != nil && vs.StreamType == st {
  168. return vs.Statement, nil
  169. }
  170. if err != nil {
  171. return "", err
  172. }
  173. return "", common.NewErrorWithCode(common.NOT_FOUND, fmt.Sprintf("%s %s is not found", xsql.StreamTypeMap[st], name))
  174. }
  175. func (p *StreamProcessor) execDescribe(stmt xsql.NameNode, st xsql.StreamType) (string, error) {
  176. streamStmt, err := p.DescStream(stmt.GetName(), st)
  177. if err != nil {
  178. return "", err
  179. }
  180. switch s := streamStmt.(type) {
  181. case *xsql.StreamStmt:
  182. var buff bytes.Buffer
  183. buff.WriteString("Fields\n--------------------------------------------------------------------------------\n")
  184. for _, f := range s.StreamFields {
  185. buff.WriteString(f.Name + "\t")
  186. buff.WriteString(xsql.PrintFieldType(f.FieldType))
  187. buff.WriteString("\n")
  188. }
  189. buff.WriteString("\n")
  190. printOptions(s.Options, &buff)
  191. return buff.String(), err
  192. default:
  193. return "%s", fmt.Errorf("Error resolving the %s %s, the data in db may be corrupted.", xsql.StreamTypeMap[st], stmt.GetName())
  194. }
  195. }
  196. func printOptions(opts *xsql.Options, buff *bytes.Buffer) {
  197. if opts.CONF_KEY != "" {
  198. buff.WriteString(fmt.Sprintf("CONF_KEY: %s\n", opts.CONF_KEY))
  199. }
  200. if opts.DATASOURCE != "" {
  201. buff.WriteString(fmt.Sprintf("DATASOURCE: %s\n", opts.DATASOURCE))
  202. }
  203. if opts.FORMAT != "" {
  204. buff.WriteString(fmt.Sprintf("FORMAT: %s\n", opts.FORMAT))
  205. }
  206. if opts.KEY != "" {
  207. buff.WriteString(fmt.Sprintf("KEY: %s\n", opts.KEY))
  208. }
  209. if opts.RETAIN_SIZE != 0 {
  210. buff.WriteString(fmt.Sprintf("RETAIN_SIZE: %d\n", opts.RETAIN_SIZE))
  211. }
  212. if opts.STRICT_VALIDATION {
  213. buff.WriteString(fmt.Sprintf("STRICT_VALIDATION: %v\n", opts.STRICT_VALIDATION))
  214. }
  215. if opts.TIMESTAMP != "" {
  216. buff.WriteString(fmt.Sprintf("TIMESTAMP: %s\n", opts.TIMESTAMP))
  217. }
  218. if opts.TIMESTAMP_FORMAT != "" {
  219. buff.WriteString(fmt.Sprintf("TIMESTAMP_FORMAT: %s\n", opts.TIMESTAMP_FORMAT))
  220. }
  221. if opts.TYPE != "" {
  222. buff.WriteString(fmt.Sprintf("TYPE: %s\n", opts.TYPE))
  223. }
  224. }
  225. func (p *StreamProcessor) DescStream(name string, st xsql.StreamType) (xsql.Statement, error) {
  226. statement, err := p.getStream(name, st)
  227. if err != nil {
  228. return nil, fmt.Errorf("Describe %s fails, %s.", xsql.StreamTypeMap[st], err)
  229. }
  230. parser := xsql.NewParser(strings.NewReader(statement))
  231. stream, err := xsql.Language.Parse(parser)
  232. if err != nil {
  233. return nil, err
  234. }
  235. return stream, nil
  236. }
  237. func (p *StreamProcessor) execExplain(stmt xsql.NameNode, st xsql.StreamType) (string, error) {
  238. _, err := p.getStream(stmt.GetName(), st)
  239. if err != nil {
  240. return "", fmt.Errorf("Explain %s fails, %s.", xsql.StreamTypeMap[st], err)
  241. }
  242. return "TO BE SUPPORTED", nil
  243. }
  244. func (p *StreamProcessor) execDrop(stmt xsql.NameNode, st xsql.StreamType) (string, error) {
  245. s, err := p.DropStream(stmt.GetName(), st)
  246. if err != nil {
  247. return s, fmt.Errorf("Drop %s fails: %s.", xsql.StreamTypeMap[st], err)
  248. }
  249. return s, nil
  250. }
  251. func (p *StreamProcessor) DropStream(name string, st xsql.StreamType) (string, error) {
  252. defer p.db.Close()
  253. _, err := p.getStream(name, st)
  254. if err != nil {
  255. return "", err
  256. }
  257. err = p.db.Open()
  258. if err != nil {
  259. return "", fmt.Errorf("error when opening db: %v", err)
  260. }
  261. defer p.db.Close()
  262. err = p.db.Delete(name)
  263. if err != nil {
  264. return "", err
  265. } else {
  266. return fmt.Sprintf("%s %s is dropped.", strings.Title(xsql.StreamTypeMap[st]), name), nil
  267. }
  268. }
  269. type RuleProcessor struct {
  270. db kv.KeyValue
  271. rootDbDir string
  272. }
  273. func NewRuleProcessor(d string) *RuleProcessor {
  274. processor := &RuleProcessor{
  275. db: kv.GetDefaultKVStore(path.Join(d, "rule")),
  276. rootDbDir: d,
  277. }
  278. return processor
  279. }
  280. func (p *RuleProcessor) ExecCreate(name, ruleJson string) (*api.Rule, error) {
  281. rule, err := p.getRuleByJson(name, ruleJson)
  282. if err != nil {
  283. return nil, err
  284. }
  285. err = p.db.Open()
  286. if err != nil {
  287. return nil, err
  288. }
  289. defer p.db.Close()
  290. err = p.db.Setnx(rule.Id, ruleJson)
  291. if err != nil {
  292. return nil, err
  293. } else {
  294. log.Infof("Rule %s is created.", rule.Id)
  295. }
  296. return rule, nil
  297. }
  298. func (p *RuleProcessor) ExecUpdate(name, ruleJson string) (*api.Rule, error) {
  299. rule, err := p.getRuleByJson(name, ruleJson)
  300. if err != nil {
  301. return nil, err
  302. }
  303. err = p.db.Open()
  304. if err != nil {
  305. return nil, err
  306. }
  307. defer p.db.Close()
  308. err = p.db.Set(rule.Id, ruleJson)
  309. if err != nil {
  310. return nil, err
  311. } else {
  312. log.Infof("Rule %s is update.", rule.Id)
  313. }
  314. return rule, nil
  315. }
  316. func (p *RuleProcessor) ExecReplaceRuleState(name string, triggered bool) (err error) {
  317. rule, err := p.GetRuleByName(name)
  318. if err != nil {
  319. return err
  320. }
  321. rule.Triggered = triggered
  322. ruleJson, err := json.Marshal(rule)
  323. if err != nil {
  324. return fmt.Errorf("Marshal rule %s error : %s.", name, err)
  325. }
  326. err = p.db.Open()
  327. if err != nil {
  328. return err
  329. }
  330. defer p.db.Close()
  331. err = p.db.Set(name, string(ruleJson))
  332. if err != nil {
  333. return err
  334. } else {
  335. log.Infof("Rule %s is replaced.", name)
  336. }
  337. return err
  338. }
  339. func (p *RuleProcessor) GetRuleByName(name string) (*api.Rule, error) {
  340. err := p.db.Open()
  341. if err != nil {
  342. return nil, err
  343. }
  344. defer p.db.Close()
  345. var s1 string
  346. f, _ := p.db.Get(name, &s1)
  347. if !f {
  348. return nil, common.NewErrorWithCode(common.NOT_FOUND, fmt.Sprintf("Rule %s is not found.", name))
  349. }
  350. return p.getRuleByJson(name, s1)
  351. }
  352. func (p *RuleProcessor) getDefaultRule(name, sql string) *api.Rule {
  353. return &api.Rule{
  354. Id: name,
  355. Sql: sql,
  356. Options: &api.RuleOption{
  357. IsEventTime: false,
  358. LateTol: 1000,
  359. Concurrency: 1,
  360. BufferLength: 1024,
  361. SendMetaToSink: false,
  362. SendError: true,
  363. Qos: api.AtMostOnce,
  364. CheckpointInterval: 300000,
  365. },
  366. }
  367. }
  368. func (p *RuleProcessor) getRuleByJson(name, ruleJson string) (*api.Rule, error) {
  369. opt := common.Config.Rule
  370. //set default rule options
  371. rule := &api.Rule{
  372. Options: &opt,
  373. }
  374. if err := json.Unmarshal([]byte(ruleJson), &rule); err != nil {
  375. return nil, fmt.Errorf("Parse rule %s error : %s.", ruleJson, err)
  376. }
  377. //validation
  378. if rule.Id == "" && name == "" {
  379. return nil, fmt.Errorf("Missing rule id.")
  380. }
  381. if name != "" && rule.Id != "" && name != rule.Id {
  382. return nil, fmt.Errorf("Name is not consistent with rule id.")
  383. }
  384. if rule.Id == "" {
  385. rule.Id = name
  386. }
  387. if rule.Sql == "" {
  388. return nil, fmt.Errorf("Missing rule SQL.")
  389. }
  390. if _, err := xsql.GetStatementFromSql(rule.Sql); err != nil {
  391. return nil, err
  392. }
  393. if rule.Actions == nil || len(rule.Actions) == 0 {
  394. return nil, fmt.Errorf("Missing rule actions.")
  395. }
  396. if rule.Options == nil {
  397. rule.Options = &api.RuleOption{}
  398. }
  399. //Setnx default options
  400. if rule.Options.CheckpointInterval < 0 {
  401. return nil, fmt.Errorf("rule option checkpointInterval %d is invalid, require a positive integer", rule.Options.CheckpointInterval)
  402. }
  403. if rule.Options.Concurrency < 0 {
  404. return nil, fmt.Errorf("rule option concurrency %d is invalid, require a positive integer", rule.Options.Concurrency)
  405. }
  406. if rule.Options.BufferLength < 0 {
  407. return nil, fmt.Errorf("rule option bufferLength %d is invalid, require a positive integer", rule.Options.BufferLength)
  408. }
  409. if rule.Options.LateTol < 0 {
  410. return nil, fmt.Errorf("rule option lateTolerance %d is invalid, require a positive integer", rule.Options.LateTol)
  411. }
  412. return rule, nil
  413. }
  414. func (p *RuleProcessor) ExecQuery(ruleid, sql string) (*xstream.TopologyNew, error) {
  415. if tp, err := planner.PlanWithSourcesAndSinks(p.getDefaultRule(ruleid, sql), p.rootDbDir, nil, []*nodes.SinkNode{nodes.NewSinkNode("sink_memory_log", "logToMemory", nil)}); err != nil {
  416. return nil, err
  417. } else {
  418. go func() {
  419. select {
  420. case err := <-tp.Open():
  421. if err != nil {
  422. log.Infof("closing query for error: %v", err)
  423. tp.GetContext().SetError(err)
  424. tp.Cancel()
  425. } else {
  426. log.Info("closing query")
  427. }
  428. }
  429. }()
  430. return tp, nil
  431. }
  432. }
  433. func (p *RuleProcessor) ExecDesc(name string) (string, error) {
  434. err := p.db.Open()
  435. if err != nil {
  436. return "", err
  437. }
  438. defer p.db.Close()
  439. var s1 string
  440. f, _ := p.db.Get(name, &s1)
  441. if !f {
  442. return "", fmt.Errorf("Rule %s is not found.", name)
  443. }
  444. dst := &bytes.Buffer{}
  445. if err := json.Indent(dst, []byte(s1), "", " "); err != nil {
  446. return "", err
  447. }
  448. return fmt.Sprintln(dst.String()), nil
  449. }
  450. func (p *RuleProcessor) GetAllRules() ([]string, error) {
  451. err := p.db.Open()
  452. if err != nil {
  453. return nil, err
  454. }
  455. defer p.db.Close()
  456. return p.db.Keys()
  457. }
  458. func (p *RuleProcessor) ExecDrop(name string) (string, error) {
  459. err := p.db.Open()
  460. if err != nil {
  461. return "", err
  462. }
  463. defer p.db.Close()
  464. result := fmt.Sprintf("Rule %s is dropped.", name)
  465. var ruleJson string
  466. if ok, _ := p.db.Get(name, &ruleJson); ok {
  467. rule, err := p.getRuleByJson(name, ruleJson)
  468. if err != nil {
  469. return "", err
  470. }
  471. if err := cleanSinkCache(rule); err != nil {
  472. result = fmt.Sprintf("%s. Clean sink cache faile: %s.", result, err)
  473. }
  474. if err := cleanCheckpoint(name); err != nil {
  475. result = fmt.Sprintf("%s. Clean checkpoint cache faile: %s.", result, err)
  476. }
  477. }
  478. err = p.db.Delete(name)
  479. if err != nil {
  480. return "", err
  481. } else {
  482. return result, nil
  483. }
  484. }
  485. func cleanCheckpoint(name string) error {
  486. dbDir, _ := common.GetDataLoc()
  487. c := path.Join(dbDir, "checkpoints", name)
  488. return os.RemoveAll(c)
  489. }
  490. func cleanSinkCache(rule *api.Rule) error {
  491. dbDir, err := common.GetDataLoc()
  492. if err != nil {
  493. return err
  494. }
  495. store := kv.GetDefaultKVStore(path.Join(dbDir, "sink"))
  496. err = store.Open()
  497. if err != nil {
  498. return err
  499. }
  500. defer store.Close()
  501. for d, m := range rule.Actions {
  502. con := 1
  503. for name, action := range m {
  504. props, _ := action.(map[string]interface{})
  505. if c, ok := props["concurrency"]; ok {
  506. if t, err := common.ToInt(c, common.STRICT); err == nil && t > 0 {
  507. con = t
  508. }
  509. }
  510. for i := 0; i < con; i++ {
  511. key := fmt.Sprintf("%s%s_%d%d", rule.Id, name, d, i)
  512. common.Log.Debugf("delete cache key %s", key)
  513. store.Delete(key)
  514. }
  515. }
  516. }
  517. return nil
  518. }