main.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627
  1. package main
  2. import (
  3. "bufio"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/go-yaml/yaml"
  7. "github.com/urfave/cli"
  8. "io/ioutil"
  9. "net/rpc"
  10. "os"
  11. "sort"
  12. "strings"
  13. "time"
  14. )
  15. type clientConf struct {
  16. Host string `yaml:"host"`
  17. Port int `yaml:"port"`
  18. }
  19. var clientYaml = "client.yaml"
  20. func streamProcess(client *rpc.Client, args string) {
  21. var reply string
  22. if args == "" {
  23. args = strings.Join(os.Args[1:], " ")
  24. }
  25. err := client.Call("Server.Stream", args, &reply)
  26. if err != nil {
  27. fmt.Println(err)
  28. } else {
  29. fmt.Println(reply)
  30. }
  31. }
  32. var (
  33. Version = "unknown"
  34. LoadFileType = "relative"
  35. )
  36. func main() {
  37. common.LoadFileType = LoadFileType
  38. app := cli.NewApp()
  39. app.Version = Version
  40. //nflag := []cli.Flag { cli.StringFlag{
  41. // Name: "name, n",
  42. // Usage: "the name of stream",
  43. // }}
  44. b, err := common.LoadConf(clientYaml)
  45. if err != nil {
  46. common.Log.Fatal(err)
  47. }
  48. var cfg map[string]clientConf
  49. var config *clientConf
  50. if err := yaml.Unmarshal(b, &cfg); err != nil {
  51. fmt.Printf("Failed to load config file with error %s.\n", err)
  52. } else {
  53. c, ok := cfg["basic"]
  54. if !ok {
  55. fmt.Printf("No basic config in client.yaml, will use the default configuration.\n")
  56. } else {
  57. config = &c
  58. }
  59. }
  60. if config == nil {
  61. config = &clientConf{
  62. Host: "127.0.0.1",
  63. Port: 20498,
  64. }
  65. }
  66. fmt.Printf("Connecting to %s:%d... \n", config.Host, config.Port)
  67. // Create a TCP connection to localhost on port 1234
  68. client, err := rpc.DialHTTP("tcp", fmt.Sprintf("%s:%d", config.Host, config.Port))
  69. if err != nil {
  70. fmt.Printf("Failed to connect the server, please start the server.\n")
  71. return
  72. }
  73. app.Commands = []cli.Command{
  74. {
  75. Name: "query",
  76. Aliases: []string{"query"},
  77. Usage: "query command line",
  78. Action: func(c *cli.Context) error {
  79. reader := bufio.NewReader(os.Stdin)
  80. var inputs []string
  81. ticker := time.NewTicker(time.Millisecond * 300)
  82. defer ticker.Stop()
  83. for {
  84. fmt.Print("kuiper > ")
  85. text, _ := reader.ReadString('\n')
  86. inputs = append(inputs, text)
  87. // convert CRLF to LF
  88. text = strings.Replace(text, "\n", "", -1)
  89. if strings.ToLower(text) == "quit" || strings.ToLower(text) == "exit" {
  90. break
  91. } else if strings.Trim(text, " ") == "" {
  92. continue
  93. } else {
  94. var reply string
  95. err := client.Call("Server.CreateQuery", text, &reply)
  96. if err != nil {
  97. fmt.Println(err)
  98. continue
  99. } else {
  100. fmt.Println(reply)
  101. go func() {
  102. for {
  103. <-ticker.C
  104. var result string
  105. e := client.Call("Server.GetQueryResult", "", &result)
  106. if e != nil {
  107. fmt.Println(e)
  108. fmt.Print("kuiper > ")
  109. return
  110. }
  111. if result != "" {
  112. fmt.Println(result)
  113. }
  114. }
  115. }()
  116. }
  117. }
  118. }
  119. return nil
  120. },
  121. },
  122. {
  123. Name: "create",
  124. Aliases: []string{"create"},
  125. Usage: "create stream $stream_name | create stream $stream_name -f $stream_def_file | create rule $rule_name $rule_json | create rule $rule_name -f $rule_def_file | create plugin $plugin_type $plugin_name $plugin_json | create plugin $plugin_type $plugin_name -f $plugin_def_file",
  126. Subcommands: []cli.Command{
  127. {
  128. Name: "stream",
  129. Usage: "create stream $stream_name [-f stream_def_file]",
  130. Flags: []cli.Flag{
  131. cli.StringFlag{
  132. Name: "file, f",
  133. Usage: "the location of stream definition file",
  134. FilePath: "/home/mystream.txt",
  135. },
  136. },
  137. Action: func(c *cli.Context) error {
  138. sfile := c.String("file")
  139. if sfile != "" {
  140. if stream, err := readDef(sfile, "stream"); err != nil {
  141. fmt.Printf("%s", err)
  142. return nil
  143. } else {
  144. args := strings.Join([]string{"CREATE STREAM ", string(stream)}, " ")
  145. streamProcess(client, args)
  146. return nil
  147. }
  148. } else {
  149. streamProcess(client, "")
  150. return nil
  151. }
  152. },
  153. },
  154. {
  155. Name: "rule",
  156. Usage: "create rule $rule_name [$rule_json | -f rule_def_file]",
  157. Flags: []cli.Flag{
  158. cli.StringFlag{
  159. Name: "file, f",
  160. Usage: "the location of rule definition file",
  161. FilePath: "/home/myrule.txt",
  162. },
  163. },
  164. Action: func(c *cli.Context) error {
  165. sfile := c.String("file")
  166. if sfile != "" {
  167. if rule, err := readDef(sfile, "rule"); err != nil {
  168. fmt.Printf("%s", err)
  169. return nil
  170. } else {
  171. if len(c.Args()) != 1 {
  172. fmt.Printf("Expect rule name.\n")
  173. return nil
  174. }
  175. rname := c.Args()[0]
  176. var reply string
  177. args := &common.RuleDesc{rname, string(rule)}
  178. err = client.Call("Server.CreateRule", args, &reply)
  179. if err != nil {
  180. fmt.Println(err)
  181. } else {
  182. fmt.Println(reply)
  183. }
  184. }
  185. return nil
  186. } else {
  187. if len(c.Args()) != 2 {
  188. fmt.Printf("Expect rule name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  189. return nil
  190. }
  191. rname := c.Args()[0]
  192. rjson := c.Args()[1]
  193. var reply string
  194. args := &common.RuleDesc{rname, rjson}
  195. err = client.Call("Server.CreateRule", args, &reply)
  196. if err != nil {
  197. fmt.Println(err)
  198. } else {
  199. fmt.Println(reply)
  200. }
  201. return nil
  202. }
  203. },
  204. },
  205. {
  206. Name: "plugin",
  207. Usage: "create plugin $plugin_type $plugin_name [$plugin_json | -f plugin_def_file]",
  208. Flags: []cli.Flag{
  209. cli.StringFlag{
  210. Name: "file, f",
  211. Usage: "the location of plugin definition file",
  212. FilePath: "/home/myplugin.txt",
  213. },
  214. },
  215. Action: func(c *cli.Context) error {
  216. if len(c.Args()) < 2 {
  217. fmt.Printf("Expect plugin type and name.\n")
  218. return nil
  219. }
  220. ptype, err := getPluginType(c.Args()[0])
  221. if err != nil {
  222. fmt.Printf("%s\n", err)
  223. return nil
  224. }
  225. pname := c.Args()[1]
  226. sfile := c.String("file")
  227. args := &common.PluginDesc{
  228. RuleDesc: common.RuleDesc{
  229. Name: pname,
  230. },
  231. Type: ptype,
  232. }
  233. if sfile != "" {
  234. if len(c.Args()) != 2 {
  235. fmt.Printf("Expect plugin type, name.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  236. return nil
  237. }
  238. if p, err := readDef(sfile, "plugin"); err != nil {
  239. fmt.Printf("%s", err)
  240. return nil
  241. } else {
  242. args.Json = string(p)
  243. }
  244. } else {
  245. if len(c.Args()) != 3 {
  246. fmt.Printf("Expect plugin type, name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  247. return nil
  248. }
  249. args.Json = c.Args()[2]
  250. }
  251. var reply string
  252. err = client.Call("Server.CreatePlugin", args, &reply)
  253. if err != nil {
  254. fmt.Println(err)
  255. } else {
  256. fmt.Println(reply)
  257. }
  258. return nil
  259. },
  260. },
  261. },
  262. },
  263. {
  264. Name: "describe",
  265. Aliases: []string{"describe"},
  266. Usage: "describe stream $stream_name | describe rule $rule_name | describe plugin $plugin_type $plugin_name",
  267. Subcommands: []cli.Command{
  268. {
  269. Name: "stream",
  270. Usage: "describe stream $stream_name",
  271. //Flags: nflag,
  272. Action: func(c *cli.Context) error {
  273. streamProcess(client, "")
  274. return nil
  275. },
  276. },
  277. {
  278. Name: "rule",
  279. Usage: "describe rule $rule_name",
  280. Action: func(c *cli.Context) error {
  281. if len(c.Args()) != 1 {
  282. fmt.Printf("Expect rule name.\n")
  283. return nil
  284. }
  285. rname := c.Args()[0]
  286. var reply string
  287. err = client.Call("Server.DescRule", rname, &reply)
  288. if err != nil {
  289. fmt.Println(err)
  290. } else {
  291. fmt.Println(reply)
  292. }
  293. return nil
  294. },
  295. },
  296. {
  297. Name: "plugin",
  298. Usage: "describe plugin $plugin_type $plugin_name",
  299. //Flags: nflag,
  300. Action: func(c *cli.Context) error {
  301. ptype, err := getPluginType(c.Args()[0])
  302. if err != nil {
  303. fmt.Printf("%s\n", err)
  304. return nil
  305. }
  306. pname := c.Args()[1]
  307. args := &common.PluginDesc{
  308. RuleDesc: common.RuleDesc{
  309. Name: pname,
  310. },
  311. Type: ptype,
  312. }
  313. var reply string
  314. err = client.Call("Server.DescPlugin", args, &reply)
  315. if err != nil {
  316. fmt.Println(err)
  317. } else {
  318. fmt.Println(reply)
  319. }
  320. return nil
  321. },
  322. },
  323. },
  324. },
  325. {
  326. Name: "drop",
  327. Aliases: []string{"drop"},
  328. Usage: "drop stream $stream_name | drop rule $rule_name | drop plugin $plugin_type $plugin_name -r $stop",
  329. Subcommands: []cli.Command{
  330. {
  331. Name: "stream",
  332. Usage: "drop stream $stream_name",
  333. //Flags: nflag,
  334. Action: func(c *cli.Context) error {
  335. streamProcess(client, "")
  336. return nil
  337. },
  338. },
  339. {
  340. Name: "rule",
  341. Usage: "drop rule $rule_name",
  342. //Flags: nflag,
  343. Action: func(c *cli.Context) error {
  344. if len(c.Args()) != 1 {
  345. fmt.Printf("Expect rule name.\n")
  346. return nil
  347. }
  348. rname := c.Args()[0]
  349. var reply string
  350. err = client.Call("Server.DropRule", rname, &reply)
  351. if err != nil {
  352. fmt.Println(err)
  353. } else {
  354. fmt.Println(reply)
  355. }
  356. return nil
  357. },
  358. },
  359. {
  360. Name: "plugin",
  361. Usage: "drop plugin $plugin_type $plugin_name -s stop",
  362. Flags: []cli.Flag{
  363. cli.StringFlag{
  364. Name: "stop, s",
  365. Usage: "stop kuiper after the action",
  366. },
  367. },
  368. Action: func(c *cli.Context) error {
  369. r := c.String("stop")
  370. if r != "true" && r != "false" {
  371. fmt.Printf("Expect r to be a boolean value.\n")
  372. return nil
  373. }
  374. if len(c.Args()) < 2 || len(c.Args()) > 3 {
  375. fmt.Printf("Expect plugin type and name.\n")
  376. return nil
  377. }
  378. ptype, err := getPluginType(c.Args()[0])
  379. if err != nil {
  380. fmt.Printf("%s\n", err)
  381. return nil
  382. }
  383. pname := c.Args()[1]
  384. args := &common.PluginDesc{
  385. RuleDesc: common.RuleDesc{
  386. Name: pname,
  387. },
  388. Type: ptype,
  389. Stop: r == "true",
  390. }
  391. var reply string
  392. err = client.Call("Server.DropPlugin", args, &reply)
  393. if err != nil {
  394. fmt.Println(err)
  395. } else {
  396. fmt.Println(reply)
  397. }
  398. return nil
  399. },
  400. },
  401. },
  402. },
  403. {
  404. Name: "show",
  405. Aliases: []string{"show"},
  406. Usage: "show streams | show rules | show plugins $plugin_type",
  407. Subcommands: []cli.Command{
  408. {
  409. Name: "streams",
  410. Usage: "show streams",
  411. Action: func(c *cli.Context) error {
  412. streamProcess(client, "")
  413. return nil
  414. },
  415. },
  416. {
  417. Name: "rules",
  418. Usage: "show rules",
  419. Action: func(c *cli.Context) error {
  420. var reply string
  421. err = client.Call("Server.ShowRules", 0, &reply)
  422. if err != nil {
  423. fmt.Println(err)
  424. } else {
  425. fmt.Println(reply)
  426. }
  427. return nil
  428. },
  429. },
  430. {
  431. Name: "plugins",
  432. Usage: "show plugins $plugin_type",
  433. Action: func(c *cli.Context) error {
  434. if len(c.Args()) != 1 {
  435. fmt.Printf("Expect plugin type.\n")
  436. return nil
  437. }
  438. ptype, err := getPluginType(c.Args()[0])
  439. if err != nil {
  440. fmt.Printf("%s\n", err)
  441. return nil
  442. }
  443. var reply string
  444. err = client.Call("Server.ShowPlugins", ptype, &reply)
  445. if err != nil {
  446. fmt.Println(err)
  447. } else {
  448. fmt.Println(reply)
  449. }
  450. return nil
  451. },
  452. },
  453. },
  454. },
  455. {
  456. Name: "getstatus",
  457. Aliases: []string{"getstatus"},
  458. Usage: "getstatus rule $rule_name",
  459. Subcommands: []cli.Command{
  460. {
  461. Name: "rule",
  462. Usage: "getstatus rule $rule_name",
  463. //Flags: nflag,
  464. Action: func(c *cli.Context) error {
  465. if len(c.Args()) != 1 {
  466. fmt.Printf("Expect rule name.\n")
  467. return nil
  468. }
  469. rname := c.Args()[0]
  470. var reply string
  471. err = client.Call("Server.GetStatusRule", rname, &reply)
  472. if err != nil {
  473. fmt.Println(err)
  474. } else {
  475. fmt.Println(reply)
  476. }
  477. return nil
  478. },
  479. },
  480. },
  481. },
  482. {
  483. Name: "start",
  484. Aliases: []string{"start"},
  485. Usage: "start rule $rule_name",
  486. Subcommands: []cli.Command{
  487. {
  488. Name: "rule",
  489. Usage: "start rule $rule_name",
  490. //Flags: nflag,
  491. Action: func(c *cli.Context) error {
  492. if len(c.Args()) != 1 {
  493. fmt.Printf("Expect rule name.\n")
  494. return nil
  495. }
  496. rname := c.Args()[0]
  497. var reply string
  498. err = client.Call("Server.StartRule", rname, &reply)
  499. if err != nil {
  500. fmt.Println(err)
  501. } else {
  502. fmt.Println(reply)
  503. }
  504. return nil
  505. },
  506. },
  507. },
  508. },
  509. {
  510. Name: "stop",
  511. Aliases: []string{"stop"},
  512. Usage: "stop rule $rule_name",
  513. Subcommands: []cli.Command{
  514. {
  515. Name: "rule",
  516. Usage: "stop rule $rule_name",
  517. //Flags: nflag,
  518. Action: func(c *cli.Context) error {
  519. if len(c.Args()) != 1 {
  520. fmt.Printf("Expect rule name.\n")
  521. return nil
  522. }
  523. rname := c.Args()[0]
  524. var reply string
  525. err = client.Call("Server.StopRule", rname, &reply)
  526. if err != nil {
  527. fmt.Println(err)
  528. } else {
  529. fmt.Println(reply)
  530. }
  531. return nil
  532. },
  533. },
  534. },
  535. },
  536. {
  537. Name: "restart",
  538. Aliases: []string{"restart"},
  539. Usage: "restart rule $rule_name",
  540. Subcommands: []cli.Command{
  541. {
  542. Name: "rule",
  543. Usage: "restart rule $rule_name",
  544. //Flags: nflag,
  545. Action: func(c *cli.Context) error {
  546. if len(c.Args()) != 1 {
  547. fmt.Printf("Expect rule name.\n")
  548. return nil
  549. }
  550. rname := c.Args()[0]
  551. var reply string
  552. err = client.Call("Server.RestartRule", rname, &reply)
  553. if err != nil {
  554. fmt.Println(err)
  555. } else {
  556. fmt.Println(reply)
  557. }
  558. return nil
  559. },
  560. },
  561. },
  562. },
  563. }
  564. app.Name = "Kuiper"
  565. app.Usage = "The command line tool for EMQ X Kuiper."
  566. app.Action = func(c *cli.Context) error {
  567. cli.ShowSubcommandHelp(c)
  568. //cli.ShowVersion(c)
  569. return nil
  570. }
  571. sort.Sort(cli.FlagsByName(app.Flags))
  572. sort.Sort(cli.CommandsByName(app.Commands))
  573. err = app.Run(os.Args)
  574. if err != nil {
  575. fmt.Printf("%v", err)
  576. }
  577. }
  578. func getPluginType(arg string) (ptype int, err error) {
  579. switch arg {
  580. case "source":
  581. ptype = 0
  582. case "sink":
  583. ptype = 1
  584. case "function":
  585. ptype = 2
  586. default:
  587. err = fmt.Errorf("Invalid plugin type %s, should be \"source\", \"sink\" or \"function\".\n", arg)
  588. }
  589. return
  590. }
  591. func readDef(sfile string, t string) ([]byte, error) {
  592. if _, err := os.Stat(sfile); os.IsNotExist(err) {
  593. return nil, fmt.Errorf("The specified %s defenition file %s is not existed.\n", t, sfile)
  594. }
  595. fmt.Printf("Creating a new %s from file %s.\n", t, sfile)
  596. if rule, err := ioutil.ReadFile(sfile); err != nil {
  597. return nil, fmt.Errorf("Failed to read from %s definition file %s.\n", t, sfile)
  598. } else {
  599. return rule, nil
  600. }
  601. }