main.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912
  1. package main
  2. import (
  3. "bufio"
  4. "fmt"
  5. "github.com/lf-edge/ekuiper/internal/conf"
  6. "github.com/lf-edge/ekuiper/internal/server"
  7. "github.com/urfave/cli"
  8. "gopkg.in/yaml.v3"
  9. "io/ioutil"
  10. "net/rpc"
  11. "os"
  12. "sort"
  13. "strings"
  14. "time"
  15. )
  16. type clientConf struct {
  17. Host string `yaml:"host"`
  18. Port int `yaml:"port"`
  19. }
  20. var clientYaml = "client.yaml"
  21. func streamProcess(client *rpc.Client, args string) {
  22. var reply string
  23. if args == "" {
  24. args = strings.Join(os.Args[1:], " ")
  25. }
  26. err := client.Call("Server.Stream", args, &reply)
  27. if err != nil {
  28. fmt.Println(err)
  29. } else {
  30. fmt.Println(reply)
  31. }
  32. }
  33. var (
  34. Version = "unknown"
  35. LoadFileType = "relative"
  36. )
  37. func main() {
  38. conf.LoadFileType = LoadFileType
  39. app := cli.NewApp()
  40. app.Version = Version
  41. //nflag := []cli.Flag { cli.StringFlag{
  42. // Name: "name, n",
  43. // Usage: "the name of stream",
  44. // }}
  45. b, err := conf.LoadConf(clientYaml)
  46. if err != nil {
  47. conf.Log.Fatal(err)
  48. }
  49. var cfg map[string]clientConf
  50. var config *clientConf
  51. if err := yaml.Unmarshal(b, &cfg); err != nil {
  52. fmt.Printf("Failed to load config file with error %s.\n", err)
  53. } else {
  54. c, ok := cfg["basic"]
  55. if !ok {
  56. fmt.Printf("No basic config in client.yaml, will use the default configuration.\n")
  57. } else {
  58. config = &c
  59. }
  60. }
  61. if config == nil {
  62. config = &clientConf{
  63. Host: "127.0.0.1",
  64. Port: 20498,
  65. }
  66. }
  67. fmt.Printf("Connecting to %s:%d... \n", config.Host, config.Port)
  68. // Create a TCP connection to localhost on port 1234
  69. client, err := rpc.DialHTTP("tcp", fmt.Sprintf("%s:%d", config.Host, config.Port))
  70. if err != nil {
  71. fmt.Printf("Failed to connect the server, please start the server.\n")
  72. return
  73. }
  74. app.Commands = []cli.Command{
  75. {
  76. Name: "query",
  77. Aliases: []string{"query"},
  78. Usage: "query command line",
  79. Action: func(c *cli.Context) error {
  80. reader := bufio.NewReader(os.Stdin)
  81. var inputs []string
  82. ticker := time.NewTicker(time.Millisecond * 300)
  83. defer ticker.Stop()
  84. for {
  85. fmt.Print("kuiper > ")
  86. text, _ := reader.ReadString('\n')
  87. inputs = append(inputs, text)
  88. // convert CRLF to LF
  89. text = strings.Replace(text, "\n", "", -1)
  90. if strings.ToLower(text) == "quit" || strings.ToLower(text) == "exit" {
  91. break
  92. } else if strings.Trim(text, " ") == "" {
  93. continue
  94. } else {
  95. var reply string
  96. err := client.Call("Server.CreateQuery", text, &reply)
  97. if err != nil {
  98. fmt.Println(err)
  99. continue
  100. } else {
  101. fmt.Println(reply)
  102. go func() {
  103. for {
  104. <-ticker.C
  105. var result string
  106. e := client.Call("Server.GetQueryResult", "", &result)
  107. if e != nil {
  108. fmt.Println(e)
  109. fmt.Print("kuiper > ")
  110. return
  111. }
  112. if result != "" {
  113. fmt.Println(result)
  114. }
  115. }
  116. }()
  117. }
  118. }
  119. }
  120. return nil
  121. },
  122. },
  123. {
  124. Name: "create",
  125. Aliases: []string{"create"},
  126. Usage: "create stream $stream_name | create stream $stream_name -f $stream_def_file | create table $table_name | create table $table_name -f $table_def_file| create rule $rule_name $rule_json | create rule $rule_name -f $rule_def_file | create plugin $plugin_type $plugin_name $plugin_json | create plugin $plugin_type $plugin_name -f $plugin_def_file | create service $service_name $service_json ",
  127. Subcommands: []cli.Command{
  128. {
  129. Name: "stream",
  130. Usage: "create stream $stream_name [-f stream_def_file]",
  131. Flags: []cli.Flag{
  132. cli.StringFlag{
  133. Name: "file, f",
  134. Usage: "the location of stream definition file",
  135. FilePath: "/home/mystream.txt",
  136. },
  137. },
  138. Action: func(c *cli.Context) error {
  139. sfile := c.String("file")
  140. if sfile != "" {
  141. if stream, err := readDef(sfile, "stream"); err != nil {
  142. fmt.Printf("%s", err)
  143. return nil
  144. } else {
  145. args := strings.Join([]string{"CREATE STREAM ", string(stream)}, " ")
  146. streamProcess(client, args)
  147. return nil
  148. }
  149. } else {
  150. streamProcess(client, "")
  151. return nil
  152. }
  153. },
  154. },
  155. {
  156. Name: "table",
  157. Usage: "create table $table_name [-f table_def_file]",
  158. Flags: []cli.Flag{
  159. cli.StringFlag{
  160. Name: "file, f",
  161. Usage: "the location of table definition file",
  162. FilePath: "/home/mytable.txt",
  163. },
  164. },
  165. Action: func(c *cli.Context) error {
  166. sfile := c.String("file")
  167. if sfile != "" {
  168. if stream, err := readDef(sfile, "table"); err != nil {
  169. fmt.Printf("%s", err)
  170. return nil
  171. } else {
  172. args := strings.Join([]string{"CREATE TABLE ", string(stream)}, " ")
  173. streamProcess(client, args)
  174. return nil
  175. }
  176. } else {
  177. streamProcess(client, "")
  178. return nil
  179. }
  180. },
  181. },
  182. {
  183. Name: "rule",
  184. Usage: "create rule $rule_name [$rule_json | -f rule_def_file]",
  185. Flags: []cli.Flag{
  186. cli.StringFlag{
  187. Name: "file, f",
  188. Usage: "the location of rule definition file",
  189. FilePath: "/home/myrule.txt",
  190. },
  191. },
  192. Action: func(c *cli.Context) error {
  193. sfile := c.String("file")
  194. if sfile != "" {
  195. if rule, err := readDef(sfile, "rule"); err != nil {
  196. fmt.Printf("%s", err)
  197. return nil
  198. } else {
  199. if len(c.Args()) != 1 {
  200. fmt.Printf("Expect rule name.\n")
  201. return nil
  202. }
  203. rname := c.Args()[0]
  204. var reply string
  205. args := &server.RPCArgDesc{Name: rname, Json: string(rule)}
  206. err = client.Call("Server.CreateRule", args, &reply)
  207. if err != nil {
  208. fmt.Println(err)
  209. } else {
  210. fmt.Println(reply)
  211. }
  212. }
  213. return nil
  214. } else {
  215. if len(c.Args()) != 2 {
  216. fmt.Printf("Expect rule name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  217. return nil
  218. }
  219. rname := c.Args()[0]
  220. rjson := c.Args()[1]
  221. var reply string
  222. args := &server.RPCArgDesc{Name: rname, Json: rjson}
  223. err = client.Call("Server.CreateRule", args, &reply)
  224. if err != nil {
  225. fmt.Println(err)
  226. } else {
  227. fmt.Println(reply)
  228. }
  229. return nil
  230. }
  231. },
  232. },
  233. {
  234. Name: "plugin",
  235. Usage: "create plugin $plugin_type $plugin_name [$plugin_json | -f plugin_def_file]",
  236. Flags: []cli.Flag{
  237. cli.StringFlag{
  238. Name: "file, f",
  239. Usage: "the location of plugin definition file",
  240. FilePath: "/home/myplugin.txt",
  241. },
  242. },
  243. Action: func(c *cli.Context) error {
  244. if len(c.Args()) < 2 {
  245. fmt.Printf("Expect plugin type and name.\n")
  246. return nil
  247. }
  248. ptype, err := getPluginType(c.Args()[0])
  249. if err != nil {
  250. fmt.Printf("%s\n", err)
  251. return nil
  252. }
  253. pname := c.Args()[1]
  254. sfile := c.String("file")
  255. args := &server.PluginDesc{
  256. RPCArgDesc: server.RPCArgDesc{
  257. Name: pname,
  258. },
  259. Type: ptype,
  260. }
  261. if sfile != "" {
  262. if len(c.Args()) != 2 {
  263. fmt.Printf("Expect plugin type, name.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  264. return nil
  265. }
  266. if p, err := readDef(sfile, "plugin"); err != nil {
  267. fmt.Printf("%s", err)
  268. return nil
  269. } else {
  270. args.Json = string(p)
  271. }
  272. } else {
  273. if len(c.Args()) != 3 {
  274. fmt.Printf("Expect plugin type, name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  275. return nil
  276. }
  277. args.Json = c.Args()[2]
  278. }
  279. var reply string
  280. err = client.Call("Server.CreatePlugin", args, &reply)
  281. if err != nil {
  282. fmt.Println(err)
  283. } else {
  284. fmt.Println(reply)
  285. }
  286. return nil
  287. },
  288. },
  289. {
  290. Name: "service",
  291. Usage: "create service $service_name $service_json",
  292. Action: func(c *cli.Context) error {
  293. if len(c.Args()) < 2 {
  294. fmt.Printf("Expect service name and json.\n")
  295. return nil
  296. }
  297. var reply string
  298. err = client.Call("Server.CreateService", &server.RPCArgDesc{
  299. Name: c.Args()[0],
  300. Json: c.Args()[1],
  301. }, &reply)
  302. if err != nil {
  303. fmt.Println(err)
  304. } else {
  305. fmt.Println(reply)
  306. }
  307. return nil
  308. },
  309. },
  310. },
  311. },
  312. {
  313. Name: "describe",
  314. Aliases: []string{"describe"},
  315. Usage: "describe stream $stream_name | describe table $table_name | describe rule $rule_name | describe plugin $plugin_type $plugin_name | describe udf $udf_name | describe service $service_name | describe service_func $service_func_name",
  316. Subcommands: []cli.Command{
  317. {
  318. Name: "stream",
  319. Usage: "describe stream $stream_name",
  320. //Flags: nflag,
  321. Action: func(c *cli.Context) error {
  322. streamProcess(client, "")
  323. return nil
  324. },
  325. },
  326. {
  327. Name: "table",
  328. Usage: "describe table $table_name",
  329. //Flags: nflag,
  330. Action: func(c *cli.Context) error {
  331. streamProcess(client, "")
  332. return nil
  333. },
  334. },
  335. {
  336. Name: "rule",
  337. Usage: "describe rule $rule_name",
  338. Action: func(c *cli.Context) error {
  339. if len(c.Args()) != 1 {
  340. fmt.Printf("Expect rule name.\n")
  341. return nil
  342. }
  343. rname := c.Args()[0]
  344. var reply string
  345. err = client.Call("Server.DescRule", rname, &reply)
  346. if err != nil {
  347. fmt.Println(err)
  348. } else {
  349. fmt.Println(reply)
  350. }
  351. return nil
  352. },
  353. },
  354. {
  355. Name: "plugin",
  356. Usage: "describe plugin $plugin_type $plugin_name",
  357. //Flags: nflag,
  358. Action: func(c *cli.Context) error {
  359. ptype, err := getPluginType(c.Args()[0])
  360. if err != nil {
  361. fmt.Printf("%s\n", err)
  362. return nil
  363. }
  364. if len(c.Args()) != 2 {
  365. fmt.Printf("Expect plugin name.\n")
  366. return nil
  367. }
  368. pname := c.Args()[1]
  369. args := &server.PluginDesc{
  370. RPCArgDesc: server.RPCArgDesc{
  371. Name: pname,
  372. },
  373. Type: ptype,
  374. }
  375. var reply string
  376. err = client.Call("Server.DescPlugin", args, &reply)
  377. if err != nil {
  378. fmt.Println(err)
  379. } else {
  380. fmt.Println(reply)
  381. }
  382. return nil
  383. },
  384. },
  385. {
  386. Name: "udf",
  387. Usage: "describe udf $udf_name",
  388. //Flags: nflag,
  389. Action: func(c *cli.Context) error {
  390. if len(c.Args()) != 1 {
  391. fmt.Printf("Expect udf name.\n")
  392. return nil
  393. }
  394. pname := c.Args()[0]
  395. var reply string
  396. err = client.Call("Server.DescUdf", pname, &reply)
  397. if err != nil {
  398. fmt.Println(err)
  399. } else {
  400. fmt.Println(reply)
  401. }
  402. return nil
  403. },
  404. },
  405. {
  406. Name: "service",
  407. Usage: "describe service $service_name",
  408. Action: func(c *cli.Context) error {
  409. if len(c.Args()) != 1 {
  410. fmt.Printf("Expect service name.\n")
  411. return nil
  412. }
  413. name := c.Args()[0]
  414. var reply string
  415. err = client.Call("Server.DescService", name, &reply)
  416. if err != nil {
  417. fmt.Println(err)
  418. } else {
  419. fmt.Println(reply)
  420. }
  421. return nil
  422. },
  423. },
  424. {
  425. Name: "service_func",
  426. Usage: "describe service_func $service_func_name",
  427. Action: func(c *cli.Context) error {
  428. if len(c.Args()) != 1 {
  429. fmt.Printf("Expect service func name.\n")
  430. return nil
  431. }
  432. name := c.Args()[0]
  433. var reply string
  434. err = client.Call("Server.DescServiceFunc", name, &reply)
  435. if err != nil {
  436. fmt.Println(err)
  437. } else {
  438. fmt.Println(reply)
  439. }
  440. return nil
  441. },
  442. },
  443. },
  444. },
  445. {
  446. Name: "drop",
  447. Aliases: []string{"drop"},
  448. Usage: "drop stream $stream_name | drop table $table_name |drop rule $rule_name | drop plugin $plugin_type $plugin_name -r $stop | drop service $service_name",
  449. Subcommands: []cli.Command{
  450. {
  451. Name: "stream",
  452. Usage: "drop stream $stream_name",
  453. //Flags: nflag,
  454. Action: func(c *cli.Context) error {
  455. streamProcess(client, "")
  456. return nil
  457. },
  458. },
  459. {
  460. Name: "table",
  461. Usage: "drop table $table_name",
  462. //Flags: nflag,
  463. Action: func(c *cli.Context) error {
  464. streamProcess(client, "")
  465. return nil
  466. },
  467. },
  468. {
  469. Name: "rule",
  470. Usage: "drop rule $rule_name",
  471. //Flags: nflag,
  472. Action: func(c *cli.Context) error {
  473. if len(c.Args()) != 1 {
  474. fmt.Printf("Expect rule name.\n")
  475. return nil
  476. }
  477. rname := c.Args()[0]
  478. var reply string
  479. err = client.Call("Server.DropRule", rname, &reply)
  480. if err != nil {
  481. fmt.Println(err)
  482. } else {
  483. fmt.Println(reply)
  484. }
  485. return nil
  486. },
  487. },
  488. {
  489. Name: "plugin",
  490. Usage: "drop plugin $plugin_type $plugin_name -s stop",
  491. Flags: []cli.Flag{
  492. cli.StringFlag{
  493. Name: "stop, s",
  494. Usage: "stop kuiper after the action",
  495. },
  496. },
  497. Action: func(c *cli.Context) error {
  498. r := c.String("stop")
  499. if r != "true" && r != "false" {
  500. fmt.Printf("Expect s flag to be a boolean value.\n")
  501. return nil
  502. }
  503. if len(c.Args()) < 2 || len(c.Args()) > 3 {
  504. fmt.Printf("Expect plugin type and name.\n")
  505. return nil
  506. }
  507. ptype, err := getPluginType(c.Args()[0])
  508. if err != nil {
  509. fmt.Printf("%s\n", err)
  510. return nil
  511. }
  512. pname := c.Args()[1]
  513. args := &server.PluginDesc{
  514. RPCArgDesc: server.RPCArgDesc{
  515. Name: pname,
  516. },
  517. Type: ptype,
  518. Stop: r == "true",
  519. }
  520. var reply string
  521. err = client.Call("Server.DropPlugin", args, &reply)
  522. if err != nil {
  523. fmt.Println(err)
  524. } else {
  525. fmt.Println(reply)
  526. }
  527. return nil
  528. },
  529. },
  530. {
  531. Name: "service",
  532. Usage: "drop service $service_name",
  533. Action: func(c *cli.Context) error {
  534. if len(c.Args()) != 1 {
  535. fmt.Printf("Expect service name.\n")
  536. return nil
  537. }
  538. name := c.Args()[0]
  539. var reply string
  540. err = client.Call("Server.DropService", name, &reply)
  541. if err != nil {
  542. fmt.Println(err)
  543. } else {
  544. fmt.Println(reply)
  545. }
  546. return nil
  547. },
  548. },
  549. },
  550. },
  551. {
  552. Name: "show",
  553. Aliases: []string{"show"},
  554. Usage: "show streams | show tables | show rules | show plugins $plugin_type | show services | show service_funcs",
  555. Subcommands: []cli.Command{
  556. {
  557. Name: "streams",
  558. Usage: "show streams",
  559. Action: func(c *cli.Context) error {
  560. streamProcess(client, "")
  561. return nil
  562. },
  563. },
  564. {
  565. Name: "tables",
  566. Usage: "show tables",
  567. Action: func(c *cli.Context) error {
  568. streamProcess(client, "")
  569. return nil
  570. },
  571. },
  572. {
  573. Name: "rules",
  574. Usage: "show rules",
  575. Action: func(c *cli.Context) error {
  576. var reply string
  577. err = client.Call("Server.ShowRules", 0, &reply)
  578. if err != nil {
  579. fmt.Println(err)
  580. } else {
  581. fmt.Println(reply)
  582. }
  583. return nil
  584. },
  585. },
  586. {
  587. Name: "plugins",
  588. Usage: "show plugins $plugin_type",
  589. Action: func(c *cli.Context) error {
  590. if len(c.Args()) != 1 {
  591. fmt.Printf("Expect plugin type.\n")
  592. return nil
  593. }
  594. ptype, err := getPluginType(c.Args()[0])
  595. if err != nil {
  596. fmt.Printf("%s\n", err)
  597. return nil
  598. }
  599. var reply string
  600. err = client.Call("Server.ShowPlugins", ptype, &reply)
  601. if err != nil {
  602. fmt.Println(err)
  603. } else {
  604. fmt.Println(reply)
  605. }
  606. return nil
  607. },
  608. },
  609. {
  610. Name: "udfs",
  611. Usage: "show udfs",
  612. Action: func(c *cli.Context) error {
  613. var reply string
  614. err = client.Call("Server.ShowUdfs", 0, &reply)
  615. if err != nil {
  616. fmt.Println(err)
  617. } else {
  618. fmt.Println(reply)
  619. }
  620. return nil
  621. },
  622. }, {
  623. Name: "services",
  624. Usage: "show services",
  625. Action: func(c *cli.Context) error {
  626. var reply string
  627. err = client.Call("Server.ShowServices", 0, &reply)
  628. if err != nil {
  629. fmt.Println(err)
  630. } else {
  631. fmt.Println(reply)
  632. }
  633. return nil
  634. },
  635. }, {
  636. Name: "service_funcs",
  637. Usage: "show service_funcs",
  638. Action: func(c *cli.Context) error {
  639. var reply string
  640. err = client.Call("Server.ShowServiceFuncs", 0, &reply)
  641. if err != nil {
  642. fmt.Println(err)
  643. } else {
  644. fmt.Println(reply)
  645. }
  646. return nil
  647. },
  648. },
  649. },
  650. },
  651. {
  652. Name: "getstatus",
  653. Aliases: []string{"getstatus"},
  654. Usage: "getstatus rule $rule_name",
  655. Subcommands: []cli.Command{
  656. {
  657. Name: "rule",
  658. Usage: "getstatus rule $rule_name",
  659. //Flags: nflag,
  660. Action: func(c *cli.Context) error {
  661. if len(c.Args()) != 1 {
  662. fmt.Printf("Expect rule name.\n")
  663. return nil
  664. }
  665. rname := c.Args()[0]
  666. var reply string
  667. err = client.Call("Server.GetStatusRule", rname, &reply)
  668. if err != nil {
  669. fmt.Println(err)
  670. } else {
  671. fmt.Println(reply)
  672. }
  673. return nil
  674. },
  675. },
  676. },
  677. },
  678. {
  679. Name: "gettopo",
  680. Aliases: []string{"gettopo"},
  681. Usage: "gettopo rule $rule_name",
  682. Subcommands: []cli.Command{
  683. {
  684. Name: "rule",
  685. Usage: "getstopo rule $rule_name",
  686. //Flags: nflag,
  687. Action: func(c *cli.Context) error {
  688. if len(c.Args()) != 1 {
  689. fmt.Printf("Expect rule name.\n")
  690. return nil
  691. }
  692. rname := c.Args()[0]
  693. var reply string
  694. err = client.Call("Server.GetTopoRule", rname, &reply)
  695. if err != nil {
  696. fmt.Println(err)
  697. } else {
  698. fmt.Println(reply)
  699. }
  700. return nil
  701. },
  702. },
  703. },
  704. },
  705. {
  706. Name: "start",
  707. Aliases: []string{"start"},
  708. Usage: "start rule $rule_name",
  709. Subcommands: []cli.Command{
  710. {
  711. Name: "rule",
  712. Usage: "start rule $rule_name",
  713. //Flags: nflag,
  714. Action: func(c *cli.Context) error {
  715. if len(c.Args()) != 1 {
  716. fmt.Printf("Expect rule name.\n")
  717. return nil
  718. }
  719. rname := c.Args()[0]
  720. var reply string
  721. err = client.Call("Server.StartRule", rname, &reply)
  722. if err != nil {
  723. fmt.Println(err)
  724. } else {
  725. fmt.Println(reply)
  726. }
  727. return nil
  728. },
  729. },
  730. },
  731. },
  732. {
  733. Name: "stop",
  734. Aliases: []string{"stop"},
  735. Usage: "stop rule $rule_name",
  736. Subcommands: []cli.Command{
  737. {
  738. Name: "rule",
  739. Usage: "stop rule $rule_name",
  740. //Flags: nflag,
  741. Action: func(c *cli.Context) error {
  742. if len(c.Args()) != 1 {
  743. fmt.Printf("Expect rule name.\n")
  744. return nil
  745. }
  746. rname := c.Args()[0]
  747. var reply string
  748. err = client.Call("Server.StopRule", rname, &reply)
  749. if err != nil {
  750. fmt.Println(err)
  751. } else {
  752. fmt.Println(reply)
  753. }
  754. return nil
  755. },
  756. },
  757. },
  758. },
  759. {
  760. Name: "restart",
  761. Aliases: []string{"restart"},
  762. Usage: "restart rule $rule_name",
  763. Subcommands: []cli.Command{
  764. {
  765. Name: "rule",
  766. Usage: "restart rule $rule_name",
  767. //Flags: nflag,
  768. Action: func(c *cli.Context) error {
  769. if len(c.Args()) != 1 {
  770. fmt.Printf("Expect rule name.\n")
  771. return nil
  772. }
  773. rname := c.Args()[0]
  774. var reply string
  775. err = client.Call("Server.RestartRule", rname, &reply)
  776. if err != nil {
  777. fmt.Println(err)
  778. } else {
  779. fmt.Println(reply)
  780. }
  781. return nil
  782. },
  783. },
  784. },
  785. },
  786. {
  787. Name: "register",
  788. Aliases: []string{"register"},
  789. Usage: "register plugin function $plugin_name [$plugin_json | -f plugin_def_file]",
  790. Subcommands: []cli.Command{
  791. {
  792. Name: "plugin",
  793. Usage: "register plugin $plugin_type $plugin_name [$plugin_json | -f plugin_def_file]",
  794. Flags: []cli.Flag{
  795. cli.StringFlag{
  796. Name: "file, f",
  797. Usage: "the location of plugin functions definition file",
  798. FilePath: "/home/myplugin.txt",
  799. },
  800. },
  801. Action: func(c *cli.Context) error {
  802. if len(c.Args()) < 2 {
  803. fmt.Printf("Expect plugin type and name.\n")
  804. return nil
  805. }
  806. ptype := c.Args()[0]
  807. if strings.ToLower(ptype) != "function" {
  808. fmt.Printf("Plugin type must be function.\n")
  809. return nil
  810. }
  811. pname := c.Args()[1]
  812. sfile := c.String("file")
  813. args := &server.PluginDesc{
  814. RPCArgDesc: server.RPCArgDesc{
  815. Name: pname,
  816. },
  817. }
  818. if sfile != "" {
  819. if len(c.Args()) != 2 {
  820. fmt.Printf("Expect plugin type, name.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  821. return nil
  822. }
  823. if p, err := readDef(sfile, "plugin"); err != nil {
  824. fmt.Printf("%s", err)
  825. return nil
  826. } else {
  827. args.Json = string(p)
  828. }
  829. } else {
  830. if len(c.Args()) != 3 {
  831. fmt.Printf("Expect plugin type, name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  832. return nil
  833. }
  834. args.Json = c.Args()[2]
  835. }
  836. var reply string
  837. err = client.Call("Server.RegisterPlugin", args, &reply)
  838. if err != nil {
  839. fmt.Println(err)
  840. } else {
  841. fmt.Println(reply)
  842. }
  843. return nil
  844. },
  845. },
  846. },
  847. },
  848. }
  849. app.Name = "Kuiper"
  850. app.Usage = "The command line tool for EMQ X Kuiper."
  851. app.Action = func(c *cli.Context) error {
  852. cli.ShowSubcommandHelp(c)
  853. //cli.ShowVersion(c)
  854. return nil
  855. }
  856. sort.Sort(cli.FlagsByName(app.Flags))
  857. sort.Sort(cli.CommandsByName(app.Commands))
  858. err = app.Run(os.Args)
  859. if err != nil {
  860. fmt.Printf("%v", err)
  861. }
  862. }
  863. func getPluginType(arg string) (ptype int, err error) {
  864. switch arg {
  865. case "source":
  866. ptype = 0
  867. case "sink":
  868. ptype = 1
  869. case "function":
  870. ptype = 2
  871. default:
  872. err = fmt.Errorf("Invalid plugin type %s, should be \"source\", \"sink\" or \"function\".\n", arg)
  873. }
  874. return
  875. }
  876. func readDef(sfile string, t string) ([]byte, error) {
  877. if _, err := os.Stat(sfile); os.IsNotExist(err) {
  878. return nil, fmt.Errorf("The specified %s defenition file %s is not existed.\n", t, sfile)
  879. }
  880. fmt.Printf("Creating a new %s from file %s.\n", t, sfile)
  881. if rule, err := ioutil.ReadFile(sfile); err != nil {
  882. return nil, fmt.Errorf("Failed to read from %s definition file %s.\n", t, sfile)
  883. } else {
  884. return rule, nil
  885. }
  886. }