main.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911
  1. package main
  2. import (
  3. "bufio"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/go-yaml/yaml"
  7. "github.com/urfave/cli"
  8. "io/ioutil"
  9. "net/rpc"
  10. "os"
  11. "sort"
  12. "strings"
  13. "time"
  14. )
  15. type clientConf struct {
  16. Host string `yaml:"host"`
  17. Port int `yaml:"port"`
  18. }
  19. var clientYaml = "client.yaml"
  20. func streamProcess(client *rpc.Client, args string) {
  21. var reply string
  22. if args == "" {
  23. args = strings.Join(os.Args[1:], " ")
  24. }
  25. err := client.Call("Server.Stream", args, &reply)
  26. if err != nil {
  27. fmt.Println(err)
  28. } else {
  29. fmt.Println(reply)
  30. }
  31. }
  32. var (
  33. Version = "unknown"
  34. LoadFileType = "relative"
  35. )
  36. func main() {
  37. common.LoadFileType = LoadFileType
  38. app := cli.NewApp()
  39. app.Version = Version
  40. //nflag := []cli.Flag { cli.StringFlag{
  41. // Name: "name, n",
  42. // Usage: "the name of stream",
  43. // }}
  44. b, err := common.LoadConf(clientYaml)
  45. if err != nil {
  46. common.Log.Fatal(err)
  47. }
  48. var cfg map[string]clientConf
  49. var config *clientConf
  50. if err := yaml.Unmarshal(b, &cfg); err != nil {
  51. fmt.Printf("Failed to load config file with error %s.\n", err)
  52. } else {
  53. c, ok := cfg["basic"]
  54. if !ok {
  55. fmt.Printf("No basic config in client.yaml, will use the default configuration.\n")
  56. } else {
  57. config = &c
  58. }
  59. }
  60. if config == nil {
  61. config = &clientConf{
  62. Host: "127.0.0.1",
  63. Port: 20498,
  64. }
  65. }
  66. fmt.Printf("Connecting to %s:%d... \n", config.Host, config.Port)
  67. // Create a TCP connection to localhost on port 1234
  68. client, err := rpc.DialHTTP("tcp", fmt.Sprintf("%s:%d", config.Host, config.Port))
  69. if err != nil {
  70. fmt.Printf("Failed to connect the server, please start the server.\n")
  71. return
  72. }
  73. app.Commands = []cli.Command{
  74. {
  75. Name: "query",
  76. Aliases: []string{"query"},
  77. Usage: "query command line",
  78. Action: func(c *cli.Context) error {
  79. reader := bufio.NewReader(os.Stdin)
  80. var inputs []string
  81. ticker := time.NewTicker(time.Millisecond * 300)
  82. defer ticker.Stop()
  83. for {
  84. fmt.Print("kuiper > ")
  85. text, _ := reader.ReadString('\n')
  86. inputs = append(inputs, text)
  87. // convert CRLF to LF
  88. text = strings.Replace(text, "\n", "", -1)
  89. if strings.ToLower(text) == "quit" || strings.ToLower(text) == "exit" {
  90. break
  91. } else if strings.Trim(text, " ") == "" {
  92. continue
  93. } else {
  94. var reply string
  95. err := client.Call("Server.CreateQuery", text, &reply)
  96. if err != nil {
  97. fmt.Println(err)
  98. continue
  99. } else {
  100. fmt.Println(reply)
  101. go func() {
  102. for {
  103. <-ticker.C
  104. var result string
  105. e := client.Call("Server.GetQueryResult", "", &result)
  106. if e != nil {
  107. fmt.Println(e)
  108. fmt.Print("kuiper > ")
  109. return
  110. }
  111. if result != "" {
  112. fmt.Println(result)
  113. }
  114. }
  115. }()
  116. }
  117. }
  118. }
  119. return nil
  120. },
  121. },
  122. {
  123. Name: "create",
  124. Aliases: []string{"create"},
  125. Usage: "create stream $stream_name | create stream $stream_name -f $stream_def_file | create table $table_name | create table $table_name -f $table_def_file| create rule $rule_name $rule_json | create rule $rule_name -f $rule_def_file | create plugin $plugin_type $plugin_name $plugin_json | create plugin $plugin_type $plugin_name -f $plugin_def_file | create service $service_name $service_json ",
  126. Subcommands: []cli.Command{
  127. {
  128. Name: "stream",
  129. Usage: "create stream $stream_name [-f stream_def_file]",
  130. Flags: []cli.Flag{
  131. cli.StringFlag{
  132. Name: "file, f",
  133. Usage: "the location of stream definition file",
  134. FilePath: "/home/mystream.txt",
  135. },
  136. },
  137. Action: func(c *cli.Context) error {
  138. sfile := c.String("file")
  139. if sfile != "" {
  140. if stream, err := readDef(sfile, "stream"); err != nil {
  141. fmt.Printf("%s", err)
  142. return nil
  143. } else {
  144. args := strings.Join([]string{"CREATE STREAM ", string(stream)}, " ")
  145. streamProcess(client, args)
  146. return nil
  147. }
  148. } else {
  149. streamProcess(client, "")
  150. return nil
  151. }
  152. },
  153. },
  154. {
  155. Name: "table",
  156. Usage: "create table $table_name [-f table_def_file]",
  157. Flags: []cli.Flag{
  158. cli.StringFlag{
  159. Name: "file, f",
  160. Usage: "the location of table definition file",
  161. FilePath: "/home/mytable.txt",
  162. },
  163. },
  164. Action: func(c *cli.Context) error {
  165. sfile := c.String("file")
  166. if sfile != "" {
  167. if stream, err := readDef(sfile, "table"); err != nil {
  168. fmt.Printf("%s", err)
  169. return nil
  170. } else {
  171. args := strings.Join([]string{"CREATE TABLE ", string(stream)}, " ")
  172. streamProcess(client, args)
  173. return nil
  174. }
  175. } else {
  176. streamProcess(client, "")
  177. return nil
  178. }
  179. },
  180. },
  181. {
  182. Name: "rule",
  183. Usage: "create rule $rule_name [$rule_json | -f rule_def_file]",
  184. Flags: []cli.Flag{
  185. cli.StringFlag{
  186. Name: "file, f",
  187. Usage: "the location of rule definition file",
  188. FilePath: "/home/myrule.txt",
  189. },
  190. },
  191. Action: func(c *cli.Context) error {
  192. sfile := c.String("file")
  193. if sfile != "" {
  194. if rule, err := readDef(sfile, "rule"); err != nil {
  195. fmt.Printf("%s", err)
  196. return nil
  197. } else {
  198. if len(c.Args()) != 1 {
  199. fmt.Printf("Expect rule name.\n")
  200. return nil
  201. }
  202. rname := c.Args()[0]
  203. var reply string
  204. args := &common.RPCArgDesc{rname, string(rule)}
  205. err = client.Call("Server.CreateRule", args, &reply)
  206. if err != nil {
  207. fmt.Println(err)
  208. } else {
  209. fmt.Println(reply)
  210. }
  211. }
  212. return nil
  213. } else {
  214. if len(c.Args()) != 2 {
  215. fmt.Printf("Expect rule name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  216. return nil
  217. }
  218. rname := c.Args()[0]
  219. rjson := c.Args()[1]
  220. var reply string
  221. args := &common.RPCArgDesc{rname, rjson}
  222. err = client.Call("Server.CreateRule", args, &reply)
  223. if err != nil {
  224. fmt.Println(err)
  225. } else {
  226. fmt.Println(reply)
  227. }
  228. return nil
  229. }
  230. },
  231. },
  232. {
  233. Name: "plugin",
  234. Usage: "create plugin $plugin_type $plugin_name [$plugin_json | -f plugin_def_file]",
  235. Flags: []cli.Flag{
  236. cli.StringFlag{
  237. Name: "file, f",
  238. Usage: "the location of plugin definition file",
  239. FilePath: "/home/myplugin.txt",
  240. },
  241. },
  242. Action: func(c *cli.Context) error {
  243. if len(c.Args()) < 2 {
  244. fmt.Printf("Expect plugin type and name.\n")
  245. return nil
  246. }
  247. ptype, err := getPluginType(c.Args()[0])
  248. if err != nil {
  249. fmt.Printf("%s\n", err)
  250. return nil
  251. }
  252. pname := c.Args()[1]
  253. sfile := c.String("file")
  254. args := &common.PluginDesc{
  255. RPCArgDesc: common.RPCArgDesc{
  256. Name: pname,
  257. },
  258. Type: ptype,
  259. }
  260. if sfile != "" {
  261. if len(c.Args()) != 2 {
  262. fmt.Printf("Expect plugin type, name.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  263. return nil
  264. }
  265. if p, err := readDef(sfile, "plugin"); err != nil {
  266. fmt.Printf("%s", err)
  267. return nil
  268. } else {
  269. args.Json = string(p)
  270. }
  271. } else {
  272. if len(c.Args()) != 3 {
  273. fmt.Printf("Expect plugin type, name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  274. return nil
  275. }
  276. args.Json = c.Args()[2]
  277. }
  278. var reply string
  279. err = client.Call("Server.CreatePlugin", args, &reply)
  280. if err != nil {
  281. fmt.Println(err)
  282. } else {
  283. fmt.Println(reply)
  284. }
  285. return nil
  286. },
  287. },
  288. {
  289. Name: "service",
  290. Usage: "create service $service_name $service_json",
  291. Action: func(c *cli.Context) error {
  292. if len(c.Args()) < 2 {
  293. fmt.Printf("Expect service name and json.\n")
  294. return nil
  295. }
  296. var reply string
  297. err = client.Call("Server.CreateService", &common.RPCArgDesc{
  298. Name: c.Args()[0],
  299. Json: c.Args()[1],
  300. }, &reply)
  301. if err != nil {
  302. fmt.Println(err)
  303. } else {
  304. fmt.Println(reply)
  305. }
  306. return nil
  307. },
  308. },
  309. },
  310. },
  311. {
  312. Name: "describe",
  313. Aliases: []string{"describe"},
  314. Usage: "describe stream $stream_name | describe table $table_name | describe rule $rule_name | describe plugin $plugin_type $plugin_name | describe udf $udf_name | describe service $service_name | describe service_func $service_func_name",
  315. Subcommands: []cli.Command{
  316. {
  317. Name: "stream",
  318. Usage: "describe stream $stream_name",
  319. //Flags: nflag,
  320. Action: func(c *cli.Context) error {
  321. streamProcess(client, "")
  322. return nil
  323. },
  324. },
  325. {
  326. Name: "table",
  327. Usage: "describe table $table_name",
  328. //Flags: nflag,
  329. Action: func(c *cli.Context) error {
  330. streamProcess(client, "")
  331. return nil
  332. },
  333. },
  334. {
  335. Name: "rule",
  336. Usage: "describe rule $rule_name",
  337. Action: func(c *cli.Context) error {
  338. if len(c.Args()) != 1 {
  339. fmt.Printf("Expect rule name.\n")
  340. return nil
  341. }
  342. rname := c.Args()[0]
  343. var reply string
  344. err = client.Call("Server.DescRule", rname, &reply)
  345. if err != nil {
  346. fmt.Println(err)
  347. } else {
  348. fmt.Println(reply)
  349. }
  350. return nil
  351. },
  352. },
  353. {
  354. Name: "plugin",
  355. Usage: "describe plugin $plugin_type $plugin_name",
  356. //Flags: nflag,
  357. Action: func(c *cli.Context) error {
  358. ptype, err := getPluginType(c.Args()[0])
  359. if err != nil {
  360. fmt.Printf("%s\n", err)
  361. return nil
  362. }
  363. if len(c.Args()) != 2 {
  364. fmt.Printf("Expect plugin name.\n")
  365. return nil
  366. }
  367. pname := c.Args()[1]
  368. args := &common.PluginDesc{
  369. RPCArgDesc: common.RPCArgDesc{
  370. Name: pname,
  371. },
  372. Type: ptype,
  373. }
  374. var reply string
  375. err = client.Call("Server.DescPlugin", args, &reply)
  376. if err != nil {
  377. fmt.Println(err)
  378. } else {
  379. fmt.Println(reply)
  380. }
  381. return nil
  382. },
  383. },
  384. {
  385. Name: "udf",
  386. Usage: "describe udf $udf_name",
  387. //Flags: nflag,
  388. Action: func(c *cli.Context) error {
  389. if len(c.Args()) != 1 {
  390. fmt.Printf("Expect udf name.\n")
  391. return nil
  392. }
  393. pname := c.Args()[0]
  394. var reply string
  395. err = client.Call("Server.DescUdf", pname, &reply)
  396. if err != nil {
  397. fmt.Println(err)
  398. } else {
  399. fmt.Println(reply)
  400. }
  401. return nil
  402. },
  403. },
  404. {
  405. Name: "service",
  406. Usage: "describe service $service_name",
  407. Action: func(c *cli.Context) error {
  408. if len(c.Args()) != 1 {
  409. fmt.Printf("Expect service name.\n")
  410. return nil
  411. }
  412. name := c.Args()[0]
  413. var reply string
  414. err = client.Call("Server.DescService", name, &reply)
  415. if err != nil {
  416. fmt.Println(err)
  417. } else {
  418. fmt.Println(reply)
  419. }
  420. return nil
  421. },
  422. },
  423. {
  424. Name: "service_func",
  425. Usage: "describe service_func $service_func_name",
  426. Action: func(c *cli.Context) error {
  427. if len(c.Args()) != 1 {
  428. fmt.Printf("Expect service func name.\n")
  429. return nil
  430. }
  431. name := c.Args()[0]
  432. var reply string
  433. err = client.Call("Server.DescServiceFunc", name, &reply)
  434. if err != nil {
  435. fmt.Println(err)
  436. } else {
  437. fmt.Println(reply)
  438. }
  439. return nil
  440. },
  441. },
  442. },
  443. },
  444. {
  445. Name: "drop",
  446. Aliases: []string{"drop"},
  447. Usage: "drop stream $stream_name | drop table $table_name |drop rule $rule_name | drop plugin $plugin_type $plugin_name -r $stop | drop service $service_name",
  448. Subcommands: []cli.Command{
  449. {
  450. Name: "stream",
  451. Usage: "drop stream $stream_name",
  452. //Flags: nflag,
  453. Action: func(c *cli.Context) error {
  454. streamProcess(client, "")
  455. return nil
  456. },
  457. },
  458. {
  459. Name: "table",
  460. Usage: "drop table $table_name",
  461. //Flags: nflag,
  462. Action: func(c *cli.Context) error {
  463. streamProcess(client, "")
  464. return nil
  465. },
  466. },
  467. {
  468. Name: "rule",
  469. Usage: "drop rule $rule_name",
  470. //Flags: nflag,
  471. Action: func(c *cli.Context) error {
  472. if len(c.Args()) != 1 {
  473. fmt.Printf("Expect rule name.\n")
  474. return nil
  475. }
  476. rname := c.Args()[0]
  477. var reply string
  478. err = client.Call("Server.DropRule", rname, &reply)
  479. if err != nil {
  480. fmt.Println(err)
  481. } else {
  482. fmt.Println(reply)
  483. }
  484. return nil
  485. },
  486. },
  487. {
  488. Name: "plugin",
  489. Usage: "drop plugin $plugin_type $plugin_name -s stop",
  490. Flags: []cli.Flag{
  491. cli.StringFlag{
  492. Name: "stop, s",
  493. Usage: "stop kuiper after the action",
  494. },
  495. },
  496. Action: func(c *cli.Context) error {
  497. r := c.String("stop")
  498. if r != "true" && r != "false" {
  499. fmt.Printf("Expect s flag to be a boolean value.\n")
  500. return nil
  501. }
  502. if len(c.Args()) < 2 || len(c.Args()) > 3 {
  503. fmt.Printf("Expect plugin type and name.\n")
  504. return nil
  505. }
  506. ptype, err := getPluginType(c.Args()[0])
  507. if err != nil {
  508. fmt.Printf("%s\n", err)
  509. return nil
  510. }
  511. pname := c.Args()[1]
  512. args := &common.PluginDesc{
  513. RPCArgDesc: common.RPCArgDesc{
  514. Name: pname,
  515. },
  516. Type: ptype,
  517. Stop: r == "true",
  518. }
  519. var reply string
  520. err = client.Call("Server.DropPlugin", args, &reply)
  521. if err != nil {
  522. fmt.Println(err)
  523. } else {
  524. fmt.Println(reply)
  525. }
  526. return nil
  527. },
  528. },
  529. {
  530. Name: "service",
  531. Usage: "drop service $service_name",
  532. Action: func(c *cli.Context) error {
  533. if len(c.Args()) != 1 {
  534. fmt.Printf("Expect service name.\n")
  535. return nil
  536. }
  537. name := c.Args()[0]
  538. var reply string
  539. err = client.Call("Server.DropService", name, &reply)
  540. if err != nil {
  541. fmt.Println(err)
  542. } else {
  543. fmt.Println(reply)
  544. }
  545. return nil
  546. },
  547. },
  548. },
  549. },
  550. {
  551. Name: "show",
  552. Aliases: []string{"show"},
  553. Usage: "show streams | show tables | show rules | show plugins $plugin_type | show services | show service_funcs",
  554. Subcommands: []cli.Command{
  555. {
  556. Name: "streams",
  557. Usage: "show streams",
  558. Action: func(c *cli.Context) error {
  559. streamProcess(client, "")
  560. return nil
  561. },
  562. },
  563. {
  564. Name: "tables",
  565. Usage: "show tables",
  566. Action: func(c *cli.Context) error {
  567. streamProcess(client, "")
  568. return nil
  569. },
  570. },
  571. {
  572. Name: "rules",
  573. Usage: "show rules",
  574. Action: func(c *cli.Context) error {
  575. var reply string
  576. err = client.Call("Server.ShowRules", 0, &reply)
  577. if err != nil {
  578. fmt.Println(err)
  579. } else {
  580. fmt.Println(reply)
  581. }
  582. return nil
  583. },
  584. },
  585. {
  586. Name: "plugins",
  587. Usage: "show plugins $plugin_type",
  588. Action: func(c *cli.Context) error {
  589. if len(c.Args()) != 1 {
  590. fmt.Printf("Expect plugin type.\n")
  591. return nil
  592. }
  593. ptype, err := getPluginType(c.Args()[0])
  594. if err != nil {
  595. fmt.Printf("%s\n", err)
  596. return nil
  597. }
  598. var reply string
  599. err = client.Call("Server.ShowPlugins", ptype, &reply)
  600. if err != nil {
  601. fmt.Println(err)
  602. } else {
  603. fmt.Println(reply)
  604. }
  605. return nil
  606. },
  607. },
  608. {
  609. Name: "udfs",
  610. Usage: "show udfs",
  611. Action: func(c *cli.Context) error {
  612. var reply string
  613. err = client.Call("Server.ShowUdfs", 0, &reply)
  614. if err != nil {
  615. fmt.Println(err)
  616. } else {
  617. fmt.Println(reply)
  618. }
  619. return nil
  620. },
  621. }, {
  622. Name: "services",
  623. Usage: "show services",
  624. Action: func(c *cli.Context) error {
  625. var reply string
  626. err = client.Call("Server.ShowServices", 0, &reply)
  627. if err != nil {
  628. fmt.Println(err)
  629. } else {
  630. fmt.Println(reply)
  631. }
  632. return nil
  633. },
  634. }, {
  635. Name: "service_funcs",
  636. Usage: "show service_funcs",
  637. Action: func(c *cli.Context) error {
  638. var reply string
  639. err = client.Call("Server.ShowServiceFuncs", 0, &reply)
  640. if err != nil {
  641. fmt.Println(err)
  642. } else {
  643. fmt.Println(reply)
  644. }
  645. return nil
  646. },
  647. },
  648. },
  649. },
  650. {
  651. Name: "getstatus",
  652. Aliases: []string{"getstatus"},
  653. Usage: "getstatus rule $rule_name",
  654. Subcommands: []cli.Command{
  655. {
  656. Name: "rule",
  657. Usage: "getstatus rule $rule_name",
  658. //Flags: nflag,
  659. Action: func(c *cli.Context) error {
  660. if len(c.Args()) != 1 {
  661. fmt.Printf("Expect rule name.\n")
  662. return nil
  663. }
  664. rname := c.Args()[0]
  665. var reply string
  666. err = client.Call("Server.GetStatusRule", rname, &reply)
  667. if err != nil {
  668. fmt.Println(err)
  669. } else {
  670. fmt.Println(reply)
  671. }
  672. return nil
  673. },
  674. },
  675. },
  676. },
  677. {
  678. Name: "gettopo",
  679. Aliases: []string{"gettopo"},
  680. Usage: "gettopo rule $rule_name",
  681. Subcommands: []cli.Command{
  682. {
  683. Name: "rule",
  684. Usage: "getstopo rule $rule_name",
  685. //Flags: nflag,
  686. Action: func(c *cli.Context) error {
  687. if len(c.Args()) != 1 {
  688. fmt.Printf("Expect rule name.\n")
  689. return nil
  690. }
  691. rname := c.Args()[0]
  692. var reply string
  693. err = client.Call("Server.GetTopoRule", rname, &reply)
  694. if err != nil {
  695. fmt.Println(err)
  696. } else {
  697. fmt.Println(reply)
  698. }
  699. return nil
  700. },
  701. },
  702. },
  703. },
  704. {
  705. Name: "start",
  706. Aliases: []string{"start"},
  707. Usage: "start rule $rule_name",
  708. Subcommands: []cli.Command{
  709. {
  710. Name: "rule",
  711. Usage: "start rule $rule_name",
  712. //Flags: nflag,
  713. Action: func(c *cli.Context) error {
  714. if len(c.Args()) != 1 {
  715. fmt.Printf("Expect rule name.\n")
  716. return nil
  717. }
  718. rname := c.Args()[0]
  719. var reply string
  720. err = client.Call("Server.StartRule", rname, &reply)
  721. if err != nil {
  722. fmt.Println(err)
  723. } else {
  724. fmt.Println(reply)
  725. }
  726. return nil
  727. },
  728. },
  729. },
  730. },
  731. {
  732. Name: "stop",
  733. Aliases: []string{"stop"},
  734. Usage: "stop rule $rule_name",
  735. Subcommands: []cli.Command{
  736. {
  737. Name: "rule",
  738. Usage: "stop rule $rule_name",
  739. //Flags: nflag,
  740. Action: func(c *cli.Context) error {
  741. if len(c.Args()) != 1 {
  742. fmt.Printf("Expect rule name.\n")
  743. return nil
  744. }
  745. rname := c.Args()[0]
  746. var reply string
  747. err = client.Call("Server.StopRule", rname, &reply)
  748. if err != nil {
  749. fmt.Println(err)
  750. } else {
  751. fmt.Println(reply)
  752. }
  753. return nil
  754. },
  755. },
  756. },
  757. },
  758. {
  759. Name: "restart",
  760. Aliases: []string{"restart"},
  761. Usage: "restart rule $rule_name",
  762. Subcommands: []cli.Command{
  763. {
  764. Name: "rule",
  765. Usage: "restart rule $rule_name",
  766. //Flags: nflag,
  767. Action: func(c *cli.Context) error {
  768. if len(c.Args()) != 1 {
  769. fmt.Printf("Expect rule name.\n")
  770. return nil
  771. }
  772. rname := c.Args()[0]
  773. var reply string
  774. err = client.Call("Server.RestartRule", rname, &reply)
  775. if err != nil {
  776. fmt.Println(err)
  777. } else {
  778. fmt.Println(reply)
  779. }
  780. return nil
  781. },
  782. },
  783. },
  784. },
  785. {
  786. Name: "register",
  787. Aliases: []string{"register"},
  788. Usage: "register plugin function $plugin_name [$plugin_json | -f plugin_def_file]",
  789. Subcommands: []cli.Command{
  790. {
  791. Name: "plugin",
  792. Usage: "register plugin $plugin_type $plugin_name [$plugin_json | -f plugin_def_file]",
  793. Flags: []cli.Flag{
  794. cli.StringFlag{
  795. Name: "file, f",
  796. Usage: "the location of plugin functions definition file",
  797. FilePath: "/home/myplugin.txt",
  798. },
  799. },
  800. Action: func(c *cli.Context) error {
  801. if len(c.Args()) < 2 {
  802. fmt.Printf("Expect plugin type and name.\n")
  803. return nil
  804. }
  805. ptype := c.Args()[0]
  806. if strings.ToLower(ptype) != "function" {
  807. fmt.Printf("Plugin type must be function.\n")
  808. return nil
  809. }
  810. pname := c.Args()[1]
  811. sfile := c.String("file")
  812. args := &common.PluginDesc{
  813. RPCArgDesc: common.RPCArgDesc{
  814. Name: pname,
  815. },
  816. }
  817. if sfile != "" {
  818. if len(c.Args()) != 2 {
  819. fmt.Printf("Expect plugin type, name.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  820. return nil
  821. }
  822. if p, err := readDef(sfile, "plugin"); err != nil {
  823. fmt.Printf("%s", err)
  824. return nil
  825. } else {
  826. args.Json = string(p)
  827. }
  828. } else {
  829. if len(c.Args()) != 3 {
  830. fmt.Printf("Expect plugin type, name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  831. return nil
  832. }
  833. args.Json = c.Args()[2]
  834. }
  835. var reply string
  836. err = client.Call("Server.RegisterPlugin", args, &reply)
  837. if err != nil {
  838. fmt.Println(err)
  839. } else {
  840. fmt.Println(reply)
  841. }
  842. return nil
  843. },
  844. },
  845. },
  846. },
  847. }
  848. app.Name = "Kuiper"
  849. app.Usage = "The command line tool for EMQ X Kuiper."
  850. app.Action = func(c *cli.Context) error {
  851. cli.ShowSubcommandHelp(c)
  852. //cli.ShowVersion(c)
  853. return nil
  854. }
  855. sort.Sort(cli.FlagsByName(app.Flags))
  856. sort.Sort(cli.CommandsByName(app.Commands))
  857. err = app.Run(os.Args)
  858. if err != nil {
  859. fmt.Printf("%v", err)
  860. }
  861. }
  862. func getPluginType(arg string) (ptype int, err error) {
  863. switch arg {
  864. case "source":
  865. ptype = 0
  866. case "sink":
  867. ptype = 1
  868. case "function":
  869. ptype = 2
  870. default:
  871. err = fmt.Errorf("Invalid plugin type %s, should be \"source\", \"sink\" or \"function\".\n", arg)
  872. }
  873. return
  874. }
  875. func readDef(sfile string, t string) ([]byte, error) {
  876. if _, err := os.Stat(sfile); os.IsNotExist(err) {
  877. return nil, fmt.Errorf("The specified %s defenition file %s is not existed.\n", t, sfile)
  878. }
  879. fmt.Printf("Creating a new %s from file %s.\n", t, sfile)
  880. if rule, err := ioutil.ReadFile(sfile); err != nil {
  881. return nil, fmt.Errorf("Failed to read from %s definition file %s.\n", t, sfile)
  882. } else {
  883. return rule, nil
  884. }
  885. }