main.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807
  1. package main
  2. import (
  3. "bufio"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/go-yaml/yaml"
  7. "github.com/urfave/cli"
  8. "io/ioutil"
  9. "net/rpc"
  10. "os"
  11. "sort"
  12. "strings"
  13. "time"
  14. )
  15. type clientConf struct {
  16. Host string `yaml:"host"`
  17. Port int `yaml:"port"`
  18. }
  19. var clientYaml = "client.yaml"
  20. func streamProcess(client *rpc.Client, args string) {
  21. var reply string
  22. if args == "" {
  23. args = strings.Join(os.Args[1:], " ")
  24. }
  25. err := client.Call("Server.Stream", args, &reply)
  26. if err != nil {
  27. fmt.Println(err)
  28. } else {
  29. fmt.Println(reply)
  30. }
  31. }
  32. var (
  33. Version = "unknown"
  34. LoadFileType = "relative"
  35. )
  36. func main() {
  37. common.LoadFileType = LoadFileType
  38. app := cli.NewApp()
  39. app.Version = Version
  40. //nflag := []cli.Flag { cli.StringFlag{
  41. // Name: "name, n",
  42. // Usage: "the name of stream",
  43. // }}
  44. b, err := common.LoadConf(clientYaml)
  45. if err != nil {
  46. common.Log.Fatal(err)
  47. }
  48. var cfg map[string]clientConf
  49. var config *clientConf
  50. if err := yaml.Unmarshal(b, &cfg); err != nil {
  51. fmt.Printf("Failed to load config file with error %s.\n", err)
  52. } else {
  53. c, ok := cfg["basic"]
  54. if !ok {
  55. fmt.Printf("No basic config in client.yaml, will use the default configuration.\n")
  56. } else {
  57. config = &c
  58. }
  59. }
  60. if config == nil {
  61. config = &clientConf{
  62. Host: "127.0.0.1",
  63. Port: 20498,
  64. }
  65. }
  66. fmt.Printf("Connecting to %s:%d... \n", config.Host, config.Port)
  67. // Create a TCP connection to localhost on port 1234
  68. client, err := rpc.DialHTTP("tcp", fmt.Sprintf("%s:%d", config.Host, config.Port))
  69. if err != nil {
  70. fmt.Printf("Failed to connect the server, please start the server.\n")
  71. return
  72. }
  73. app.Commands = []cli.Command{
  74. {
  75. Name: "query",
  76. Aliases: []string{"query"},
  77. Usage: "query command line",
  78. Action: func(c *cli.Context) error {
  79. reader := bufio.NewReader(os.Stdin)
  80. var inputs []string
  81. ticker := time.NewTicker(time.Millisecond * 300)
  82. defer ticker.Stop()
  83. for {
  84. fmt.Print("kuiper > ")
  85. text, _ := reader.ReadString('\n')
  86. inputs = append(inputs, text)
  87. // convert CRLF to LF
  88. text = strings.Replace(text, "\n", "", -1)
  89. if strings.ToLower(text) == "quit" || strings.ToLower(text) == "exit" {
  90. break
  91. } else if strings.Trim(text, " ") == "" {
  92. continue
  93. } else {
  94. var reply string
  95. err := client.Call("Server.CreateQuery", text, &reply)
  96. if err != nil {
  97. fmt.Println(err)
  98. continue
  99. } else {
  100. fmt.Println(reply)
  101. go func() {
  102. for {
  103. <-ticker.C
  104. var result string
  105. e := client.Call("Server.GetQueryResult", "", &result)
  106. if e != nil {
  107. fmt.Println(e)
  108. fmt.Print("kuiper > ")
  109. return
  110. }
  111. if result != "" {
  112. fmt.Println(result)
  113. }
  114. }
  115. }()
  116. }
  117. }
  118. }
  119. return nil
  120. },
  121. },
  122. {
  123. Name: "create",
  124. Aliases: []string{"create"},
  125. Usage: "create stream $stream_name | create stream $stream_name -f $stream_def_file | create table $table_name | create table $table_name -f $table_def_file| create rule $rule_name $rule_json | create rule $rule_name -f $rule_def_file | create plugin $plugin_type $plugin_name $plugin_json | create plugin $plugin_type $plugin_name -f $plugin_def_file",
  126. Subcommands: []cli.Command{
  127. {
  128. Name: "stream",
  129. Usage: "create stream $stream_name [-f stream_def_file]",
  130. Flags: []cli.Flag{
  131. cli.StringFlag{
  132. Name: "file, f",
  133. Usage: "the location of stream definition file",
  134. FilePath: "/home/mystream.txt",
  135. },
  136. },
  137. Action: func(c *cli.Context) error {
  138. sfile := c.String("file")
  139. if sfile != "" {
  140. if stream, err := readDef(sfile, "stream"); err != nil {
  141. fmt.Printf("%s", err)
  142. return nil
  143. } else {
  144. args := strings.Join([]string{"CREATE STREAM ", string(stream)}, " ")
  145. streamProcess(client, args)
  146. return nil
  147. }
  148. } else {
  149. streamProcess(client, "")
  150. return nil
  151. }
  152. },
  153. },
  154. {
  155. Name: "table",
  156. Usage: "create table $table_name [-f table_def_file]",
  157. Flags: []cli.Flag{
  158. cli.StringFlag{
  159. Name: "file, f",
  160. Usage: "the location of table definition file",
  161. FilePath: "/home/mytable.txt",
  162. },
  163. },
  164. Action: func(c *cli.Context) error {
  165. sfile := c.String("file")
  166. if sfile != "" {
  167. if stream, err := readDef(sfile, "table"); err != nil {
  168. fmt.Printf("%s", err)
  169. return nil
  170. } else {
  171. args := strings.Join([]string{"CREATE TABLE ", string(stream)}, " ")
  172. streamProcess(client, args)
  173. return nil
  174. }
  175. } else {
  176. streamProcess(client, "")
  177. return nil
  178. }
  179. },
  180. },
  181. {
  182. Name: "rule",
  183. Usage: "create rule $rule_name [$rule_json | -f rule_def_file]",
  184. Flags: []cli.Flag{
  185. cli.StringFlag{
  186. Name: "file, f",
  187. Usage: "the location of rule definition file",
  188. FilePath: "/home/myrule.txt",
  189. },
  190. },
  191. Action: func(c *cli.Context) error {
  192. sfile := c.String("file")
  193. if sfile != "" {
  194. if rule, err := readDef(sfile, "rule"); err != nil {
  195. fmt.Printf("%s", err)
  196. return nil
  197. } else {
  198. if len(c.Args()) != 1 {
  199. fmt.Printf("Expect rule name.\n")
  200. return nil
  201. }
  202. rname := c.Args()[0]
  203. var reply string
  204. args := &common.RuleDesc{rname, string(rule)}
  205. err = client.Call("Server.CreateRule", args, &reply)
  206. if err != nil {
  207. fmt.Println(err)
  208. } else {
  209. fmt.Println(reply)
  210. }
  211. }
  212. return nil
  213. } else {
  214. if len(c.Args()) != 2 {
  215. fmt.Printf("Expect rule name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  216. return nil
  217. }
  218. rname := c.Args()[0]
  219. rjson := c.Args()[1]
  220. var reply string
  221. args := &common.RuleDesc{rname, rjson}
  222. err = client.Call("Server.CreateRule", args, &reply)
  223. if err != nil {
  224. fmt.Println(err)
  225. } else {
  226. fmt.Println(reply)
  227. }
  228. return nil
  229. }
  230. },
  231. },
  232. {
  233. Name: "plugin",
  234. Usage: "create plugin $plugin_type $plugin_name [$plugin_json | -f plugin_def_file]",
  235. Flags: []cli.Flag{
  236. cli.StringFlag{
  237. Name: "file, f",
  238. Usage: "the location of plugin definition file",
  239. FilePath: "/home/myplugin.txt",
  240. },
  241. },
  242. Action: func(c *cli.Context) error {
  243. if len(c.Args()) < 2 {
  244. fmt.Printf("Expect plugin type and name.\n")
  245. return nil
  246. }
  247. ptype, err := getPluginType(c.Args()[0])
  248. if err != nil {
  249. fmt.Printf("%s\n", err)
  250. return nil
  251. }
  252. pname := c.Args()[1]
  253. sfile := c.String("file")
  254. args := &common.PluginDesc{
  255. RuleDesc: common.RuleDesc{
  256. Name: pname,
  257. },
  258. Type: ptype,
  259. }
  260. if sfile != "" {
  261. if len(c.Args()) != 2 {
  262. fmt.Printf("Expect plugin type, name.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  263. return nil
  264. }
  265. if p, err := readDef(sfile, "plugin"); err != nil {
  266. fmt.Printf("%s", err)
  267. return nil
  268. } else {
  269. args.Json = string(p)
  270. }
  271. } else {
  272. if len(c.Args()) != 3 {
  273. fmt.Printf("Expect plugin type, name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  274. return nil
  275. }
  276. args.Json = c.Args()[2]
  277. }
  278. var reply string
  279. err = client.Call("Server.CreatePlugin", args, &reply)
  280. if err != nil {
  281. fmt.Println(err)
  282. } else {
  283. fmt.Println(reply)
  284. }
  285. return nil
  286. },
  287. },
  288. },
  289. },
  290. {
  291. Name: "describe",
  292. Aliases: []string{"describe"},
  293. Usage: "describe stream $stream_name | describe table $table_name | describe rule $rule_name | describe plugin $plugin_type $plugin_name",
  294. Subcommands: []cli.Command{
  295. {
  296. Name: "stream",
  297. Usage: "describe stream $stream_name",
  298. //Flags: nflag,
  299. Action: func(c *cli.Context) error {
  300. streamProcess(client, "")
  301. return nil
  302. },
  303. },
  304. {
  305. Name: "table",
  306. Usage: "describe table $table_name",
  307. //Flags: nflag,
  308. Action: func(c *cli.Context) error {
  309. streamProcess(client, "")
  310. return nil
  311. },
  312. },
  313. {
  314. Name: "rule",
  315. Usage: "describe rule $rule_name",
  316. Action: func(c *cli.Context) error {
  317. if len(c.Args()) != 1 {
  318. fmt.Printf("Expect rule name.\n")
  319. return nil
  320. }
  321. rname := c.Args()[0]
  322. var reply string
  323. err = client.Call("Server.DescRule", rname, &reply)
  324. if err != nil {
  325. fmt.Println(err)
  326. } else {
  327. fmt.Println(reply)
  328. }
  329. return nil
  330. },
  331. },
  332. {
  333. Name: "plugin",
  334. Usage: "describe plugin $plugin_type $plugin_name",
  335. //Flags: nflag,
  336. Action: func(c *cli.Context) error {
  337. ptype, err := getPluginType(c.Args()[0])
  338. if err != nil {
  339. fmt.Printf("%s\n", err)
  340. return nil
  341. }
  342. if len(c.Args()) != 2 {
  343. fmt.Printf("Expect plugin name.\n")
  344. return nil
  345. }
  346. pname := c.Args()[1]
  347. args := &common.PluginDesc{
  348. RuleDesc: common.RuleDesc{
  349. Name: pname,
  350. },
  351. Type: ptype,
  352. }
  353. var reply string
  354. err = client.Call("Server.DescPlugin", args, &reply)
  355. if err != nil {
  356. fmt.Println(err)
  357. } else {
  358. fmt.Println(reply)
  359. }
  360. return nil
  361. },
  362. },
  363. {
  364. Name: "udf",
  365. Usage: "describe udf $udf_name",
  366. //Flags: nflag,
  367. Action: func(c *cli.Context) error {
  368. if len(c.Args()) != 1 {
  369. fmt.Printf("Expect udf name.\n")
  370. return nil
  371. }
  372. pname := c.Args()[0]
  373. var reply string
  374. err = client.Call("Server.DescUdf", pname, &reply)
  375. if err != nil {
  376. fmt.Println(err)
  377. } else {
  378. fmt.Println(reply)
  379. }
  380. return nil
  381. },
  382. },
  383. },
  384. },
  385. {
  386. Name: "drop",
  387. Aliases: []string{"drop"},
  388. Usage: "drop stream $stream_name | drop table $table_name |drop rule $rule_name | drop plugin $plugin_type $plugin_name -r $stop",
  389. Subcommands: []cli.Command{
  390. {
  391. Name: "stream",
  392. Usage: "drop stream $stream_name",
  393. //Flags: nflag,
  394. Action: func(c *cli.Context) error {
  395. streamProcess(client, "")
  396. return nil
  397. },
  398. },
  399. {
  400. Name: "table",
  401. Usage: "drop table $table_name",
  402. //Flags: nflag,
  403. Action: func(c *cli.Context) error {
  404. streamProcess(client, "")
  405. return nil
  406. },
  407. },
  408. {
  409. Name: "rule",
  410. Usage: "drop rule $rule_name",
  411. //Flags: nflag,
  412. Action: func(c *cli.Context) error {
  413. if len(c.Args()) != 1 {
  414. fmt.Printf("Expect rule name.\n")
  415. return nil
  416. }
  417. rname := c.Args()[0]
  418. var reply string
  419. err = client.Call("Server.DropRule", rname, &reply)
  420. if err != nil {
  421. fmt.Println(err)
  422. } else {
  423. fmt.Println(reply)
  424. }
  425. return nil
  426. },
  427. },
  428. {
  429. Name: "plugin",
  430. Usage: "drop plugin $plugin_type $plugin_name -s stop",
  431. Flags: []cli.Flag{
  432. cli.StringFlag{
  433. Name: "stop, s",
  434. Usage: "stop kuiper after the action",
  435. },
  436. },
  437. Action: func(c *cli.Context) error {
  438. r := c.String("stop")
  439. if r != "true" && r != "false" {
  440. fmt.Printf("Expect s flag to be a boolean value.\n")
  441. return nil
  442. }
  443. if len(c.Args()) < 2 || len(c.Args()) > 3 {
  444. fmt.Printf("Expect plugin type and name.\n")
  445. return nil
  446. }
  447. ptype, err := getPluginType(c.Args()[0])
  448. if err != nil {
  449. fmt.Printf("%s\n", err)
  450. return nil
  451. }
  452. pname := c.Args()[1]
  453. args := &common.PluginDesc{
  454. RuleDesc: common.RuleDesc{
  455. Name: pname,
  456. },
  457. Type: ptype,
  458. Stop: r == "true",
  459. }
  460. var reply string
  461. err = client.Call("Server.DropPlugin", args, &reply)
  462. if err != nil {
  463. fmt.Println(err)
  464. } else {
  465. fmt.Println(reply)
  466. }
  467. return nil
  468. },
  469. },
  470. },
  471. },
  472. {
  473. Name: "show",
  474. Aliases: []string{"show"},
  475. Usage: "show streams | show tables | show rules | show plugins $plugin_type",
  476. Subcommands: []cli.Command{
  477. {
  478. Name: "streams",
  479. Usage: "show streams",
  480. Action: func(c *cli.Context) error {
  481. streamProcess(client, "")
  482. return nil
  483. },
  484. },
  485. {
  486. Name: "tables",
  487. Usage: "show tables",
  488. Action: func(c *cli.Context) error {
  489. streamProcess(client, "")
  490. return nil
  491. },
  492. },
  493. {
  494. Name: "rules",
  495. Usage: "show rules",
  496. Action: func(c *cli.Context) error {
  497. var reply string
  498. err = client.Call("Server.ShowRules", 0, &reply)
  499. if err != nil {
  500. fmt.Println(err)
  501. } else {
  502. fmt.Println(reply)
  503. }
  504. return nil
  505. },
  506. },
  507. {
  508. Name: "plugins",
  509. Usage: "show plugins $plugin_type",
  510. Action: func(c *cli.Context) error {
  511. if len(c.Args()) != 1 {
  512. fmt.Printf("Expect plugin type.\n")
  513. return nil
  514. }
  515. ptype, err := getPluginType(c.Args()[0])
  516. if err != nil {
  517. fmt.Printf("%s\n", err)
  518. return nil
  519. }
  520. var reply string
  521. err = client.Call("Server.ShowPlugins", ptype, &reply)
  522. if err != nil {
  523. fmt.Println(err)
  524. } else {
  525. fmt.Println(reply)
  526. }
  527. return nil
  528. },
  529. },
  530. {
  531. Name: "udfs",
  532. Usage: "show udfs",
  533. Action: func(c *cli.Context) error {
  534. var reply string
  535. err = client.Call("Server.ShowUdfs", 0, &reply)
  536. if err != nil {
  537. fmt.Println(err)
  538. } else {
  539. fmt.Println(reply)
  540. }
  541. return nil
  542. },
  543. },
  544. },
  545. },
  546. {
  547. Name: "getstatus",
  548. Aliases: []string{"getstatus"},
  549. Usage: "getstatus rule $rule_name",
  550. Subcommands: []cli.Command{
  551. {
  552. Name: "rule",
  553. Usage: "getstatus rule $rule_name",
  554. //Flags: nflag,
  555. Action: func(c *cli.Context) error {
  556. if len(c.Args()) != 1 {
  557. fmt.Printf("Expect rule name.\n")
  558. return nil
  559. }
  560. rname := c.Args()[0]
  561. var reply string
  562. err = client.Call("Server.GetStatusRule", rname, &reply)
  563. if err != nil {
  564. fmt.Println(err)
  565. } else {
  566. fmt.Println(reply)
  567. }
  568. return nil
  569. },
  570. },
  571. },
  572. },
  573. {
  574. Name: "gettopo",
  575. Aliases: []string{"gettopo"},
  576. Usage: "gettopo rule $rule_name",
  577. Subcommands: []cli.Command{
  578. {
  579. Name: "rule",
  580. Usage: "getstopo rule $rule_name",
  581. //Flags: nflag,
  582. Action: func(c *cli.Context) error {
  583. if len(c.Args()) != 1 {
  584. fmt.Printf("Expect rule name.\n")
  585. return nil
  586. }
  587. rname := c.Args()[0]
  588. var reply string
  589. err = client.Call("Server.GetTopoRule", rname, &reply)
  590. if err != nil {
  591. fmt.Println(err)
  592. } else {
  593. fmt.Println(reply)
  594. }
  595. return nil
  596. },
  597. },
  598. },
  599. },
  600. {
  601. Name: "start",
  602. Aliases: []string{"start"},
  603. Usage: "start rule $rule_name",
  604. Subcommands: []cli.Command{
  605. {
  606. Name: "rule",
  607. Usage: "start rule $rule_name",
  608. //Flags: nflag,
  609. Action: func(c *cli.Context) error {
  610. if len(c.Args()) != 1 {
  611. fmt.Printf("Expect rule name.\n")
  612. return nil
  613. }
  614. rname := c.Args()[0]
  615. var reply string
  616. err = client.Call("Server.StartRule", rname, &reply)
  617. if err != nil {
  618. fmt.Println(err)
  619. } else {
  620. fmt.Println(reply)
  621. }
  622. return nil
  623. },
  624. },
  625. },
  626. },
  627. {
  628. Name: "stop",
  629. Aliases: []string{"stop"},
  630. Usage: "stop rule $rule_name",
  631. Subcommands: []cli.Command{
  632. {
  633. Name: "rule",
  634. Usage: "stop rule $rule_name",
  635. //Flags: nflag,
  636. Action: func(c *cli.Context) error {
  637. if len(c.Args()) != 1 {
  638. fmt.Printf("Expect rule name.\n")
  639. return nil
  640. }
  641. rname := c.Args()[0]
  642. var reply string
  643. err = client.Call("Server.StopRule", rname, &reply)
  644. if err != nil {
  645. fmt.Println(err)
  646. } else {
  647. fmt.Println(reply)
  648. }
  649. return nil
  650. },
  651. },
  652. },
  653. },
  654. {
  655. Name: "restart",
  656. Aliases: []string{"restart"},
  657. Usage: "restart rule $rule_name",
  658. Subcommands: []cli.Command{
  659. {
  660. Name: "rule",
  661. Usage: "restart rule $rule_name",
  662. //Flags: nflag,
  663. Action: func(c *cli.Context) error {
  664. if len(c.Args()) != 1 {
  665. fmt.Printf("Expect rule name.\n")
  666. return nil
  667. }
  668. rname := c.Args()[0]
  669. var reply string
  670. err = client.Call("Server.RestartRule", rname, &reply)
  671. if err != nil {
  672. fmt.Println(err)
  673. } else {
  674. fmt.Println(reply)
  675. }
  676. return nil
  677. },
  678. },
  679. },
  680. },
  681. {
  682. Name: "register",
  683. Aliases: []string{"register"},
  684. Usage: "register plugin function $plugin_name [$plugin_json | -f plugin_def_file]",
  685. Subcommands: []cli.Command{
  686. {
  687. Name: "plugin",
  688. Usage: "register plugin $plugin_type $plugin_name [$plugin_json | -f plugin_def_file]",
  689. Flags: []cli.Flag{
  690. cli.StringFlag{
  691. Name: "file, f",
  692. Usage: "the location of plugin functions definition file",
  693. FilePath: "/home/myplugin.txt",
  694. },
  695. },
  696. Action: func(c *cli.Context) error {
  697. if len(c.Args()) < 2 {
  698. fmt.Printf("Expect plugin type and name.\n")
  699. return nil
  700. }
  701. ptype := c.Args()[0]
  702. if strings.ToLower(ptype) != "function" {
  703. fmt.Printf("Plugin type must be function.\n")
  704. return nil
  705. }
  706. pname := c.Args()[1]
  707. sfile := c.String("file")
  708. args := &common.PluginDesc{
  709. RuleDesc: common.RuleDesc{
  710. Name: pname,
  711. },
  712. }
  713. if sfile != "" {
  714. if len(c.Args()) != 2 {
  715. fmt.Printf("Expect plugin type, name.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  716. return nil
  717. }
  718. if p, err := readDef(sfile, "plugin"); err != nil {
  719. fmt.Printf("%s", err)
  720. return nil
  721. } else {
  722. args.Json = string(p)
  723. }
  724. } else {
  725. if len(c.Args()) != 3 {
  726. fmt.Printf("Expect plugin type, name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  727. return nil
  728. }
  729. args.Json = c.Args()[2]
  730. }
  731. var reply string
  732. err = client.Call("Server.RegisterPlugin", args, &reply)
  733. if err != nil {
  734. fmt.Println(err)
  735. } else {
  736. fmt.Println(reply)
  737. }
  738. return nil
  739. },
  740. },
  741. },
  742. },
  743. }
  744. app.Name = "Kuiper"
  745. app.Usage = "The command line tool for EMQ X Kuiper."
  746. app.Action = func(c *cli.Context) error {
  747. cli.ShowSubcommandHelp(c)
  748. //cli.ShowVersion(c)
  749. return nil
  750. }
  751. sort.Sort(cli.FlagsByName(app.Flags))
  752. sort.Sort(cli.CommandsByName(app.Commands))
  753. err = app.Run(os.Args)
  754. if err != nil {
  755. fmt.Printf("%v", err)
  756. }
  757. }
  758. func getPluginType(arg string) (ptype int, err error) {
  759. switch arg {
  760. case "source":
  761. ptype = 0
  762. case "sink":
  763. ptype = 1
  764. case "function":
  765. ptype = 2
  766. default:
  767. err = fmt.Errorf("Invalid plugin type %s, should be \"source\", \"sink\" or \"function\".\n", arg)
  768. }
  769. return
  770. }
  771. func readDef(sfile string, t string) ([]byte, error) {
  772. if _, err := os.Stat(sfile); os.IsNotExist(err) {
  773. return nil, fmt.Errorf("The specified %s defenition file %s is not existed.\n", t, sfile)
  774. }
  775. fmt.Printf("Creating a new %s from file %s.\n", t, sfile)
  776. if rule, err := ioutil.ReadFile(sfile); err != nil {
  777. return nil, fmt.Errorf("Failed to read from %s definition file %s.\n", t, sfile)
  778. } else {
  779. return rule, nil
  780. }
  781. }