main.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "bufio"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/internal/pkg/model"
  20. "github.com/lf-edge/ekuiper/pkg/infra"
  21. "github.com/urfave/cli"
  22. "io/ioutil"
  23. "net/rpc"
  24. "os"
  25. "sort"
  26. "strings"
  27. "time"
  28. )
  29. type clientConf struct {
  30. Host string `yaml:"host"`
  31. Port int `yaml:"port"`
  32. }
  33. const ClientYaml = "client.yaml"
  34. func streamProcess(client *rpc.Client, args string) {
  35. var reply string
  36. if args == "" {
  37. args = strings.Join(os.Args[1:], " ")
  38. }
  39. err := client.Call("Server.Stream", args, &reply)
  40. if err != nil {
  41. fmt.Println(err)
  42. } else {
  43. fmt.Println(reply)
  44. }
  45. }
  46. var (
  47. Version = "unknown"
  48. LoadFileType = "relative"
  49. )
  50. func main() {
  51. conf.LoadFileType = LoadFileType
  52. app := cli.NewApp()
  53. app.Version = Version
  54. //nflag := []cli.Flag { cli.StringFlag{
  55. // Name: "name, n",
  56. // Usage: "the name of stream",
  57. // }}
  58. var cfg map[string]clientConf
  59. err := conf.LoadConfigByName(ClientYaml, &cfg)
  60. if err != nil {
  61. conf.Log.Fatal(err)
  62. fmt.Printf("Failed to load config file with error %s.\n", err)
  63. }
  64. var config *clientConf
  65. c, ok := cfg["basic"]
  66. if !ok {
  67. fmt.Printf("No basic config in client.yaml, will use the default configuration.\n")
  68. } else {
  69. config = &c
  70. }
  71. if config == nil {
  72. config = &clientConf{
  73. Host: "127.0.0.1",
  74. Port: 20498,
  75. }
  76. }
  77. fmt.Printf("Connecting to %s:%d... \n", config.Host, config.Port)
  78. // Create a TCP connection to localhost on port 1234
  79. client, err := rpc.DialHTTP("tcp", fmt.Sprintf("%s:%d", config.Host, config.Port))
  80. if err != nil {
  81. fmt.Printf("Failed to connect the server, please start the server.\n")
  82. return
  83. }
  84. app.Commands = []cli.Command{
  85. {
  86. Name: "query",
  87. Aliases: []string{"query"},
  88. Usage: "query command line",
  89. Action: func(c *cli.Context) error {
  90. reader := bufio.NewReader(os.Stdin)
  91. var inputs []string
  92. ticker := time.NewTicker(time.Millisecond * 300)
  93. defer ticker.Stop()
  94. for {
  95. fmt.Print("kuiper > ")
  96. text, _ := reader.ReadString('\n')
  97. inputs = append(inputs, text)
  98. // convert CRLF to LF
  99. text = strings.Replace(text, "\n", "", -1)
  100. if strings.EqualFold(text, "quit") || strings.EqualFold(text, "exit") {
  101. break
  102. } else if strings.Trim(text, " ") == "" {
  103. continue
  104. } else {
  105. var reply string
  106. err := client.Call("Server.CreateQuery", text, &reply)
  107. if err != nil {
  108. fmt.Println(err)
  109. continue
  110. } else {
  111. fmt.Println(reply)
  112. go func() {
  113. err := infra.SafeRun(func() error {
  114. for {
  115. <-ticker.C
  116. var result string
  117. e := client.Call("Server.GetQueryResult", "", &result)
  118. if e != nil {
  119. return e
  120. }
  121. if result != "" {
  122. fmt.Println(result)
  123. }
  124. }
  125. })
  126. if err != nil {
  127. fmt.Println(err)
  128. fmt.Print("kuiper > ")
  129. }
  130. }()
  131. }
  132. }
  133. }
  134. return nil
  135. },
  136. },
  137. {
  138. Name: "create",
  139. Aliases: []string{"create"},
  140. Usage: "create stream $stream_name | create stream $stream_name -f $stream_def_file | create table $table_name | create table $table_name -f $table_def_file| create rule $rule_name $rule_json | create rule $rule_name -f $rule_def_file | create plugin $plugin_type $plugin_name $plugin_json | create plugin $plugin_type $plugin_name -f $plugin_def_file | create service $service_name $service_json ",
  141. Subcommands: []cli.Command{
  142. {
  143. Name: "stream",
  144. Usage: "create stream $stream_name [-f stream_def_file]",
  145. Flags: []cli.Flag{
  146. cli.StringFlag{
  147. Name: "file, f",
  148. Usage: "the location of stream definition file",
  149. FilePath: "/home/mystream.txt",
  150. },
  151. },
  152. Action: func(c *cli.Context) error {
  153. sfile := c.String("file")
  154. if sfile != "" {
  155. if stream, err := readDef(sfile, "stream"); err != nil {
  156. fmt.Printf("%s", err)
  157. return nil
  158. } else {
  159. args := strings.Join([]string{"CREATE STREAM ", string(stream)}, " ")
  160. streamProcess(client, args)
  161. return nil
  162. }
  163. } else {
  164. streamProcess(client, "")
  165. return nil
  166. }
  167. },
  168. },
  169. {
  170. Name: "table",
  171. Usage: "create table $table_name [-f table_def_file]",
  172. Flags: []cli.Flag{
  173. cli.StringFlag{
  174. Name: "file, f",
  175. Usage: "the location of table definition file",
  176. FilePath: "/home/mytable.txt",
  177. },
  178. },
  179. Action: func(c *cli.Context) error {
  180. sfile := c.String("file")
  181. if sfile != "" {
  182. if stream, err := readDef(sfile, "table"); err != nil {
  183. fmt.Printf("%s", err)
  184. return nil
  185. } else {
  186. args := strings.Join([]string{"CREATE TABLE ", string(stream)}, " ")
  187. streamProcess(client, args)
  188. return nil
  189. }
  190. } else {
  191. streamProcess(client, "")
  192. return nil
  193. }
  194. },
  195. },
  196. {
  197. Name: "rule",
  198. Usage: "create rule $rule_name [$rule_json | -f rule_def_file]",
  199. Flags: []cli.Flag{
  200. cli.StringFlag{
  201. Name: "file, f",
  202. Usage: "the location of rule definition file",
  203. FilePath: "/home/myrule.txt",
  204. },
  205. },
  206. Action: func(c *cli.Context) error {
  207. sfile := c.String("file")
  208. if sfile != "" {
  209. if rule, err := readDef(sfile, "rule"); err != nil {
  210. fmt.Printf("%s", err)
  211. return nil
  212. } else {
  213. if len(c.Args()) != 1 {
  214. fmt.Printf("Expect rule name.\n")
  215. return nil
  216. }
  217. rname := c.Args()[0]
  218. var reply string
  219. args := &model.RPCArgDesc{Name: rname, Json: string(rule)}
  220. err = client.Call("Server.CreateRule", args, &reply)
  221. if err != nil {
  222. fmt.Println(err)
  223. } else {
  224. fmt.Println(reply)
  225. }
  226. }
  227. return nil
  228. } else {
  229. if len(c.Args()) != 2 {
  230. fmt.Printf("Expect rule name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  231. return nil
  232. }
  233. rname := c.Args()[0]
  234. rjson := c.Args()[1]
  235. var reply string
  236. args := &model.RPCArgDesc{Name: rname, Json: rjson}
  237. err = client.Call("Server.CreateRule", args, &reply)
  238. if err != nil {
  239. fmt.Println(err)
  240. } else {
  241. fmt.Println(reply)
  242. }
  243. return nil
  244. }
  245. },
  246. },
  247. {
  248. Name: "plugin",
  249. Usage: "create plugin $plugin_type $plugin_name [$plugin_json | -f plugin_def_file]",
  250. Flags: []cli.Flag{
  251. cli.StringFlag{
  252. Name: "file, f",
  253. Usage: "the location of plugin definition file",
  254. FilePath: "/home/myplugin.txt",
  255. },
  256. },
  257. Action: func(c *cli.Context) error {
  258. if len(c.Args()) < 2 {
  259. fmt.Printf("Expect plugin type and name.\n")
  260. return nil
  261. }
  262. ptype, err := getPluginType(c.Args()[0])
  263. if err != nil {
  264. fmt.Printf("%s\n", err)
  265. return nil
  266. }
  267. pname := c.Args()[1]
  268. sfile := c.String("file")
  269. args := &model.PluginDesc{
  270. RPCArgDesc: model.RPCArgDesc{
  271. Name: pname,
  272. },
  273. Type: ptype,
  274. }
  275. if sfile != "" {
  276. if len(c.Args()) != 2 {
  277. fmt.Printf("Expect plugin type, name.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  278. return nil
  279. }
  280. if p, err := readDef(sfile, "plugin"); err != nil {
  281. fmt.Printf("%s", err)
  282. return nil
  283. } else {
  284. args.Json = string(p)
  285. }
  286. } else {
  287. if len(c.Args()) != 3 {
  288. fmt.Printf("Expect plugin type, name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  289. return nil
  290. }
  291. args.Json = c.Args()[2]
  292. }
  293. var reply string
  294. err = client.Call("Server.CreatePlugin", args, &reply)
  295. if err != nil {
  296. fmt.Println(err)
  297. } else {
  298. fmt.Println(reply)
  299. }
  300. return nil
  301. },
  302. },
  303. {
  304. Name: "service",
  305. Usage: "create service $service_name $service_json",
  306. Action: func(c *cli.Context) error {
  307. if len(c.Args()) < 2 {
  308. fmt.Printf("Expect service name and json.\n")
  309. return nil
  310. }
  311. var reply string
  312. err = client.Call("Server.CreateService", &model.RPCArgDesc{
  313. Name: c.Args()[0],
  314. Json: c.Args()[1],
  315. }, &reply)
  316. if err != nil {
  317. fmt.Println(err)
  318. } else {
  319. fmt.Println(reply)
  320. }
  321. return nil
  322. },
  323. },
  324. },
  325. },
  326. {
  327. Name: "describe",
  328. Aliases: []string{"describe"},
  329. Usage: "describe stream $stream_name | describe table $table_name | describe rule $rule_name | describe plugin $plugin_type $plugin_name | describe udf $udf_name | describe service $service_name | describe service_func $service_func_name",
  330. Subcommands: []cli.Command{
  331. {
  332. Name: "stream",
  333. Usage: "describe stream $stream_name",
  334. //Flags: nflag,
  335. Action: func(c *cli.Context) error {
  336. streamProcess(client, "")
  337. return nil
  338. },
  339. },
  340. {
  341. Name: "table",
  342. Usage: "describe table $table_name",
  343. //Flags: nflag,
  344. Action: func(c *cli.Context) error {
  345. streamProcess(client, "")
  346. return nil
  347. },
  348. },
  349. {
  350. Name: "rule",
  351. Usage: "describe rule $rule_name",
  352. Action: func(c *cli.Context) error {
  353. if len(c.Args()) != 1 {
  354. fmt.Printf("Expect rule name.\n")
  355. return nil
  356. }
  357. rname := c.Args()[0]
  358. var reply string
  359. err = client.Call("Server.DescRule", rname, &reply)
  360. if err != nil {
  361. fmt.Println(err)
  362. } else {
  363. fmt.Println(reply)
  364. }
  365. return nil
  366. },
  367. },
  368. {
  369. Name: "plugin",
  370. Usage: "describe plugin $plugin_type $plugin_name",
  371. //Flags: nflag,
  372. Action: func(c *cli.Context) error {
  373. ptype, err := getPluginType(c.Args()[0])
  374. if err != nil {
  375. fmt.Printf("%s\n", err)
  376. return nil
  377. }
  378. if len(c.Args()) != 2 {
  379. fmt.Printf("Expect plugin name.\n")
  380. return nil
  381. }
  382. pname := c.Args()[1]
  383. args := &model.PluginDesc{
  384. RPCArgDesc: model.RPCArgDesc{
  385. Name: pname,
  386. },
  387. Type: ptype,
  388. }
  389. var reply string
  390. err = client.Call("Server.DescPlugin", args, &reply)
  391. if err != nil {
  392. fmt.Println(err)
  393. } else {
  394. fmt.Println(reply)
  395. }
  396. return nil
  397. },
  398. },
  399. {
  400. Name: "udf",
  401. Usage: "describe udf $udf_name",
  402. //Flags: nflag,
  403. Action: func(c *cli.Context) error {
  404. if len(c.Args()) != 1 {
  405. fmt.Printf("Expect udf name.\n")
  406. return nil
  407. }
  408. pname := c.Args()[0]
  409. var reply string
  410. err = client.Call("Server.DescUdf", pname, &reply)
  411. if err != nil {
  412. fmt.Println(err)
  413. } else {
  414. fmt.Println(reply)
  415. }
  416. return nil
  417. },
  418. },
  419. {
  420. Name: "service",
  421. Usage: "describe service $service_name",
  422. Action: func(c *cli.Context) error {
  423. if len(c.Args()) != 1 {
  424. fmt.Printf("Expect service name.\n")
  425. return nil
  426. }
  427. name := c.Args()[0]
  428. var reply string
  429. err = client.Call("Server.DescService", name, &reply)
  430. if err != nil {
  431. fmt.Println(err)
  432. } else {
  433. fmt.Println(reply)
  434. }
  435. return nil
  436. },
  437. },
  438. {
  439. Name: "service_func",
  440. Usage: "describe service_func $service_func_name",
  441. Action: func(c *cli.Context) error {
  442. if len(c.Args()) != 1 {
  443. fmt.Printf("Expect service func name.\n")
  444. return nil
  445. }
  446. name := c.Args()[0]
  447. var reply string
  448. err = client.Call("Server.DescServiceFunc", name, &reply)
  449. if err != nil {
  450. fmt.Println(err)
  451. } else {
  452. fmt.Println(reply)
  453. }
  454. return nil
  455. },
  456. },
  457. },
  458. },
  459. {
  460. Name: "drop",
  461. Aliases: []string{"drop"},
  462. Usage: "drop stream $stream_name | drop table $table_name |drop rule $rule_name | drop plugin $plugin_type $plugin_name -s $stop | drop service $service_name",
  463. Subcommands: []cli.Command{
  464. {
  465. Name: "stream",
  466. Usage: "drop stream $stream_name",
  467. //Flags: nflag,
  468. Action: func(c *cli.Context) error {
  469. streamProcess(client, "")
  470. return nil
  471. },
  472. },
  473. {
  474. Name: "table",
  475. Usage: "drop table $table_name",
  476. //Flags: nflag,
  477. Action: func(c *cli.Context) error {
  478. streamProcess(client, "")
  479. return nil
  480. },
  481. },
  482. {
  483. Name: "rule",
  484. Usage: "drop rule $rule_name",
  485. //Flags: nflag,
  486. Action: func(c *cli.Context) error {
  487. if len(c.Args()) != 1 {
  488. fmt.Printf("Expect rule name.\n")
  489. return nil
  490. }
  491. rname := c.Args()[0]
  492. var reply string
  493. err = client.Call("Server.DropRule", rname, &reply)
  494. if err != nil {
  495. fmt.Println(err)
  496. } else {
  497. fmt.Println(reply)
  498. }
  499. return nil
  500. },
  501. },
  502. {
  503. Name: "plugin",
  504. Usage: "drop plugin $plugin_type $plugin_name -s stop",
  505. Flags: []cli.Flag{
  506. cli.StringFlag{
  507. Name: "stop, s",
  508. Usage: "stop kuiper after the action",
  509. },
  510. },
  511. Action: func(c *cli.Context) error {
  512. r := c.String("stop")
  513. if r != "true" && r != "false" {
  514. fmt.Printf("Expect s flag to be a boolean value.\n")
  515. return nil
  516. }
  517. if len(c.Args()) < 2 || len(c.Args()) > 3 {
  518. fmt.Printf("Expect plugin type and name.\n")
  519. return nil
  520. }
  521. ptype, err := getPluginType(c.Args()[0])
  522. if err != nil {
  523. fmt.Printf("%s\n", err)
  524. return nil
  525. }
  526. pname := c.Args()[1]
  527. args := &model.PluginDesc{
  528. RPCArgDesc: model.RPCArgDesc{
  529. Name: pname,
  530. },
  531. Type: ptype,
  532. Stop: r == "true",
  533. }
  534. var reply string
  535. err = client.Call("Server.DropPlugin", args, &reply)
  536. if err != nil {
  537. fmt.Println(err)
  538. } else {
  539. fmt.Println(reply)
  540. }
  541. return nil
  542. },
  543. },
  544. {
  545. Name: "service",
  546. Usage: "drop service $service_name",
  547. Action: func(c *cli.Context) error {
  548. if len(c.Args()) != 1 {
  549. fmt.Printf("Expect service name.\n")
  550. return nil
  551. }
  552. name := c.Args()[0]
  553. var reply string
  554. err = client.Call("Server.DropService", name, &reply)
  555. if err != nil {
  556. fmt.Println(err)
  557. } else {
  558. fmt.Println(reply)
  559. }
  560. return nil
  561. },
  562. },
  563. },
  564. },
  565. {
  566. Name: "show",
  567. Aliases: []string{"show"},
  568. Usage: "show streams | show tables | show rules | show plugins $plugin_type | show services | show service_funcs",
  569. Subcommands: []cli.Command{
  570. {
  571. Name: "streams",
  572. Usage: "show streams",
  573. Action: func(c *cli.Context) error {
  574. streamProcess(client, "")
  575. return nil
  576. },
  577. },
  578. {
  579. Name: "tables",
  580. Usage: "show tables",
  581. Action: func(c *cli.Context) error {
  582. streamProcess(client, "")
  583. return nil
  584. },
  585. },
  586. {
  587. Name: "rules",
  588. Usage: "show rules",
  589. Action: func(c *cli.Context) error {
  590. var reply string
  591. err = client.Call("Server.ShowRules", 0, &reply)
  592. if err != nil {
  593. fmt.Println(err)
  594. } else {
  595. fmt.Println(reply)
  596. }
  597. return nil
  598. },
  599. },
  600. {
  601. Name: "plugins",
  602. Usage: "show plugins $plugin_type",
  603. Action: func(c *cli.Context) error {
  604. if len(c.Args()) != 1 {
  605. fmt.Printf("Expect plugin type.\n")
  606. return nil
  607. }
  608. ptype, err := getPluginType(c.Args()[0])
  609. if err != nil {
  610. fmt.Printf("%s\n", err)
  611. return nil
  612. }
  613. var reply string
  614. err = client.Call("Server.ShowPlugins", ptype, &reply)
  615. if err != nil {
  616. fmt.Println(err)
  617. } else {
  618. fmt.Println(reply)
  619. }
  620. return nil
  621. },
  622. },
  623. {
  624. Name: "udfs",
  625. Usage: "show udfs",
  626. Action: func(c *cli.Context) error {
  627. var reply string
  628. err = client.Call("Server.ShowUdfs", 0, &reply)
  629. if err != nil {
  630. fmt.Println(err)
  631. } else {
  632. fmt.Println(reply)
  633. }
  634. return nil
  635. },
  636. }, {
  637. Name: "services",
  638. Usage: "show services",
  639. Action: func(c *cli.Context) error {
  640. var reply string
  641. err = client.Call("Server.ShowServices", 0, &reply)
  642. if err != nil {
  643. fmt.Println(err)
  644. } else {
  645. fmt.Println(reply)
  646. }
  647. return nil
  648. },
  649. }, {
  650. Name: "service_funcs",
  651. Usage: "show service_funcs",
  652. Action: func(c *cli.Context) error {
  653. var reply string
  654. err = client.Call("Server.ShowServiceFuncs", 0, &reply)
  655. if err != nil {
  656. fmt.Println(err)
  657. } else {
  658. fmt.Println(reply)
  659. }
  660. return nil
  661. },
  662. },
  663. },
  664. },
  665. {
  666. Name: "getstatus",
  667. Aliases: []string{"getstatus"},
  668. Usage: "getstatus rule $rule_name",
  669. Subcommands: []cli.Command{
  670. {
  671. Name: "rule",
  672. Usage: "getstatus rule $rule_name",
  673. //Flags: nflag,
  674. Action: func(c *cli.Context) error {
  675. if len(c.Args()) != 1 {
  676. fmt.Printf("Expect rule name.\n")
  677. return nil
  678. }
  679. rname := c.Args()[0]
  680. var reply string
  681. err = client.Call("Server.GetStatusRule", rname, &reply)
  682. if err != nil {
  683. fmt.Println(err)
  684. } else {
  685. fmt.Println(reply)
  686. }
  687. return nil
  688. },
  689. },
  690. },
  691. },
  692. {
  693. Name: "gettopo",
  694. Aliases: []string{"gettopo"},
  695. Usage: "gettopo rule $rule_name",
  696. Subcommands: []cli.Command{
  697. {
  698. Name: "rule",
  699. Usage: "getstopo rule $rule_name",
  700. //Flags: nflag,
  701. Action: func(c *cli.Context) error {
  702. if len(c.Args()) != 1 {
  703. fmt.Printf("Expect rule name.\n")
  704. return nil
  705. }
  706. rname := c.Args()[0]
  707. var reply string
  708. err = client.Call("Server.GetTopoRule", rname, &reply)
  709. if err != nil {
  710. fmt.Println(err)
  711. } else {
  712. fmt.Println(reply)
  713. }
  714. return nil
  715. },
  716. },
  717. },
  718. },
  719. {
  720. Name: "start",
  721. Aliases: []string{"start"},
  722. Usage: "start rule $rule_name",
  723. Subcommands: []cli.Command{
  724. {
  725. Name: "rule",
  726. Usage: "start rule $rule_name",
  727. //Flags: nflag,
  728. Action: func(c *cli.Context) error {
  729. if len(c.Args()) != 1 {
  730. fmt.Printf("Expect rule name.\n")
  731. return nil
  732. }
  733. rname := c.Args()[0]
  734. var reply string
  735. err = client.Call("Server.StartRule", rname, &reply)
  736. if err != nil {
  737. fmt.Println(err)
  738. } else {
  739. fmt.Println(reply)
  740. }
  741. return nil
  742. },
  743. },
  744. },
  745. },
  746. {
  747. Name: "stop",
  748. Aliases: []string{"stop"},
  749. Usage: "stop rule $rule_name",
  750. Subcommands: []cli.Command{
  751. {
  752. Name: "rule",
  753. Usage: "stop rule $rule_name",
  754. //Flags: nflag,
  755. Action: func(c *cli.Context) error {
  756. if len(c.Args()) != 1 {
  757. fmt.Printf("Expect rule name.\n")
  758. return nil
  759. }
  760. rname := c.Args()[0]
  761. var reply string
  762. err = client.Call("Server.StopRule", rname, &reply)
  763. if err != nil {
  764. fmt.Println(err)
  765. } else {
  766. fmt.Println(reply)
  767. }
  768. return nil
  769. },
  770. },
  771. },
  772. },
  773. {
  774. Name: "restart",
  775. Aliases: []string{"restart"},
  776. Usage: "restart rule $rule_name",
  777. Subcommands: []cli.Command{
  778. {
  779. Name: "rule",
  780. Usage: "restart rule $rule_name",
  781. //Flags: nflag,
  782. Action: func(c *cli.Context) error {
  783. if len(c.Args()) != 1 {
  784. fmt.Printf("Expect rule name.\n")
  785. return nil
  786. }
  787. rname := c.Args()[0]
  788. var reply string
  789. err = client.Call("Server.RestartRule", rname, &reply)
  790. if err != nil {
  791. fmt.Println(err)
  792. } else {
  793. fmt.Println(reply)
  794. }
  795. return nil
  796. },
  797. },
  798. },
  799. },
  800. {
  801. Name: "register",
  802. Aliases: []string{"register"},
  803. Usage: "register plugin function $plugin_name [$plugin_json | -f plugin_def_file]",
  804. Subcommands: []cli.Command{
  805. {
  806. Name: "plugin",
  807. Usage: "register plugin $plugin_type $plugin_name [$plugin_json | -f plugin_def_file]",
  808. Flags: []cli.Flag{
  809. cli.StringFlag{
  810. Name: "file, f",
  811. Usage: "the location of plugin functions definition file",
  812. FilePath: "/home/myplugin.txt",
  813. },
  814. },
  815. Action: func(c *cli.Context) error {
  816. if len(c.Args()) < 2 {
  817. fmt.Printf("Expect plugin type and name.\n")
  818. return nil
  819. }
  820. ptype := c.Args()[0]
  821. if !strings.EqualFold(ptype, "function") {
  822. fmt.Printf("Plugin type must be function.\n")
  823. return nil
  824. }
  825. pname := c.Args()[1]
  826. sfile := c.String("file")
  827. args := &model.PluginDesc{
  828. RPCArgDesc: model.RPCArgDesc{
  829. Name: pname,
  830. },
  831. }
  832. if sfile != "" {
  833. if len(c.Args()) != 2 {
  834. fmt.Printf("Expect plugin type, name.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  835. return nil
  836. }
  837. if p, err := readDef(sfile, "plugin"); err != nil {
  838. fmt.Printf("%s", err)
  839. return nil
  840. } else {
  841. args.Json = string(p)
  842. }
  843. } else {
  844. if len(c.Args()) != 3 {
  845. fmt.Printf("Expect plugin type, name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  846. return nil
  847. }
  848. args.Json = c.Args()[2]
  849. }
  850. var reply string
  851. err = client.Call("Server.RegisterPlugin", args, &reply)
  852. if err != nil {
  853. fmt.Println(err)
  854. } else {
  855. fmt.Println(reply)
  856. }
  857. return nil
  858. },
  859. },
  860. },
  861. },
  862. }
  863. app.Name = "Kuiper"
  864. app.Usage = "The command line tool for EMQ X Kuiper."
  865. app.Action = func(c *cli.Context) error {
  866. cli.ShowSubcommandHelp(c)
  867. //cli.ShowVersion(c)
  868. return nil
  869. }
  870. sort.Sort(cli.FlagsByName(app.Flags))
  871. sort.Sort(cli.CommandsByName(app.Commands))
  872. err = app.Run(os.Args)
  873. if err != nil {
  874. fmt.Printf("%v", err)
  875. }
  876. }
  877. func getPluginType(arg string) (ptype int, err error) {
  878. switch arg {
  879. case "source":
  880. ptype = 0
  881. case "sink":
  882. ptype = 1
  883. case "function":
  884. ptype = 2
  885. case "portable":
  886. ptype = 3
  887. default:
  888. err = fmt.Errorf("Invalid plugin type %s, should be \"source\", \"sink\", \"function\" or \"portable\".\n", arg)
  889. }
  890. return
  891. }
  892. func readDef(sfile string, t string) ([]byte, error) {
  893. if _, err := os.Stat(sfile); os.IsNotExist(err) {
  894. return nil, fmt.Errorf("The specified %s defenition file %s is not existed.\n", t, sfile)
  895. }
  896. fmt.Printf("Creating a new %s from file %s.\n", t, sfile)
  897. if rule, err := ioutil.ReadFile(sfile); err != nil {
  898. return nil, fmt.Errorf("Failed to read from %s definition file %s.\n", t, sfile)
  899. } else {
  900. return rule, nil
  901. }
  902. }