main.go 31 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "bufio"
  17. "encoding/json"
  18. "flag"
  19. "fmt"
  20. "net/rpc"
  21. "os"
  22. "sort"
  23. "strings"
  24. "time"
  25. "github.com/urfave/cli"
  26. "github.com/lf-edge/ekuiper/internal/conf"
  27. "github.com/lf-edge/ekuiper/internal/pkg/model"
  28. "github.com/lf-edge/ekuiper/pkg/cast"
  29. "github.com/lf-edge/ekuiper/pkg/infra"
  30. )
  31. type clientConf struct {
  32. Host string `yaml:"host"`
  33. Port int `yaml:"port"`
  34. }
  35. const ClientYaml = "client.yaml"
  36. func streamProcess(client *rpc.Client, args string) {
  37. var reply string
  38. if args == "" {
  39. args = strings.Join(os.Args[1:], " ")
  40. }
  41. err := client.Call("Server.Stream", args, &reply)
  42. if err != nil {
  43. fmt.Println(err)
  44. } else {
  45. fmt.Println(reply)
  46. }
  47. }
  48. var (
  49. Version = "unknown"
  50. LoadFileType = "relative"
  51. )
  52. var (
  53. loadFileType string
  54. etcPath string
  55. )
  56. func init() {
  57. flag.StringVar(&loadFileType, "loadFileType", "", "loadFileType indicates the how to load path")
  58. flag.StringVar(&etcPath, "etc", "", "etc indicates the path of etc dir")
  59. flag.Parse()
  60. if len(loadFileType) > 0 {
  61. conf.PathConfig.LoadFileType = loadFileType
  62. } else {
  63. conf.PathConfig.LoadFileType = LoadFileType
  64. }
  65. if len(etcPath) > 0 {
  66. conf.PathConfig.Dirs["etc"] = etcPath
  67. }
  68. }
  69. func main() {
  70. app := cli.NewApp()
  71. app.Version = Version
  72. // nflag := []cli.Flag { cli.StringFlag{
  73. // Name: "name, n",
  74. // Usage: "the name of stream",
  75. // }}
  76. var cfg map[string]clientConf
  77. err := conf.LoadConfigByName(ClientYaml, &cfg)
  78. if err != nil {
  79. conf.Log.Fatal(err)
  80. fmt.Printf("Failed to load config file with error %s.\n", err)
  81. }
  82. var config *clientConf
  83. c, ok := cfg["basic"]
  84. if !ok {
  85. fmt.Printf("No basic config in client.yaml, will use the default configuration.\n")
  86. } else {
  87. config = &c
  88. }
  89. if config == nil {
  90. config = &clientConf{
  91. Host: "127.0.0.1",
  92. Port: 20498,
  93. }
  94. }
  95. fmt.Printf("Connecting to %s... \n", cast.JoinHostPortInt(config.Host, config.Port))
  96. // Create a TCP connection to localhost on port 1234
  97. client, err := rpc.DialHTTP("tcp", cast.JoinHostPortInt(config.Host, config.Port))
  98. if err != nil {
  99. fmt.Printf("Failed to connect the server, please start the server.\n")
  100. return
  101. }
  102. app.Commands = []cli.Command{
  103. {
  104. Name: "query",
  105. Aliases: []string{"query"},
  106. Usage: "query command line",
  107. Action: func(c *cli.Context) error {
  108. reader := bufio.NewReader(os.Stdin)
  109. ticker := time.NewTicker(time.Millisecond * 300)
  110. defer ticker.Stop()
  111. for {
  112. fmt.Print("kuiper > ")
  113. text, _ := reader.ReadString('\n')
  114. // convert CRLF to LF
  115. text = strings.Replace(text, "\n", "", -1)
  116. if strings.EqualFold(text, "quit") || strings.EqualFold(text, "exit") {
  117. break
  118. } else if strings.Trim(text, " ") == "" {
  119. continue
  120. } else {
  121. var reply string
  122. err := client.Call("Server.CreateQuery", text, &reply)
  123. if err != nil {
  124. fmt.Println(err)
  125. continue
  126. }
  127. fmt.Println(reply)
  128. go func() {
  129. err := infra.SafeRun(func() error {
  130. for {
  131. <-ticker.C
  132. var result string
  133. e := client.Call("Server.GetQueryResult", "", &result)
  134. if e != nil {
  135. return e
  136. }
  137. if result != "" {
  138. fmt.Println(result)
  139. }
  140. }
  141. })
  142. if err != nil {
  143. fmt.Println(err)
  144. fmt.Print("kuiper > ")
  145. }
  146. }()
  147. }
  148. }
  149. return nil
  150. },
  151. },
  152. {
  153. Name: "create",
  154. Aliases: []string{"create"},
  155. Usage: "create stream $stream_name | create stream $stream_name -f $stream_def_file | create table $table_name | create table $table_name -f $table_def_file| create rule $rule_name $rule_json | create rule $rule_name -f $rule_def_file | create plugin $plugin_type $plugin_name $plugin_json | create plugin $plugin_type $plugin_name -f $plugin_def_file | create service $service_name $service_json | create schema $schema_type $schema_name $schema_json",
  156. Subcommands: []cli.Command{
  157. {
  158. Name: "stream",
  159. Usage: "create stream $stream_name [-f stream_def_file]",
  160. Flags: []cli.Flag{
  161. cli.StringFlag{
  162. Name: "file, f",
  163. Usage: "the location of stream definition file",
  164. FilePath: "/home/mystream.txt",
  165. },
  166. },
  167. Action: func(c *cli.Context) error {
  168. sfile := c.String("file")
  169. if sfile != "" {
  170. if stream, err := readDef(sfile, "stream"); err != nil {
  171. fmt.Printf("%s", err)
  172. return nil
  173. } else {
  174. args := strings.Join([]string{"CREATE STREAM ", string(stream)}, " ")
  175. streamProcess(client, args)
  176. return nil
  177. }
  178. } else {
  179. streamProcess(client, "")
  180. return nil
  181. }
  182. },
  183. },
  184. {
  185. Name: "table",
  186. Usage: "create table $table_name [-f table_def_file]",
  187. Flags: []cli.Flag{
  188. cli.StringFlag{
  189. Name: "file, f",
  190. Usage: "the location of table definition file",
  191. FilePath: "/home/mytable.txt",
  192. },
  193. },
  194. Action: func(c *cli.Context) error {
  195. sfile := c.String("file")
  196. if sfile != "" {
  197. if stream, err := readDef(sfile, "table"); err != nil {
  198. fmt.Printf("%s", err)
  199. return nil
  200. } else {
  201. args := strings.Join([]string{"CREATE TABLE ", string(stream)}, " ")
  202. streamProcess(client, args)
  203. return nil
  204. }
  205. } else {
  206. streamProcess(client, "")
  207. return nil
  208. }
  209. },
  210. },
  211. {
  212. Name: "rule",
  213. Usage: "create rule $rule_name [$rule_json | -f rule_def_file]",
  214. Flags: []cli.Flag{
  215. cli.StringFlag{
  216. Name: "file, f",
  217. Usage: "the location of rule definition file",
  218. FilePath: "/home/myrule.txt",
  219. },
  220. },
  221. Action: func(c *cli.Context) error {
  222. sfile := c.String("file")
  223. if sfile != "" {
  224. if rule, err := readDef(sfile, "rule"); err != nil {
  225. fmt.Printf("%s", err)
  226. return nil
  227. } else {
  228. if len(c.Args()) != 1 {
  229. fmt.Printf("Expect rule name.\n")
  230. return nil
  231. }
  232. rname := c.Args()[0]
  233. var reply string
  234. args := &model.RPCArgDesc{Name: rname, Json: string(rule)}
  235. err = client.Call("Server.CreateRule", args, &reply)
  236. if err != nil {
  237. fmt.Println(err)
  238. } else {
  239. fmt.Println(reply)
  240. }
  241. }
  242. return nil
  243. } else {
  244. if len(c.Args()) != 2 {
  245. fmt.Printf("Expect rule name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  246. return nil
  247. }
  248. rname := c.Args()[0]
  249. rjson := c.Args()[1]
  250. var reply string
  251. args := &model.RPCArgDesc{Name: rname, Json: rjson}
  252. err = client.Call("Server.CreateRule", args, &reply)
  253. if err != nil {
  254. fmt.Println(err)
  255. } else {
  256. fmt.Println(reply)
  257. }
  258. return nil
  259. }
  260. },
  261. },
  262. {
  263. Name: "plugin",
  264. Usage: "create plugin $plugin_type $plugin_name [$plugin_json | -f plugin_def_file]",
  265. Flags: []cli.Flag{
  266. cli.StringFlag{
  267. Name: "file, f",
  268. Usage: "the location of plugin definition file",
  269. FilePath: "/home/myplugin.txt",
  270. },
  271. },
  272. Action: func(c *cli.Context) error {
  273. if len(c.Args()) < 2 {
  274. fmt.Printf("Expect plugin type and name.\n")
  275. return nil
  276. }
  277. ptype, err := getPluginType(c.Args()[0])
  278. if err != nil {
  279. fmt.Printf("%s\n", err)
  280. return nil
  281. }
  282. pname := c.Args()[1]
  283. sfile := c.String("file")
  284. args := &model.PluginDesc{
  285. RPCArgDesc: model.RPCArgDesc{
  286. Name: pname,
  287. },
  288. Type: ptype,
  289. }
  290. if sfile != "" {
  291. if len(c.Args()) != 2 {
  292. fmt.Printf("Expect plugin type, name.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  293. return nil
  294. }
  295. if p, err := readDef(sfile, "plugin"); err != nil {
  296. fmt.Printf("%s", err)
  297. return nil
  298. } else {
  299. args.Json = string(p)
  300. }
  301. } else {
  302. if len(c.Args()) != 3 {
  303. fmt.Printf("Expect plugin type, name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  304. return nil
  305. }
  306. args.Json = c.Args()[2]
  307. }
  308. var reply string
  309. err = client.Call("Server.CreatePlugin", args, &reply)
  310. if err != nil {
  311. fmt.Println(err)
  312. } else {
  313. fmt.Println(reply)
  314. }
  315. return nil
  316. },
  317. },
  318. {
  319. Name: "service",
  320. Usage: "create service $service_name $service_json",
  321. Action: func(c *cli.Context) error {
  322. if len(c.Args()) < 2 {
  323. fmt.Printf("Expect service name and json.\n")
  324. return nil
  325. }
  326. var reply string
  327. err = client.Call("Server.CreateService", &model.RPCArgDesc{
  328. Name: c.Args()[0],
  329. Json: c.Args()[1],
  330. }, &reply)
  331. if err != nil {
  332. fmt.Println(err)
  333. } else {
  334. fmt.Println(reply)
  335. }
  336. return nil
  337. },
  338. },
  339. {
  340. Name: "schema",
  341. Usage: "create schema $schema_type $schema_name $schema_json",
  342. Action: func(c *cli.Context) error {
  343. if len(c.Args()) < 3 {
  344. fmt.Printf("Expect plugin type, name and json.\n")
  345. return nil
  346. }
  347. var reply string
  348. err = client.Call("Server.CreateSchema", &model.RPCTypedArgDesc{
  349. Type: c.Args()[0],
  350. Name: c.Args()[1],
  351. Json: c.Args()[2],
  352. }, &reply)
  353. if err != nil {
  354. fmt.Println(err)
  355. } else {
  356. fmt.Println(reply)
  357. }
  358. return nil
  359. },
  360. },
  361. },
  362. },
  363. {
  364. Name: "describe",
  365. Aliases: []string{"describe"},
  366. Usage: "describe stream $stream_name | describe table $table_name | describe rule $rule_name | describe plugin $plugin_type $plugin_name | describe udf $udf_name | describe service $service_name | describe service_func $service_func_name | describe schema $schema_type $schema_name",
  367. Subcommands: []cli.Command{
  368. {
  369. Name: "stream",
  370. Usage: "describe stream $stream_name",
  371. // Flags: nflag,
  372. Action: func(c *cli.Context) error {
  373. streamProcess(client, "")
  374. return nil
  375. },
  376. },
  377. {
  378. Name: "table",
  379. Usage: "describe table $table_name",
  380. // Flags: nflag,
  381. Action: func(c *cli.Context) error {
  382. streamProcess(client, "")
  383. return nil
  384. },
  385. },
  386. {
  387. Name: "rule",
  388. Usage: "describe rule $rule_name",
  389. Action: func(c *cli.Context) error {
  390. if len(c.Args()) != 1 {
  391. fmt.Printf("Expect rule name.\n")
  392. return nil
  393. }
  394. rname := c.Args()[0]
  395. var reply string
  396. err = client.Call("Server.DescRule", rname, &reply)
  397. if err != nil {
  398. fmt.Println(err)
  399. } else {
  400. fmt.Println(reply)
  401. }
  402. return nil
  403. },
  404. },
  405. {
  406. Name: "plugin",
  407. Usage: "describe plugin $plugin_type $plugin_name",
  408. // Flags: nflag,
  409. Action: func(c *cli.Context) error {
  410. ptype, err := getPluginType(c.Args()[0])
  411. if err != nil {
  412. fmt.Printf("%s\n", err)
  413. return nil
  414. }
  415. if len(c.Args()) != 2 {
  416. fmt.Printf("Expect plugin name.\n")
  417. return nil
  418. }
  419. pname := c.Args()[1]
  420. args := &model.PluginDesc{
  421. RPCArgDesc: model.RPCArgDesc{
  422. Name: pname,
  423. },
  424. Type: ptype,
  425. }
  426. var reply string
  427. err = client.Call("Server.DescPlugin", args, &reply)
  428. if err != nil {
  429. fmt.Println(err)
  430. } else {
  431. fmt.Println(reply)
  432. }
  433. return nil
  434. },
  435. },
  436. {
  437. Name: "udf",
  438. Usage: "describe udf $udf_name",
  439. // Flags: nflag,
  440. Action: func(c *cli.Context) error {
  441. if len(c.Args()) != 1 {
  442. fmt.Printf("Expect udf name.\n")
  443. return nil
  444. }
  445. pname := c.Args()[0]
  446. var reply string
  447. err = client.Call("Server.DescUdf", pname, &reply)
  448. if err != nil {
  449. fmt.Println(err)
  450. } else {
  451. fmt.Println(reply)
  452. }
  453. return nil
  454. },
  455. },
  456. {
  457. Name: "service",
  458. Usage: "describe service $service_name",
  459. Action: func(c *cli.Context) error {
  460. if len(c.Args()) != 1 {
  461. fmt.Printf("Expect service name.\n")
  462. return nil
  463. }
  464. name := c.Args()[0]
  465. var reply string
  466. err = client.Call("Server.DescService", name, &reply)
  467. if err != nil {
  468. fmt.Println(err)
  469. } else {
  470. fmt.Println(reply)
  471. }
  472. return nil
  473. },
  474. },
  475. {
  476. Name: "service_func",
  477. Usage: "describe service_func $service_func_name",
  478. Action: func(c *cli.Context) error {
  479. if len(c.Args()) != 1 {
  480. fmt.Printf("Expect service func name.\n")
  481. return nil
  482. }
  483. name := c.Args()[0]
  484. var reply string
  485. err = client.Call("Server.DescServiceFunc", name, &reply)
  486. if err != nil {
  487. fmt.Println(err)
  488. } else {
  489. fmt.Println(reply)
  490. }
  491. return nil
  492. },
  493. },
  494. {
  495. Name: "schema",
  496. Usage: "describe schema $schema_type $schema_name",
  497. Action: func(c *cli.Context) error {
  498. if len(c.Args()) != 2 {
  499. fmt.Printf("Expect schema type and name.\n")
  500. return nil
  501. }
  502. args := &model.RPCTypedArgDesc{
  503. Type: c.Args()[0],
  504. Name: c.Args()[1],
  505. }
  506. var reply string
  507. err = client.Call("Server.DescSchema", args, &reply)
  508. if err != nil {
  509. fmt.Println(err)
  510. } else {
  511. fmt.Println(reply)
  512. }
  513. return nil
  514. },
  515. },
  516. },
  517. },
  518. {
  519. Name: "drop",
  520. Aliases: []string{"drop"},
  521. Usage: "drop stream $stream_name | drop table $table_name |drop rule $rule_name | drop plugin $plugin_type $plugin_name -s $stop | drop service $service_name | drop schema $schema_type $schema_name",
  522. Subcommands: []cli.Command{
  523. {
  524. Name: "stream",
  525. Usage: "drop stream $stream_name",
  526. // Flags: nflag,
  527. Action: func(c *cli.Context) error {
  528. streamProcess(client, "")
  529. return nil
  530. },
  531. },
  532. {
  533. Name: "table",
  534. Usage: "drop table $table_name",
  535. // Flags: nflag,
  536. Action: func(c *cli.Context) error {
  537. streamProcess(client, "")
  538. return nil
  539. },
  540. },
  541. {
  542. Name: "rule",
  543. Usage: "drop rule $rule_name",
  544. // Flags: nflag,
  545. Action: func(c *cli.Context) error {
  546. if len(c.Args()) != 1 {
  547. fmt.Printf("Expect rule name.\n")
  548. return nil
  549. }
  550. rname := c.Args()[0]
  551. var reply string
  552. err = client.Call("Server.DropRule", rname, &reply)
  553. if err != nil {
  554. fmt.Println(err)
  555. } else {
  556. fmt.Println(reply)
  557. }
  558. return nil
  559. },
  560. },
  561. {
  562. Name: "plugin",
  563. Usage: "drop plugin $plugin_type $plugin_name -s stop",
  564. Flags: []cli.Flag{
  565. cli.StringFlag{
  566. Name: "stop, s",
  567. Usage: "stop kuiper after the action",
  568. },
  569. },
  570. Action: func(c *cli.Context) error {
  571. r := c.String("stop")
  572. if r != "true" && r != "false" {
  573. fmt.Printf("Expect s flag to be a boolean value.\n")
  574. return nil
  575. }
  576. if len(c.Args()) < 2 || len(c.Args()) > 3 {
  577. fmt.Printf("Expect plugin type and name.\n")
  578. return nil
  579. }
  580. ptype, err := getPluginType(c.Args()[0])
  581. if err != nil {
  582. fmt.Printf("%s\n", err)
  583. return nil
  584. }
  585. pname := c.Args()[1]
  586. args := &model.PluginDesc{
  587. RPCArgDesc: model.RPCArgDesc{
  588. Name: pname,
  589. },
  590. Type: ptype,
  591. Stop: r == "true",
  592. }
  593. var reply string
  594. err = client.Call("Server.DropPlugin", args, &reply)
  595. if err != nil {
  596. fmt.Println(err)
  597. } else {
  598. fmt.Println(reply)
  599. }
  600. return nil
  601. },
  602. },
  603. {
  604. Name: "service",
  605. Usage: "drop service $service_name",
  606. Action: func(c *cli.Context) error {
  607. if len(c.Args()) != 1 {
  608. fmt.Printf("Expect service name.\n")
  609. return nil
  610. }
  611. name := c.Args()[0]
  612. var reply string
  613. err = client.Call("Server.DropService", name, &reply)
  614. if err != nil {
  615. fmt.Println(err)
  616. } else {
  617. fmt.Println(reply)
  618. }
  619. return nil
  620. },
  621. },
  622. {
  623. Name: "schema",
  624. Usage: "drop schema $schema_type $schema_name",
  625. Action: func(c *cli.Context) error {
  626. if len(c.Args()) != 2 {
  627. fmt.Printf("Expect schema type and name.\n")
  628. return nil
  629. }
  630. args := &model.RPCTypedArgDesc{
  631. Type: c.Args()[0],
  632. Name: c.Args()[1],
  633. }
  634. var reply string
  635. err = client.Call("Server.DropSchema", args, &reply)
  636. if err != nil {
  637. fmt.Println(err)
  638. } else {
  639. fmt.Println(reply)
  640. }
  641. return nil
  642. },
  643. },
  644. },
  645. },
  646. {
  647. Name: "show",
  648. Aliases: []string{"show"},
  649. Usage: "show streams | show tables | show rules | show plugins $plugin_type | show services | show service_funcs | show schemas $schema_type",
  650. Subcommands: []cli.Command{
  651. {
  652. Name: "streams",
  653. Usage: "show streams",
  654. Action: func(c *cli.Context) error {
  655. streamProcess(client, "")
  656. return nil
  657. },
  658. },
  659. {
  660. Name: "tables",
  661. Usage: "show tables",
  662. Action: func(c *cli.Context) error {
  663. streamProcess(client, "")
  664. return nil
  665. },
  666. },
  667. {
  668. Name: "rules",
  669. Usage: "show rules",
  670. Action: func(c *cli.Context) error {
  671. var reply string
  672. err = client.Call("Server.ShowRules", 0, &reply)
  673. if err != nil {
  674. fmt.Println(err)
  675. } else {
  676. fmt.Println(reply)
  677. }
  678. return nil
  679. },
  680. },
  681. {
  682. Name: "plugins",
  683. Usage: "show plugins $plugin_type",
  684. Action: func(c *cli.Context) error {
  685. if len(c.Args()) != 1 {
  686. fmt.Printf("Expect plugin type.\n")
  687. return nil
  688. }
  689. ptype, err := getPluginType(c.Args()[0])
  690. if err != nil {
  691. fmt.Printf("%s\n", err)
  692. return nil
  693. }
  694. var reply string
  695. err = client.Call("Server.ShowPlugins", ptype, &reply)
  696. if err != nil {
  697. fmt.Println(err)
  698. } else {
  699. fmt.Println(reply)
  700. }
  701. return nil
  702. },
  703. },
  704. {
  705. Name: "udfs",
  706. Usage: "show udfs",
  707. Action: func(c *cli.Context) error {
  708. var reply string
  709. err = client.Call("Server.ShowUdfs", 0, &reply)
  710. if err != nil {
  711. fmt.Println(err)
  712. } else {
  713. fmt.Println(reply)
  714. }
  715. return nil
  716. },
  717. },
  718. {
  719. Name: "services",
  720. Usage: "show services",
  721. Action: func(c *cli.Context) error {
  722. var reply string
  723. err = client.Call("Server.ShowServices", 0, &reply)
  724. if err != nil {
  725. fmt.Println(err)
  726. } else {
  727. fmt.Println(reply)
  728. }
  729. return nil
  730. },
  731. },
  732. {
  733. Name: "service_funcs",
  734. Usage: "show service_funcs",
  735. Action: func(c *cli.Context) error {
  736. var reply string
  737. err = client.Call("Server.ShowServiceFuncs", 0, &reply)
  738. if err != nil {
  739. fmt.Println(err)
  740. } else {
  741. fmt.Println(reply)
  742. }
  743. return nil
  744. },
  745. },
  746. {
  747. Name: "schemas",
  748. Usage: "show schemas $schema_type",
  749. Action: func(c *cli.Context) error {
  750. if len(c.Args()) != 1 {
  751. fmt.Printf("Expect schema type.\n")
  752. return nil
  753. }
  754. var reply string
  755. err = client.Call("Server.ShowSchemas", c.Args()[0], &reply)
  756. if err != nil {
  757. fmt.Println(err)
  758. } else {
  759. fmt.Println(reply)
  760. }
  761. return nil
  762. },
  763. },
  764. },
  765. },
  766. {
  767. Name: "getstatus",
  768. Aliases: []string{"getstatus"},
  769. Usage: "getstatus rule $rule_name | import",
  770. Subcommands: []cli.Command{
  771. {
  772. Name: "rule",
  773. Usage: "getstatus rule $rule_name",
  774. // Flags: nflag,
  775. Action: func(c *cli.Context) error {
  776. if len(c.Args()) != 1 {
  777. fmt.Printf("Expect rule name.\n")
  778. return nil
  779. }
  780. rname := c.Args()[0]
  781. var reply string
  782. err = client.Call("Server.GetStatusRule", rname, &reply)
  783. if err != nil {
  784. fmt.Println(err)
  785. } else {
  786. fmt.Println(reply)
  787. }
  788. return nil
  789. },
  790. },
  791. {
  792. Name: "import",
  793. Usage: "getstatus import",
  794. // Flags: nflag,
  795. Action: func(c *cli.Context) error {
  796. var reply string
  797. err = client.Call("Server.GetStatusImport", 0, &reply)
  798. if err != nil {
  799. fmt.Println(err)
  800. } else {
  801. fmt.Println(reply)
  802. }
  803. return nil
  804. },
  805. },
  806. },
  807. },
  808. {
  809. Name: "gettopo",
  810. Aliases: []string{"gettopo"},
  811. Usage: "gettopo rule $rule_name",
  812. Subcommands: []cli.Command{
  813. {
  814. Name: "rule",
  815. Usage: "getstopo rule $rule_name",
  816. // Flags: nflag,
  817. Action: func(c *cli.Context) error {
  818. if len(c.Args()) != 1 {
  819. fmt.Printf("Expect rule name.\n")
  820. return nil
  821. }
  822. rname := c.Args()[0]
  823. var reply string
  824. err = client.Call("Server.GetTopoRule", rname, &reply)
  825. if err != nil {
  826. fmt.Println(err)
  827. } else {
  828. fmt.Println(reply)
  829. }
  830. return nil
  831. },
  832. },
  833. },
  834. },
  835. {
  836. Name: "start",
  837. Aliases: []string{"start"},
  838. Usage: "start rule $rule_name",
  839. Subcommands: []cli.Command{
  840. {
  841. Name: "rule",
  842. Usage: "start rule $rule_name",
  843. // Flags: nflag,
  844. Action: func(c *cli.Context) error {
  845. if len(c.Args()) != 1 {
  846. fmt.Printf("Expect rule name.\n")
  847. return nil
  848. }
  849. rname := c.Args()[0]
  850. var reply string
  851. err = client.Call("Server.StartRule", rname, &reply)
  852. if err != nil {
  853. fmt.Println(err)
  854. } else {
  855. fmt.Println(reply)
  856. }
  857. return nil
  858. },
  859. },
  860. },
  861. },
  862. {
  863. Name: "stop",
  864. Aliases: []string{"stop"},
  865. Usage: "stop rule $rule_name",
  866. Subcommands: []cli.Command{
  867. {
  868. Name: "rule",
  869. Usage: "stop rule $rule_name",
  870. // Flags: nflag,
  871. Action: func(c *cli.Context) error {
  872. if len(c.Args()) != 1 {
  873. fmt.Printf("Expect rule name.\n")
  874. return nil
  875. }
  876. rname := c.Args()[0]
  877. var reply string
  878. err = client.Call("Server.StopRule", rname, &reply)
  879. if err != nil {
  880. fmt.Println(err)
  881. } else {
  882. fmt.Println(reply)
  883. }
  884. return nil
  885. },
  886. },
  887. },
  888. },
  889. {
  890. Name: "restart",
  891. Aliases: []string{"restart"},
  892. Usage: "restart rule $rule_name",
  893. Subcommands: []cli.Command{
  894. {
  895. Name: "rule",
  896. Usage: "restart rule $rule_name",
  897. // Flags: nflag,
  898. Action: func(c *cli.Context) error {
  899. if len(c.Args()) != 1 {
  900. fmt.Printf("Expect rule name.\n")
  901. return nil
  902. }
  903. rname := c.Args()[0]
  904. var reply string
  905. err = client.Call("Server.RestartRule", rname, &reply)
  906. if err != nil {
  907. fmt.Println(err)
  908. } else {
  909. fmt.Println(reply)
  910. }
  911. return nil
  912. },
  913. },
  914. },
  915. },
  916. {
  917. Name: "validate",
  918. Aliases: []string{"validate"},
  919. Usage: "validate rule $rule_name [$rule_json | -f $rule_def_file]",
  920. Subcommands: []cli.Command{
  921. {
  922. Name: "rule",
  923. Usage: "validate rule $rule_name [$rule_json | -f $rule_def_file]",
  924. Flags: []cli.Flag{
  925. cli.StringFlag{
  926. Name: "file, f",
  927. Usage: "the location of rule definition file",
  928. FilePath: "/home/myrule.txt",
  929. },
  930. },
  931. Action: func(c *cli.Context) error {
  932. sfile := c.String("file")
  933. if sfile != "" {
  934. if rule, err := readDef(sfile, "rule"); err != nil {
  935. fmt.Printf("%s", err)
  936. return nil
  937. } else {
  938. if len(c.Args()) != 1 {
  939. fmt.Printf("Expect rule name.\n")
  940. return nil
  941. }
  942. rname := c.Args()[0]
  943. var reply string
  944. args := &model.RPCArgDesc{Name: rname, Json: string(rule)}
  945. err = client.Call("Server.ValidateRule", args, &reply)
  946. if err != nil {
  947. fmt.Println(err)
  948. } else {
  949. fmt.Println(reply)
  950. }
  951. }
  952. return nil
  953. } else {
  954. if len(c.Args()) != 2 {
  955. fmt.Printf("Expect rule name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  956. return nil
  957. }
  958. rname := c.Args()[0]
  959. rjson := c.Args()[1]
  960. var reply string
  961. args := &model.RPCArgDesc{Name: rname, Json: rjson}
  962. err = client.Call("Server.ValidateRule", args, &reply)
  963. if err != nil {
  964. fmt.Println(err)
  965. } else {
  966. fmt.Println(reply)
  967. }
  968. return nil
  969. }
  970. },
  971. },
  972. },
  973. },
  974. {
  975. Name: "register",
  976. Aliases: []string{"register"},
  977. Usage: "register plugin function $plugin_name [$plugin_json | -f plugin_def_file]",
  978. Subcommands: []cli.Command{
  979. {
  980. Name: "plugin",
  981. Usage: "register plugin $plugin_type $plugin_name [$plugin_json | -f plugin_def_file]",
  982. Flags: []cli.Flag{
  983. cli.StringFlag{
  984. Name: "file, f",
  985. Usage: "the location of plugin functions definition file",
  986. FilePath: "/home/myplugin.txt",
  987. },
  988. },
  989. Action: func(c *cli.Context) error {
  990. if len(c.Args()) < 2 {
  991. fmt.Printf("Expect plugin type and name.\n")
  992. return nil
  993. }
  994. ptype := c.Args()[0]
  995. if !strings.EqualFold(ptype, "function") {
  996. fmt.Printf("Plugin type must be function.\n")
  997. return nil
  998. }
  999. pname := c.Args()[1]
  1000. sfile := c.String("file")
  1001. args := &model.PluginDesc{
  1002. RPCArgDesc: model.RPCArgDesc{
  1003. Name: pname,
  1004. },
  1005. }
  1006. if sfile != "" {
  1007. if len(c.Args()) != 2 {
  1008. fmt.Printf("Expect plugin type, name.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  1009. return nil
  1010. }
  1011. if p, err := readDef(sfile, "plugin"); err != nil {
  1012. fmt.Printf("%s", err)
  1013. return nil
  1014. } else {
  1015. args.Json = string(p)
  1016. }
  1017. } else {
  1018. if len(c.Args()) != 3 {
  1019. fmt.Printf("Expect plugin type, name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  1020. return nil
  1021. }
  1022. args.Json = c.Args()[2]
  1023. }
  1024. var reply string
  1025. err = client.Call("Server.RegisterPlugin", args, &reply)
  1026. if err != nil {
  1027. fmt.Println(err)
  1028. } else {
  1029. fmt.Println(reply)
  1030. }
  1031. return nil
  1032. },
  1033. },
  1034. },
  1035. },
  1036. {
  1037. Name: "import",
  1038. Aliases: []string{"import"},
  1039. Usage: "import ruleset | data -f file -p partial -s stop",
  1040. Subcommands: []cli.Command{
  1041. {
  1042. Name: "ruleset",
  1043. Usage: "\"import ruleset -f ruleset_file",
  1044. Flags: []cli.Flag{
  1045. cli.StringFlag{
  1046. Name: "file, f",
  1047. Usage: "the location of the ruleset json file",
  1048. FilePath: "/home/ekuiper_ruleset.json",
  1049. },
  1050. },
  1051. Action: func(c *cli.Context) error {
  1052. sfile := c.String("file")
  1053. if sfile == "" {
  1054. fmt.Print("Required ruleset json file to import")
  1055. return nil
  1056. }
  1057. var reply string
  1058. err = client.Call("Server.Import", sfile, &reply)
  1059. if err != nil {
  1060. fmt.Println(err)
  1061. } else {
  1062. fmt.Println(reply)
  1063. }
  1064. return nil
  1065. },
  1066. },
  1067. {
  1068. Name: "data",
  1069. Usage: "\"import data -f configuration_file -p partial -s stop",
  1070. Flags: []cli.Flag{
  1071. cli.StringFlag{
  1072. Name: "file, f",
  1073. Usage: "the location of the configuration json file",
  1074. FilePath: "/home/ekuiper_configuration.json",
  1075. },
  1076. cli.StringFlag{
  1077. Name: "stop, s",
  1078. Usage: "stop kuiper after the action",
  1079. },
  1080. cli.StringFlag{
  1081. Name: "partial, p",
  1082. Usage: "import partial configuration",
  1083. },
  1084. },
  1085. Action: func(c *cli.Context) error {
  1086. sfile := c.String("file")
  1087. if sfile == "" {
  1088. fmt.Print("Required configuration json file to import")
  1089. return nil
  1090. }
  1091. r := c.String("stop")
  1092. if r != "true" && r != "false" {
  1093. fmt.Printf("Expect s flag to be a boolean value.\n")
  1094. return nil
  1095. }
  1096. p := c.String("partial")
  1097. if p != "true" && p != "false" {
  1098. fmt.Printf("Expect p flag to be a boolean value.\n")
  1099. return nil
  1100. }
  1101. args := &model.ImportDataDesc{
  1102. FileName: sfile,
  1103. Stop: r == "true",
  1104. Partial: p == "true",
  1105. }
  1106. var reply string
  1107. err = client.Call("Server.ImportConfiguration", args, &reply)
  1108. if err != nil {
  1109. fmt.Println(err)
  1110. } else {
  1111. fmt.Println(reply)
  1112. }
  1113. return nil
  1114. },
  1115. },
  1116. },
  1117. },
  1118. {
  1119. Name: "export",
  1120. Aliases: []string{"export"},
  1121. Usage: "export ruleset | data $ruleset_file [ -r rules ]",
  1122. Subcommands: []cli.Command{
  1123. {
  1124. Name: "ruleset",
  1125. Usage: "\"export ruleset $ruleset_file",
  1126. Action: func(c *cli.Context) error {
  1127. if len(c.Args()) < 1 {
  1128. fmt.Printf("Require exported file name.\n")
  1129. return nil
  1130. }
  1131. var reply string
  1132. err = client.Call("Server.Export", c.Args()[0], &reply)
  1133. if err != nil {
  1134. fmt.Println(err)
  1135. } else {
  1136. fmt.Println(reply)
  1137. }
  1138. return nil
  1139. },
  1140. },
  1141. {
  1142. Name: "data",
  1143. Usage: "export data $configuration_file [ -r rules ]",
  1144. Flags: []cli.Flag{
  1145. cli.StringFlag{
  1146. Name: "rules, r",
  1147. Usage: "the rules id in json array format",
  1148. },
  1149. },
  1150. Action: func(c *cli.Context) error {
  1151. args := model.ExportDataDesc{
  1152. Rules: []string{},
  1153. FileName: "",
  1154. }
  1155. rulesArray := c.String("rules")
  1156. if rulesArray != "" {
  1157. var rules []string
  1158. err := json.Unmarshal(cast.StringToBytes(rulesArray), &rules)
  1159. if err != nil {
  1160. fmt.Printf("rules %s unmarshal error %s", rules, err)
  1161. return nil
  1162. }
  1163. args.Rules = rules
  1164. if len(c.Args()) != 1 {
  1165. fmt.Printf("Expect configuration file.\n")
  1166. return nil
  1167. }
  1168. args.FileName = c.Args()[0]
  1169. var reply string
  1170. err = client.Call("Server.ExportConfiguration", args, &reply)
  1171. if err != nil {
  1172. fmt.Println(err)
  1173. } else {
  1174. fmt.Println(reply)
  1175. }
  1176. } else {
  1177. if len(c.Args()) != 1 {
  1178. fmt.Printf("Expect configuration file.\n")
  1179. return nil
  1180. }
  1181. args.FileName = c.Args()[0]
  1182. var reply string
  1183. err = client.Call("Server.ExportConfiguration", args, &reply)
  1184. if err != nil {
  1185. fmt.Println(err)
  1186. } else {
  1187. fmt.Println(reply)
  1188. }
  1189. }
  1190. return nil
  1191. },
  1192. },
  1193. },
  1194. },
  1195. }
  1196. app.Name = "Kuiper"
  1197. app.Usage = "The command line tool for EMQ X Kuiper."
  1198. app.Action = func(c *cli.Context) error {
  1199. cli.ShowSubcommandHelp(c)
  1200. // cli.ShowVersion(c)
  1201. return nil
  1202. }
  1203. sort.Sort(cli.FlagsByName(app.Flags))
  1204. sort.Sort(cli.CommandsByName(app.Commands))
  1205. err = app.Run(os.Args)
  1206. if err != nil {
  1207. fmt.Printf("%v", err)
  1208. }
  1209. }
  1210. func getPluginType(arg string) (ptype int, err error) {
  1211. switch arg {
  1212. case "source":
  1213. ptype = 0
  1214. case "sink":
  1215. ptype = 1
  1216. case "function":
  1217. ptype = 2
  1218. case "portable":
  1219. ptype = 3
  1220. case "wasm":
  1221. ptype = 4
  1222. default:
  1223. err = fmt.Errorf("Invalid plugin type %s, should be \"source\", \"sink\", \"function\" or \"portable\" or \"wasm\".\n", arg)
  1224. }
  1225. return
  1226. }
  1227. func readDef(sfile string, t string) ([]byte, error) {
  1228. if _, err := os.Stat(sfile); os.IsNotExist(err) {
  1229. return nil, fmt.Errorf("The specified %s defenition file %s is not existed.\n", t, sfile)
  1230. }
  1231. fmt.Printf("Creating a new %s from file %s.\n", t, sfile)
  1232. if rule, err := os.ReadFile(sfile); err != nil {
  1233. return nil, fmt.Errorf("Failed to read from %s definition file %s.\n", t, sfile)
  1234. } else {
  1235. return rule, nil
  1236. }
  1237. }