123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454 |
- package main
- import (
- "bufio"
- "engine/common"
- "fmt"
- "github.com/go-yaml/yaml"
- "github.com/urfave/cli"
- "io/ioutil"
- "net/rpc"
- "os"
- "sort"
- "strings"
- "time"
- )
- type clientConf struct {
- Host string `yaml:"host"`
- Port int `yaml:"port"`
- }
- var clientYaml = "client.yaml"
- func streamProcess(client *rpc.Client, args string) {
- var reply string
- if args == ""{
- args = strings.Join(os.Args[1:], " ")
- }
- err := client.Call("Server.Stream", args, &reply)
- if err != nil{
- fmt.Println(err)
- }else{
- fmt.Println(reply)
- }
- }
- var Version string = "unknown"
- func main() {
- app := cli.NewApp()
- app.Version = Version
- //nflag := []cli.Flag { cli.StringFlag{
- // Name: "name, n",
- // Usage: "the name of stream",
- // }}
- b, err := common.LoadConf(clientYaml)
- if err != nil {
- common.Log.Fatal(err)
- }
- var cfg map[string]clientConf
- var config *clientConf
- if err := yaml.Unmarshal(b, &cfg); err != nil {
- fmt.Printf("Failed to load config file with error %s.\n", err)
- }else{
- c, ok := cfg["basic"]
- if !ok{
- fmt.Printf("No basic config in client.yaml, will use the default configuration.\n")
- }else{
- config = &c
- }
- }
- if config == nil {
- config = &clientConf{
- Host: "127.0.0.1",
- Port: 20498,
- }
- }
- fmt.Printf("Connecting to %s:%d... \n", config.Host, config.Port)
- // Create a TCP connection to localhost on port 1234
- client, err := rpc.DialHTTP("tcp", fmt.Sprintf("%s:%d", config.Host, config.Port))
- if err != nil {
- fmt.Printf("Failed to connect the server, please start the server.\n")
- return
- }
- app.Commands = []cli.Command{
- {
- Name: "query",
- Aliases: []string{"query"},
- Usage: "query command line",
- Action: func(c *cli.Context) error {
- reader := bufio.NewReader(os.Stdin)
- var inputs []string
- ticker := time.NewTicker(time.Millisecond * 300)
- defer ticker.Stop()
- for {
- fmt.Print("kuiper > ")
- text, _ := reader.ReadString('\n')
- inputs = append(inputs, text)
- // convert CRLF to LF
- text = strings.Replace(text, "\n", "", -1)
- if strings.ToLower(text) == "quit" || strings.ToLower(text) == "exit" {
- break
- } else if strings.Trim(text, " ") == "" {
- continue
- } else {
- var reply string
- err := client.Call("Server.CreateQuery", text, &reply)
- if err != nil{
- fmt.Println(err)
- return err
- } else {
- fmt.Println(reply)
- go func() {
- for {
- <-ticker.C
- var result string
- _ = client.Call("Server.GetQueryResult", "", &result)
- if result != "" {
- fmt.Println(result)
- }
- }
- }()
- }
- }
- }
- return nil
- },
- },
- {
- Name: "create",
- Aliases: []string{"create"},
- Usage: "create stream $stream_name | create stream $stream_name -f $stream_def_file | create rule $rule_name $rule_json | create rule $rule_name -f $rule_def_file",
- Subcommands: []cli.Command {
- {
- Name: "stream",
- Usage: "create stream $stream_name [-f stream_def_file]",
- Flags: []cli.Flag {
- cli.StringFlag{
- Name: "file, f",
- Usage: "the location of stream definition file",
- FilePath: "/home/mystream.txt",
- },
- },
- Action: func(c *cli.Context) error {
- sfile := c.String("file")
- if sfile != "" {
- if _, err := os.Stat(c.String("file")); os.IsNotExist(err) {
- fmt.Printf("The specified stream defintion file %s does not existed.\n", sfile)
- return nil
- }
- fmt.Printf("Creating a new stream from file %s.\n", sfile)
- if stream, err := ioutil.ReadFile(sfile); err != nil {
- fmt.Printf("Failed to read from stream definition file %s.\n", sfile)
- return nil
- } else {
- args := strings.Join([]string{"CREATE STREAM ", string(stream)}, " ")
- streamProcess(client, args)
- return nil
- }
- } else {
- streamProcess(client, "")
- return nil
- }
- },
- },
- {
- Name: "rule",
- Usage: "create rule $rule_name [$rule_json | -f rule_def_file]",
- Flags: []cli.Flag {
- cli.StringFlag{
- Name: "file, f",
- Usage: "the location of rule definition file",
- FilePath: "/home/myrule.txt",
- },
- },
- Action: func(c *cli.Context) error {
- sfile := c.String("file")
- if sfile != "" {
- if _, err := os.Stat(c.String("file")); os.IsNotExist(err) {
- fmt.Printf("The specified rule defenition file %s is not existed.\n", sfile)
- return nil
- }
- fmt.Printf("Creating a new rule from file %s.\n", sfile)
- if rule, err := ioutil.ReadFile(sfile); err != nil {
- fmt.Printf("Failed to read from rule definition file %s.\n", sfile)
- return nil
- } else {
- if len(c.Args()) != 1 {
- fmt.Printf("Expect rule name.\n")
- return nil
- }
- rname := c.Args()[0]
- var reply string
- args := &common.Rule{rname, string(rule)}
- err = client.Call("Server.CreateRule", args, &reply)
- if err != nil {
- fmt.Println(err)
- } else {
- fmt.Println(reply)
- }
- }
- return nil
- } else {
- if len(c.Args()) != 2 {
- fmt.Printf("Expect rule name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
- return nil
- }
- rname := c.Args()[0]
- rjson := c.Args()[1]
- var reply string
- args := &common.Rule{rname, rjson}
- err = client.Call("Server.CreateRule", args, &reply)
- if err != nil {
- fmt.Println(err)
- } else {
- fmt.Println(reply)
- }
- return nil
- }
- },
- },
- },
- },
- {
- Name: "describe",
- Aliases: []string{"describe"},
- Usage: "describe stream $stream_name | describe rule $rule_name",
- Subcommands: []cli.Command{
- {
- Name: "stream",
- Usage: "describe stream $stream_name",
- //Flags: nflag,
- Action: func(c *cli.Context) error {
- streamProcess(client, "")
- return nil
- },
- },
- {
- Name: "rule",
- Usage: "describe rule $rule_name",
- Action: func(c *cli.Context) error {
- if len(c.Args()) != 1 {
- fmt.Printf("Expect rule name.\n")
- return nil
- }
- rname := c.Args()[0]
- var reply string
- err = client.Call("Server.DescRule", rname, &reply)
- if err != nil {
- fmt.Println(err)
- } else {
- fmt.Println(reply)
- }
- return nil
- },
- },
- },
- },
- {
- Name: "drop",
- Aliases: []string{"drop"},
- Usage: "drop stream $stream_name | drop rule $rule_name",
- Subcommands: []cli.Command{
- {
- Name: "stream",
- Usage: "drop stream $stream_name",
- //Flags: nflag,
- Action: func(c *cli.Context) error {
- streamProcess(client, "")
- return nil
- },
- },
- {
- Name: "rule",
- Usage: "drop rule $rule_name",
- //Flags: nflag,
- Action: func(c *cli.Context) error {
- if len(c.Args()) != 1 {
- fmt.Printf("Expect rule name.\n")
- return nil
- }
- rname := c.Args()[0]
- var reply string
- err = client.Call("Server.DropRule", rname, &reply)
- if err != nil {
- fmt.Println(err)
- } else {
- fmt.Println(reply)
- }
- return nil
- },
- },
- },
- },
- {
- Name: "show",
- Aliases: []string{"show"},
- Usage: "show streams | show rules",
- Subcommands: []cli.Command{
- {
- Name: "streams",
- Usage: "show streams",
- Action: func(c *cli.Context) error {
- streamProcess(client, "")
- return nil
- },
- },
- {
- Name: "rules",
- Usage: "show rules",
- Action: func(c *cli.Context) error {
- var reply string
- err = client.Call("Server.ShowRules", 0, &reply)
- if err != nil {
- fmt.Println(err)
- } else {
- fmt.Println(reply)
- }
- return nil
- },
- },
- },
- },
- {
- Name: "getstatus",
- Aliases: []string{"getstatus"},
- Usage: "getstatus rule $rule_name",
- Subcommands: []cli.Command{
- {
- Name: "rule",
- Usage: "getstatus rule $rule_name",
- //Flags: nflag,
- Action: func(c *cli.Context) error {
- if len(c.Args()) != 1 {
- fmt.Printf("Expect rule name.\n")
- return nil
- }
- rname := c.Args()[0]
- var reply string
- err = client.Call("Server.GetStatusRule", rname, &reply)
- if err != nil {
- fmt.Println(err)
- } else {
- fmt.Println(reply)
- }
- return nil
- },
- },
- },
- },
- {
- Name: "start",
- Aliases: []string{"start"},
- Usage: "start rule $rule_name",
- Subcommands: []cli.Command{
- {
- Name: "rule",
- Usage: "start rule $rule_name",
- //Flags: nflag,
- Action: func(c *cli.Context) error {
- if len(c.Args()) != 1 {
- fmt.Printf("Expect rule name.\n")
- return nil
- }
- rname := c.Args()[0]
- var reply string
- err = client.Call("Server.StartRule", rname, &reply)
- if err != nil {
- fmt.Println(err)
- } else {
- fmt.Println(reply)
- }
- return nil
- },
- },
- },
- },
- {
- Name: "stop",
- Aliases: []string{"stop"},
- Usage: "stop rule $rule_name",
- Subcommands: []cli.Command{
- {
- Name: "rule",
- Usage: "stop rule $rule_name",
- //Flags: nflag,
- Action: func(c *cli.Context) error {
- if len(c.Args()) != 1 {
- fmt.Printf("Expect rule name.\n")
- return nil
- }
- rname := c.Args()[0]
- var reply string
- err = client.Call("Server.StopRule", rname, &reply)
- if err != nil {
- fmt.Println(err)
- } else {
- fmt.Println(reply)
- }
- return nil
- },
- },
- },
- },
- {
- Name: "restart",
- Aliases: []string{"restart"},
- Usage: "restart rule $rule_name",
- Subcommands: []cli.Command{
- {
- Name: "rule",
- Usage: "restart rule $rule_name",
- //Flags: nflag,
- Action: func(c *cli.Context) error {
- if len(c.Args()) != 1 {
- fmt.Printf("Expect rule name.\n")
- return nil
- }
- rname := c.Args()[0]
- var reply string
- err = client.Call("Server.RestartRule", rname, &reply)
- if err != nil {
- fmt.Println(err)
- } else {
- fmt.Println(reply)
- }
- return nil
- },
- },
- },
- },
- }
- app.Name = "Kuiper"
- app.Usage = "The command line tool for EMQ X Kuiper."
- app.Action = func(c *cli.Context) error {
- cli.ShowSubcommandHelp(c)
- //cli.ShowVersion(c)
- return nil
- }
- sort.Sort(cli.FlagsByName(app.Flags))
- sort.Sort(cli.CommandsByName(app.Commands))
- err = app.Run(os.Args)
- if err != nil {
- fmt.Printf("%v", err)
- }
- }
|