main.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "bufio"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/internal/server"
  20. "github.com/urfave/cli"
  21. "gopkg.in/yaml.v3"
  22. "io/ioutil"
  23. "net/rpc"
  24. "os"
  25. "sort"
  26. "strings"
  27. "time"
  28. )
  29. type clientConf struct {
  30. Host string `yaml:"host"`
  31. Port int `yaml:"port"`
  32. }
  33. var clientYaml = "client.yaml"
  34. func streamProcess(client *rpc.Client, args string) {
  35. var reply string
  36. if args == "" {
  37. args = strings.Join(os.Args[1:], " ")
  38. }
  39. err := client.Call("Server.Stream", args, &reply)
  40. if err != nil {
  41. fmt.Println(err)
  42. } else {
  43. fmt.Println(reply)
  44. }
  45. }
  46. var (
  47. Version = "unknown"
  48. LoadFileType = "relative"
  49. )
  50. func main() {
  51. conf.LoadFileType = LoadFileType
  52. app := cli.NewApp()
  53. app.Version = Version
  54. //nflag := []cli.Flag { cli.StringFlag{
  55. // Name: "name, n",
  56. // Usage: "the name of stream",
  57. // }}
  58. b, err := conf.LoadConf(clientYaml)
  59. if err != nil {
  60. conf.Log.Fatal(err)
  61. }
  62. var cfg map[string]clientConf
  63. var config *clientConf
  64. if err := yaml.Unmarshal(b, &cfg); err != nil {
  65. fmt.Printf("Failed to load config file with error %s.\n", err)
  66. } else {
  67. c, ok := cfg["basic"]
  68. if !ok {
  69. fmt.Printf("No basic config in client.yaml, will use the default configuration.\n")
  70. } else {
  71. config = &c
  72. }
  73. }
  74. if config == nil {
  75. config = &clientConf{
  76. Host: "127.0.0.1",
  77. Port: 20498,
  78. }
  79. }
  80. fmt.Printf("Connecting to %s:%d... \n", config.Host, config.Port)
  81. // Create a TCP connection to localhost on port 1234
  82. client, err := rpc.DialHTTP("tcp", fmt.Sprintf("%s:%d", config.Host, config.Port))
  83. if err != nil {
  84. fmt.Printf("Failed to connect the server, please start the server.\n")
  85. return
  86. }
  87. app.Commands = []cli.Command{
  88. {
  89. Name: "query",
  90. Aliases: []string{"query"},
  91. Usage: "query command line",
  92. Action: func(c *cli.Context) error {
  93. reader := bufio.NewReader(os.Stdin)
  94. var inputs []string
  95. ticker := time.NewTicker(time.Millisecond * 300)
  96. defer ticker.Stop()
  97. for {
  98. fmt.Print("kuiper > ")
  99. text, _ := reader.ReadString('\n')
  100. inputs = append(inputs, text)
  101. // convert CRLF to LF
  102. text = strings.Replace(text, "\n", "", -1)
  103. if strings.ToLower(text) == "quit" || strings.ToLower(text) == "exit" {
  104. break
  105. } else if strings.Trim(text, " ") == "" {
  106. continue
  107. } else {
  108. var reply string
  109. err := client.Call("Server.CreateQuery", text, &reply)
  110. if err != nil {
  111. fmt.Println(err)
  112. continue
  113. } else {
  114. fmt.Println(reply)
  115. go func() {
  116. for {
  117. <-ticker.C
  118. var result string
  119. e := client.Call("Server.GetQueryResult", "", &result)
  120. if e != nil {
  121. fmt.Println(e)
  122. fmt.Print("kuiper > ")
  123. return
  124. }
  125. if result != "" {
  126. fmt.Println(result)
  127. }
  128. }
  129. }()
  130. }
  131. }
  132. }
  133. return nil
  134. },
  135. },
  136. {
  137. Name: "create",
  138. Aliases: []string{"create"},
  139. Usage: "create stream $stream_name | create stream $stream_name -f $stream_def_file | create table $table_name | create table $table_name -f $table_def_file| create rule $rule_name $rule_json | create rule $rule_name -f $rule_def_file | create plugin $plugin_type $plugin_name $plugin_json | create plugin $plugin_type $plugin_name -f $plugin_def_file | create service $service_name $service_json ",
  140. Subcommands: []cli.Command{
  141. {
  142. Name: "stream",
  143. Usage: "create stream $stream_name [-f stream_def_file]",
  144. Flags: []cli.Flag{
  145. cli.StringFlag{
  146. Name: "file, f",
  147. Usage: "the location of stream definition file",
  148. FilePath: "/home/mystream.txt",
  149. },
  150. },
  151. Action: func(c *cli.Context) error {
  152. sfile := c.String("file")
  153. if sfile != "" {
  154. if stream, err := readDef(sfile, "stream"); err != nil {
  155. fmt.Printf("%s", err)
  156. return nil
  157. } else {
  158. args := strings.Join([]string{"CREATE STREAM ", string(stream)}, " ")
  159. streamProcess(client, args)
  160. return nil
  161. }
  162. } else {
  163. streamProcess(client, "")
  164. return nil
  165. }
  166. },
  167. },
  168. {
  169. Name: "table",
  170. Usage: "create table $table_name [-f table_def_file]",
  171. Flags: []cli.Flag{
  172. cli.StringFlag{
  173. Name: "file, f",
  174. Usage: "the location of table definition file",
  175. FilePath: "/home/mytable.txt",
  176. },
  177. },
  178. Action: func(c *cli.Context) error {
  179. sfile := c.String("file")
  180. if sfile != "" {
  181. if stream, err := readDef(sfile, "table"); err != nil {
  182. fmt.Printf("%s", err)
  183. return nil
  184. } else {
  185. args := strings.Join([]string{"CREATE TABLE ", string(stream)}, " ")
  186. streamProcess(client, args)
  187. return nil
  188. }
  189. } else {
  190. streamProcess(client, "")
  191. return nil
  192. }
  193. },
  194. },
  195. {
  196. Name: "rule",
  197. Usage: "create rule $rule_name [$rule_json | -f rule_def_file]",
  198. Flags: []cli.Flag{
  199. cli.StringFlag{
  200. Name: "file, f",
  201. Usage: "the location of rule definition file",
  202. FilePath: "/home/myrule.txt",
  203. },
  204. },
  205. Action: func(c *cli.Context) error {
  206. sfile := c.String("file")
  207. if sfile != "" {
  208. if rule, err := readDef(sfile, "rule"); err != nil {
  209. fmt.Printf("%s", err)
  210. return nil
  211. } else {
  212. if len(c.Args()) != 1 {
  213. fmt.Printf("Expect rule name.\n")
  214. return nil
  215. }
  216. rname := c.Args()[0]
  217. var reply string
  218. args := &server.RPCArgDesc{Name: rname, Json: string(rule)}
  219. err = client.Call("Server.CreateRule", args, &reply)
  220. if err != nil {
  221. fmt.Println(err)
  222. } else {
  223. fmt.Println(reply)
  224. }
  225. }
  226. return nil
  227. } else {
  228. if len(c.Args()) != 2 {
  229. fmt.Printf("Expect rule name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  230. return nil
  231. }
  232. rname := c.Args()[0]
  233. rjson := c.Args()[1]
  234. var reply string
  235. args := &server.RPCArgDesc{Name: rname, Json: rjson}
  236. err = client.Call("Server.CreateRule", args, &reply)
  237. if err != nil {
  238. fmt.Println(err)
  239. } else {
  240. fmt.Println(reply)
  241. }
  242. return nil
  243. }
  244. },
  245. },
  246. {
  247. Name: "plugin",
  248. Usage: "create plugin $plugin_type $plugin_name [$plugin_json | -f plugin_def_file]",
  249. Flags: []cli.Flag{
  250. cli.StringFlag{
  251. Name: "file, f",
  252. Usage: "the location of plugin definition file",
  253. FilePath: "/home/myplugin.txt",
  254. },
  255. },
  256. Action: func(c *cli.Context) error {
  257. if len(c.Args()) < 2 {
  258. fmt.Printf("Expect plugin type and name.\n")
  259. return nil
  260. }
  261. ptype, err := getPluginType(c.Args()[0])
  262. if err != nil {
  263. fmt.Printf("%s\n", err)
  264. return nil
  265. }
  266. pname := c.Args()[1]
  267. sfile := c.String("file")
  268. args := &server.PluginDesc{
  269. RPCArgDesc: server.RPCArgDesc{
  270. Name: pname,
  271. },
  272. Type: ptype,
  273. }
  274. if sfile != "" {
  275. if len(c.Args()) != 2 {
  276. fmt.Printf("Expect plugin type, name.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  277. return nil
  278. }
  279. if p, err := readDef(sfile, "plugin"); err != nil {
  280. fmt.Printf("%s", err)
  281. return nil
  282. } else {
  283. args.Json = string(p)
  284. }
  285. } else {
  286. if len(c.Args()) != 3 {
  287. fmt.Printf("Expect plugin type, name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  288. return nil
  289. }
  290. args.Json = c.Args()[2]
  291. }
  292. var reply string
  293. err = client.Call("Server.CreatePlugin", args, &reply)
  294. if err != nil {
  295. fmt.Println(err)
  296. } else {
  297. fmt.Println(reply)
  298. }
  299. return nil
  300. },
  301. },
  302. {
  303. Name: "service",
  304. Usage: "create service $service_name $service_json",
  305. Action: func(c *cli.Context) error {
  306. if len(c.Args()) < 2 {
  307. fmt.Printf("Expect service name and json.\n")
  308. return nil
  309. }
  310. var reply string
  311. err = client.Call("Server.CreateService", &server.RPCArgDesc{
  312. Name: c.Args()[0],
  313. Json: c.Args()[1],
  314. }, &reply)
  315. if err != nil {
  316. fmt.Println(err)
  317. } else {
  318. fmt.Println(reply)
  319. }
  320. return nil
  321. },
  322. },
  323. },
  324. },
  325. {
  326. Name: "describe",
  327. Aliases: []string{"describe"},
  328. Usage: "describe stream $stream_name | describe table $table_name | describe rule $rule_name | describe plugin $plugin_type $plugin_name | describe udf $udf_name | describe service $service_name | describe service_func $service_func_name",
  329. Subcommands: []cli.Command{
  330. {
  331. Name: "stream",
  332. Usage: "describe stream $stream_name",
  333. //Flags: nflag,
  334. Action: func(c *cli.Context) error {
  335. streamProcess(client, "")
  336. return nil
  337. },
  338. },
  339. {
  340. Name: "table",
  341. Usage: "describe table $table_name",
  342. //Flags: nflag,
  343. Action: func(c *cli.Context) error {
  344. streamProcess(client, "")
  345. return nil
  346. },
  347. },
  348. {
  349. Name: "rule",
  350. Usage: "describe rule $rule_name",
  351. Action: func(c *cli.Context) error {
  352. if len(c.Args()) != 1 {
  353. fmt.Printf("Expect rule name.\n")
  354. return nil
  355. }
  356. rname := c.Args()[0]
  357. var reply string
  358. err = client.Call("Server.DescRule", rname, &reply)
  359. if err != nil {
  360. fmt.Println(err)
  361. } else {
  362. fmt.Println(reply)
  363. }
  364. return nil
  365. },
  366. },
  367. {
  368. Name: "plugin",
  369. Usage: "describe plugin $plugin_type $plugin_name",
  370. //Flags: nflag,
  371. Action: func(c *cli.Context) error {
  372. ptype, err := getPluginType(c.Args()[0])
  373. if err != nil {
  374. fmt.Printf("%s\n", err)
  375. return nil
  376. }
  377. if len(c.Args()) != 2 {
  378. fmt.Printf("Expect plugin name.\n")
  379. return nil
  380. }
  381. pname := c.Args()[1]
  382. args := &server.PluginDesc{
  383. RPCArgDesc: server.RPCArgDesc{
  384. Name: pname,
  385. },
  386. Type: ptype,
  387. }
  388. var reply string
  389. err = client.Call("Server.DescPlugin", args, &reply)
  390. if err != nil {
  391. fmt.Println(err)
  392. } else {
  393. fmt.Println(reply)
  394. }
  395. return nil
  396. },
  397. },
  398. {
  399. Name: "udf",
  400. Usage: "describe udf $udf_name",
  401. //Flags: nflag,
  402. Action: func(c *cli.Context) error {
  403. if len(c.Args()) != 1 {
  404. fmt.Printf("Expect udf name.\n")
  405. return nil
  406. }
  407. pname := c.Args()[0]
  408. var reply string
  409. err = client.Call("Server.DescUdf", pname, &reply)
  410. if err != nil {
  411. fmt.Println(err)
  412. } else {
  413. fmt.Println(reply)
  414. }
  415. return nil
  416. },
  417. },
  418. {
  419. Name: "service",
  420. Usage: "describe service $service_name",
  421. Action: func(c *cli.Context) error {
  422. if len(c.Args()) != 1 {
  423. fmt.Printf("Expect service name.\n")
  424. return nil
  425. }
  426. name := c.Args()[0]
  427. var reply string
  428. err = client.Call("Server.DescService", name, &reply)
  429. if err != nil {
  430. fmt.Println(err)
  431. } else {
  432. fmt.Println(reply)
  433. }
  434. return nil
  435. },
  436. },
  437. {
  438. Name: "service_func",
  439. Usage: "describe service_func $service_func_name",
  440. Action: func(c *cli.Context) error {
  441. if len(c.Args()) != 1 {
  442. fmt.Printf("Expect service func name.\n")
  443. return nil
  444. }
  445. name := c.Args()[0]
  446. var reply string
  447. err = client.Call("Server.DescServiceFunc", name, &reply)
  448. if err != nil {
  449. fmt.Println(err)
  450. } else {
  451. fmt.Println(reply)
  452. }
  453. return nil
  454. },
  455. },
  456. },
  457. },
  458. {
  459. Name: "drop",
  460. Aliases: []string{"drop"},
  461. Usage: "drop stream $stream_name | drop table $table_name |drop rule $rule_name | drop plugin $plugin_type $plugin_name -r $stop | drop service $service_name",
  462. Subcommands: []cli.Command{
  463. {
  464. Name: "stream",
  465. Usage: "drop stream $stream_name",
  466. //Flags: nflag,
  467. Action: func(c *cli.Context) error {
  468. streamProcess(client, "")
  469. return nil
  470. },
  471. },
  472. {
  473. Name: "table",
  474. Usage: "drop table $table_name",
  475. //Flags: nflag,
  476. Action: func(c *cli.Context) error {
  477. streamProcess(client, "")
  478. return nil
  479. },
  480. },
  481. {
  482. Name: "rule",
  483. Usage: "drop rule $rule_name",
  484. //Flags: nflag,
  485. Action: func(c *cli.Context) error {
  486. if len(c.Args()) != 1 {
  487. fmt.Printf("Expect rule name.\n")
  488. return nil
  489. }
  490. rname := c.Args()[0]
  491. var reply string
  492. err = client.Call("Server.DropRule", rname, &reply)
  493. if err != nil {
  494. fmt.Println(err)
  495. } else {
  496. fmt.Println(reply)
  497. }
  498. return nil
  499. },
  500. },
  501. {
  502. Name: "plugin",
  503. Usage: "drop plugin $plugin_type $plugin_name -s stop",
  504. Flags: []cli.Flag{
  505. cli.StringFlag{
  506. Name: "stop, s",
  507. Usage: "stop kuiper after the action",
  508. },
  509. },
  510. Action: func(c *cli.Context) error {
  511. r := c.String("stop")
  512. if r != "true" && r != "false" {
  513. fmt.Printf("Expect s flag to be a boolean value.\n")
  514. return nil
  515. }
  516. if len(c.Args()) < 2 || len(c.Args()) > 3 {
  517. fmt.Printf("Expect plugin type and name.\n")
  518. return nil
  519. }
  520. ptype, err := getPluginType(c.Args()[0])
  521. if err != nil {
  522. fmt.Printf("%s\n", err)
  523. return nil
  524. }
  525. pname := c.Args()[1]
  526. args := &server.PluginDesc{
  527. RPCArgDesc: server.RPCArgDesc{
  528. Name: pname,
  529. },
  530. Type: ptype,
  531. Stop: r == "true",
  532. }
  533. var reply string
  534. err = client.Call("Server.DropPlugin", args, &reply)
  535. if err != nil {
  536. fmt.Println(err)
  537. } else {
  538. fmt.Println(reply)
  539. }
  540. return nil
  541. },
  542. },
  543. {
  544. Name: "service",
  545. Usage: "drop service $service_name",
  546. Action: func(c *cli.Context) error {
  547. if len(c.Args()) != 1 {
  548. fmt.Printf("Expect service name.\n")
  549. return nil
  550. }
  551. name := c.Args()[0]
  552. var reply string
  553. err = client.Call("Server.DropService", name, &reply)
  554. if err != nil {
  555. fmt.Println(err)
  556. } else {
  557. fmt.Println(reply)
  558. }
  559. return nil
  560. },
  561. },
  562. },
  563. },
  564. {
  565. Name: "show",
  566. Aliases: []string{"show"},
  567. Usage: "show streams | show tables | show rules | show plugins $plugin_type | show services | show service_funcs",
  568. Subcommands: []cli.Command{
  569. {
  570. Name: "streams",
  571. Usage: "show streams",
  572. Action: func(c *cli.Context) error {
  573. streamProcess(client, "")
  574. return nil
  575. },
  576. },
  577. {
  578. Name: "tables",
  579. Usage: "show tables",
  580. Action: func(c *cli.Context) error {
  581. streamProcess(client, "")
  582. return nil
  583. },
  584. },
  585. {
  586. Name: "rules",
  587. Usage: "show rules",
  588. Action: func(c *cli.Context) error {
  589. var reply string
  590. err = client.Call("Server.ShowRules", 0, &reply)
  591. if err != nil {
  592. fmt.Println(err)
  593. } else {
  594. fmt.Println(reply)
  595. }
  596. return nil
  597. },
  598. },
  599. {
  600. Name: "plugins",
  601. Usage: "show plugins $plugin_type",
  602. Action: func(c *cli.Context) error {
  603. if len(c.Args()) != 1 {
  604. fmt.Printf("Expect plugin type.\n")
  605. return nil
  606. }
  607. ptype, err := getPluginType(c.Args()[0])
  608. if err != nil {
  609. fmt.Printf("%s\n", err)
  610. return nil
  611. }
  612. var reply string
  613. err = client.Call("Server.ShowPlugins", ptype, &reply)
  614. if err != nil {
  615. fmt.Println(err)
  616. } else {
  617. fmt.Println(reply)
  618. }
  619. return nil
  620. },
  621. },
  622. {
  623. Name: "udfs",
  624. Usage: "show udfs",
  625. Action: func(c *cli.Context) error {
  626. var reply string
  627. err = client.Call("Server.ShowUdfs", 0, &reply)
  628. if err != nil {
  629. fmt.Println(err)
  630. } else {
  631. fmt.Println(reply)
  632. }
  633. return nil
  634. },
  635. }, {
  636. Name: "services",
  637. Usage: "show services",
  638. Action: func(c *cli.Context) error {
  639. var reply string
  640. err = client.Call("Server.ShowServices", 0, &reply)
  641. if err != nil {
  642. fmt.Println(err)
  643. } else {
  644. fmt.Println(reply)
  645. }
  646. return nil
  647. },
  648. }, {
  649. Name: "service_funcs",
  650. Usage: "show service_funcs",
  651. Action: func(c *cli.Context) error {
  652. var reply string
  653. err = client.Call("Server.ShowServiceFuncs", 0, &reply)
  654. if err != nil {
  655. fmt.Println(err)
  656. } else {
  657. fmt.Println(reply)
  658. }
  659. return nil
  660. },
  661. },
  662. },
  663. },
  664. {
  665. Name: "getstatus",
  666. Aliases: []string{"getstatus"},
  667. Usage: "getstatus rule $rule_name",
  668. Subcommands: []cli.Command{
  669. {
  670. Name: "rule",
  671. Usage: "getstatus rule $rule_name",
  672. //Flags: nflag,
  673. Action: func(c *cli.Context) error {
  674. if len(c.Args()) != 1 {
  675. fmt.Printf("Expect rule name.\n")
  676. return nil
  677. }
  678. rname := c.Args()[0]
  679. var reply string
  680. err = client.Call("Server.GetStatusRule", rname, &reply)
  681. if err != nil {
  682. fmt.Println(err)
  683. } else {
  684. fmt.Println(reply)
  685. }
  686. return nil
  687. },
  688. },
  689. },
  690. },
  691. {
  692. Name: "gettopo",
  693. Aliases: []string{"gettopo"},
  694. Usage: "gettopo rule $rule_name",
  695. Subcommands: []cli.Command{
  696. {
  697. Name: "rule",
  698. Usage: "getstopo rule $rule_name",
  699. //Flags: nflag,
  700. Action: func(c *cli.Context) error {
  701. if len(c.Args()) != 1 {
  702. fmt.Printf("Expect rule name.\n")
  703. return nil
  704. }
  705. rname := c.Args()[0]
  706. var reply string
  707. err = client.Call("Server.GetTopoRule", rname, &reply)
  708. if err != nil {
  709. fmt.Println(err)
  710. } else {
  711. fmt.Println(reply)
  712. }
  713. return nil
  714. },
  715. },
  716. },
  717. },
  718. {
  719. Name: "start",
  720. Aliases: []string{"start"},
  721. Usage: "start rule $rule_name",
  722. Subcommands: []cli.Command{
  723. {
  724. Name: "rule",
  725. Usage: "start rule $rule_name",
  726. //Flags: nflag,
  727. Action: func(c *cli.Context) error {
  728. if len(c.Args()) != 1 {
  729. fmt.Printf("Expect rule name.\n")
  730. return nil
  731. }
  732. rname := c.Args()[0]
  733. var reply string
  734. err = client.Call("Server.StartRule", rname, &reply)
  735. if err != nil {
  736. fmt.Println(err)
  737. } else {
  738. fmt.Println(reply)
  739. }
  740. return nil
  741. },
  742. },
  743. },
  744. },
  745. {
  746. Name: "stop",
  747. Aliases: []string{"stop"},
  748. Usage: "stop rule $rule_name",
  749. Subcommands: []cli.Command{
  750. {
  751. Name: "rule",
  752. Usage: "stop rule $rule_name",
  753. //Flags: nflag,
  754. Action: func(c *cli.Context) error {
  755. if len(c.Args()) != 1 {
  756. fmt.Printf("Expect rule name.\n")
  757. return nil
  758. }
  759. rname := c.Args()[0]
  760. var reply string
  761. err = client.Call("Server.StopRule", rname, &reply)
  762. if err != nil {
  763. fmt.Println(err)
  764. } else {
  765. fmt.Println(reply)
  766. }
  767. return nil
  768. },
  769. },
  770. },
  771. },
  772. {
  773. Name: "restart",
  774. Aliases: []string{"restart"},
  775. Usage: "restart rule $rule_name",
  776. Subcommands: []cli.Command{
  777. {
  778. Name: "rule",
  779. Usage: "restart rule $rule_name",
  780. //Flags: nflag,
  781. Action: func(c *cli.Context) error {
  782. if len(c.Args()) != 1 {
  783. fmt.Printf("Expect rule name.\n")
  784. return nil
  785. }
  786. rname := c.Args()[0]
  787. var reply string
  788. err = client.Call("Server.RestartRule", rname, &reply)
  789. if err != nil {
  790. fmt.Println(err)
  791. } else {
  792. fmt.Println(reply)
  793. }
  794. return nil
  795. },
  796. },
  797. },
  798. },
  799. {
  800. Name: "register",
  801. Aliases: []string{"register"},
  802. Usage: "register plugin function $plugin_name [$plugin_json | -f plugin_def_file]",
  803. Subcommands: []cli.Command{
  804. {
  805. Name: "plugin",
  806. Usage: "register plugin $plugin_type $plugin_name [$plugin_json | -f plugin_def_file]",
  807. Flags: []cli.Flag{
  808. cli.StringFlag{
  809. Name: "file, f",
  810. Usage: "the location of plugin functions definition file",
  811. FilePath: "/home/myplugin.txt",
  812. },
  813. },
  814. Action: func(c *cli.Context) error {
  815. if len(c.Args()) < 2 {
  816. fmt.Printf("Expect plugin type and name.\n")
  817. return nil
  818. }
  819. ptype := c.Args()[0]
  820. if strings.ToLower(ptype) != "function" {
  821. fmt.Printf("Plugin type must be function.\n")
  822. return nil
  823. }
  824. pname := c.Args()[1]
  825. sfile := c.String("file")
  826. args := &server.PluginDesc{
  827. RPCArgDesc: server.RPCArgDesc{
  828. Name: pname,
  829. },
  830. }
  831. if sfile != "" {
  832. if len(c.Args()) != 2 {
  833. fmt.Printf("Expect plugin type, name.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  834. return nil
  835. }
  836. if p, err := readDef(sfile, "plugin"); err != nil {
  837. fmt.Printf("%s", err)
  838. return nil
  839. } else {
  840. args.Json = string(p)
  841. }
  842. } else {
  843. if len(c.Args()) != 3 {
  844. fmt.Printf("Expect plugin type, name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  845. return nil
  846. }
  847. args.Json = c.Args()[2]
  848. }
  849. var reply string
  850. err = client.Call("Server.RegisterPlugin", args, &reply)
  851. if err != nil {
  852. fmt.Println(err)
  853. } else {
  854. fmt.Println(reply)
  855. }
  856. return nil
  857. },
  858. },
  859. },
  860. },
  861. }
  862. app.Name = "Kuiper"
  863. app.Usage = "The command line tool for EMQ X Kuiper."
  864. app.Action = func(c *cli.Context) error {
  865. cli.ShowSubcommandHelp(c)
  866. //cli.ShowVersion(c)
  867. return nil
  868. }
  869. sort.Sort(cli.FlagsByName(app.Flags))
  870. sort.Sort(cli.CommandsByName(app.Commands))
  871. err = app.Run(os.Args)
  872. if err != nil {
  873. fmt.Printf("%v", err)
  874. }
  875. }
  876. func getPluginType(arg string) (ptype int, err error) {
  877. switch arg {
  878. case "source":
  879. ptype = 0
  880. case "sink":
  881. ptype = 1
  882. case "function":
  883. ptype = 2
  884. default:
  885. err = fmt.Errorf("Invalid plugin type %s, should be \"source\", \"sink\" or \"function\".\n", arg)
  886. }
  887. return
  888. }
  889. func readDef(sfile string, t string) ([]byte, error) {
  890. if _, err := os.Stat(sfile); os.IsNotExist(err) {
  891. return nil, fmt.Errorf("The specified %s defenition file %s is not existed.\n", t, sfile)
  892. }
  893. fmt.Printf("Creating a new %s from file %s.\n", t, sfile)
  894. if rule, err := ioutil.ReadFile(sfile); err != nil {
  895. return nil, fmt.Errorf("Failed to read from %s definition file %s.\n", t, sfile)
  896. } else {
  897. return rule, nil
  898. }
  899. }