main.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446
  1. package main
  2. import (
  3. "bufio"
  4. "engine/common"
  5. "fmt"
  6. "github.com/go-yaml/yaml"
  7. "github.com/urfave/cli"
  8. "io/ioutil"
  9. "net/rpc"
  10. "os"
  11. "sort"
  12. "strings"
  13. "time"
  14. )
  15. type clientConf struct {
  16. Host string `yaml:"host"`
  17. Port int `yaml:"port"`
  18. }
  19. var clientYaml = "client.yaml"
  20. func streamProcess(client *rpc.Client, args string) error {
  21. var reply string
  22. if args == ""{
  23. args = strings.Join(os.Args[1:], " ")
  24. }
  25. err := client.Call("Server.Stream", args, &reply)
  26. if err != nil{
  27. fmt.Println(err)
  28. return err
  29. }else{
  30. fmt.Println(reply)
  31. }
  32. return nil
  33. }
  34. func main() {
  35. app := cli.NewApp()
  36. app.Version = "0.0.3"
  37. //nflag := []cli.Flag { cli.StringFlag{
  38. // Name: "name, n",
  39. // Usage: "the name of stream",
  40. // }}
  41. b := common.LoadConf(clientYaml)
  42. var cfg map[string]clientConf
  43. var config *clientConf
  44. if err := yaml.Unmarshal(b, &cfg); err != nil {
  45. fmt.Printf("Failed to load config file with error %s.\n", err)
  46. }else{
  47. c, ok := cfg["basic"]
  48. if !ok{
  49. fmt.Printf("No basic config in client.yaml, will use the default configuration.\n")
  50. }else{
  51. config = &c
  52. }
  53. }
  54. if config == nil {
  55. config = &clientConf{
  56. Host: "127.0.0.1",
  57. Port: 20498,
  58. }
  59. }
  60. fmt.Printf("Connecting to %s:%d \n", config.Host, config.Port)
  61. // Create a TCP connection to localhost on port 1234
  62. client, err := rpc.DialHTTP("tcp", fmt.Sprintf("%s:%d", config.Host, config.Port))
  63. if err != nil {
  64. fmt.Printf("Failed to connect the server, please start the server.\n")
  65. return
  66. }
  67. app.Commands = []cli.Command{
  68. {
  69. Name: "query",
  70. Aliases: []string{"query"},
  71. Usage: "query command line",
  72. Action: func(c *cli.Context) error {
  73. reader := bufio.NewReader(os.Stdin)
  74. var inputs []string
  75. ticker := time.NewTicker(time.Millisecond * 300)
  76. defer ticker.Stop()
  77. for {
  78. fmt.Print("kuiper > ")
  79. text, _ := reader.ReadString('\n')
  80. inputs = append(inputs, text)
  81. // convert CRLF to LF
  82. text = strings.Replace(text, "\n", "", -1)
  83. if strings.ToLower(text) == "quit" || strings.ToLower(text) == "exit" {
  84. break
  85. } else if strings.Trim(text, " ") == "" {
  86. continue
  87. } else {
  88. var reply string
  89. err := client.Call("Server.CreateQuery", text, &reply)
  90. if err != nil{
  91. fmt.Println(err)
  92. return err
  93. } else {
  94. fmt.Println(reply)
  95. go func() {
  96. for {
  97. <-ticker.C
  98. var result string
  99. _ = client.Call("Server.GetQueryResult", "", &result)
  100. if result != "" {
  101. fmt.Println(result)
  102. }
  103. }
  104. }()
  105. }
  106. }
  107. }
  108. return nil
  109. },
  110. },
  111. {
  112. Name: "create",
  113. Aliases: []string{"create"},
  114. Usage: "create stream $stream_name | create stream $stream_name -f $stream_def_file | create rule $rule_name $rule_json | create rule $rule_name -f $rule_def_file",
  115. Subcommands: []cli.Command {
  116. {
  117. Name: "stream",
  118. Usage: "create stream $stream_name [-f stream_def_file]",
  119. Flags: []cli.Flag {
  120. cli.StringFlag{
  121. Name: "file, f",
  122. Usage: "the location of stream definition file",
  123. FilePath: "/home/mystream.txt",
  124. },
  125. },
  126. Action: func(c *cli.Context) error {
  127. sfile := c.String("file")
  128. if sfile != "" {
  129. if _, err := os.Stat(c.String("file")); os.IsNotExist(err) {
  130. fmt.Printf("The specified stream defintion file %s does not existed.", sfile)
  131. return nil
  132. }
  133. fmt.Printf("Creating a new stream from file %s", sfile)
  134. if stream, err := ioutil.ReadFile(sfile); err != nil {
  135. fmt.Printf("Failed to read from stream definition file %s", sfile)
  136. return nil
  137. } else {
  138. args := strings.Join([]string{"CREATE STREAM ", string(stream)}, " ")
  139. return streamProcess(client, args)
  140. }
  141. } else {
  142. return streamProcess(client, "")
  143. }
  144. },
  145. },
  146. {
  147. Name: "rule",
  148. Usage: "create rule $rule_name [$rule_json | -f rule_def_file]",
  149. Flags: []cli.Flag {
  150. cli.StringFlag{
  151. Name: "file, f",
  152. Usage: "the location of rule definition file",
  153. FilePath: "/home/myrule.txt",
  154. },
  155. },
  156. Action: func(c *cli.Context) error {
  157. sfile := c.String("file")
  158. if sfile != "" {
  159. if _, err := os.Stat(c.String("file")); os.IsNotExist(err) {
  160. fmt.Printf("The specified rule defenition file %s does not existed.", sfile)
  161. return nil
  162. }
  163. fmt.Printf("Creating a new rule from file %s", sfile)
  164. if rule, err := ioutil.ReadFile(sfile); err != nil {
  165. fmt.Printf("Failed to read from rule definition file %s", sfile)
  166. return nil
  167. } else {
  168. if len(c.Args()) != 1 {
  169. fmt.Printf("Expect rule name.\n")
  170. return nil
  171. }
  172. rname := c.Args()[0]
  173. var reply string
  174. args := &common.Rule{rname, string(rule)}
  175. err = client.Call("Server.CreateRule", args, &reply)
  176. if err != nil {
  177. fmt.Println(err)
  178. } else {
  179. fmt.Println(reply)
  180. }
  181. }
  182. return nil
  183. } else {
  184. if len(c.Args()) != 2 {
  185. fmt.Printf("Expect rule name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  186. return nil
  187. }
  188. rname := c.Args()[0]
  189. rjson := c.Args()[1]
  190. var reply string
  191. args := &common.Rule{rname, rjson}
  192. err = client.Call("Server.CreateRule", args, &reply)
  193. if err != nil {
  194. fmt.Println(err)
  195. } else {
  196. fmt.Println(reply)
  197. }
  198. return nil
  199. }
  200. },
  201. },
  202. },
  203. },
  204. {
  205. Name: "describe",
  206. Aliases: []string{"describe"},
  207. Usage: "describe stream $stream_name | describe rule $rule_name",
  208. Subcommands: []cli.Command{
  209. {
  210. Name: "stream",
  211. Usage: "describe stream $stream_name",
  212. //Flags: nflag,
  213. Action: func(c *cli.Context) error {
  214. return streamProcess(client, "")
  215. },
  216. },
  217. {
  218. Name: "rule",
  219. Usage: "describe rule $rule_name",
  220. Action: func(c *cli.Context) error {
  221. if len(c.Args()) != 1 {
  222. fmt.Printf("Expect rule name.\n")
  223. return nil
  224. }
  225. rname := c.Args()[0]
  226. var reply string
  227. err = client.Call("Server.DescRule", rname, &reply)
  228. if err != nil {
  229. fmt.Println(err)
  230. } else {
  231. fmt.Println(reply)
  232. }
  233. return nil
  234. },
  235. },
  236. },
  237. },
  238. {
  239. Name: "drop",
  240. Aliases: []string{"drop"},
  241. Usage: "drop stream $stream_name | drop rule $rule_name",
  242. Subcommands: []cli.Command{
  243. {
  244. Name: "stream",
  245. Usage: "drop stream $stream_name",
  246. //Flags: nflag,
  247. Action: func(c *cli.Context) error {
  248. return streamProcess(client, "")
  249. },
  250. },
  251. {
  252. Name: "rule",
  253. Usage: "drop rule $rule_name",
  254. //Flags: nflag,
  255. Action: func(c *cli.Context) error {
  256. if len(c.Args()) != 1 {
  257. fmt.Printf("Expect rule name.\n")
  258. return nil
  259. }
  260. rname := c.Args()[0]
  261. var reply string
  262. err = client.Call("Server.DropRule", rname, &reply)
  263. if err != nil {
  264. fmt.Println(err)
  265. } else {
  266. fmt.Println(reply)
  267. }
  268. return nil
  269. },
  270. },
  271. },
  272. },
  273. {
  274. Name: "show",
  275. Aliases: []string{"show"},
  276. Usage: "show streams | show rules",
  277. Subcommands: []cli.Command{
  278. {
  279. Name: "streams",
  280. Usage: "show streams",
  281. Action: func(c *cli.Context) error {
  282. return streamProcess(client, "")
  283. },
  284. },
  285. {
  286. Name: "rules",
  287. Usage: "show rules",
  288. Action: func(c *cli.Context) error {
  289. var reply string
  290. err = client.Call("Server.ShowRules", 0, &reply)
  291. if err != nil {
  292. fmt.Println(err)
  293. } else {
  294. fmt.Println(reply)
  295. }
  296. return nil
  297. },
  298. },
  299. },
  300. },
  301. {
  302. Name: "getstatus",
  303. Aliases: []string{"getstatus"},
  304. Usage: "getstatus rule $rule_name",
  305. Subcommands: []cli.Command{
  306. {
  307. Name: "rule",
  308. Usage: "getstatus rule $rule_name",
  309. //Flags: nflag,
  310. Action: func(c *cli.Context) error {
  311. if len(c.Args()) != 1 {
  312. fmt.Printf("Expect rule name.\n")
  313. return nil
  314. }
  315. rname := c.Args()[0]
  316. var reply string
  317. err = client.Call("Server.GetStatusRule", rname, &reply)
  318. if err != nil {
  319. fmt.Println(err)
  320. } else {
  321. fmt.Println(reply)
  322. }
  323. return nil
  324. },
  325. },
  326. },
  327. },
  328. {
  329. Name: "start",
  330. Aliases: []string{"start"},
  331. Usage: "start rule $rule_name",
  332. Subcommands: []cli.Command{
  333. {
  334. Name: "rule",
  335. Usage: "start rule $rule_name",
  336. //Flags: nflag,
  337. Action: func(c *cli.Context) error {
  338. if len(c.Args()) != 1 {
  339. fmt.Printf("Expect rule name.\n")
  340. return nil
  341. }
  342. rname := c.Args()[0]
  343. var reply string
  344. err = client.Call("Server.StartRule", rname, &reply)
  345. if err != nil {
  346. fmt.Println(err)
  347. } else {
  348. fmt.Println(reply)
  349. }
  350. return nil
  351. },
  352. },
  353. },
  354. },
  355. {
  356. Name: "stop",
  357. Aliases: []string{"stop"},
  358. Usage: "stop rule $rule_name",
  359. Subcommands: []cli.Command{
  360. {
  361. Name: "rule",
  362. Usage: "stop rule $rule_name",
  363. //Flags: nflag,
  364. Action: func(c *cli.Context) error {
  365. if len(c.Args()) != 1 {
  366. fmt.Printf("Expect rule name.\n")
  367. return nil
  368. }
  369. rname := c.Args()[0]
  370. var reply string
  371. err = client.Call("Server.StopRule", rname, &reply)
  372. if err != nil {
  373. fmt.Println(err)
  374. } else {
  375. fmt.Println(reply)
  376. }
  377. return nil
  378. },
  379. },
  380. },
  381. },
  382. {
  383. Name: "restart",
  384. Aliases: []string{"restart"},
  385. Usage: "restart rule $rule_name",
  386. Subcommands: []cli.Command{
  387. {
  388. Name: "rule",
  389. Usage: "restart rule $rule_name",
  390. //Flags: nflag,
  391. Action: func(c *cli.Context) error {
  392. if len(c.Args()) != 1 {
  393. fmt.Printf("Expect rule name.\n")
  394. return nil
  395. }
  396. rname := c.Args()[0]
  397. var reply string
  398. err = client.Call("Server.RestartRule", rname, &reply)
  399. if err != nil {
  400. fmt.Println(err)
  401. } else {
  402. fmt.Println(reply)
  403. }
  404. return nil
  405. },
  406. },
  407. },
  408. },
  409. }
  410. app.Name = "Kuiper"
  411. app.Usage = "The command line tool for EMQ X Kuiper."
  412. app.Action = func(c *cli.Context) error {
  413. cli.ShowSubcommandHelp(c)
  414. //cli.ShowVersion(c)
  415. return nil
  416. }
  417. sort.Sort(cli.FlagsByName(app.Flags))
  418. sort.Sort(cli.CommandsByName(app.Commands))
  419. err = app.Run(os.Args)
  420. if err != nil {
  421. fmt.Printf("%v", err)
  422. }
  423. }