main.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623
  1. package main
  2. import (
  3. "bufio"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/go-yaml/yaml"
  7. "github.com/urfave/cli"
  8. "io/ioutil"
  9. "net/rpc"
  10. "os"
  11. "sort"
  12. "strings"
  13. "time"
  14. )
  15. type clientConf struct {
  16. Host string `yaml:"host"`
  17. Port int `yaml:"port"`
  18. }
  19. var clientYaml = "client.yaml"
  20. func streamProcess(client *rpc.Client, args string) {
  21. var reply string
  22. if args == "" {
  23. args = strings.Join(os.Args[1:], " ")
  24. }
  25. err := client.Call("Server.Stream", args, &reply)
  26. if err != nil {
  27. fmt.Println(err)
  28. } else {
  29. fmt.Println(reply)
  30. }
  31. }
  32. var Version string = "unknown"
  33. func main() {
  34. app := cli.NewApp()
  35. app.Version = Version
  36. //nflag := []cli.Flag { cli.StringFlag{
  37. // Name: "name, n",
  38. // Usage: "the name of stream",
  39. // }}
  40. b, err := common.LoadConf(clientYaml)
  41. if err != nil {
  42. common.Log.Fatal(err)
  43. }
  44. var cfg map[string]clientConf
  45. var config *clientConf
  46. if err := yaml.Unmarshal(b, &cfg); err != nil {
  47. fmt.Printf("Failed to load config file with error %s.\n", err)
  48. } else {
  49. c, ok := cfg["basic"]
  50. if !ok {
  51. fmt.Printf("No basic config in client.yaml, will use the default configuration.\n")
  52. } else {
  53. config = &c
  54. }
  55. }
  56. if config == nil {
  57. config = &clientConf{
  58. Host: "127.0.0.1",
  59. Port: 20498,
  60. }
  61. }
  62. fmt.Printf("Connecting to %s:%d... \n", config.Host, config.Port)
  63. // Create a TCP connection to localhost on port 1234
  64. client, err := rpc.DialHTTP("tcp", fmt.Sprintf("%s:%d", config.Host, config.Port))
  65. if err != nil {
  66. fmt.Printf("Failed to connect the server, please start the server.\n")
  67. return
  68. }
  69. app.Commands = []cli.Command{
  70. {
  71. Name: "query",
  72. Aliases: []string{"query"},
  73. Usage: "query command line",
  74. Action: func(c *cli.Context) error {
  75. reader := bufio.NewReader(os.Stdin)
  76. var inputs []string
  77. ticker := time.NewTicker(time.Millisecond * 300)
  78. defer ticker.Stop()
  79. for {
  80. fmt.Print("kuiper > ")
  81. text, _ := reader.ReadString('\n')
  82. inputs = append(inputs, text)
  83. // convert CRLF to LF
  84. text = strings.Replace(text, "\n", "", -1)
  85. if strings.ToLower(text) == "quit" || strings.ToLower(text) == "exit" {
  86. break
  87. } else if strings.Trim(text, " ") == "" {
  88. continue
  89. } else {
  90. var reply string
  91. err := client.Call("Server.CreateQuery", text, &reply)
  92. if err != nil {
  93. fmt.Println(err)
  94. continue
  95. } else {
  96. fmt.Println(reply)
  97. go func() {
  98. for {
  99. <-ticker.C
  100. var result string
  101. e := client.Call("Server.GetQueryResult", "", &result)
  102. if e != nil {
  103. fmt.Println(e)
  104. fmt.Print("kuiper > ")
  105. return
  106. }
  107. if result != "" {
  108. fmt.Println(result)
  109. }
  110. }
  111. }()
  112. }
  113. }
  114. }
  115. return nil
  116. },
  117. },
  118. {
  119. Name: "create",
  120. Aliases: []string{"create"},
  121. Usage: "create stream $stream_name | create stream $stream_name -f $stream_def_file | create rule $rule_name $rule_json | create rule $rule_name -f $rule_def_file | create plugin $plugin_type $plugin_name $plugin_json | create plugin $plugin_type $plugin_name -f $plugin_def_file",
  122. Subcommands: []cli.Command{
  123. {
  124. Name: "stream",
  125. Usage: "create stream $stream_name [-f stream_def_file]",
  126. Flags: []cli.Flag{
  127. cli.StringFlag{
  128. Name: "file, f",
  129. Usage: "the location of stream definition file",
  130. FilePath: "/home/mystream.txt",
  131. },
  132. },
  133. Action: func(c *cli.Context) error {
  134. sfile := c.String("file")
  135. if sfile != "" {
  136. if stream, err := readDef(sfile, "stream"); err != nil {
  137. fmt.Printf("%s", err)
  138. return nil
  139. } else {
  140. args := strings.Join([]string{"CREATE STREAM ", string(stream)}, " ")
  141. streamProcess(client, args)
  142. return nil
  143. }
  144. } else {
  145. streamProcess(client, "")
  146. return nil
  147. }
  148. },
  149. },
  150. {
  151. Name: "rule",
  152. Usage: "create rule $rule_name [$rule_json | -f rule_def_file]",
  153. Flags: []cli.Flag{
  154. cli.StringFlag{
  155. Name: "file, f",
  156. Usage: "the location of rule definition file",
  157. FilePath: "/home/myrule.txt",
  158. },
  159. },
  160. Action: func(c *cli.Context) error {
  161. sfile := c.String("file")
  162. if sfile != "" {
  163. if rule, err := readDef(sfile, "rule"); err != nil {
  164. fmt.Printf("%s", err)
  165. return nil
  166. } else {
  167. if len(c.Args()) != 1 {
  168. fmt.Printf("Expect rule name.\n")
  169. return nil
  170. }
  171. rname := c.Args()[0]
  172. var reply string
  173. args := &common.RuleDesc{rname, string(rule)}
  174. err = client.Call("Server.CreateRule", args, &reply)
  175. if err != nil {
  176. fmt.Println(err)
  177. } else {
  178. fmt.Println(reply)
  179. }
  180. }
  181. return nil
  182. } else {
  183. if len(c.Args()) != 2 {
  184. fmt.Printf("Expect rule name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  185. return nil
  186. }
  187. rname := c.Args()[0]
  188. rjson := c.Args()[1]
  189. var reply string
  190. args := &common.RuleDesc{rname, rjson}
  191. err = client.Call("Server.CreateRule", args, &reply)
  192. if err != nil {
  193. fmt.Println(err)
  194. } else {
  195. fmt.Println(reply)
  196. }
  197. return nil
  198. }
  199. },
  200. },
  201. {
  202. Name: "plugin",
  203. Usage: "create plugin $plugin_type $plugin_name [$plugin_json | -f plugin_def_file]",
  204. Flags: []cli.Flag{
  205. cli.StringFlag{
  206. Name: "file, f",
  207. Usage: "the location of plugin definition file",
  208. FilePath: "/home/myplugin.txt",
  209. },
  210. },
  211. Action: func(c *cli.Context) error {
  212. if len(c.Args()) < 2 {
  213. fmt.Printf("Expect plugin type and name.\n")
  214. return nil
  215. }
  216. ptype, err := getPluginType(c.Args()[0])
  217. if err != nil {
  218. fmt.Printf("%s\n", err)
  219. return nil
  220. }
  221. pname := c.Args()[1]
  222. sfile := c.String("file")
  223. args := &common.PluginDesc{
  224. RuleDesc: common.RuleDesc{
  225. Name: pname,
  226. },
  227. Type: ptype,
  228. }
  229. if sfile != "" {
  230. if len(c.Args()) != 2 {
  231. fmt.Printf("Expect plugin type, name.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  232. return nil
  233. }
  234. if p, err := readDef(sfile, "plugin"); err != nil {
  235. fmt.Printf("%s", err)
  236. return nil
  237. } else {
  238. args.Json = string(p)
  239. }
  240. } else {
  241. if len(c.Args()) != 3 {
  242. fmt.Printf("Expect plugin type, name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  243. return nil
  244. }
  245. args.Json = c.Args()[2]
  246. }
  247. var reply string
  248. err = client.Call("Server.CreatePlugin", args, &reply)
  249. if err != nil {
  250. fmt.Println(err)
  251. } else {
  252. fmt.Println(reply)
  253. }
  254. return nil
  255. },
  256. },
  257. },
  258. },
  259. {
  260. Name: "describe",
  261. Aliases: []string{"describe"},
  262. Usage: "describe stream $stream_name | describe rule $rule_name | describe plugin $plugin_type $plugin_name",
  263. Subcommands: []cli.Command{
  264. {
  265. Name: "stream",
  266. Usage: "describe stream $stream_name",
  267. //Flags: nflag,
  268. Action: func(c *cli.Context) error {
  269. streamProcess(client, "")
  270. return nil
  271. },
  272. },
  273. {
  274. Name: "rule",
  275. Usage: "describe rule $rule_name",
  276. Action: func(c *cli.Context) error {
  277. if len(c.Args()) != 1 {
  278. fmt.Printf("Expect rule name.\n")
  279. return nil
  280. }
  281. rname := c.Args()[0]
  282. var reply string
  283. err = client.Call("Server.DescRule", rname, &reply)
  284. if err != nil {
  285. fmt.Println(err)
  286. } else {
  287. fmt.Println(reply)
  288. }
  289. return nil
  290. },
  291. },
  292. {
  293. Name: "plugin",
  294. Usage: "describe plugin $plugin_type $plugin_name",
  295. //Flags: nflag,
  296. Action: func(c *cli.Context) error {
  297. ptype, err := getPluginType(c.Args()[0])
  298. if err != nil {
  299. fmt.Printf("%s\n", err)
  300. return nil
  301. }
  302. pname := c.Args()[1]
  303. args := &common.PluginDesc{
  304. RuleDesc: common.RuleDesc{
  305. Name: pname,
  306. },
  307. Type: ptype,
  308. }
  309. var reply string
  310. err = client.Call("Server.DescPlugin", args, &reply)
  311. if err != nil {
  312. fmt.Println(err)
  313. } else {
  314. fmt.Println(reply)
  315. }
  316. return nil
  317. },
  318. },
  319. },
  320. },
  321. {
  322. Name: "drop",
  323. Aliases: []string{"drop"},
  324. Usage: "drop stream $stream_name | drop rule $rule_name | drop plugin $plugin_type $plugin_name -r $stop",
  325. Subcommands: []cli.Command{
  326. {
  327. Name: "stream",
  328. Usage: "drop stream $stream_name",
  329. //Flags: nflag,
  330. Action: func(c *cli.Context) error {
  331. streamProcess(client, "")
  332. return nil
  333. },
  334. },
  335. {
  336. Name: "rule",
  337. Usage: "drop rule $rule_name",
  338. //Flags: nflag,
  339. Action: func(c *cli.Context) error {
  340. if len(c.Args()) != 1 {
  341. fmt.Printf("Expect rule name.\n")
  342. return nil
  343. }
  344. rname := c.Args()[0]
  345. var reply string
  346. err = client.Call("Server.DropRule", rname, &reply)
  347. if err != nil {
  348. fmt.Println(err)
  349. } else {
  350. fmt.Println(reply)
  351. }
  352. return nil
  353. },
  354. },
  355. {
  356. Name: "plugin",
  357. Usage: "drop plugin $plugin_type $plugin_name -s stop",
  358. Flags: []cli.Flag{
  359. cli.StringFlag{
  360. Name: "stop, s",
  361. Usage: "stop kuiper after the action",
  362. },
  363. },
  364. Action: func(c *cli.Context) error {
  365. r := c.String("stop")
  366. if r != "true" && r != "false" {
  367. fmt.Printf("Expect r to be a boolean value.\n")
  368. return nil
  369. }
  370. if len(c.Args()) < 2 || len(c.Args()) > 3 {
  371. fmt.Printf("Expect plugin type and name.\n")
  372. return nil
  373. }
  374. ptype, err := getPluginType(c.Args()[0])
  375. if err != nil {
  376. fmt.Printf("%s\n", err)
  377. return nil
  378. }
  379. pname := c.Args()[1]
  380. args := &common.PluginDesc{
  381. RuleDesc: common.RuleDesc{
  382. Name: pname,
  383. },
  384. Type: ptype,
  385. Stop: r == "true",
  386. }
  387. var reply string
  388. err = client.Call("Server.DropPlugin", args, &reply)
  389. if err != nil {
  390. fmt.Println(err)
  391. } else {
  392. fmt.Println(reply)
  393. }
  394. return nil
  395. },
  396. },
  397. },
  398. },
  399. {
  400. Name: "show",
  401. Aliases: []string{"show"},
  402. Usage: "show streams | show rules | show plugins $plugin_type",
  403. Subcommands: []cli.Command{
  404. {
  405. Name: "streams",
  406. Usage: "show streams",
  407. Action: func(c *cli.Context) error {
  408. streamProcess(client, "")
  409. return nil
  410. },
  411. },
  412. {
  413. Name: "rules",
  414. Usage: "show rules",
  415. Action: func(c *cli.Context) error {
  416. var reply string
  417. err = client.Call("Server.ShowRules", 0, &reply)
  418. if err != nil {
  419. fmt.Println(err)
  420. } else {
  421. fmt.Println(reply)
  422. }
  423. return nil
  424. },
  425. },
  426. {
  427. Name: "plugins",
  428. Usage: "show plugins $plugin_type",
  429. Action: func(c *cli.Context) error {
  430. if len(c.Args()) != 1 {
  431. fmt.Printf("Expect plugin type.\n")
  432. return nil
  433. }
  434. ptype, err := getPluginType(c.Args()[0])
  435. if err != nil {
  436. fmt.Printf("%s\n", err)
  437. return nil
  438. }
  439. var reply string
  440. err = client.Call("Server.ShowPlugins", ptype, &reply)
  441. if err != nil {
  442. fmt.Println(err)
  443. } else {
  444. fmt.Println(reply)
  445. }
  446. return nil
  447. },
  448. },
  449. },
  450. },
  451. {
  452. Name: "getstatus",
  453. Aliases: []string{"getstatus"},
  454. Usage: "getstatus rule $rule_name",
  455. Subcommands: []cli.Command{
  456. {
  457. Name: "rule",
  458. Usage: "getstatus rule $rule_name",
  459. //Flags: nflag,
  460. Action: func(c *cli.Context) error {
  461. if len(c.Args()) != 1 {
  462. fmt.Printf("Expect rule name.\n")
  463. return nil
  464. }
  465. rname := c.Args()[0]
  466. var reply string
  467. err = client.Call("Server.GetStatusRule", rname, &reply)
  468. if err != nil {
  469. fmt.Println(err)
  470. } else {
  471. fmt.Println(reply)
  472. }
  473. return nil
  474. },
  475. },
  476. },
  477. },
  478. {
  479. Name: "start",
  480. Aliases: []string{"start"},
  481. Usage: "start rule $rule_name",
  482. Subcommands: []cli.Command{
  483. {
  484. Name: "rule",
  485. Usage: "start rule $rule_name",
  486. //Flags: nflag,
  487. Action: func(c *cli.Context) error {
  488. if len(c.Args()) != 1 {
  489. fmt.Printf("Expect rule name.\n")
  490. return nil
  491. }
  492. rname := c.Args()[0]
  493. var reply string
  494. err = client.Call("Server.StartRule", rname, &reply)
  495. if err != nil {
  496. fmt.Println(err)
  497. } else {
  498. fmt.Println(reply)
  499. }
  500. return nil
  501. },
  502. },
  503. },
  504. },
  505. {
  506. Name: "stop",
  507. Aliases: []string{"stop"},
  508. Usage: "stop rule $rule_name",
  509. Subcommands: []cli.Command{
  510. {
  511. Name: "rule",
  512. Usage: "stop rule $rule_name",
  513. //Flags: nflag,
  514. Action: func(c *cli.Context) error {
  515. if len(c.Args()) != 1 {
  516. fmt.Printf("Expect rule name.\n")
  517. return nil
  518. }
  519. rname := c.Args()[0]
  520. var reply string
  521. err = client.Call("Server.StopRule", rname, &reply)
  522. if err != nil {
  523. fmt.Println(err)
  524. } else {
  525. fmt.Println(reply)
  526. }
  527. return nil
  528. },
  529. },
  530. },
  531. },
  532. {
  533. Name: "restart",
  534. Aliases: []string{"restart"},
  535. Usage: "restart rule $rule_name",
  536. Subcommands: []cli.Command{
  537. {
  538. Name: "rule",
  539. Usage: "restart rule $rule_name",
  540. //Flags: nflag,
  541. Action: func(c *cli.Context) error {
  542. if len(c.Args()) != 1 {
  543. fmt.Printf("Expect rule name.\n")
  544. return nil
  545. }
  546. rname := c.Args()[0]
  547. var reply string
  548. err = client.Call("Server.RestartRule", rname, &reply)
  549. if err != nil {
  550. fmt.Println(err)
  551. } else {
  552. fmt.Println(reply)
  553. }
  554. return nil
  555. },
  556. },
  557. },
  558. },
  559. }
  560. app.Name = "Kuiper"
  561. app.Usage = "The command line tool for EMQ X Kuiper."
  562. app.Action = func(c *cli.Context) error {
  563. cli.ShowSubcommandHelp(c)
  564. //cli.ShowVersion(c)
  565. return nil
  566. }
  567. sort.Sort(cli.FlagsByName(app.Flags))
  568. sort.Sort(cli.CommandsByName(app.Commands))
  569. err = app.Run(os.Args)
  570. if err != nil {
  571. fmt.Printf("%v", err)
  572. }
  573. }
  574. func getPluginType(arg string) (ptype int, err error) {
  575. switch arg {
  576. case "source":
  577. ptype = 0
  578. case "sink":
  579. ptype = 1
  580. case "function":
  581. ptype = 2
  582. default:
  583. err = fmt.Errorf("Invalid plugin type %s, should be \"source\", \"sink\" or \"function\".\n", arg)
  584. }
  585. return
  586. }
  587. func readDef(sfile string, t string) ([]byte, error) {
  588. if _, err := os.Stat(sfile); os.IsNotExist(err) {
  589. return nil, fmt.Errorf("The specified %s defenition file %s is not existed.\n", t, sfile)
  590. }
  591. fmt.Printf("Creating a new %s from file %s.\n", t, sfile)
  592. if rule, err := ioutil.ReadFile(sfile); err != nil {
  593. return nil, fmt.Errorf("Failed to read from %s definition file %s.\n", t, sfile)
  594. } else {
  595. return rule, nil
  596. }
  597. }