main.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454
  1. package main
  2. import (
  3. "bufio"
  4. "engine/common"
  5. "fmt"
  6. "github.com/go-yaml/yaml"
  7. "github.com/urfave/cli"
  8. "io/ioutil"
  9. "net/rpc"
  10. "os"
  11. "sort"
  12. "strings"
  13. "time"
  14. )
  15. type clientConf struct {
  16. Host string `yaml:"host"`
  17. Port int `yaml:"port"`
  18. }
  19. var clientYaml = "client.yaml"
  20. func streamProcess(client *rpc.Client, args string) {
  21. var reply string
  22. if args == ""{
  23. args = strings.Join(os.Args[1:], " ")
  24. }
  25. err := client.Call("Server.Stream", args, &reply)
  26. if err != nil{
  27. fmt.Println(err)
  28. }else{
  29. fmt.Println(reply)
  30. }
  31. }
  32. var Version string = "unknown"
  33. func main() {
  34. app := cli.NewApp()
  35. app.Version = Version
  36. //nflag := []cli.Flag { cli.StringFlag{
  37. // Name: "name, n",
  38. // Usage: "the name of stream",
  39. // }}
  40. b, err := common.LoadConf(clientYaml)
  41. if err != nil {
  42. common.Log.Fatal(err)
  43. }
  44. var cfg map[string]clientConf
  45. var config *clientConf
  46. if err := yaml.Unmarshal(b, &cfg); err != nil {
  47. fmt.Printf("Failed to load config file with error %s.\n", err)
  48. }else{
  49. c, ok := cfg["basic"]
  50. if !ok{
  51. fmt.Printf("No basic config in client.yaml, will use the default configuration.\n")
  52. }else{
  53. config = &c
  54. }
  55. }
  56. if config == nil {
  57. config = &clientConf{
  58. Host: "127.0.0.1",
  59. Port: 20498,
  60. }
  61. }
  62. fmt.Printf("Connecting to %s:%d... \n", config.Host, config.Port)
  63. // Create a TCP connection to localhost on port 1234
  64. client, err := rpc.DialHTTP("tcp", fmt.Sprintf("%s:%d", config.Host, config.Port))
  65. if err != nil {
  66. fmt.Printf("Failed to connect the server, please start the server.\n")
  67. return
  68. }
  69. app.Commands = []cli.Command{
  70. {
  71. Name: "query",
  72. Aliases: []string{"query"},
  73. Usage: "query command line",
  74. Action: func(c *cli.Context) error {
  75. reader := bufio.NewReader(os.Stdin)
  76. var inputs []string
  77. ticker := time.NewTicker(time.Millisecond * 300)
  78. defer ticker.Stop()
  79. for {
  80. fmt.Print("kuiper > ")
  81. text, _ := reader.ReadString('\n')
  82. inputs = append(inputs, text)
  83. // convert CRLF to LF
  84. text = strings.Replace(text, "\n", "", -1)
  85. if strings.ToLower(text) == "quit" || strings.ToLower(text) == "exit" {
  86. break
  87. } else if strings.Trim(text, " ") == "" {
  88. continue
  89. } else {
  90. var reply string
  91. err := client.Call("Server.CreateQuery", text, &reply)
  92. if err != nil{
  93. fmt.Println(err)
  94. return err
  95. } else {
  96. fmt.Println(reply)
  97. go func() {
  98. for {
  99. <-ticker.C
  100. var result string
  101. _ = client.Call("Server.GetQueryResult", "", &result)
  102. if result != "" {
  103. fmt.Println(result)
  104. }
  105. }
  106. }()
  107. }
  108. }
  109. }
  110. return nil
  111. },
  112. },
  113. {
  114. Name: "create",
  115. Aliases: []string{"create"},
  116. Usage: "create stream $stream_name | create stream $stream_name -f $stream_def_file | create rule $rule_name $rule_json | create rule $rule_name -f $rule_def_file",
  117. Subcommands: []cli.Command {
  118. {
  119. Name: "stream",
  120. Usage: "create stream $stream_name [-f stream_def_file]",
  121. Flags: []cli.Flag {
  122. cli.StringFlag{
  123. Name: "file, f",
  124. Usage: "the location of stream definition file",
  125. FilePath: "/home/mystream.txt",
  126. },
  127. },
  128. Action: func(c *cli.Context) error {
  129. sfile := c.String("file")
  130. if sfile != "" {
  131. if _, err := os.Stat(c.String("file")); os.IsNotExist(err) {
  132. fmt.Printf("The specified stream defintion file %s does not existed.\n", sfile)
  133. return nil
  134. }
  135. fmt.Printf("Creating a new stream from file %s.\n", sfile)
  136. if stream, err := ioutil.ReadFile(sfile); err != nil {
  137. fmt.Printf("Failed to read from stream definition file %s.\n", sfile)
  138. return nil
  139. } else {
  140. args := strings.Join([]string{"CREATE STREAM ", string(stream)}, " ")
  141. streamProcess(client, args)
  142. return nil
  143. }
  144. } else {
  145. streamProcess(client, "")
  146. return nil
  147. }
  148. },
  149. },
  150. {
  151. Name: "rule",
  152. Usage: "create rule $rule_name [$rule_json | -f rule_def_file]",
  153. Flags: []cli.Flag {
  154. cli.StringFlag{
  155. Name: "file, f",
  156. Usage: "the location of rule definition file",
  157. FilePath: "/home/myrule.txt",
  158. },
  159. },
  160. Action: func(c *cli.Context) error {
  161. sfile := c.String("file")
  162. if sfile != "" {
  163. if _, err := os.Stat(c.String("file")); os.IsNotExist(err) {
  164. fmt.Printf("The specified rule defenition file %s is not existed.\n", sfile)
  165. return nil
  166. }
  167. fmt.Printf("Creating a new rule from file %s.\n", sfile)
  168. if rule, err := ioutil.ReadFile(sfile); err != nil {
  169. fmt.Printf("Failed to read from rule definition file %s.\n", sfile)
  170. return nil
  171. } else {
  172. if len(c.Args()) != 1 {
  173. fmt.Printf("Expect rule name.\n")
  174. return nil
  175. }
  176. rname := c.Args()[0]
  177. var reply string
  178. args := &common.Rule{rname, string(rule)}
  179. err = client.Call("Server.CreateRule", args, &reply)
  180. if err != nil {
  181. fmt.Println(err)
  182. } else {
  183. fmt.Println(reply)
  184. }
  185. }
  186. return nil
  187. } else {
  188. if len(c.Args()) != 2 {
  189. fmt.Printf("Expect rule name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  190. return nil
  191. }
  192. rname := c.Args()[0]
  193. rjson := c.Args()[1]
  194. var reply string
  195. args := &common.Rule{rname, rjson}
  196. err = client.Call("Server.CreateRule", args, &reply)
  197. if err != nil {
  198. fmt.Println(err)
  199. } else {
  200. fmt.Println(reply)
  201. }
  202. return nil
  203. }
  204. },
  205. },
  206. },
  207. },
  208. {
  209. Name: "describe",
  210. Aliases: []string{"describe"},
  211. Usage: "describe stream $stream_name | describe rule $rule_name",
  212. Subcommands: []cli.Command{
  213. {
  214. Name: "stream",
  215. Usage: "describe stream $stream_name",
  216. //Flags: nflag,
  217. Action: func(c *cli.Context) error {
  218. streamProcess(client, "")
  219. return nil
  220. },
  221. },
  222. {
  223. Name: "rule",
  224. Usage: "describe rule $rule_name",
  225. Action: func(c *cli.Context) error {
  226. if len(c.Args()) != 1 {
  227. fmt.Printf("Expect rule name.\n")
  228. return nil
  229. }
  230. rname := c.Args()[0]
  231. var reply string
  232. err = client.Call("Server.DescRule", rname, &reply)
  233. if err != nil {
  234. fmt.Println(err)
  235. } else {
  236. fmt.Println(reply)
  237. }
  238. return nil
  239. },
  240. },
  241. },
  242. },
  243. {
  244. Name: "drop",
  245. Aliases: []string{"drop"},
  246. Usage: "drop stream $stream_name | drop rule $rule_name",
  247. Subcommands: []cli.Command{
  248. {
  249. Name: "stream",
  250. Usage: "drop stream $stream_name",
  251. //Flags: nflag,
  252. Action: func(c *cli.Context) error {
  253. streamProcess(client, "")
  254. return nil
  255. },
  256. },
  257. {
  258. Name: "rule",
  259. Usage: "drop rule $rule_name",
  260. //Flags: nflag,
  261. Action: func(c *cli.Context) error {
  262. if len(c.Args()) != 1 {
  263. fmt.Printf("Expect rule name.\n")
  264. return nil
  265. }
  266. rname := c.Args()[0]
  267. var reply string
  268. err = client.Call("Server.DropRule", rname, &reply)
  269. if err != nil {
  270. fmt.Println(err)
  271. } else {
  272. fmt.Println(reply)
  273. }
  274. return nil
  275. },
  276. },
  277. },
  278. },
  279. {
  280. Name: "show",
  281. Aliases: []string{"show"},
  282. Usage: "show streams | show rules",
  283. Subcommands: []cli.Command{
  284. {
  285. Name: "streams",
  286. Usage: "show streams",
  287. Action: func(c *cli.Context) error {
  288. streamProcess(client, "")
  289. return nil
  290. },
  291. },
  292. {
  293. Name: "rules",
  294. Usage: "show rules",
  295. Action: func(c *cli.Context) error {
  296. var reply string
  297. err = client.Call("Server.ShowRules", 0, &reply)
  298. if err != nil {
  299. fmt.Println(err)
  300. } else {
  301. fmt.Println(reply)
  302. }
  303. return nil
  304. },
  305. },
  306. },
  307. },
  308. {
  309. Name: "getstatus",
  310. Aliases: []string{"getstatus"},
  311. Usage: "getstatus rule $rule_name",
  312. Subcommands: []cli.Command{
  313. {
  314. Name: "rule",
  315. Usage: "getstatus rule $rule_name",
  316. //Flags: nflag,
  317. Action: func(c *cli.Context) error {
  318. if len(c.Args()) != 1 {
  319. fmt.Printf("Expect rule name.\n")
  320. return nil
  321. }
  322. rname := c.Args()[0]
  323. var reply string
  324. err = client.Call("Server.GetStatusRule", rname, &reply)
  325. if err != nil {
  326. fmt.Println(err)
  327. } else {
  328. fmt.Println(reply)
  329. }
  330. return nil
  331. },
  332. },
  333. },
  334. },
  335. {
  336. Name: "start",
  337. Aliases: []string{"start"},
  338. Usage: "start rule $rule_name",
  339. Subcommands: []cli.Command{
  340. {
  341. Name: "rule",
  342. Usage: "start rule $rule_name",
  343. //Flags: nflag,
  344. Action: func(c *cli.Context) error {
  345. if len(c.Args()) != 1 {
  346. fmt.Printf("Expect rule name.\n")
  347. return nil
  348. }
  349. rname := c.Args()[0]
  350. var reply string
  351. err = client.Call("Server.StartRule", rname, &reply)
  352. if err != nil {
  353. fmt.Println(err)
  354. } else {
  355. fmt.Println(reply)
  356. }
  357. return nil
  358. },
  359. },
  360. },
  361. },
  362. {
  363. Name: "stop",
  364. Aliases: []string{"stop"},
  365. Usage: "stop rule $rule_name",
  366. Subcommands: []cli.Command{
  367. {
  368. Name: "rule",
  369. Usage: "stop rule $rule_name",
  370. //Flags: nflag,
  371. Action: func(c *cli.Context) error {
  372. if len(c.Args()) != 1 {
  373. fmt.Printf("Expect rule name.\n")
  374. return nil
  375. }
  376. rname := c.Args()[0]
  377. var reply string
  378. err = client.Call("Server.StopRule", rname, &reply)
  379. if err != nil {
  380. fmt.Println(err)
  381. } else {
  382. fmt.Println(reply)
  383. }
  384. return nil
  385. },
  386. },
  387. },
  388. },
  389. {
  390. Name: "restart",
  391. Aliases: []string{"restart"},
  392. Usage: "restart rule $rule_name",
  393. Subcommands: []cli.Command{
  394. {
  395. Name: "rule",
  396. Usage: "restart rule $rule_name",
  397. //Flags: nflag,
  398. Action: func(c *cli.Context) error {
  399. if len(c.Args()) != 1 {
  400. fmt.Printf("Expect rule name.\n")
  401. return nil
  402. }
  403. rname := c.Args()[0]
  404. var reply string
  405. err = client.Call("Server.RestartRule", rname, &reply)
  406. if err != nil {
  407. fmt.Println(err)
  408. } else {
  409. fmt.Println(reply)
  410. }
  411. return nil
  412. },
  413. },
  414. },
  415. },
  416. }
  417. app.Name = "Kuiper"
  418. app.Usage = "The command line tool for EMQ X Kuiper."
  419. app.Action = func(c *cli.Context) error {
  420. cli.ShowSubcommandHelp(c)
  421. //cli.ShowVersion(c)
  422. return nil
  423. }
  424. sort.Sort(cli.FlagsByName(app.Flags))
  425. sort.Sort(cli.CommandsByName(app.Commands))
  426. err = app.Run(os.Args)
  427. if err != nil {
  428. fmt.Printf("%v", err)
  429. }
  430. }