main.go 25 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "bufio"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/internal/pkg/model"
  20. "github.com/lf-edge/ekuiper/pkg/infra"
  21. "github.com/urfave/cli"
  22. "io/ioutil"
  23. "net/rpc"
  24. "os"
  25. "sort"
  26. "strings"
  27. "time"
  28. )
  29. type clientConf struct {
  30. Host string `yaml:"host"`
  31. Port int `yaml:"port"`
  32. }
  33. const ClientYaml = "client.yaml"
  34. func streamProcess(client *rpc.Client, args string) {
  35. var reply string
  36. if args == "" {
  37. args = strings.Join(os.Args[1:], " ")
  38. }
  39. err := client.Call("Server.Stream", args, &reply)
  40. if err != nil {
  41. fmt.Println(err)
  42. } else {
  43. fmt.Println(reply)
  44. }
  45. }
  46. var (
  47. Version = "unknown"
  48. LoadFileType = "relative"
  49. )
  50. func main() {
  51. conf.LoadFileType = LoadFileType
  52. app := cli.NewApp()
  53. app.Version = Version
  54. //nflag := []cli.Flag { cli.StringFlag{
  55. // Name: "name, n",
  56. // Usage: "the name of stream",
  57. // }}
  58. var cfg map[string]clientConf
  59. err := conf.LoadConfigByName(ClientYaml, &cfg)
  60. if err != nil {
  61. conf.Log.Fatal(err)
  62. fmt.Printf("Failed to load config file with error %s.\n", err)
  63. }
  64. var config *clientConf
  65. c, ok := cfg["basic"]
  66. if !ok {
  67. fmt.Printf("No basic config in client.yaml, will use the default configuration.\n")
  68. } else {
  69. config = &c
  70. }
  71. if config == nil {
  72. config = &clientConf{
  73. Host: "127.0.0.1",
  74. Port: 20498,
  75. }
  76. }
  77. fmt.Printf("Connecting to %s:%d... \n", config.Host, config.Port)
  78. // Create a TCP connection to localhost on port 1234
  79. client, err := rpc.DialHTTP("tcp", fmt.Sprintf("%s:%d", config.Host, config.Port))
  80. if err != nil {
  81. fmt.Printf("Failed to connect the server, please start the server.\n")
  82. return
  83. }
  84. app.Commands = []cli.Command{
  85. {
  86. Name: "query",
  87. Aliases: []string{"query"},
  88. Usage: "query command line",
  89. Action: func(c *cli.Context) error {
  90. reader := bufio.NewReader(os.Stdin)
  91. var inputs []string
  92. ticker := time.NewTicker(time.Millisecond * 300)
  93. defer ticker.Stop()
  94. for {
  95. fmt.Print("kuiper > ")
  96. text, _ := reader.ReadString('\n')
  97. inputs = append(inputs, text)
  98. // convert CRLF to LF
  99. text = strings.Replace(text, "\n", "", -1)
  100. if strings.EqualFold(text, "quit") || strings.EqualFold(text, "exit") {
  101. break
  102. } else if strings.Trim(text, " ") == "" {
  103. continue
  104. } else {
  105. var reply string
  106. err := client.Call("Server.CreateQuery", text, &reply)
  107. if err != nil {
  108. fmt.Println(err)
  109. continue
  110. } else {
  111. fmt.Println(reply)
  112. go func() {
  113. err := infra.SafeRun(func() error {
  114. for {
  115. <-ticker.C
  116. var result string
  117. e := client.Call("Server.GetQueryResult", "", &result)
  118. if e != nil {
  119. return e
  120. }
  121. if result != "" {
  122. fmt.Println(result)
  123. }
  124. }
  125. })
  126. if err != nil {
  127. fmt.Println(err)
  128. fmt.Print("kuiper > ")
  129. }
  130. }()
  131. }
  132. }
  133. }
  134. return nil
  135. },
  136. },
  137. {
  138. Name: "create",
  139. Aliases: []string{"create"},
  140. Usage: "create stream $stream_name | create stream $stream_name -f $stream_def_file | create table $table_name | create table $table_name -f $table_def_file| create rule $rule_name $rule_json | create rule $rule_name -f $rule_def_file | create plugin $plugin_type $plugin_name $plugin_json | create plugin $plugin_type $plugin_name -f $plugin_def_file | create service $service_name $service_json | create schema $schema_type $schema_name $schema_json",
  141. Subcommands: []cli.Command{
  142. {
  143. Name: "stream",
  144. Usage: "create stream $stream_name [-f stream_def_file]",
  145. Flags: []cli.Flag{
  146. cli.StringFlag{
  147. Name: "file, f",
  148. Usage: "the location of stream definition file",
  149. FilePath: "/home/mystream.txt",
  150. },
  151. },
  152. Action: func(c *cli.Context) error {
  153. sfile := c.String("file")
  154. if sfile != "" {
  155. if stream, err := readDef(sfile, "stream"); err != nil {
  156. fmt.Printf("%s", err)
  157. return nil
  158. } else {
  159. args := strings.Join([]string{"CREATE STREAM ", string(stream)}, " ")
  160. streamProcess(client, args)
  161. return nil
  162. }
  163. } else {
  164. streamProcess(client, "")
  165. return nil
  166. }
  167. },
  168. },
  169. {
  170. Name: "table",
  171. Usage: "create table $table_name [-f table_def_file]",
  172. Flags: []cli.Flag{
  173. cli.StringFlag{
  174. Name: "file, f",
  175. Usage: "the location of table definition file",
  176. FilePath: "/home/mytable.txt",
  177. },
  178. },
  179. Action: func(c *cli.Context) error {
  180. sfile := c.String("file")
  181. if sfile != "" {
  182. if stream, err := readDef(sfile, "table"); err != nil {
  183. fmt.Printf("%s", err)
  184. return nil
  185. } else {
  186. args := strings.Join([]string{"CREATE TABLE ", string(stream)}, " ")
  187. streamProcess(client, args)
  188. return nil
  189. }
  190. } else {
  191. streamProcess(client, "")
  192. return nil
  193. }
  194. },
  195. },
  196. {
  197. Name: "rule",
  198. Usage: "create rule $rule_name [$rule_json | -f rule_def_file]",
  199. Flags: []cli.Flag{
  200. cli.StringFlag{
  201. Name: "file, f",
  202. Usage: "the location of rule definition file",
  203. FilePath: "/home/myrule.txt",
  204. },
  205. },
  206. Action: func(c *cli.Context) error {
  207. sfile := c.String("file")
  208. if sfile != "" {
  209. if rule, err := readDef(sfile, "rule"); err != nil {
  210. fmt.Printf("%s", err)
  211. return nil
  212. } else {
  213. if len(c.Args()) != 1 {
  214. fmt.Printf("Expect rule name.\n")
  215. return nil
  216. }
  217. rname := c.Args()[0]
  218. var reply string
  219. args := &model.RPCArgDesc{Name: rname, Json: string(rule)}
  220. err = client.Call("Server.CreateRule", args, &reply)
  221. if err != nil {
  222. fmt.Println(err)
  223. } else {
  224. fmt.Println(reply)
  225. }
  226. }
  227. return nil
  228. } else {
  229. if len(c.Args()) != 2 {
  230. fmt.Printf("Expect rule name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  231. return nil
  232. }
  233. rname := c.Args()[0]
  234. rjson := c.Args()[1]
  235. var reply string
  236. args := &model.RPCArgDesc{Name: rname, Json: rjson}
  237. err = client.Call("Server.CreateRule", args, &reply)
  238. if err != nil {
  239. fmt.Println(err)
  240. } else {
  241. fmt.Println(reply)
  242. }
  243. return nil
  244. }
  245. },
  246. },
  247. {
  248. Name: "plugin",
  249. Usage: "create plugin $plugin_type $plugin_name [$plugin_json | -f plugin_def_file]",
  250. Flags: []cli.Flag{
  251. cli.StringFlag{
  252. Name: "file, f",
  253. Usage: "the location of plugin definition file",
  254. FilePath: "/home/myplugin.txt",
  255. },
  256. },
  257. Action: func(c *cli.Context) error {
  258. if len(c.Args()) < 2 {
  259. fmt.Printf("Expect plugin type and name.\n")
  260. return nil
  261. }
  262. ptype, err := getPluginType(c.Args()[0])
  263. if err != nil {
  264. fmt.Printf("%s\n", err)
  265. return nil
  266. }
  267. pname := c.Args()[1]
  268. sfile := c.String("file")
  269. args := &model.PluginDesc{
  270. RPCArgDesc: model.RPCArgDesc{
  271. Name: pname,
  272. },
  273. Type: ptype,
  274. }
  275. if sfile != "" {
  276. if len(c.Args()) != 2 {
  277. fmt.Printf("Expect plugin type, name.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  278. return nil
  279. }
  280. if p, err := readDef(sfile, "plugin"); err != nil {
  281. fmt.Printf("%s", err)
  282. return nil
  283. } else {
  284. args.Json = string(p)
  285. }
  286. } else {
  287. if len(c.Args()) != 3 {
  288. fmt.Printf("Expect plugin type, name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  289. return nil
  290. }
  291. args.Json = c.Args()[2]
  292. }
  293. var reply string
  294. err = client.Call("Server.CreatePlugin", args, &reply)
  295. if err != nil {
  296. fmt.Println(err)
  297. } else {
  298. fmt.Println(reply)
  299. }
  300. return nil
  301. },
  302. },
  303. {
  304. Name: "service",
  305. Usage: "create service $service_name $service_json",
  306. Action: func(c *cli.Context) error {
  307. if len(c.Args()) < 2 {
  308. fmt.Printf("Expect service name and json.\n")
  309. return nil
  310. }
  311. var reply string
  312. err = client.Call("Server.CreateService", &model.RPCArgDesc{
  313. Name: c.Args()[0],
  314. Json: c.Args()[1],
  315. }, &reply)
  316. if err != nil {
  317. fmt.Println(err)
  318. } else {
  319. fmt.Println(reply)
  320. }
  321. return nil
  322. },
  323. },
  324. {
  325. Name: "schema",
  326. Usage: "create schema $schema_type $schema_name $schema_json",
  327. Action: func(c *cli.Context) error {
  328. if len(c.Args()) < 3 {
  329. fmt.Printf("Expect plugin type, name and json.\n")
  330. return nil
  331. }
  332. var reply string
  333. err = client.Call("Server.CreateSchema", &model.RPCTypedArgDesc{
  334. Type: c.Args()[0],
  335. Name: c.Args()[1],
  336. Json: c.Args()[2],
  337. }, &reply)
  338. if err != nil {
  339. fmt.Println(err)
  340. } else {
  341. fmt.Println(reply)
  342. }
  343. return nil
  344. },
  345. },
  346. },
  347. },
  348. {
  349. Name: "describe",
  350. Aliases: []string{"describe"},
  351. Usage: "describe stream $stream_name | describe table $table_name | describe rule $rule_name | describe plugin $plugin_type $plugin_name | describe udf $udf_name | describe service $service_name | describe service_func $service_func_name | describe schema $schema_type $schema_name",
  352. Subcommands: []cli.Command{
  353. {
  354. Name: "stream",
  355. Usage: "describe stream $stream_name",
  356. //Flags: nflag,
  357. Action: func(c *cli.Context) error {
  358. streamProcess(client, "")
  359. return nil
  360. },
  361. },
  362. {
  363. Name: "table",
  364. Usage: "describe table $table_name",
  365. //Flags: nflag,
  366. Action: func(c *cli.Context) error {
  367. streamProcess(client, "")
  368. return nil
  369. },
  370. },
  371. {
  372. Name: "rule",
  373. Usage: "describe rule $rule_name",
  374. Action: func(c *cli.Context) error {
  375. if len(c.Args()) != 1 {
  376. fmt.Printf("Expect rule name.\n")
  377. return nil
  378. }
  379. rname := c.Args()[0]
  380. var reply string
  381. err = client.Call("Server.DescRule", rname, &reply)
  382. if err != nil {
  383. fmt.Println(err)
  384. } else {
  385. fmt.Println(reply)
  386. }
  387. return nil
  388. },
  389. },
  390. {
  391. Name: "plugin",
  392. Usage: "describe plugin $plugin_type $plugin_name",
  393. //Flags: nflag,
  394. Action: func(c *cli.Context) error {
  395. ptype, err := getPluginType(c.Args()[0])
  396. if err != nil {
  397. fmt.Printf("%s\n", err)
  398. return nil
  399. }
  400. if len(c.Args()) != 2 {
  401. fmt.Printf("Expect plugin name.\n")
  402. return nil
  403. }
  404. pname := c.Args()[1]
  405. args := &model.PluginDesc{
  406. RPCArgDesc: model.RPCArgDesc{
  407. Name: pname,
  408. },
  409. Type: ptype,
  410. }
  411. var reply string
  412. err = client.Call("Server.DescPlugin", args, &reply)
  413. if err != nil {
  414. fmt.Println(err)
  415. } else {
  416. fmt.Println(reply)
  417. }
  418. return nil
  419. },
  420. },
  421. {
  422. Name: "udf",
  423. Usage: "describe udf $udf_name",
  424. //Flags: nflag,
  425. Action: func(c *cli.Context) error {
  426. if len(c.Args()) != 1 {
  427. fmt.Printf("Expect udf name.\n")
  428. return nil
  429. }
  430. pname := c.Args()[0]
  431. var reply string
  432. err = client.Call("Server.DescUdf", pname, &reply)
  433. if err != nil {
  434. fmt.Println(err)
  435. } else {
  436. fmt.Println(reply)
  437. }
  438. return nil
  439. },
  440. },
  441. {
  442. Name: "service",
  443. Usage: "describe service $service_name",
  444. Action: func(c *cli.Context) error {
  445. if len(c.Args()) != 1 {
  446. fmt.Printf("Expect service name.\n")
  447. return nil
  448. }
  449. name := c.Args()[0]
  450. var reply string
  451. err = client.Call("Server.DescService", name, &reply)
  452. if err != nil {
  453. fmt.Println(err)
  454. } else {
  455. fmt.Println(reply)
  456. }
  457. return nil
  458. },
  459. },
  460. {
  461. Name: "service_func",
  462. Usage: "describe service_func $service_func_name",
  463. Action: func(c *cli.Context) error {
  464. if len(c.Args()) != 1 {
  465. fmt.Printf("Expect service func name.\n")
  466. return nil
  467. }
  468. name := c.Args()[0]
  469. var reply string
  470. err = client.Call("Server.DescServiceFunc", name, &reply)
  471. if err != nil {
  472. fmt.Println(err)
  473. } else {
  474. fmt.Println(reply)
  475. }
  476. return nil
  477. },
  478. },
  479. {
  480. Name: "schema",
  481. Usage: "describe schema $schema_type $schema_name",
  482. Action: func(c *cli.Context) error {
  483. if len(c.Args()) != 2 {
  484. fmt.Printf("Expect schema type and name.\n")
  485. return nil
  486. }
  487. args := &model.RPCTypedArgDesc{
  488. Type: c.Args()[0],
  489. Name: c.Args()[1],
  490. }
  491. var reply string
  492. err = client.Call("Server.DescSchema", args, &reply)
  493. if err != nil {
  494. fmt.Println(err)
  495. } else {
  496. fmt.Println(reply)
  497. }
  498. return nil
  499. },
  500. },
  501. },
  502. },
  503. {
  504. Name: "drop",
  505. Aliases: []string{"drop"},
  506. Usage: "drop stream $stream_name | drop table $table_name |drop rule $rule_name | drop plugin $plugin_type $plugin_name -s $stop | drop service $service_name | drop schema $schema_type $schema_name",
  507. Subcommands: []cli.Command{
  508. {
  509. Name: "stream",
  510. Usage: "drop stream $stream_name",
  511. //Flags: nflag,
  512. Action: func(c *cli.Context) error {
  513. streamProcess(client, "")
  514. return nil
  515. },
  516. },
  517. {
  518. Name: "table",
  519. Usage: "drop table $table_name",
  520. //Flags: nflag,
  521. Action: func(c *cli.Context) error {
  522. streamProcess(client, "")
  523. return nil
  524. },
  525. },
  526. {
  527. Name: "rule",
  528. Usage: "drop rule $rule_name",
  529. //Flags: nflag,
  530. Action: func(c *cli.Context) error {
  531. if len(c.Args()) != 1 {
  532. fmt.Printf("Expect rule name.\n")
  533. return nil
  534. }
  535. rname := c.Args()[0]
  536. var reply string
  537. err = client.Call("Server.DropRule", rname, &reply)
  538. if err != nil {
  539. fmt.Println(err)
  540. } else {
  541. fmt.Println(reply)
  542. }
  543. return nil
  544. },
  545. },
  546. {
  547. Name: "plugin",
  548. Usage: "drop plugin $plugin_type $plugin_name -s stop",
  549. Flags: []cli.Flag{
  550. cli.StringFlag{
  551. Name: "stop, s",
  552. Usage: "stop kuiper after the action",
  553. },
  554. },
  555. Action: func(c *cli.Context) error {
  556. r := c.String("stop")
  557. if r != "true" && r != "false" {
  558. fmt.Printf("Expect s flag to be a boolean value.\n")
  559. return nil
  560. }
  561. if len(c.Args()) < 2 || len(c.Args()) > 3 {
  562. fmt.Printf("Expect plugin type and name.\n")
  563. return nil
  564. }
  565. ptype, err := getPluginType(c.Args()[0])
  566. if err != nil {
  567. fmt.Printf("%s\n", err)
  568. return nil
  569. }
  570. pname := c.Args()[1]
  571. args := &model.PluginDesc{
  572. RPCArgDesc: model.RPCArgDesc{
  573. Name: pname,
  574. },
  575. Type: ptype,
  576. Stop: r == "true",
  577. }
  578. var reply string
  579. err = client.Call("Server.DropPlugin", args, &reply)
  580. if err != nil {
  581. fmt.Println(err)
  582. } else {
  583. fmt.Println(reply)
  584. }
  585. return nil
  586. },
  587. },
  588. {
  589. Name: "service",
  590. Usage: "drop service $service_name",
  591. Action: func(c *cli.Context) error {
  592. if len(c.Args()) != 1 {
  593. fmt.Printf("Expect service name.\n")
  594. return nil
  595. }
  596. name := c.Args()[0]
  597. var reply string
  598. err = client.Call("Server.DropService", name, &reply)
  599. if err != nil {
  600. fmt.Println(err)
  601. } else {
  602. fmt.Println(reply)
  603. }
  604. return nil
  605. },
  606. },
  607. {
  608. Name: "schema",
  609. Usage: "drop schema $schema_type $schema_name",
  610. Action: func(c *cli.Context) error {
  611. if len(c.Args()) != 2 {
  612. fmt.Printf("Expect schema type and name.\n")
  613. return nil
  614. }
  615. args := &model.RPCTypedArgDesc{
  616. Type: c.Args()[0],
  617. Name: c.Args()[1],
  618. }
  619. var reply string
  620. err = client.Call("Server.DropSchema", args, &reply)
  621. if err != nil {
  622. fmt.Println(err)
  623. } else {
  624. fmt.Println(reply)
  625. }
  626. return nil
  627. },
  628. },
  629. },
  630. },
  631. {
  632. Name: "show",
  633. Aliases: []string{"show"},
  634. Usage: "show streams | show tables | show rules | show plugins $plugin_type | show services | show service_funcs | show schemas $schema_type",
  635. Subcommands: []cli.Command{
  636. {
  637. Name: "streams",
  638. Usage: "show streams",
  639. Action: func(c *cli.Context) error {
  640. streamProcess(client, "")
  641. return nil
  642. },
  643. },
  644. {
  645. Name: "tables",
  646. Usage: "show tables",
  647. Action: func(c *cli.Context) error {
  648. streamProcess(client, "")
  649. return nil
  650. },
  651. },
  652. {
  653. Name: "rules",
  654. Usage: "show rules",
  655. Action: func(c *cli.Context) error {
  656. var reply string
  657. err = client.Call("Server.ShowRules", 0, &reply)
  658. if err != nil {
  659. fmt.Println(err)
  660. } else {
  661. fmt.Println(reply)
  662. }
  663. return nil
  664. },
  665. },
  666. {
  667. Name: "plugins",
  668. Usage: "show plugins $plugin_type",
  669. Action: func(c *cli.Context) error {
  670. if len(c.Args()) != 1 {
  671. fmt.Printf("Expect plugin type.\n")
  672. return nil
  673. }
  674. ptype, err := getPluginType(c.Args()[0])
  675. if err != nil {
  676. fmt.Printf("%s\n", err)
  677. return nil
  678. }
  679. var reply string
  680. err = client.Call("Server.ShowPlugins", ptype, &reply)
  681. if err != nil {
  682. fmt.Println(err)
  683. } else {
  684. fmt.Println(reply)
  685. }
  686. return nil
  687. },
  688. },
  689. {
  690. Name: "udfs",
  691. Usage: "show udfs",
  692. Action: func(c *cli.Context) error {
  693. var reply string
  694. err = client.Call("Server.ShowUdfs", 0, &reply)
  695. if err != nil {
  696. fmt.Println(err)
  697. } else {
  698. fmt.Println(reply)
  699. }
  700. return nil
  701. },
  702. }, {
  703. Name: "services",
  704. Usage: "show services",
  705. Action: func(c *cli.Context) error {
  706. var reply string
  707. err = client.Call("Server.ShowServices", 0, &reply)
  708. if err != nil {
  709. fmt.Println(err)
  710. } else {
  711. fmt.Println(reply)
  712. }
  713. return nil
  714. },
  715. }, {
  716. Name: "service_funcs",
  717. Usage: "show service_funcs",
  718. Action: func(c *cli.Context) error {
  719. var reply string
  720. err = client.Call("Server.ShowServiceFuncs", 0, &reply)
  721. if err != nil {
  722. fmt.Println(err)
  723. } else {
  724. fmt.Println(reply)
  725. }
  726. return nil
  727. },
  728. }, {
  729. Name: "schemas",
  730. Usage: "show schemas $schema_type",
  731. Action: func(c *cli.Context) error {
  732. if len(c.Args()) != 1 {
  733. fmt.Printf("Expect schema type.\n")
  734. return nil
  735. }
  736. var reply string
  737. err = client.Call("Server.ShowSchemas", c.Args()[0], &reply)
  738. if err != nil {
  739. fmt.Println(err)
  740. } else {
  741. fmt.Println(reply)
  742. }
  743. return nil
  744. },
  745. },
  746. },
  747. },
  748. {
  749. Name: "getstatus",
  750. Aliases: []string{"getstatus"},
  751. Usage: "getstatus rule $rule_name",
  752. Subcommands: []cli.Command{
  753. {
  754. Name: "rule",
  755. Usage: "getstatus rule $rule_name",
  756. //Flags: nflag,
  757. Action: func(c *cli.Context) error {
  758. if len(c.Args()) != 1 {
  759. fmt.Printf("Expect rule name.\n")
  760. return nil
  761. }
  762. rname := c.Args()[0]
  763. var reply string
  764. err = client.Call("Server.GetStatusRule", rname, &reply)
  765. if err != nil {
  766. fmt.Println(err)
  767. } else {
  768. fmt.Println(reply)
  769. }
  770. return nil
  771. },
  772. },
  773. },
  774. },
  775. {
  776. Name: "gettopo",
  777. Aliases: []string{"gettopo"},
  778. Usage: "gettopo rule $rule_name",
  779. Subcommands: []cli.Command{
  780. {
  781. Name: "rule",
  782. Usage: "getstopo rule $rule_name",
  783. //Flags: nflag,
  784. Action: func(c *cli.Context) error {
  785. if len(c.Args()) != 1 {
  786. fmt.Printf("Expect rule name.\n")
  787. return nil
  788. }
  789. rname := c.Args()[0]
  790. var reply string
  791. err = client.Call("Server.GetTopoRule", rname, &reply)
  792. if err != nil {
  793. fmt.Println(err)
  794. } else {
  795. fmt.Println(reply)
  796. }
  797. return nil
  798. },
  799. },
  800. },
  801. },
  802. {
  803. Name: "start",
  804. Aliases: []string{"start"},
  805. Usage: "start rule $rule_name",
  806. Subcommands: []cli.Command{
  807. {
  808. Name: "rule",
  809. Usage: "start rule $rule_name",
  810. //Flags: nflag,
  811. Action: func(c *cli.Context) error {
  812. if len(c.Args()) != 1 {
  813. fmt.Printf("Expect rule name.\n")
  814. return nil
  815. }
  816. rname := c.Args()[0]
  817. var reply string
  818. err = client.Call("Server.StartRule", rname, &reply)
  819. if err != nil {
  820. fmt.Println(err)
  821. } else {
  822. fmt.Println(reply)
  823. }
  824. return nil
  825. },
  826. },
  827. },
  828. },
  829. {
  830. Name: "stop",
  831. Aliases: []string{"stop"},
  832. Usage: "stop rule $rule_name",
  833. Subcommands: []cli.Command{
  834. {
  835. Name: "rule",
  836. Usage: "stop rule $rule_name",
  837. //Flags: nflag,
  838. Action: func(c *cli.Context) error {
  839. if len(c.Args()) != 1 {
  840. fmt.Printf("Expect rule name.\n")
  841. return nil
  842. }
  843. rname := c.Args()[0]
  844. var reply string
  845. err = client.Call("Server.StopRule", rname, &reply)
  846. if err != nil {
  847. fmt.Println(err)
  848. } else {
  849. fmt.Println(reply)
  850. }
  851. return nil
  852. },
  853. },
  854. },
  855. },
  856. {
  857. Name: "restart",
  858. Aliases: []string{"restart"},
  859. Usage: "restart rule $rule_name",
  860. Subcommands: []cli.Command{
  861. {
  862. Name: "rule",
  863. Usage: "restart rule $rule_name",
  864. //Flags: nflag,
  865. Action: func(c *cli.Context) error {
  866. if len(c.Args()) != 1 {
  867. fmt.Printf("Expect rule name.\n")
  868. return nil
  869. }
  870. rname := c.Args()[0]
  871. var reply string
  872. err = client.Call("Server.RestartRule", rname, &reply)
  873. if err != nil {
  874. fmt.Println(err)
  875. } else {
  876. fmt.Println(reply)
  877. }
  878. return nil
  879. },
  880. },
  881. },
  882. },
  883. {
  884. Name: "register",
  885. Aliases: []string{"register"},
  886. Usage: "register plugin function $plugin_name [$plugin_json | -f plugin_def_file]",
  887. Subcommands: []cli.Command{
  888. {
  889. Name: "plugin",
  890. Usage: "register plugin $plugin_type $plugin_name [$plugin_json | -f plugin_def_file]",
  891. Flags: []cli.Flag{
  892. cli.StringFlag{
  893. Name: "file, f",
  894. Usage: "the location of plugin functions definition file",
  895. FilePath: "/home/myplugin.txt",
  896. },
  897. },
  898. Action: func(c *cli.Context) error {
  899. if len(c.Args()) < 2 {
  900. fmt.Printf("Expect plugin type and name.\n")
  901. return nil
  902. }
  903. ptype := c.Args()[0]
  904. if !strings.EqualFold(ptype, "function") {
  905. fmt.Printf("Plugin type must be function.\n")
  906. return nil
  907. }
  908. pname := c.Args()[1]
  909. sfile := c.String("file")
  910. args := &model.PluginDesc{
  911. RPCArgDesc: model.RPCArgDesc{
  912. Name: pname,
  913. },
  914. }
  915. if sfile != "" {
  916. if len(c.Args()) != 2 {
  917. fmt.Printf("Expect plugin type, name.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  918. return nil
  919. }
  920. if p, err := readDef(sfile, "plugin"); err != nil {
  921. fmt.Printf("%s", err)
  922. return nil
  923. } else {
  924. args.Json = string(p)
  925. }
  926. } else {
  927. if len(c.Args()) != 3 {
  928. fmt.Printf("Expect plugin type, name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  929. return nil
  930. }
  931. args.Json = c.Args()[2]
  932. }
  933. var reply string
  934. err = client.Call("Server.RegisterPlugin", args, &reply)
  935. if err != nil {
  936. fmt.Println(err)
  937. } else {
  938. fmt.Println(reply)
  939. }
  940. return nil
  941. },
  942. },
  943. },
  944. },
  945. }
  946. app.Name = "Kuiper"
  947. app.Usage = "The command line tool for EMQ X Kuiper."
  948. app.Action = func(c *cli.Context) error {
  949. cli.ShowSubcommandHelp(c)
  950. //cli.ShowVersion(c)
  951. return nil
  952. }
  953. sort.Sort(cli.FlagsByName(app.Flags))
  954. sort.Sort(cli.CommandsByName(app.Commands))
  955. err = app.Run(os.Args)
  956. if err != nil {
  957. fmt.Printf("%v", err)
  958. }
  959. }
  960. func getPluginType(arg string) (ptype int, err error) {
  961. switch arg {
  962. case "source":
  963. ptype = 0
  964. case "sink":
  965. ptype = 1
  966. case "function":
  967. ptype = 2
  968. case "portable":
  969. ptype = 3
  970. default:
  971. err = fmt.Errorf("Invalid plugin type %s, should be \"source\", \"sink\", \"function\" or \"portable\".\n", arg)
  972. }
  973. return
  974. }
  975. func readDef(sfile string, t string) ([]byte, error) {
  976. if _, err := os.Stat(sfile); os.IsNotExist(err) {
  977. return nil, fmt.Errorf("The specified %s defenition file %s is not existed.\n", t, sfile)
  978. }
  979. fmt.Printf("Creating a new %s from file %s.\n", t, sfile)
  980. if rule, err := ioutil.ReadFile(sfile); err != nil {
  981. return nil, fmt.Errorf("Failed to read from %s definition file %s.\n", t, sfile)
  982. } else {
  983. return rule, nil
  984. }
  985. }