main.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658
  1. package main
  2. import (
  3. "bufio"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/go-yaml/yaml"
  7. "github.com/urfave/cli"
  8. "io/ioutil"
  9. "net/rpc"
  10. "os"
  11. "sort"
  12. "strings"
  13. "time"
  14. )
  15. type clientConf struct {
  16. Host string `yaml:"host"`
  17. Port int `yaml:"port"`
  18. }
  19. var clientYaml = "client.yaml"
  20. func streamProcess(client *rpc.Client, args string) {
  21. var reply string
  22. if args == "" {
  23. args = strings.Join(os.Args[1:], " ")
  24. }
  25. err := client.Call("Server.Stream", args, &reply)
  26. if err != nil {
  27. fmt.Println(err)
  28. } else {
  29. fmt.Println(reply)
  30. }
  31. }
  32. var (
  33. Version = "unknown"
  34. LoadFileType = "relative"
  35. )
  36. func main() {
  37. common.LoadFileType = LoadFileType
  38. app := cli.NewApp()
  39. app.Version = Version
  40. //nflag := []cli.Flag { cli.StringFlag{
  41. // Name: "name, n",
  42. // Usage: "the name of stream",
  43. // }}
  44. b, err := common.LoadConf(clientYaml)
  45. if err != nil {
  46. common.Log.Fatal(err)
  47. }
  48. var cfg map[string]clientConf
  49. var config *clientConf
  50. if err := yaml.Unmarshal(b, &cfg); err != nil {
  51. fmt.Printf("Failed to load config file with error %s.\n", err)
  52. } else {
  53. c, ok := cfg["basic"]
  54. if !ok {
  55. fmt.Printf("No basic config in client.yaml, will use the default configuration.\n")
  56. } else {
  57. config = &c
  58. }
  59. }
  60. if config == nil {
  61. config = &clientConf{
  62. Host: "127.0.0.1",
  63. Port: 20498,
  64. }
  65. }
  66. fmt.Printf("Connecting to %s:%d... \n", config.Host, config.Port)
  67. // Create a TCP connection to localhost on port 1234
  68. client, err := rpc.DialHTTP("tcp", fmt.Sprintf("%s:%d", config.Host, config.Port))
  69. if err != nil {
  70. fmt.Printf("Failed to connect the server, please start the server.\n")
  71. return
  72. }
  73. app.Commands = []cli.Command{
  74. {
  75. Name: "query",
  76. Aliases: []string{"query"},
  77. Usage: "query command line",
  78. Action: func(c *cli.Context) error {
  79. reader := bufio.NewReader(os.Stdin)
  80. var inputs []string
  81. ticker := time.NewTicker(time.Millisecond * 300)
  82. defer ticker.Stop()
  83. for {
  84. fmt.Print("kuiper > ")
  85. text, _ := reader.ReadString('\n')
  86. inputs = append(inputs, text)
  87. // convert CRLF to LF
  88. text = strings.Replace(text, "\n", "", -1)
  89. if strings.ToLower(text) == "quit" || strings.ToLower(text) == "exit" {
  90. break
  91. } else if strings.Trim(text, " ") == "" {
  92. continue
  93. } else {
  94. var reply string
  95. err := client.Call("Server.CreateQuery", text, &reply)
  96. if err != nil {
  97. fmt.Println(err)
  98. continue
  99. } else {
  100. fmt.Println(reply)
  101. go func() {
  102. for {
  103. <-ticker.C
  104. var result string
  105. e := client.Call("Server.GetQueryResult", "", &result)
  106. if e != nil {
  107. fmt.Println(e)
  108. fmt.Print("kuiper > ")
  109. return
  110. }
  111. if result != "" {
  112. fmt.Println(result)
  113. }
  114. }
  115. }()
  116. }
  117. }
  118. }
  119. return nil
  120. },
  121. },
  122. {
  123. Name: "create",
  124. Aliases: []string{"create"},
  125. Usage: "create stream $stream_name | create stream $stream_name -f $stream_def_file | create rule $rule_name $rule_json | create rule $rule_name -f $rule_def_file | create plugin $plugin_type $plugin_name $plugin_json | create plugin $plugin_type $plugin_name -f $plugin_def_file",
  126. Subcommands: []cli.Command{
  127. {
  128. Name: "stream",
  129. Usage: "create stream $stream_name [-f stream_def_file]",
  130. Flags: []cli.Flag{
  131. cli.StringFlag{
  132. Name: "file, f",
  133. Usage: "the location of stream definition file",
  134. FilePath: "/home/mystream.txt",
  135. },
  136. },
  137. Action: func(c *cli.Context) error {
  138. sfile := c.String("file")
  139. if sfile != "" {
  140. if stream, err := readDef(sfile, "stream"); err != nil {
  141. fmt.Printf("%s", err)
  142. return nil
  143. } else {
  144. args := strings.Join([]string{"CREATE STREAM ", string(stream)}, " ")
  145. streamProcess(client, args)
  146. return nil
  147. }
  148. } else {
  149. streamProcess(client, "")
  150. return nil
  151. }
  152. },
  153. },
  154. {
  155. Name: "rule",
  156. Usage: "create rule $rule_name [$rule_json | -f rule_def_file]",
  157. Flags: []cli.Flag{
  158. cli.StringFlag{
  159. Name: "file, f",
  160. Usage: "the location of rule definition file",
  161. FilePath: "/home/myrule.txt",
  162. },
  163. },
  164. Action: func(c *cli.Context) error {
  165. sfile := c.String("file")
  166. if sfile != "" {
  167. if rule, err := readDef(sfile, "rule"); err != nil {
  168. fmt.Printf("%s", err)
  169. return nil
  170. } else {
  171. if len(c.Args()) != 1 {
  172. fmt.Printf("Expect rule name.\n")
  173. return nil
  174. }
  175. rname := c.Args()[0]
  176. var reply string
  177. args := &common.RuleDesc{rname, string(rule)}
  178. err = client.Call("Server.CreateRule", args, &reply)
  179. if err != nil {
  180. fmt.Println(err)
  181. } else {
  182. fmt.Println(reply)
  183. }
  184. }
  185. return nil
  186. } else {
  187. if len(c.Args()) != 2 {
  188. fmt.Printf("Expect rule name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  189. return nil
  190. }
  191. rname := c.Args()[0]
  192. rjson := c.Args()[1]
  193. var reply string
  194. args := &common.RuleDesc{rname, rjson}
  195. err = client.Call("Server.CreateRule", args, &reply)
  196. if err != nil {
  197. fmt.Println(err)
  198. } else {
  199. fmt.Println(reply)
  200. }
  201. return nil
  202. }
  203. },
  204. },
  205. {
  206. Name: "plugin",
  207. Usage: "create plugin $plugin_type $plugin_name [$plugin_json | -f plugin_def_file]",
  208. Flags: []cli.Flag{
  209. cli.StringFlag{
  210. Name: "file, f",
  211. Usage: "the location of plugin definition file",
  212. FilePath: "/home/myplugin.txt",
  213. },
  214. },
  215. Action: func(c *cli.Context) error {
  216. if len(c.Args()) < 2 {
  217. fmt.Printf("Expect plugin type and name.\n")
  218. return nil
  219. }
  220. ptype, err := getPluginType(c.Args()[0])
  221. if err != nil {
  222. fmt.Printf("%s\n", err)
  223. return nil
  224. }
  225. pname := c.Args()[1]
  226. sfile := c.String("file")
  227. args := &common.PluginDesc{
  228. RuleDesc: common.RuleDesc{
  229. Name: pname,
  230. },
  231. Type: ptype,
  232. }
  233. if sfile != "" {
  234. if len(c.Args()) != 2 {
  235. fmt.Printf("Expect plugin type, name.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  236. return nil
  237. }
  238. if p, err := readDef(sfile, "plugin"); err != nil {
  239. fmt.Printf("%s", err)
  240. return nil
  241. } else {
  242. args.Json = string(p)
  243. }
  244. } else {
  245. if len(c.Args()) != 3 {
  246. fmt.Printf("Expect plugin type, name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  247. return nil
  248. }
  249. args.Json = c.Args()[2]
  250. }
  251. var reply string
  252. err = client.Call("Server.CreatePlugin", args, &reply)
  253. if err != nil {
  254. fmt.Println(err)
  255. } else {
  256. fmt.Println(reply)
  257. }
  258. return nil
  259. },
  260. },
  261. },
  262. },
  263. {
  264. Name: "describe",
  265. Aliases: []string{"describe"},
  266. Usage: "describe stream $stream_name | describe rule $rule_name | describe plugin $plugin_type $plugin_name",
  267. Subcommands: []cli.Command{
  268. {
  269. Name: "stream",
  270. Usage: "describe stream $stream_name",
  271. //Flags: nflag,
  272. Action: func(c *cli.Context) error {
  273. streamProcess(client, "")
  274. return nil
  275. },
  276. },
  277. {
  278. Name: "rule",
  279. Usage: "describe rule $rule_name",
  280. Action: func(c *cli.Context) error {
  281. if len(c.Args()) != 1 {
  282. fmt.Printf("Expect rule name.\n")
  283. return nil
  284. }
  285. rname := c.Args()[0]
  286. var reply string
  287. err = client.Call("Server.DescRule", rname, &reply)
  288. if err != nil {
  289. fmt.Println(err)
  290. } else {
  291. fmt.Println(reply)
  292. }
  293. return nil
  294. },
  295. },
  296. {
  297. Name: "plugin",
  298. Usage: "describe plugin $plugin_type $plugin_name",
  299. //Flags: nflag,
  300. Action: func(c *cli.Context) error {
  301. ptype, err := getPluginType(c.Args()[0])
  302. if err != nil {
  303. fmt.Printf("%s\n", err)
  304. return nil
  305. }
  306. if len(c.Args()) != 2 {
  307. fmt.Printf("Expect plugin name.\n")
  308. return nil
  309. }
  310. pname := c.Args()[1]
  311. args := &common.PluginDesc{
  312. RuleDesc: common.RuleDesc{
  313. Name: pname,
  314. },
  315. Type: ptype,
  316. }
  317. var reply string
  318. err = client.Call("Server.DescPlugin", args, &reply)
  319. if err != nil {
  320. fmt.Println(err)
  321. } else {
  322. fmt.Println(reply)
  323. }
  324. return nil
  325. },
  326. },
  327. },
  328. },
  329. {
  330. Name: "drop",
  331. Aliases: []string{"drop"},
  332. Usage: "drop stream $stream_name | drop rule $rule_name | drop plugin $plugin_type $plugin_name -r $stop",
  333. Subcommands: []cli.Command{
  334. {
  335. Name: "stream",
  336. Usage: "drop stream $stream_name",
  337. //Flags: nflag,
  338. Action: func(c *cli.Context) error {
  339. streamProcess(client, "")
  340. return nil
  341. },
  342. },
  343. {
  344. Name: "rule",
  345. Usage: "drop rule $rule_name",
  346. //Flags: nflag,
  347. Action: func(c *cli.Context) error {
  348. if len(c.Args()) != 1 {
  349. fmt.Printf("Expect rule name.\n")
  350. return nil
  351. }
  352. rname := c.Args()[0]
  353. var reply string
  354. err = client.Call("Server.DropRule", rname, &reply)
  355. if err != nil {
  356. fmt.Println(err)
  357. } else {
  358. fmt.Println(reply)
  359. }
  360. return nil
  361. },
  362. },
  363. {
  364. Name: "plugin",
  365. Usage: "drop plugin $plugin_type $plugin_name -s stop",
  366. Flags: []cli.Flag{
  367. cli.StringFlag{
  368. Name: "stop, s",
  369. Usage: "stop kuiper after the action",
  370. },
  371. },
  372. Action: func(c *cli.Context) error {
  373. r := c.String("stop")
  374. if r != "true" && r != "false" {
  375. fmt.Printf("Expect r to be a boolean value.\n")
  376. return nil
  377. }
  378. if len(c.Args()) < 2 || len(c.Args()) > 3 {
  379. fmt.Printf("Expect plugin type and name.\n")
  380. return nil
  381. }
  382. ptype, err := getPluginType(c.Args()[0])
  383. if err != nil {
  384. fmt.Printf("%s\n", err)
  385. return nil
  386. }
  387. pname := c.Args()[1]
  388. args := &common.PluginDesc{
  389. RuleDesc: common.RuleDesc{
  390. Name: pname,
  391. },
  392. Type: ptype,
  393. Stop: r == "true",
  394. }
  395. var reply string
  396. err = client.Call("Server.DropPlugin", args, &reply)
  397. if err != nil {
  398. fmt.Println(err)
  399. } else {
  400. fmt.Println(reply)
  401. }
  402. return nil
  403. },
  404. },
  405. },
  406. },
  407. {
  408. Name: "show",
  409. Aliases: []string{"show"},
  410. Usage: "show streams | show rules | show plugins $plugin_type",
  411. Subcommands: []cli.Command{
  412. {
  413. Name: "streams",
  414. Usage: "show streams",
  415. Action: func(c *cli.Context) error {
  416. streamProcess(client, "")
  417. return nil
  418. },
  419. },
  420. {
  421. Name: "rules",
  422. Usage: "show rules",
  423. Action: func(c *cli.Context) error {
  424. var reply string
  425. err = client.Call("Server.ShowRules", 0, &reply)
  426. if err != nil {
  427. fmt.Println(err)
  428. } else {
  429. fmt.Println(reply)
  430. }
  431. return nil
  432. },
  433. },
  434. {
  435. Name: "plugins",
  436. Usage: "show plugins $plugin_type",
  437. Action: func(c *cli.Context) error {
  438. if len(c.Args()) != 1 {
  439. fmt.Printf("Expect plugin type.\n")
  440. return nil
  441. }
  442. ptype, err := getPluginType(c.Args()[0])
  443. if err != nil {
  444. fmt.Printf("%s\n", err)
  445. return nil
  446. }
  447. var reply string
  448. err = client.Call("Server.ShowPlugins", ptype, &reply)
  449. if err != nil {
  450. fmt.Println(err)
  451. } else {
  452. fmt.Println(reply)
  453. }
  454. return nil
  455. },
  456. },
  457. },
  458. },
  459. {
  460. Name: "getstatus",
  461. Aliases: []string{"getstatus"},
  462. Usage: "getstatus rule $rule_name",
  463. Subcommands: []cli.Command{
  464. {
  465. Name: "rule",
  466. Usage: "getstatus rule $rule_name",
  467. //Flags: nflag,
  468. Action: func(c *cli.Context) error {
  469. if len(c.Args()) != 1 {
  470. fmt.Printf("Expect rule name.\n")
  471. return nil
  472. }
  473. rname := c.Args()[0]
  474. var reply string
  475. err = client.Call("Server.GetStatusRule", rname, &reply)
  476. if err != nil {
  477. fmt.Println(err)
  478. } else {
  479. fmt.Println(reply)
  480. }
  481. return nil
  482. },
  483. },
  484. },
  485. },
  486. {
  487. Name: "gettopo",
  488. Aliases: []string{"gettopo"},
  489. Usage: "gettopo rule $rule_name",
  490. Subcommands: []cli.Command{
  491. {
  492. Name: "rule",
  493. Usage: "getstopo rule $rule_name",
  494. //Flags: nflag,
  495. Action: func(c *cli.Context) error {
  496. if len(c.Args()) != 1 {
  497. fmt.Printf("Expect rule name.\n")
  498. return nil
  499. }
  500. rname := c.Args()[0]
  501. var reply string
  502. err = client.Call("Server.GetTopoRule", rname, &reply)
  503. if err != nil {
  504. fmt.Println(err)
  505. } else {
  506. fmt.Println(reply)
  507. }
  508. return nil
  509. },
  510. },
  511. },
  512. },
  513. {
  514. Name: "start",
  515. Aliases: []string{"start"},
  516. Usage: "start rule $rule_name",
  517. Subcommands: []cli.Command{
  518. {
  519. Name: "rule",
  520. Usage: "start rule $rule_name",
  521. //Flags: nflag,
  522. Action: func(c *cli.Context) error {
  523. if len(c.Args()) != 1 {
  524. fmt.Printf("Expect rule name.\n")
  525. return nil
  526. }
  527. rname := c.Args()[0]
  528. var reply string
  529. err = client.Call("Server.StartRule", rname, &reply)
  530. if err != nil {
  531. fmt.Println(err)
  532. } else {
  533. fmt.Println(reply)
  534. }
  535. return nil
  536. },
  537. },
  538. },
  539. },
  540. {
  541. Name: "stop",
  542. Aliases: []string{"stop"},
  543. Usage: "stop rule $rule_name",
  544. Subcommands: []cli.Command{
  545. {
  546. Name: "rule",
  547. Usage: "stop rule $rule_name",
  548. //Flags: nflag,
  549. Action: func(c *cli.Context) error {
  550. if len(c.Args()) != 1 {
  551. fmt.Printf("Expect rule name.\n")
  552. return nil
  553. }
  554. rname := c.Args()[0]
  555. var reply string
  556. err = client.Call("Server.StopRule", rname, &reply)
  557. if err != nil {
  558. fmt.Println(err)
  559. } else {
  560. fmt.Println(reply)
  561. }
  562. return nil
  563. },
  564. },
  565. },
  566. },
  567. {
  568. Name: "restart",
  569. Aliases: []string{"restart"},
  570. Usage: "restart rule $rule_name",
  571. Subcommands: []cli.Command{
  572. {
  573. Name: "rule",
  574. Usage: "restart rule $rule_name",
  575. //Flags: nflag,
  576. Action: func(c *cli.Context) error {
  577. if len(c.Args()) != 1 {
  578. fmt.Printf("Expect rule name.\n")
  579. return nil
  580. }
  581. rname := c.Args()[0]
  582. var reply string
  583. err = client.Call("Server.RestartRule", rname, &reply)
  584. if err != nil {
  585. fmt.Println(err)
  586. } else {
  587. fmt.Println(reply)
  588. }
  589. return nil
  590. },
  591. },
  592. },
  593. },
  594. }
  595. app.Name = "Kuiper"
  596. app.Usage = "The command line tool for EMQ X Kuiper."
  597. app.Action = func(c *cli.Context) error {
  598. cli.ShowSubcommandHelp(c)
  599. //cli.ShowVersion(c)
  600. return nil
  601. }
  602. sort.Sort(cli.FlagsByName(app.Flags))
  603. sort.Sort(cli.CommandsByName(app.Commands))
  604. err = app.Run(os.Args)
  605. if err != nil {
  606. fmt.Printf("%v", err)
  607. }
  608. }
  609. func getPluginType(arg string) (ptype int, err error) {
  610. switch arg {
  611. case "source":
  612. ptype = 0
  613. case "sink":
  614. ptype = 1
  615. case "function":
  616. ptype = 2
  617. default:
  618. err = fmt.Errorf("Invalid plugin type %s, should be \"source\", \"sink\" or \"function\".\n", arg)
  619. }
  620. return
  621. }
  622. func readDef(sfile string, t string) ([]byte, error) {
  623. if _, err := os.Stat(sfile); os.IsNotExist(err) {
  624. return nil, fmt.Errorf("The specified %s defenition file %s is not existed.\n", t, sfile)
  625. }
  626. fmt.Printf("Creating a new %s from file %s.\n", t, sfile)
  627. if rule, err := ioutil.ReadFile(sfile); err != nil {
  628. return nil, fmt.Errorf("Failed to read from %s definition file %s.\n", t, sfile)
  629. } else {
  630. return rule, nil
  631. }
  632. }