xsql_processor.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532
  1. package processors
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/emqx/kuiper/common"
  7. "github.com/emqx/kuiper/common/kv"
  8. "github.com/emqx/kuiper/xsql"
  9. "github.com/emqx/kuiper/xstream"
  10. "github.com/emqx/kuiper/xstream/api"
  11. "github.com/emqx/kuiper/xstream/nodes"
  12. "github.com/emqx/kuiper/xstream/planner"
  13. "os"
  14. "path"
  15. "strings"
  16. )
  17. var (
  18. log = common.Log
  19. )
  20. type StreamProcessor struct {
  21. db kv.KeyValue
  22. }
  23. //@params d : the directory of the DB to save the stream info
  24. func NewStreamProcessor(d string) *StreamProcessor {
  25. processor := &StreamProcessor{
  26. db: kv.GetDefaultKVStore(d),
  27. }
  28. return processor
  29. }
  30. func (p *StreamProcessor) ExecStmt(statement string) (result []string, err error) {
  31. parser := xsql.NewParser(strings.NewReader(statement))
  32. stmt, err := xsql.Language.Parse(parser)
  33. if err != nil {
  34. return nil, err
  35. }
  36. switch s := stmt.(type) {
  37. case *xsql.StreamStmt: //Table is also StreamStmt
  38. var r string
  39. err = p.execSave(s, statement, false)
  40. stt := xsql.StreamTypeMap[s.StreamType]
  41. if err != nil {
  42. err = fmt.Errorf("Create %s fails: %v.", stt, err)
  43. } else {
  44. r = fmt.Sprintf("%s %s is created.", strings.Title(stt), s.Name)
  45. log.Printf("%s", r)
  46. }
  47. result = append(result, r)
  48. case *xsql.ShowStreamsStatement:
  49. result, err = p.execShow(xsql.TypeStream)
  50. case *xsql.ShowTablesStatement:
  51. result, err = p.execShow(xsql.TypeTable)
  52. case *xsql.DescribeStreamStatement:
  53. var r string
  54. r, err = p.execDescribe(s, xsql.TypeStream)
  55. result = append(result, r)
  56. case *xsql.DescribeTableStatement:
  57. var r string
  58. r, err = p.execDescribe(s, xsql.TypeTable)
  59. result = append(result, r)
  60. case *xsql.ExplainStreamStatement:
  61. var r string
  62. r, err = p.execExplain(s, xsql.TypeStream)
  63. result = append(result, r)
  64. case *xsql.ExplainTableStatement:
  65. var r string
  66. r, err = p.execExplain(s, xsql.TypeTable)
  67. result = append(result, r)
  68. case *xsql.DropStreamStatement:
  69. var r string
  70. r, err = p.execDrop(s, xsql.TypeStream)
  71. result = append(result, r)
  72. case *xsql.DropTableStatement:
  73. var r string
  74. r, err = p.execDrop(s, xsql.TypeTable)
  75. result = append(result, r)
  76. default:
  77. return nil, fmt.Errorf("Invalid stream statement: %s", statement)
  78. }
  79. return
  80. }
  81. func (p *StreamProcessor) execSave(stmt *xsql.StreamStmt, statement string, replace bool) error {
  82. err := p.db.Open()
  83. if err != nil {
  84. return fmt.Errorf("error when opening db: %v.", err)
  85. }
  86. defer p.db.Close()
  87. s, err := json.Marshal(xsql.StreamInfo{
  88. StreamType: stmt.StreamType,
  89. Statement: statement,
  90. })
  91. if err != nil {
  92. return fmt.Errorf("error when saving to db: %v.", err)
  93. }
  94. if replace {
  95. err = p.db.Set(string(stmt.Name), string(s))
  96. } else {
  97. err = p.db.Setnx(string(stmt.Name), string(s))
  98. }
  99. return err
  100. }
  101. func (p *StreamProcessor) ExecReplaceStream(statement string, st xsql.StreamType) (string, error) {
  102. parser := xsql.NewParser(strings.NewReader(statement))
  103. stmt, err := xsql.Language.Parse(parser)
  104. if err != nil {
  105. return "", err
  106. }
  107. stt := xsql.StreamTypeMap[st]
  108. switch s := stmt.(type) {
  109. case *xsql.StreamStmt:
  110. if s.StreamType != st {
  111. return "", common.NewErrorWithCode(common.NOT_FOUND, fmt.Sprintf("%s %s is not found", xsql.StreamTypeMap[st], s.Name))
  112. }
  113. err = p.execSave(s, statement, true)
  114. if err != nil {
  115. return "", fmt.Errorf("Replace %s fails: %v.", stt, err)
  116. } else {
  117. info := fmt.Sprintf("%s %s is replaced.", strings.Title(stt), s.Name)
  118. log.Printf("%s", info)
  119. return info, nil
  120. }
  121. default:
  122. return "", fmt.Errorf("Invalid %s statement: %s", stt, statement)
  123. }
  124. }
  125. func (p *StreamProcessor) ExecStreamSql(statement string) (string, error) {
  126. r, err := p.ExecStmt(statement)
  127. if err != nil {
  128. return "", err
  129. } else {
  130. return strings.Join(r, "\n"), err
  131. }
  132. }
  133. func (p *StreamProcessor) execShow(st xsql.StreamType) ([]string, error) {
  134. keys, err := p.ShowStream(st)
  135. if len(keys) == 0 {
  136. keys = append(keys, fmt.Sprintf("No %s definitions are found.", xsql.StreamTypeMap[st]))
  137. }
  138. return keys, err
  139. }
  140. func (p *StreamProcessor) ShowStream(st xsql.StreamType) ([]string, error) {
  141. stt := xsql.StreamTypeMap[st]
  142. err := p.db.Open()
  143. if err != nil {
  144. return nil, fmt.Errorf("Show %ss fails, error when opening db: %v.", stt, err)
  145. }
  146. defer p.db.Close()
  147. keys, err := p.db.Keys()
  148. if err != nil {
  149. return nil, fmt.Errorf("Show %ss fails, error when loading data from db: %v.", stt, err)
  150. }
  151. var (
  152. v string
  153. vs = &xsql.StreamInfo{}
  154. result = make([]string, 0)
  155. )
  156. for _, k := range keys {
  157. if ok, _ := p.db.Get(k, &v); ok {
  158. if err := json.Unmarshal([]byte(v), vs); err == nil && vs.StreamType == st {
  159. result = append(result, k)
  160. }
  161. }
  162. }
  163. return result, nil
  164. }
  165. func (p *StreamProcessor) getStream(name string, st xsql.StreamType) (string, error) {
  166. vs, err := xsql.GetDataSourceStatement(p.db, name)
  167. if vs != nil && vs.StreamType == st {
  168. return vs.Statement, nil
  169. }
  170. if err != nil {
  171. return "", err
  172. }
  173. return "", common.NewErrorWithCode(common.NOT_FOUND, fmt.Sprintf("%s %s is not found", xsql.StreamTypeMap[st], name))
  174. }
  175. func (p *StreamProcessor) execDescribe(stmt xsql.NameNode, st xsql.StreamType) (string, error) {
  176. streamStmt, err := p.DescStream(stmt.GetName(), st)
  177. if err != nil {
  178. return "", err
  179. }
  180. switch s := streamStmt.(type) {
  181. case *xsql.StreamStmt:
  182. var buff bytes.Buffer
  183. buff.WriteString("Fields\n--------------------------------------------------------------------------------\n")
  184. for _, f := range s.StreamFields {
  185. buff.WriteString(f.Name + "\t")
  186. buff.WriteString(xsql.PrintFieldType(f.FieldType))
  187. buff.WriteString("\n")
  188. }
  189. buff.WriteString("\n")
  190. common.PrintMap(s.Options, &buff)
  191. return buff.String(), err
  192. default:
  193. return "%s", fmt.Errorf("Error resolving the %s %s, the data in db may be corrupted.", xsql.StreamTypeMap[st], stmt.GetName())
  194. }
  195. }
  196. func (p *StreamProcessor) DescStream(name string, st xsql.StreamType) (xsql.Statement, error) {
  197. statement, err := p.getStream(name, st)
  198. if err != nil {
  199. return nil, fmt.Errorf("Describe %s fails, %s.", xsql.StreamTypeMap[st], err)
  200. }
  201. parser := xsql.NewParser(strings.NewReader(statement))
  202. stream, err := xsql.Language.Parse(parser)
  203. if err != nil {
  204. return nil, err
  205. }
  206. return stream, nil
  207. }
  208. func (p *StreamProcessor) execExplain(stmt xsql.NameNode, st xsql.StreamType) (string, error) {
  209. _, err := p.getStream(stmt.GetName(), st)
  210. if err != nil {
  211. return "", fmt.Errorf("Explain %s fails, %s.", xsql.StreamTypeMap[st], err)
  212. }
  213. return "TO BE SUPPORTED", nil
  214. }
  215. func (p *StreamProcessor) execDrop(stmt xsql.NameNode, st xsql.StreamType) (string, error) {
  216. s, err := p.DropStream(stmt.GetName(), st)
  217. if err != nil {
  218. return s, fmt.Errorf("Drop %s fails: %s.", xsql.StreamTypeMap[st], err)
  219. }
  220. return s, nil
  221. }
  222. func (p *StreamProcessor) DropStream(name string, st xsql.StreamType) (string, error) {
  223. defer p.db.Close()
  224. _, err := p.getStream(name, st)
  225. if err != nil {
  226. return "", err
  227. }
  228. err = p.db.Open()
  229. if err != nil {
  230. return "", fmt.Errorf("error when opening db: %v", err)
  231. }
  232. defer p.db.Close()
  233. err = p.db.Delete(name)
  234. if err != nil {
  235. return "", err
  236. } else {
  237. return fmt.Sprintf("%s %s is dropped.", strings.Title(xsql.StreamTypeMap[st]), name), nil
  238. }
  239. }
  240. type RuleProcessor struct {
  241. db kv.KeyValue
  242. rootDbDir string
  243. }
  244. func NewRuleProcessor(d string) *RuleProcessor {
  245. processor := &RuleProcessor{
  246. db: kv.GetDefaultKVStore(path.Join(d, "rule")),
  247. rootDbDir: d,
  248. }
  249. return processor
  250. }
  251. func (p *RuleProcessor) ExecCreate(name, ruleJson string) (*api.Rule, error) {
  252. rule, err := p.getRuleByJson(name, ruleJson)
  253. if err != nil {
  254. return nil, err
  255. }
  256. err = p.db.Open()
  257. if err != nil {
  258. return nil, err
  259. }
  260. defer p.db.Close()
  261. err = p.db.Setnx(rule.Id, ruleJson)
  262. if err != nil {
  263. return nil, err
  264. } else {
  265. log.Infof("Rule %s is created.", rule.Id)
  266. }
  267. return rule, nil
  268. }
  269. func (p *RuleProcessor) ExecUpdate(name, ruleJson string) (*api.Rule, error) {
  270. rule, err := p.getRuleByJson(name, ruleJson)
  271. if err != nil {
  272. return nil, err
  273. }
  274. err = p.db.Open()
  275. if err != nil {
  276. return nil, err
  277. }
  278. defer p.db.Close()
  279. err = p.db.Set(rule.Id, ruleJson)
  280. if err != nil {
  281. return nil, err
  282. } else {
  283. log.Infof("Rule %s is update.", rule.Id)
  284. }
  285. return rule, nil
  286. }
  287. func (p *RuleProcessor) ExecReplaceRuleState(name string, triggered bool) (err error) {
  288. rule, err := p.GetRuleByName(name)
  289. if err != nil {
  290. return err
  291. }
  292. rule.Triggered = triggered
  293. ruleJson, err := json.Marshal(rule)
  294. if err != nil {
  295. return fmt.Errorf("Marshal rule %s error : %s.", name, err)
  296. }
  297. err = p.db.Open()
  298. if err != nil {
  299. return err
  300. }
  301. defer p.db.Close()
  302. err = p.db.Set(name, string(ruleJson))
  303. if err != nil {
  304. return err
  305. } else {
  306. log.Infof("Rule %s is replaced.", name)
  307. }
  308. return err
  309. }
  310. func (p *RuleProcessor) GetRuleByName(name string) (*api.Rule, error) {
  311. err := p.db.Open()
  312. if err != nil {
  313. return nil, err
  314. }
  315. defer p.db.Close()
  316. var s1 string
  317. f, _ := p.db.Get(name, &s1)
  318. if !f {
  319. return nil, common.NewErrorWithCode(common.NOT_FOUND, fmt.Sprintf("Rule %s is not found.", name))
  320. }
  321. return p.getRuleByJson(name, s1)
  322. }
  323. func (p *RuleProcessor) getDefaultRule(name, sql string) *api.Rule {
  324. return &api.Rule{
  325. Id: name,
  326. Sql: sql,
  327. Options: &api.RuleOption{
  328. IsEventTime: false,
  329. LateTol: 1000,
  330. Concurrency: 1,
  331. BufferLength: 1024,
  332. SendMetaToSink: false,
  333. SendError: true,
  334. Qos: api.AtMostOnce,
  335. CheckpointInterval: 300000,
  336. },
  337. }
  338. }
  339. func (p *RuleProcessor) getRuleByJson(name, ruleJson string) (*api.Rule, error) {
  340. opt := common.Config.Rule
  341. //set default rule options
  342. rule := &api.Rule{
  343. Options: &opt,
  344. }
  345. if err := json.Unmarshal([]byte(ruleJson), &rule); err != nil {
  346. return nil, fmt.Errorf("Parse rule %s error : %s.", ruleJson, err)
  347. }
  348. //validation
  349. if rule.Id == "" && name == "" {
  350. return nil, fmt.Errorf("Missing rule id.")
  351. }
  352. if name != "" && rule.Id != "" && name != rule.Id {
  353. return nil, fmt.Errorf("Name is not consistent with rule id.")
  354. }
  355. if rule.Id == "" {
  356. rule.Id = name
  357. }
  358. if rule.Sql == "" {
  359. return nil, fmt.Errorf("Missing rule SQL.")
  360. }
  361. if _, err := xsql.GetStatementFromSql(rule.Sql); err != nil {
  362. return nil, err
  363. }
  364. if rule.Actions == nil || len(rule.Actions) == 0 {
  365. return nil, fmt.Errorf("Missing rule actions.")
  366. }
  367. if rule.Options == nil {
  368. rule.Options = &api.RuleOption{}
  369. }
  370. //Setnx default options
  371. if rule.Options.CheckpointInterval < 0 {
  372. return nil, fmt.Errorf("rule option checkpointInterval %d is invalid, require a positive integer", rule.Options.CheckpointInterval)
  373. }
  374. if rule.Options.Concurrency < 0 {
  375. return nil, fmt.Errorf("rule option concurrency %d is invalid, require a positive integer", rule.Options.Concurrency)
  376. }
  377. if rule.Options.BufferLength < 0 {
  378. return nil, fmt.Errorf("rule option bufferLength %d is invalid, require a positive integer", rule.Options.BufferLength)
  379. }
  380. if rule.Options.LateTol < 0 {
  381. return nil, fmt.Errorf("rule option lateTolerance %d is invalid, require a positive integer", rule.Options.LateTol)
  382. }
  383. return rule, nil
  384. }
  385. func (p *RuleProcessor) ExecQuery(ruleid, sql string) (*xstream.TopologyNew, error) {
  386. if tp, err := planner.PlanWithSourcesAndSinks(p.getDefaultRule(ruleid, sql), p.rootDbDir, nil, []*nodes.SinkNode{nodes.NewSinkNode("sink_memory_log", "logToMemory", nil)}); err != nil {
  387. return nil, err
  388. } else {
  389. go func() {
  390. select {
  391. case err := <-tp.Open():
  392. if err != nil {
  393. log.Infof("closing query for error: %v", err)
  394. tp.GetContext().SetError(err)
  395. tp.Cancel()
  396. } else {
  397. log.Info("closing query")
  398. }
  399. }
  400. }()
  401. return tp, nil
  402. }
  403. }
  404. func (p *RuleProcessor) ExecDesc(name string) (string, error) {
  405. err := p.db.Open()
  406. if err != nil {
  407. return "", err
  408. }
  409. defer p.db.Close()
  410. var s1 string
  411. f, _ := p.db.Get(name, &s1)
  412. if !f {
  413. return "", fmt.Errorf("Rule %s is not found.", name)
  414. }
  415. dst := &bytes.Buffer{}
  416. if err := json.Indent(dst, []byte(s1), "", " "); err != nil {
  417. return "", err
  418. }
  419. return fmt.Sprintln(dst.String()), nil
  420. }
  421. func (p *RuleProcessor) GetAllRules() ([]string, error) {
  422. err := p.db.Open()
  423. if err != nil {
  424. return nil, err
  425. }
  426. defer p.db.Close()
  427. return p.db.Keys()
  428. }
  429. func (p *RuleProcessor) ExecDrop(name string) (string, error) {
  430. err := p.db.Open()
  431. if err != nil {
  432. return "", err
  433. }
  434. defer p.db.Close()
  435. result := fmt.Sprintf("Rule %s is dropped.", name)
  436. var ruleJson string
  437. if ok, _ := p.db.Get(name, &ruleJson); ok {
  438. rule, err := p.getRuleByJson(name, ruleJson)
  439. if err != nil {
  440. return "", err
  441. }
  442. if err := cleanSinkCache(rule); err != nil {
  443. result = fmt.Sprintf("%s. Clean sink cache faile: %s.", result, err)
  444. }
  445. if err := cleanCheckpoint(name); err != nil {
  446. result = fmt.Sprintf("%s. Clean checkpoint cache faile: %s.", result, err)
  447. }
  448. }
  449. err = p.db.Delete(name)
  450. if err != nil {
  451. return "", err
  452. } else {
  453. return result, nil
  454. }
  455. }
  456. func cleanCheckpoint(name string) error {
  457. dbDir, _ := common.GetDataLoc()
  458. c := path.Join(dbDir, "checkpoints", name)
  459. return os.RemoveAll(c)
  460. }
  461. func cleanSinkCache(rule *api.Rule) error {
  462. dbDir, err := common.GetDataLoc()
  463. if err != nil {
  464. return err
  465. }
  466. store := kv.GetDefaultKVStore(path.Join(dbDir, "sink"))
  467. err = store.Open()
  468. if err != nil {
  469. return err
  470. }
  471. defer store.Close()
  472. for d, m := range rule.Actions {
  473. con := 1
  474. for name, action := range m {
  475. props, _ := action.(map[string]interface{})
  476. if c, ok := props["concurrency"]; ok {
  477. if t, err := common.ToInt(c); err == nil && t > 0 {
  478. con = t
  479. }
  480. }
  481. for i := 0; i < con; i++ {
  482. key := fmt.Sprintf("%s%s_%d%d", rule.Id, name, d, i)
  483. common.Log.Debugf("delete cache key %s", key)
  484. store.Delete(key)
  485. }
  486. }
  487. }
  488. return nil
  489. }