main.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. package main
  2. import (
  3. "bufio"
  4. "engine/common"
  5. "engine/xsql/processors"
  6. "fmt"
  7. "github.com/urfave/cli"
  8. "os"
  9. "sort"
  10. "strings"
  11. )
  12. var log = common.Log
  13. func main() {
  14. app := cli.NewApp()
  15. app.Version = "0.1"
  16. //nflag := []cli.Flag { cli.StringFlag{
  17. // Name: "name, n",
  18. // Usage: "the name of stream",
  19. // }}
  20. dataDir, err := common.GetDataLoc()
  21. if err != nil {
  22. log.Panic(err)
  23. }
  24. app.Commands = []cli.Command{
  25. {
  26. Name: "stream",
  27. Aliases: []string{"s"},
  28. Usage: "manage streams",
  29. Action: func(c *cli.Context) error {
  30. reader := bufio.NewReader(os.Stdin)
  31. var inputs []string
  32. for {
  33. fmt.Print("xstream > ")
  34. text, _ := reader.ReadString('\n')
  35. inputs = append(inputs, text)
  36. // convert CRLF to LF
  37. text = strings.Replace(text, "\n", "", -1)
  38. if strings.ToLower(text) == "quit" || strings.ToLower(text) == "exit" {
  39. break
  40. } else {
  41. content, err := processors.NewStreamProcessor(text, dataDir).Exec()
  42. if err != nil {
  43. fmt.Printf("stream command error: %s\n", err)
  44. }else{
  45. for _, c := range content{
  46. fmt.Println(c)
  47. }
  48. }
  49. }
  50. }
  51. return nil
  52. },
  53. },
  54. {
  55. Name: "query",
  56. Aliases: []string{"q"},
  57. Usage: "query against stream",
  58. Action: func(c *cli.Context) error {
  59. reader := bufio.NewReader(os.Stdin)
  60. var inputs []string
  61. for {
  62. fmt.Print("xstream > ")
  63. text, _ := reader.ReadString('\n')
  64. inputs = append(inputs, text)
  65. // convert CRLF to LF
  66. text = strings.Replace(text, "\n", "", -1)
  67. if strings.ToLower(text) == "quit" || strings.ToLower(text) == "exit" {
  68. break
  69. } else {
  70. fmt.Println(text)
  71. err = processors.NewRuleProcessor(text, dataDir).Exec()
  72. if err != nil {
  73. fmt.Printf("create topology error : %s\n", err)
  74. }else{
  75. fmt.Println("topology running")
  76. }
  77. }
  78. }
  79. return nil
  80. },
  81. },
  82. }
  83. app.Name = "xstream"
  84. app.Usage = "The command line tool for EMQ X stream."
  85. app.Action = func(c *cli.Context) error {
  86. cli.ShowSubcommandHelp(c)
  87. //cli.ShowVersion(c)
  88. return nil
  89. }
  90. sort.Sort(cli.FlagsByName(app.Flags))
  91. sort.Sort(cli.CommandsByName(app.Commands))
  92. err = app.Run(os.Args)
  93. if err != nil {
  94. log.Fatal(err)
  95. }
  96. }