main.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457
  1. package main
  2. import (
  3. "bufio"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/go-yaml/yaml"
  7. "github.com/urfave/cli"
  8. "io/ioutil"
  9. "net/rpc"
  10. "os"
  11. "sort"
  12. "strings"
  13. "time"
  14. )
  15. type clientConf struct {
  16. Host string `yaml:"host"`
  17. Port int `yaml:"port"`
  18. }
  19. var clientYaml = "client.yaml"
  20. func streamProcess(client *rpc.Client, args string) {
  21. var reply string
  22. if args == "" {
  23. args = strings.Join(os.Args[1:], " ")
  24. }
  25. err := client.Call("Server.Stream", args, &reply)
  26. if err != nil {
  27. fmt.Println(err)
  28. } else {
  29. fmt.Println(reply)
  30. }
  31. }
  32. var Version string = "unknown"
  33. func main() {
  34. app := cli.NewApp()
  35. app.Version = Version
  36. //nflag := []cli.Flag { cli.StringFlag{
  37. // Name: "name, n",
  38. // Usage: "the name of stream",
  39. // }}
  40. b, err := common.LoadConf(clientYaml)
  41. if err != nil {
  42. common.Log.Fatal(err)
  43. }
  44. var cfg map[string]clientConf
  45. var config *clientConf
  46. if err := yaml.Unmarshal(b, &cfg); err != nil {
  47. fmt.Printf("Failed to load config file with error %s.\n", err)
  48. } else {
  49. c, ok := cfg["basic"]
  50. if !ok {
  51. fmt.Printf("No basic config in client.yaml, will use the default configuration.\n")
  52. } else {
  53. config = &c
  54. }
  55. }
  56. if config == nil {
  57. config = &clientConf{
  58. Host: "127.0.0.1",
  59. Port: 20498,
  60. }
  61. }
  62. fmt.Printf("Connecting to %s:%d... \n", config.Host, config.Port)
  63. // Create a TCP connection to localhost on port 1234
  64. client, err := rpc.DialHTTP("tcp", fmt.Sprintf("%s:%d", config.Host, config.Port))
  65. if err != nil {
  66. fmt.Printf("Failed to connect the server, please start the server.\n")
  67. return
  68. }
  69. app.Commands = []cli.Command{
  70. {
  71. Name: "query",
  72. Aliases: []string{"query"},
  73. Usage: "query command line",
  74. Action: func(c *cli.Context) error {
  75. reader := bufio.NewReader(os.Stdin)
  76. var inputs []string
  77. ticker := time.NewTicker(time.Millisecond * 300)
  78. defer ticker.Stop()
  79. for {
  80. fmt.Print("kuiper > ")
  81. text, _ := reader.ReadString('\n')
  82. inputs = append(inputs, text)
  83. // convert CRLF to LF
  84. text = strings.Replace(text, "\n", "", -1)
  85. if strings.ToLower(text) == "quit" || strings.ToLower(text) == "exit" {
  86. break
  87. } else if strings.Trim(text, " ") == "" {
  88. continue
  89. } else {
  90. var reply string
  91. err := client.Call("Server.CreateQuery", text, &reply)
  92. if err != nil {
  93. fmt.Println(err)
  94. continue
  95. } else {
  96. fmt.Println(reply)
  97. go func() {
  98. for {
  99. <-ticker.C
  100. var result string
  101. e := client.Call("Server.GetQueryResult", "", &result)
  102. if e != nil {
  103. fmt.Println(e)
  104. fmt.Print("kuiper > ")
  105. return
  106. }
  107. if result != "" {
  108. fmt.Println(result)
  109. }
  110. }
  111. }()
  112. }
  113. }
  114. }
  115. return nil
  116. },
  117. },
  118. {
  119. Name: "create",
  120. Aliases: []string{"create"},
  121. Usage: "create stream $stream_name | create stream $stream_name -f $stream_def_file | create rule $rule_name $rule_json | create rule $rule_name -f $rule_def_file",
  122. Subcommands: []cli.Command{
  123. {
  124. Name: "stream",
  125. Usage: "create stream $stream_name [-f stream_def_file]",
  126. Flags: []cli.Flag{
  127. cli.StringFlag{
  128. Name: "file, f",
  129. Usage: "the location of stream definition file",
  130. FilePath: "/home/mystream.txt",
  131. },
  132. },
  133. Action: func(c *cli.Context) error {
  134. sfile := c.String("file")
  135. if sfile != "" {
  136. if _, err := os.Stat(c.String("file")); os.IsNotExist(err) {
  137. fmt.Printf("The specified stream defintion file %s does not existed.\n", sfile)
  138. return nil
  139. }
  140. fmt.Printf("Creating a new stream from file %s.\n", sfile)
  141. if stream, err := ioutil.ReadFile(sfile); err != nil {
  142. fmt.Printf("Failed to read from stream definition file %s.\n", sfile)
  143. return nil
  144. } else {
  145. args := strings.Join([]string{"CREATE STREAM ", string(stream)}, " ")
  146. streamProcess(client, args)
  147. return nil
  148. }
  149. } else {
  150. streamProcess(client, "")
  151. return nil
  152. }
  153. },
  154. },
  155. {
  156. Name: "rule",
  157. Usage: "create rule $rule_name [$rule_json | -f rule_def_file]",
  158. Flags: []cli.Flag{
  159. cli.StringFlag{
  160. Name: "file, f",
  161. Usage: "the location of rule definition file",
  162. FilePath: "/home/myrule.txt",
  163. },
  164. },
  165. Action: func(c *cli.Context) error {
  166. sfile := c.String("file")
  167. if sfile != "" {
  168. if _, err := os.Stat(c.String("file")); os.IsNotExist(err) {
  169. fmt.Printf("The specified rule defenition file %s is not existed.\n", sfile)
  170. return nil
  171. }
  172. fmt.Printf("Creating a new rule from file %s.\n", sfile)
  173. if rule, err := ioutil.ReadFile(sfile); err != nil {
  174. fmt.Printf("Failed to read from rule definition file %s.\n", sfile)
  175. return nil
  176. } else {
  177. if len(c.Args()) != 1 {
  178. fmt.Printf("Expect rule name.\n")
  179. return nil
  180. }
  181. rname := c.Args()[0]
  182. var reply string
  183. args := &common.Rule{rname, string(rule)}
  184. err = client.Call("Server.CreateRule", args, &reply)
  185. if err != nil {
  186. fmt.Println(err)
  187. } else {
  188. fmt.Println(reply)
  189. }
  190. }
  191. return nil
  192. } else {
  193. if len(c.Args()) != 2 {
  194. fmt.Printf("Expect rule name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  195. return nil
  196. }
  197. rname := c.Args()[0]
  198. rjson := c.Args()[1]
  199. var reply string
  200. args := &common.Rule{rname, rjson}
  201. err = client.Call("Server.CreateRule", args, &reply)
  202. if err != nil {
  203. fmt.Println(err)
  204. } else {
  205. fmt.Println(reply)
  206. }
  207. return nil
  208. }
  209. },
  210. },
  211. },
  212. },
  213. {
  214. Name: "describe",
  215. Aliases: []string{"describe"},
  216. Usage: "describe stream $stream_name | describe rule $rule_name",
  217. Subcommands: []cli.Command{
  218. {
  219. Name: "stream",
  220. Usage: "describe stream $stream_name",
  221. //Flags: nflag,
  222. Action: func(c *cli.Context) error {
  223. streamProcess(client, "")
  224. return nil
  225. },
  226. },
  227. {
  228. Name: "rule",
  229. Usage: "describe rule $rule_name",
  230. Action: func(c *cli.Context) error {
  231. if len(c.Args()) != 1 {
  232. fmt.Printf("Expect rule name.\n")
  233. return nil
  234. }
  235. rname := c.Args()[0]
  236. var reply string
  237. err = client.Call("Server.DescRule", rname, &reply)
  238. if err != nil {
  239. fmt.Println(err)
  240. } else {
  241. fmt.Println(reply)
  242. }
  243. return nil
  244. },
  245. },
  246. },
  247. },
  248. {
  249. Name: "drop",
  250. Aliases: []string{"drop"},
  251. Usage: "drop stream $stream_name | drop rule $rule_name",
  252. Subcommands: []cli.Command{
  253. {
  254. Name: "stream",
  255. Usage: "drop stream $stream_name",
  256. //Flags: nflag,
  257. Action: func(c *cli.Context) error {
  258. streamProcess(client, "")
  259. return nil
  260. },
  261. },
  262. {
  263. Name: "rule",
  264. Usage: "drop rule $rule_name",
  265. //Flags: nflag,
  266. Action: func(c *cli.Context) error {
  267. if len(c.Args()) != 1 {
  268. fmt.Printf("Expect rule name.\n")
  269. return nil
  270. }
  271. rname := c.Args()[0]
  272. var reply string
  273. err = client.Call("Server.DropRule", rname, &reply)
  274. if err != nil {
  275. fmt.Println(err)
  276. } else {
  277. fmt.Println(reply)
  278. }
  279. return nil
  280. },
  281. },
  282. },
  283. },
  284. {
  285. Name: "show",
  286. Aliases: []string{"show"},
  287. Usage: "show streams | show rules",
  288. Subcommands: []cli.Command{
  289. {
  290. Name: "streams",
  291. Usage: "show streams",
  292. Action: func(c *cli.Context) error {
  293. streamProcess(client, "")
  294. return nil
  295. },
  296. },
  297. {
  298. Name: "rules",
  299. Usage: "show rules",
  300. Action: func(c *cli.Context) error {
  301. var reply string
  302. err = client.Call("Server.ShowRules", 0, &reply)
  303. if err != nil {
  304. fmt.Println(err)
  305. } else {
  306. fmt.Println(reply)
  307. }
  308. return nil
  309. },
  310. },
  311. },
  312. },
  313. {
  314. Name: "getstatus",
  315. Aliases: []string{"getstatus"},
  316. Usage: "getstatus rule $rule_name",
  317. Subcommands: []cli.Command{
  318. {
  319. Name: "rule",
  320. Usage: "getstatus rule $rule_name",
  321. //Flags: nflag,
  322. Action: func(c *cli.Context) error {
  323. if len(c.Args()) != 1 {
  324. fmt.Printf("Expect rule name.\n")
  325. return nil
  326. }
  327. rname := c.Args()[0]
  328. var reply string
  329. err = client.Call("Server.GetStatusRule", rname, &reply)
  330. if err != nil {
  331. fmt.Println(err)
  332. } else {
  333. fmt.Println(reply)
  334. }
  335. return nil
  336. },
  337. },
  338. },
  339. },
  340. {
  341. Name: "start",
  342. Aliases: []string{"start"},
  343. Usage: "start rule $rule_name",
  344. Subcommands: []cli.Command{
  345. {
  346. Name: "rule",
  347. Usage: "start rule $rule_name",
  348. //Flags: nflag,
  349. Action: func(c *cli.Context) error {
  350. if len(c.Args()) != 1 {
  351. fmt.Printf("Expect rule name.\n")
  352. return nil
  353. }
  354. rname := c.Args()[0]
  355. var reply string
  356. err = client.Call("Server.StartRule", rname, &reply)
  357. if err != nil {
  358. fmt.Println(err)
  359. } else {
  360. fmt.Println(reply)
  361. }
  362. return nil
  363. },
  364. },
  365. },
  366. },
  367. {
  368. Name: "stop",
  369. Aliases: []string{"stop"},
  370. Usage: "stop rule $rule_name",
  371. Subcommands: []cli.Command{
  372. {
  373. Name: "rule",
  374. Usage: "stop rule $rule_name",
  375. //Flags: nflag,
  376. Action: func(c *cli.Context) error {
  377. if len(c.Args()) != 1 {
  378. fmt.Printf("Expect rule name.\n")
  379. return nil
  380. }
  381. rname := c.Args()[0]
  382. var reply string
  383. err = client.Call("Server.StopRule", rname, &reply)
  384. if err != nil {
  385. fmt.Println(err)
  386. } else {
  387. fmt.Println(reply)
  388. }
  389. return nil
  390. },
  391. },
  392. },
  393. },
  394. {
  395. Name: "restart",
  396. Aliases: []string{"restart"},
  397. Usage: "restart rule $rule_name",
  398. Subcommands: []cli.Command{
  399. {
  400. Name: "rule",
  401. Usage: "restart rule $rule_name",
  402. //Flags: nflag,
  403. Action: func(c *cli.Context) error {
  404. if len(c.Args()) != 1 {
  405. fmt.Printf("Expect rule name.\n")
  406. return nil
  407. }
  408. rname := c.Args()[0]
  409. var reply string
  410. err = client.Call("Server.RestartRule", rname, &reply)
  411. if err != nil {
  412. fmt.Println(err)
  413. } else {
  414. fmt.Println(reply)
  415. }
  416. return nil
  417. },
  418. },
  419. },
  420. },
  421. }
  422. app.Name = "Kuiper"
  423. app.Usage = "The command line tool for EMQ X Kuiper."
  424. app.Action = func(c *cli.Context) error {
  425. cli.ShowSubcommandHelp(c)
  426. //cli.ShowVersion(c)
  427. return nil
  428. }
  429. sort.Sort(cli.FlagsByName(app.Flags))
  430. sort.Sort(cli.CommandsByName(app.Commands))
  431. err = app.Run(os.Args)
  432. if err != nil {
  433. fmt.Printf("%v", err)
  434. }
  435. }