main.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586
  1. package main
  2. import (
  3. "bufio"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/go-yaml/yaml"
  7. "github.com/urfave/cli"
  8. "io/ioutil"
  9. "net/rpc"
  10. "os"
  11. "sort"
  12. "strings"
  13. "time"
  14. )
  15. type clientConf struct {
  16. Host string `yaml:"host"`
  17. Port int `yaml:"port"`
  18. }
  19. var clientYaml = "client.yaml"
  20. func streamProcess(client *rpc.Client, args string) {
  21. var reply string
  22. if args == "" {
  23. args = strings.Join(os.Args[1:], " ")
  24. }
  25. err := client.Call("Server.Stream", args, &reply)
  26. if err != nil {
  27. fmt.Println(err)
  28. } else {
  29. fmt.Println(reply)
  30. }
  31. }
  32. var Version string = "unknown"
  33. func main() {
  34. app := cli.NewApp()
  35. app.Version = Version
  36. //nflag := []cli.Flag { cli.StringFlag{
  37. // Name: "name, n",
  38. // Usage: "the name of stream",
  39. // }}
  40. b, err := common.LoadConf(clientYaml)
  41. if err != nil {
  42. common.Log.Fatal(err)
  43. }
  44. var cfg map[string]clientConf
  45. var config *clientConf
  46. if err := yaml.Unmarshal(b, &cfg); err != nil {
  47. fmt.Printf("Failed to load config file with error %s.\n", err)
  48. } else {
  49. c, ok := cfg["basic"]
  50. if !ok {
  51. fmt.Printf("No basic config in client.yaml, will use the default configuration.\n")
  52. } else {
  53. config = &c
  54. }
  55. }
  56. if config == nil {
  57. config = &clientConf{
  58. Host: "127.0.0.1",
  59. Port: 20498,
  60. }
  61. }
  62. fmt.Printf("Connecting to %s:%d... \n", config.Host, config.Port)
  63. // Create a TCP connection to localhost on port 1234
  64. client, err := rpc.DialHTTP("tcp", fmt.Sprintf("%s:%d", config.Host, config.Port))
  65. if err != nil {
  66. fmt.Printf("Failed to connect the server, please start the server.\n")
  67. return
  68. }
  69. app.Commands = []cli.Command{
  70. {
  71. Name: "query",
  72. Aliases: []string{"query"},
  73. Usage: "query command line",
  74. Action: func(c *cli.Context) error {
  75. reader := bufio.NewReader(os.Stdin)
  76. var inputs []string
  77. ticker := time.NewTicker(time.Millisecond * 300)
  78. defer ticker.Stop()
  79. for {
  80. fmt.Print("kuiper > ")
  81. text, _ := reader.ReadString('\n')
  82. inputs = append(inputs, text)
  83. // convert CRLF to LF
  84. text = strings.Replace(text, "\n", "", -1)
  85. if strings.ToLower(text) == "quit" || strings.ToLower(text) == "exit" {
  86. break
  87. } else if strings.Trim(text, " ") == "" {
  88. continue
  89. } else {
  90. var reply string
  91. err := client.Call("Server.CreateQuery", text, &reply)
  92. if err != nil {
  93. fmt.Println(err)
  94. continue
  95. } else {
  96. fmt.Println(reply)
  97. go func() {
  98. for {
  99. <-ticker.C
  100. var result string
  101. e := client.Call("Server.GetQueryResult", "", &result)
  102. if e != nil {
  103. fmt.Println(e)
  104. fmt.Print("kuiper > ")
  105. return
  106. }
  107. if result != "" {
  108. fmt.Println(result)
  109. }
  110. }
  111. }()
  112. }
  113. }
  114. }
  115. return nil
  116. },
  117. },
  118. {
  119. Name: "create",
  120. Aliases: []string{"create"},
  121. Usage: "create stream $stream_name | create stream $stream_name -f $stream_def_file | create rule $rule_name $rule_json | create rule $rule_name -f $rule_def_file",
  122. Subcommands: []cli.Command{
  123. {
  124. Name: "stream",
  125. Usage: "create stream $stream_name [-f stream_def_file]",
  126. Flags: []cli.Flag{
  127. cli.StringFlag{
  128. Name: "file, f",
  129. Usage: "the location of stream definition file",
  130. FilePath: "/home/mystream.txt",
  131. },
  132. },
  133. Action: func(c *cli.Context) error {
  134. sfile := c.String("file")
  135. if sfile != "" {
  136. if stream, err := readDef(sfile, "stream"); err != nil {
  137. fmt.Printf("%s", err)
  138. return nil
  139. } else {
  140. args := strings.Join([]string{"CREATE STREAM ", string(stream)}, " ")
  141. streamProcess(client, args)
  142. return nil
  143. }
  144. } else {
  145. streamProcess(client, "")
  146. return nil
  147. }
  148. },
  149. },
  150. {
  151. Name: "rule",
  152. Usage: "create rule $rule_name [$rule_json | -f rule_def_file]",
  153. Flags: []cli.Flag{
  154. cli.StringFlag{
  155. Name: "file, f",
  156. Usage: "the location of rule definition file",
  157. FilePath: "/home/myrule.txt",
  158. },
  159. },
  160. Action: func(c *cli.Context) error {
  161. sfile := c.String("file")
  162. if sfile != "" {
  163. if rule, err := readDef(sfile, "rule"); err != nil {
  164. fmt.Printf("%s", err)
  165. return nil
  166. } else {
  167. if len(c.Args()) != 1 {
  168. fmt.Printf("Expect rule name.\n")
  169. return nil
  170. }
  171. rname := c.Args()[0]
  172. var reply string
  173. args := &common.RuleDesc{rname, string(rule)}
  174. err = client.Call("Server.CreateRule", args, &reply)
  175. if err != nil {
  176. fmt.Println(err)
  177. } else {
  178. fmt.Println(reply)
  179. }
  180. }
  181. return nil
  182. } else {
  183. if len(c.Args()) != 2 {
  184. fmt.Printf("Expect rule name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  185. return nil
  186. }
  187. rname := c.Args()[0]
  188. rjson := c.Args()[1]
  189. var reply string
  190. args := &common.RuleDesc{rname, rjson}
  191. err = client.Call("Server.CreateRule", args, &reply)
  192. if err != nil {
  193. fmt.Println(err)
  194. } else {
  195. fmt.Println(reply)
  196. }
  197. return nil
  198. }
  199. },
  200. },
  201. {
  202. Name: "plugin",
  203. Usage: "create plugin $plugin_type $plugin_name [$plugin_json | -f rule_def_file]",
  204. Flags: []cli.Flag{
  205. cli.StringFlag{
  206. Name: "file, f",
  207. Usage: "the location of plugin definition file",
  208. FilePath: "/home/myplugin.txt",
  209. },
  210. },
  211. Action: func(c *cli.Context) error {
  212. if len(c.Args()) < 2 {
  213. fmt.Printf("Expect plugin type and name.\n")
  214. return nil
  215. }
  216. ptype, err := getPluginType(c.Args()[0])
  217. if err != nil {
  218. fmt.Printf("%s\n", err)
  219. return nil
  220. }
  221. pname := c.Args()[1]
  222. sfile := c.String("file")
  223. args := &common.PluginDesc{
  224. RuleDesc: common.RuleDesc{
  225. Name: pname,
  226. },
  227. Type: ptype,
  228. }
  229. if sfile != "" {
  230. if len(c.Args()) != 2 {
  231. fmt.Printf("Expect plugin type, name.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  232. return nil
  233. }
  234. if p, err := readDef(sfile, "plugin"); err != nil {
  235. fmt.Printf("%s", err)
  236. return nil
  237. } else {
  238. args.Json = string(p)
  239. }
  240. } else {
  241. if len(c.Args()) != 3 {
  242. fmt.Printf("Expect plugin type, name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  243. return nil
  244. }
  245. args.Json = c.Args()[2]
  246. }
  247. var reply string
  248. err = client.Call("Server.CreatePlugin", args, &reply)
  249. if err != nil {
  250. fmt.Println(err)
  251. } else {
  252. fmt.Println(reply)
  253. }
  254. return nil
  255. },
  256. },
  257. },
  258. },
  259. {
  260. Name: "describe",
  261. Aliases: []string{"describe"},
  262. Usage: "describe stream $stream_name | describe rule $rule_name",
  263. Subcommands: []cli.Command{
  264. {
  265. Name: "stream",
  266. Usage: "describe stream $stream_name",
  267. //Flags: nflag,
  268. Action: func(c *cli.Context) error {
  269. streamProcess(client, "")
  270. return nil
  271. },
  272. },
  273. {
  274. Name: "rule",
  275. Usage: "describe rule $rule_name",
  276. Action: func(c *cli.Context) error {
  277. if len(c.Args()) != 1 {
  278. fmt.Printf("Expect rule name.\n")
  279. return nil
  280. }
  281. rname := c.Args()[0]
  282. var reply string
  283. err = client.Call("Server.DescRule", rname, &reply)
  284. if err != nil {
  285. fmt.Println(err)
  286. } else {
  287. fmt.Println(reply)
  288. }
  289. return nil
  290. },
  291. },
  292. },
  293. },
  294. {
  295. Name: "drop",
  296. Aliases: []string{"drop"},
  297. Usage: "drop stream $stream_name | drop rule $rule_name",
  298. Subcommands: []cli.Command{
  299. {
  300. Name: "stream",
  301. Usage: "drop stream $stream_name",
  302. //Flags: nflag,
  303. Action: func(c *cli.Context) error {
  304. streamProcess(client, "")
  305. return nil
  306. },
  307. },
  308. {
  309. Name: "rule",
  310. Usage: "drop rule $rule_name",
  311. //Flags: nflag,
  312. Action: func(c *cli.Context) error {
  313. if len(c.Args()) != 1 {
  314. fmt.Printf("Expect rule name.\n")
  315. return nil
  316. }
  317. rname := c.Args()[0]
  318. var reply string
  319. err = client.Call("Server.DropRule", rname, &reply)
  320. if err != nil {
  321. fmt.Println(err)
  322. } else {
  323. fmt.Println(reply)
  324. }
  325. return nil
  326. },
  327. },
  328. {
  329. Name: "plugin",
  330. Usage: "drop plugin $plugin_type $plugin_name $plugin_json",
  331. //Flags: nflag,
  332. Action: func(c *cli.Context) error {
  333. if len(c.Args()) < 2 || len(c.Args()) > 3 {
  334. fmt.Printf("Expect plugin type and name.\n")
  335. return nil
  336. }
  337. ptype, err := getPluginType(c.Args()[0])
  338. if err != nil {
  339. fmt.Printf("%s\n", err)
  340. return nil
  341. }
  342. pname := c.Args()[1]
  343. args := &common.PluginDesc{
  344. RuleDesc: common.RuleDesc{
  345. Name: pname,
  346. },
  347. Type: ptype,
  348. }
  349. if len(c.Args()) == 3 {
  350. args.Json = c.Args()[2]
  351. }
  352. var reply string
  353. err = client.Call("Server.DropPlugin", args, &reply)
  354. if err != nil {
  355. fmt.Println(err)
  356. } else {
  357. fmt.Println(reply)
  358. }
  359. return nil
  360. },
  361. },
  362. },
  363. },
  364. {
  365. Name: "show",
  366. Aliases: []string{"show"},
  367. Usage: "show streams | show rules | show plugins $plugin_type",
  368. Subcommands: []cli.Command{
  369. {
  370. Name: "streams",
  371. Usage: "show streams",
  372. Action: func(c *cli.Context) error {
  373. streamProcess(client, "")
  374. return nil
  375. },
  376. },
  377. {
  378. Name: "rules",
  379. Usage: "show rules",
  380. Action: func(c *cli.Context) error {
  381. var reply string
  382. err = client.Call("Server.ShowRules", 0, &reply)
  383. if err != nil {
  384. fmt.Println(err)
  385. } else {
  386. fmt.Println(reply)
  387. }
  388. return nil
  389. },
  390. },
  391. {
  392. Name: "plugins",
  393. Usage: "show plugins $plugin_type",
  394. Action: func(c *cli.Context) error {
  395. if len(c.Args()) != 1 {
  396. fmt.Printf("Expect plugin type.\n")
  397. return nil
  398. }
  399. ptype, err := getPluginType(c.Args()[0])
  400. if err != nil {
  401. fmt.Printf("%s\n", err)
  402. return nil
  403. }
  404. var reply string
  405. err = client.Call("Server.ShowPlugins", ptype, &reply)
  406. if err != nil {
  407. fmt.Println(err)
  408. } else {
  409. fmt.Println(reply)
  410. }
  411. return nil
  412. },
  413. },
  414. },
  415. },
  416. {
  417. Name: "getstatus",
  418. Aliases: []string{"getstatus"},
  419. Usage: "getstatus rule $rule_name",
  420. Subcommands: []cli.Command{
  421. {
  422. Name: "rule",
  423. Usage: "getstatus rule $rule_name",
  424. //Flags: nflag,
  425. Action: func(c *cli.Context) error {
  426. if len(c.Args()) != 1 {
  427. fmt.Printf("Expect rule name.\n")
  428. return nil
  429. }
  430. rname := c.Args()[0]
  431. var reply string
  432. err = client.Call("Server.GetStatusRule", rname, &reply)
  433. if err != nil {
  434. fmt.Println(err)
  435. } else {
  436. fmt.Println(reply)
  437. }
  438. return nil
  439. },
  440. },
  441. },
  442. },
  443. {
  444. Name: "start",
  445. Aliases: []string{"start"},
  446. Usage: "start rule $rule_name",
  447. Subcommands: []cli.Command{
  448. {
  449. Name: "rule",
  450. Usage: "start rule $rule_name",
  451. //Flags: nflag,
  452. Action: func(c *cli.Context) error {
  453. if len(c.Args()) != 1 {
  454. fmt.Printf("Expect rule name.\n")
  455. return nil
  456. }
  457. rname := c.Args()[0]
  458. var reply string
  459. err = client.Call("Server.StartRule", rname, &reply)
  460. if err != nil {
  461. fmt.Println(err)
  462. } else {
  463. fmt.Println(reply)
  464. }
  465. return nil
  466. },
  467. },
  468. },
  469. },
  470. {
  471. Name: "stop",
  472. Aliases: []string{"stop"},
  473. Usage: "stop rule $rule_name",
  474. Subcommands: []cli.Command{
  475. {
  476. Name: "rule",
  477. Usage: "stop rule $rule_name",
  478. //Flags: nflag,
  479. Action: func(c *cli.Context) error {
  480. if len(c.Args()) != 1 {
  481. fmt.Printf("Expect rule name.\n")
  482. return nil
  483. }
  484. rname := c.Args()[0]
  485. var reply string
  486. err = client.Call("Server.StopRule", rname, &reply)
  487. if err != nil {
  488. fmt.Println(err)
  489. } else {
  490. fmt.Println(reply)
  491. }
  492. return nil
  493. },
  494. },
  495. },
  496. },
  497. {
  498. Name: "restart",
  499. Aliases: []string{"restart"},
  500. Usage: "restart rule $rule_name",
  501. Subcommands: []cli.Command{
  502. {
  503. Name: "rule",
  504. Usage: "restart rule $rule_name",
  505. //Flags: nflag,
  506. Action: func(c *cli.Context) error {
  507. if len(c.Args()) != 1 {
  508. fmt.Printf("Expect rule name.\n")
  509. return nil
  510. }
  511. rname := c.Args()[0]
  512. var reply string
  513. err = client.Call("Server.RestartRule", rname, &reply)
  514. if err != nil {
  515. fmt.Println(err)
  516. } else {
  517. fmt.Println(reply)
  518. }
  519. return nil
  520. },
  521. },
  522. },
  523. },
  524. }
  525. app.Name = "Kuiper"
  526. app.Usage = "The command line tool for EMQ X Kuiper."
  527. app.Action = func(c *cli.Context) error {
  528. cli.ShowSubcommandHelp(c)
  529. //cli.ShowVersion(c)
  530. return nil
  531. }
  532. sort.Sort(cli.FlagsByName(app.Flags))
  533. sort.Sort(cli.CommandsByName(app.Commands))
  534. err = app.Run(os.Args)
  535. if err != nil {
  536. fmt.Printf("%v", err)
  537. }
  538. }
  539. func getPluginType(arg string) (ptype int, err error) {
  540. switch arg {
  541. case "source":
  542. ptype = 0
  543. case "sink":
  544. ptype = 1
  545. case "function":
  546. ptype = 2
  547. default:
  548. err = fmt.Errorf("Invalid plugin type %s, should be \"source\", \"sink\" or \"function\".\n", arg)
  549. }
  550. return
  551. }
  552. func readDef(sfile string, t string) ([]byte, error) {
  553. if _, err := os.Stat(sfile); os.IsNotExist(err) {
  554. return nil, fmt.Errorf("The specified %s defenition file %s is not existed.\n", t, sfile)
  555. }
  556. fmt.Printf("Creating a new %s from file %s.\n", t, sfile)
  557. if rule, err := ioutil.ReadFile(sfile); err != nil {
  558. return nil, fmt.Errorf("Failed to read from %s definition file %s.\n", t, sfile)
  559. } else {
  560. return rule, nil
  561. }
  562. }