main.go 29 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "bufio"
  17. "encoding/json"
  18. "fmt"
  19. "net/rpc"
  20. "os"
  21. "sort"
  22. "strings"
  23. "time"
  24. "github.com/lf-edge/ekuiper/internal/conf"
  25. "github.com/lf-edge/ekuiper/internal/pkg/model"
  26. "github.com/lf-edge/ekuiper/pkg/infra"
  27. "github.com/urfave/cli"
  28. )
  29. type clientConf struct {
  30. Host string `yaml:"host"`
  31. Port int `yaml:"port"`
  32. }
  33. const ClientYaml = "client.yaml"
  34. func streamProcess(client *rpc.Client, args string) {
  35. var reply string
  36. if args == "" {
  37. args = strings.Join(os.Args[1:], " ")
  38. }
  39. err := client.Call("Server.Stream", args, &reply)
  40. if err != nil {
  41. fmt.Println(err)
  42. } else {
  43. fmt.Println(reply)
  44. }
  45. }
  46. var (
  47. Version = "unknown"
  48. LoadFileType = "relative"
  49. )
  50. func main() {
  51. conf.LoadFileType = LoadFileType
  52. app := cli.NewApp()
  53. app.Version = Version
  54. //nflag := []cli.Flag { cli.StringFlag{
  55. // Name: "name, n",
  56. // Usage: "the name of stream",
  57. // }}
  58. var cfg map[string]clientConf
  59. err := conf.LoadConfigByName(ClientYaml, &cfg)
  60. if err != nil {
  61. conf.Log.Fatal(err)
  62. fmt.Printf("Failed to load config file with error %s.\n", err)
  63. }
  64. var config *clientConf
  65. c, ok := cfg["basic"]
  66. if !ok {
  67. fmt.Printf("No basic config in client.yaml, will use the default configuration.\n")
  68. } else {
  69. config = &c
  70. }
  71. if config == nil {
  72. config = &clientConf{
  73. Host: "127.0.0.1",
  74. Port: 20498,
  75. }
  76. }
  77. fmt.Printf("Connecting to %s:%d... \n", config.Host, config.Port)
  78. // Create a TCP connection to localhost on port 1234
  79. client, err := rpc.DialHTTP("tcp", fmt.Sprintf("%s:%d", config.Host, config.Port))
  80. if err != nil {
  81. fmt.Printf("Failed to connect the server, please start the server.\n")
  82. return
  83. }
  84. app.Commands = []cli.Command{
  85. {
  86. Name: "query",
  87. Aliases: []string{"query"},
  88. Usage: "query command line",
  89. Action: func(c *cli.Context) error {
  90. reader := bufio.NewReader(os.Stdin)
  91. var inputs []string
  92. ticker := time.NewTicker(time.Millisecond * 300)
  93. defer ticker.Stop()
  94. for {
  95. fmt.Print("kuiper > ")
  96. text, _ := reader.ReadString('\n')
  97. inputs = append(inputs, text)
  98. // convert CRLF to LF
  99. text = strings.Replace(text, "\n", "", -1)
  100. if strings.EqualFold(text, "quit") || strings.EqualFold(text, "exit") {
  101. break
  102. } else if strings.Trim(text, " ") == "" {
  103. continue
  104. } else {
  105. var reply string
  106. err := client.Call("Server.CreateQuery", text, &reply)
  107. if err != nil {
  108. fmt.Println(err)
  109. continue
  110. } else {
  111. fmt.Println(reply)
  112. go func() {
  113. err := infra.SafeRun(func() error {
  114. for {
  115. <-ticker.C
  116. var result string
  117. e := client.Call("Server.GetQueryResult", "", &result)
  118. if e != nil {
  119. return e
  120. }
  121. if result != "" {
  122. fmt.Println(result)
  123. }
  124. }
  125. })
  126. if err != nil {
  127. fmt.Println(err)
  128. fmt.Print("kuiper > ")
  129. }
  130. }()
  131. }
  132. }
  133. }
  134. return nil
  135. },
  136. },
  137. {
  138. Name: "create",
  139. Aliases: []string{"create"},
  140. Usage: "create stream $stream_name | create stream $stream_name -f $stream_def_file | create table $table_name | create table $table_name -f $table_def_file| create rule $rule_name $rule_json | create rule $rule_name -f $rule_def_file | create plugin $plugin_type $plugin_name $plugin_json | create plugin $plugin_type $plugin_name -f $plugin_def_file | create service $service_name $service_json | create schema $schema_type $schema_name $schema_json",
  141. Subcommands: []cli.Command{
  142. {
  143. Name: "stream",
  144. Usage: "create stream $stream_name [-f stream_def_file]",
  145. Flags: []cli.Flag{
  146. cli.StringFlag{
  147. Name: "file, f",
  148. Usage: "the location of stream definition file",
  149. FilePath: "/home/mystream.txt",
  150. },
  151. },
  152. Action: func(c *cli.Context) error {
  153. sfile := c.String("file")
  154. if sfile != "" {
  155. if stream, err := readDef(sfile, "stream"); err != nil {
  156. fmt.Printf("%s", err)
  157. return nil
  158. } else {
  159. args := strings.Join([]string{"CREATE STREAM ", string(stream)}, " ")
  160. streamProcess(client, args)
  161. return nil
  162. }
  163. } else {
  164. streamProcess(client, "")
  165. return nil
  166. }
  167. },
  168. },
  169. {
  170. Name: "table",
  171. Usage: "create table $table_name [-f table_def_file]",
  172. Flags: []cli.Flag{
  173. cli.StringFlag{
  174. Name: "file, f",
  175. Usage: "the location of table definition file",
  176. FilePath: "/home/mytable.txt",
  177. },
  178. },
  179. Action: func(c *cli.Context) error {
  180. sfile := c.String("file")
  181. if sfile != "" {
  182. if stream, err := readDef(sfile, "table"); err != nil {
  183. fmt.Printf("%s", err)
  184. return nil
  185. } else {
  186. args := strings.Join([]string{"CREATE TABLE ", string(stream)}, " ")
  187. streamProcess(client, args)
  188. return nil
  189. }
  190. } else {
  191. streamProcess(client, "")
  192. return nil
  193. }
  194. },
  195. },
  196. {
  197. Name: "rule",
  198. Usage: "create rule $rule_name [$rule_json | -f rule_def_file]",
  199. Flags: []cli.Flag{
  200. cli.StringFlag{
  201. Name: "file, f",
  202. Usage: "the location of rule definition file",
  203. FilePath: "/home/myrule.txt",
  204. },
  205. },
  206. Action: func(c *cli.Context) error {
  207. sfile := c.String("file")
  208. if sfile != "" {
  209. if rule, err := readDef(sfile, "rule"); err != nil {
  210. fmt.Printf("%s", err)
  211. return nil
  212. } else {
  213. if len(c.Args()) != 1 {
  214. fmt.Printf("Expect rule name.\n")
  215. return nil
  216. }
  217. rname := c.Args()[0]
  218. var reply string
  219. args := &model.RPCArgDesc{Name: rname, Json: string(rule)}
  220. err = client.Call("Server.CreateRule", args, &reply)
  221. if err != nil {
  222. fmt.Println(err)
  223. } else {
  224. fmt.Println(reply)
  225. }
  226. }
  227. return nil
  228. } else {
  229. if len(c.Args()) != 2 {
  230. fmt.Printf("Expect rule name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  231. return nil
  232. }
  233. rname := c.Args()[0]
  234. rjson := c.Args()[1]
  235. var reply string
  236. args := &model.RPCArgDesc{Name: rname, Json: rjson}
  237. err = client.Call("Server.CreateRule", args, &reply)
  238. if err != nil {
  239. fmt.Println(err)
  240. } else {
  241. fmt.Println(reply)
  242. }
  243. return nil
  244. }
  245. },
  246. },
  247. {
  248. Name: "plugin",
  249. Usage: "create plugin $plugin_type $plugin_name [$plugin_json | -f plugin_def_file]",
  250. Flags: []cli.Flag{
  251. cli.StringFlag{
  252. Name: "file, f",
  253. Usage: "the location of plugin definition file",
  254. FilePath: "/home/myplugin.txt",
  255. },
  256. },
  257. Action: func(c *cli.Context) error {
  258. if len(c.Args()) < 2 {
  259. fmt.Printf("Expect plugin type and name.\n")
  260. return nil
  261. }
  262. ptype, err := getPluginType(c.Args()[0])
  263. if err != nil {
  264. fmt.Printf("%s\n", err)
  265. return nil
  266. }
  267. pname := c.Args()[1]
  268. sfile := c.String("file")
  269. args := &model.PluginDesc{
  270. RPCArgDesc: model.RPCArgDesc{
  271. Name: pname,
  272. },
  273. Type: ptype,
  274. }
  275. if sfile != "" {
  276. if len(c.Args()) != 2 {
  277. fmt.Printf("Expect plugin type, name.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  278. return nil
  279. }
  280. if p, err := readDef(sfile, "plugin"); err != nil {
  281. fmt.Printf("%s", err)
  282. return nil
  283. } else {
  284. args.Json = string(p)
  285. }
  286. } else {
  287. if len(c.Args()) != 3 {
  288. fmt.Printf("Expect plugin type, name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  289. return nil
  290. }
  291. args.Json = c.Args()[2]
  292. }
  293. var reply string
  294. err = client.Call("Server.CreatePlugin", args, &reply)
  295. if err != nil {
  296. fmt.Println(err)
  297. } else {
  298. fmt.Println(reply)
  299. }
  300. return nil
  301. },
  302. },
  303. {
  304. Name: "service",
  305. Usage: "create service $service_name $service_json",
  306. Action: func(c *cli.Context) error {
  307. if len(c.Args()) < 2 {
  308. fmt.Printf("Expect service name and json.\n")
  309. return nil
  310. }
  311. var reply string
  312. err = client.Call("Server.CreateService", &model.RPCArgDesc{
  313. Name: c.Args()[0],
  314. Json: c.Args()[1],
  315. }, &reply)
  316. if err != nil {
  317. fmt.Println(err)
  318. } else {
  319. fmt.Println(reply)
  320. }
  321. return nil
  322. },
  323. },
  324. {
  325. Name: "schema",
  326. Usage: "create schema $schema_type $schema_name $schema_json",
  327. Action: func(c *cli.Context) error {
  328. if len(c.Args()) < 3 {
  329. fmt.Printf("Expect plugin type, name and json.\n")
  330. return nil
  331. }
  332. var reply string
  333. err = client.Call("Server.CreateSchema", &model.RPCTypedArgDesc{
  334. Type: c.Args()[0],
  335. Name: c.Args()[1],
  336. Json: c.Args()[2],
  337. }, &reply)
  338. if err != nil {
  339. fmt.Println(err)
  340. } else {
  341. fmt.Println(reply)
  342. }
  343. return nil
  344. },
  345. },
  346. },
  347. },
  348. {
  349. Name: "describe",
  350. Aliases: []string{"describe"},
  351. Usage: "describe stream $stream_name | describe table $table_name | describe rule $rule_name | describe plugin $plugin_type $plugin_name | describe udf $udf_name | describe service $service_name | describe service_func $service_func_name | describe schema $schema_type $schema_name",
  352. Subcommands: []cli.Command{
  353. {
  354. Name: "stream",
  355. Usage: "describe stream $stream_name",
  356. //Flags: nflag,
  357. Action: func(c *cli.Context) error {
  358. streamProcess(client, "")
  359. return nil
  360. },
  361. },
  362. {
  363. Name: "table",
  364. Usage: "describe table $table_name",
  365. //Flags: nflag,
  366. Action: func(c *cli.Context) error {
  367. streamProcess(client, "")
  368. return nil
  369. },
  370. },
  371. {
  372. Name: "rule",
  373. Usage: "describe rule $rule_name",
  374. Action: func(c *cli.Context) error {
  375. if len(c.Args()) != 1 {
  376. fmt.Printf("Expect rule name.\n")
  377. return nil
  378. }
  379. rname := c.Args()[0]
  380. var reply string
  381. err = client.Call("Server.DescRule", rname, &reply)
  382. if err != nil {
  383. fmt.Println(err)
  384. } else {
  385. fmt.Println(reply)
  386. }
  387. return nil
  388. },
  389. },
  390. {
  391. Name: "plugin",
  392. Usage: "describe plugin $plugin_type $plugin_name",
  393. //Flags: nflag,
  394. Action: func(c *cli.Context) error {
  395. ptype, err := getPluginType(c.Args()[0])
  396. if err != nil {
  397. fmt.Printf("%s\n", err)
  398. return nil
  399. }
  400. if len(c.Args()) != 2 {
  401. fmt.Printf("Expect plugin name.\n")
  402. return nil
  403. }
  404. pname := c.Args()[1]
  405. args := &model.PluginDesc{
  406. RPCArgDesc: model.RPCArgDesc{
  407. Name: pname,
  408. },
  409. Type: ptype,
  410. }
  411. var reply string
  412. err = client.Call("Server.DescPlugin", args, &reply)
  413. if err != nil {
  414. fmt.Println(err)
  415. } else {
  416. fmt.Println(reply)
  417. }
  418. return nil
  419. },
  420. },
  421. {
  422. Name: "udf",
  423. Usage: "describe udf $udf_name",
  424. //Flags: nflag,
  425. Action: func(c *cli.Context) error {
  426. if len(c.Args()) != 1 {
  427. fmt.Printf("Expect udf name.\n")
  428. return nil
  429. }
  430. pname := c.Args()[0]
  431. var reply string
  432. err = client.Call("Server.DescUdf", pname, &reply)
  433. if err != nil {
  434. fmt.Println(err)
  435. } else {
  436. fmt.Println(reply)
  437. }
  438. return nil
  439. },
  440. },
  441. {
  442. Name: "service",
  443. Usage: "describe service $service_name",
  444. Action: func(c *cli.Context) error {
  445. if len(c.Args()) != 1 {
  446. fmt.Printf("Expect service name.\n")
  447. return nil
  448. }
  449. name := c.Args()[0]
  450. var reply string
  451. err = client.Call("Server.DescService", name, &reply)
  452. if err != nil {
  453. fmt.Println(err)
  454. } else {
  455. fmt.Println(reply)
  456. }
  457. return nil
  458. },
  459. },
  460. {
  461. Name: "service_func",
  462. Usage: "describe service_func $service_func_name",
  463. Action: func(c *cli.Context) error {
  464. if len(c.Args()) != 1 {
  465. fmt.Printf("Expect service func name.\n")
  466. return nil
  467. }
  468. name := c.Args()[0]
  469. var reply string
  470. err = client.Call("Server.DescServiceFunc", name, &reply)
  471. if err != nil {
  472. fmt.Println(err)
  473. } else {
  474. fmt.Println(reply)
  475. }
  476. return nil
  477. },
  478. },
  479. {
  480. Name: "schema",
  481. Usage: "describe schema $schema_type $schema_name",
  482. Action: func(c *cli.Context) error {
  483. if len(c.Args()) != 2 {
  484. fmt.Printf("Expect schema type and name.\n")
  485. return nil
  486. }
  487. args := &model.RPCTypedArgDesc{
  488. Type: c.Args()[0],
  489. Name: c.Args()[1],
  490. }
  491. var reply string
  492. err = client.Call("Server.DescSchema", args, &reply)
  493. if err != nil {
  494. fmt.Println(err)
  495. } else {
  496. fmt.Println(reply)
  497. }
  498. return nil
  499. },
  500. },
  501. },
  502. },
  503. {
  504. Name: "drop",
  505. Aliases: []string{"drop"},
  506. Usage: "drop stream $stream_name | drop table $table_name |drop rule $rule_name | drop plugin $plugin_type $plugin_name -s $stop | drop service $service_name | drop schema $schema_type $schema_name",
  507. Subcommands: []cli.Command{
  508. {
  509. Name: "stream",
  510. Usage: "drop stream $stream_name",
  511. //Flags: nflag,
  512. Action: func(c *cli.Context) error {
  513. streamProcess(client, "")
  514. return nil
  515. },
  516. },
  517. {
  518. Name: "table",
  519. Usage: "drop table $table_name",
  520. //Flags: nflag,
  521. Action: func(c *cli.Context) error {
  522. streamProcess(client, "")
  523. return nil
  524. },
  525. },
  526. {
  527. Name: "rule",
  528. Usage: "drop rule $rule_name",
  529. //Flags: nflag,
  530. Action: func(c *cli.Context) error {
  531. if len(c.Args()) != 1 {
  532. fmt.Printf("Expect rule name.\n")
  533. return nil
  534. }
  535. rname := c.Args()[0]
  536. var reply string
  537. err = client.Call("Server.DropRule", rname, &reply)
  538. if err != nil {
  539. fmt.Println(err)
  540. } else {
  541. fmt.Println(reply)
  542. }
  543. return nil
  544. },
  545. },
  546. {
  547. Name: "plugin",
  548. Usage: "drop plugin $plugin_type $plugin_name -s stop",
  549. Flags: []cli.Flag{
  550. cli.StringFlag{
  551. Name: "stop, s",
  552. Usage: "stop kuiper after the action",
  553. },
  554. },
  555. Action: func(c *cli.Context) error {
  556. r := c.String("stop")
  557. if r != "true" && r != "false" {
  558. fmt.Printf("Expect s flag to be a boolean value.\n")
  559. return nil
  560. }
  561. if len(c.Args()) < 2 || len(c.Args()) > 3 {
  562. fmt.Printf("Expect plugin type and name.\n")
  563. return nil
  564. }
  565. ptype, err := getPluginType(c.Args()[0])
  566. if err != nil {
  567. fmt.Printf("%s\n", err)
  568. return nil
  569. }
  570. pname := c.Args()[1]
  571. args := &model.PluginDesc{
  572. RPCArgDesc: model.RPCArgDesc{
  573. Name: pname,
  574. },
  575. Type: ptype,
  576. Stop: r == "true",
  577. }
  578. var reply string
  579. err = client.Call("Server.DropPlugin", args, &reply)
  580. if err != nil {
  581. fmt.Println(err)
  582. } else {
  583. fmt.Println(reply)
  584. }
  585. return nil
  586. },
  587. },
  588. {
  589. Name: "service",
  590. Usage: "drop service $service_name",
  591. Action: func(c *cli.Context) error {
  592. if len(c.Args()) != 1 {
  593. fmt.Printf("Expect service name.\n")
  594. return nil
  595. }
  596. name := c.Args()[0]
  597. var reply string
  598. err = client.Call("Server.DropService", name, &reply)
  599. if err != nil {
  600. fmt.Println(err)
  601. } else {
  602. fmt.Println(reply)
  603. }
  604. return nil
  605. },
  606. },
  607. {
  608. Name: "schema",
  609. Usage: "drop schema $schema_type $schema_name",
  610. Action: func(c *cli.Context) error {
  611. if len(c.Args()) != 2 {
  612. fmt.Printf("Expect schema type and name.\n")
  613. return nil
  614. }
  615. args := &model.RPCTypedArgDesc{
  616. Type: c.Args()[0],
  617. Name: c.Args()[1],
  618. }
  619. var reply string
  620. err = client.Call("Server.DropSchema", args, &reply)
  621. if err != nil {
  622. fmt.Println(err)
  623. } else {
  624. fmt.Println(reply)
  625. }
  626. return nil
  627. },
  628. },
  629. },
  630. },
  631. {
  632. Name: "show",
  633. Aliases: []string{"show"},
  634. Usage: "show streams | show tables | show rules | show plugins $plugin_type | show services | show service_funcs | show schemas $schema_type",
  635. Subcommands: []cli.Command{
  636. {
  637. Name: "streams",
  638. Usage: "show streams",
  639. Action: func(c *cli.Context) error {
  640. streamProcess(client, "")
  641. return nil
  642. },
  643. },
  644. {
  645. Name: "tables",
  646. Usage: "show tables",
  647. Action: func(c *cli.Context) error {
  648. streamProcess(client, "")
  649. return nil
  650. },
  651. },
  652. {
  653. Name: "rules",
  654. Usage: "show rules",
  655. Action: func(c *cli.Context) error {
  656. var reply string
  657. err = client.Call("Server.ShowRules", 0, &reply)
  658. if err != nil {
  659. fmt.Println(err)
  660. } else {
  661. fmt.Println(reply)
  662. }
  663. return nil
  664. },
  665. },
  666. {
  667. Name: "plugins",
  668. Usage: "show plugins $plugin_type",
  669. Action: func(c *cli.Context) error {
  670. if len(c.Args()) != 1 {
  671. fmt.Printf("Expect plugin type.\n")
  672. return nil
  673. }
  674. ptype, err := getPluginType(c.Args()[0])
  675. if err != nil {
  676. fmt.Printf("%s\n", err)
  677. return nil
  678. }
  679. var reply string
  680. err = client.Call("Server.ShowPlugins", ptype, &reply)
  681. if err != nil {
  682. fmt.Println(err)
  683. } else {
  684. fmt.Println(reply)
  685. }
  686. return nil
  687. },
  688. },
  689. {
  690. Name: "udfs",
  691. Usage: "show udfs",
  692. Action: func(c *cli.Context) error {
  693. var reply string
  694. err = client.Call("Server.ShowUdfs", 0, &reply)
  695. if err != nil {
  696. fmt.Println(err)
  697. } else {
  698. fmt.Println(reply)
  699. }
  700. return nil
  701. },
  702. }, {
  703. Name: "services",
  704. Usage: "show services",
  705. Action: func(c *cli.Context) error {
  706. var reply string
  707. err = client.Call("Server.ShowServices", 0, &reply)
  708. if err != nil {
  709. fmt.Println(err)
  710. } else {
  711. fmt.Println(reply)
  712. }
  713. return nil
  714. },
  715. }, {
  716. Name: "service_funcs",
  717. Usage: "show service_funcs",
  718. Action: func(c *cli.Context) error {
  719. var reply string
  720. err = client.Call("Server.ShowServiceFuncs", 0, &reply)
  721. if err != nil {
  722. fmt.Println(err)
  723. } else {
  724. fmt.Println(reply)
  725. }
  726. return nil
  727. },
  728. }, {
  729. Name: "schemas",
  730. Usage: "show schemas $schema_type",
  731. Action: func(c *cli.Context) error {
  732. if len(c.Args()) != 1 {
  733. fmt.Printf("Expect schema type.\n")
  734. return nil
  735. }
  736. var reply string
  737. err = client.Call("Server.ShowSchemas", c.Args()[0], &reply)
  738. if err != nil {
  739. fmt.Println(err)
  740. } else {
  741. fmt.Println(reply)
  742. }
  743. return nil
  744. },
  745. },
  746. },
  747. },
  748. {
  749. Name: "getstatus",
  750. Aliases: []string{"getstatus"},
  751. Usage: "getstatus rule $rule_name | import",
  752. Subcommands: []cli.Command{
  753. {
  754. Name: "rule",
  755. Usage: "getstatus rule $rule_name",
  756. //Flags: nflag,
  757. Action: func(c *cli.Context) error {
  758. if len(c.Args()) != 1 {
  759. fmt.Printf("Expect rule name.\n")
  760. return nil
  761. }
  762. rname := c.Args()[0]
  763. var reply string
  764. err = client.Call("Server.GetStatusRule", rname, &reply)
  765. if err != nil {
  766. fmt.Println(err)
  767. } else {
  768. fmt.Println(reply)
  769. }
  770. return nil
  771. },
  772. },
  773. {
  774. Name: "import",
  775. Usage: "getstatus import",
  776. //Flags: nflag,
  777. Action: func(c *cli.Context) error {
  778. var reply string
  779. err = client.Call("Server.GetStatusImport", 0, &reply)
  780. if err != nil {
  781. fmt.Println(err)
  782. } else {
  783. fmt.Println(reply)
  784. }
  785. return nil
  786. },
  787. },
  788. },
  789. },
  790. {
  791. Name: "gettopo",
  792. Aliases: []string{"gettopo"},
  793. Usage: "gettopo rule $rule_name",
  794. Subcommands: []cli.Command{
  795. {
  796. Name: "rule",
  797. Usage: "getstopo rule $rule_name",
  798. //Flags: nflag,
  799. Action: func(c *cli.Context) error {
  800. if len(c.Args()) != 1 {
  801. fmt.Printf("Expect rule name.\n")
  802. return nil
  803. }
  804. rname := c.Args()[0]
  805. var reply string
  806. err = client.Call("Server.GetTopoRule", rname, &reply)
  807. if err != nil {
  808. fmt.Println(err)
  809. } else {
  810. fmt.Println(reply)
  811. }
  812. return nil
  813. },
  814. },
  815. },
  816. },
  817. {
  818. Name: "start",
  819. Aliases: []string{"start"},
  820. Usage: "start rule $rule_name",
  821. Subcommands: []cli.Command{
  822. {
  823. Name: "rule",
  824. Usage: "start rule $rule_name",
  825. //Flags: nflag,
  826. Action: func(c *cli.Context) error {
  827. if len(c.Args()) != 1 {
  828. fmt.Printf("Expect rule name.\n")
  829. return nil
  830. }
  831. rname := c.Args()[0]
  832. var reply string
  833. err = client.Call("Server.StartRule", rname, &reply)
  834. if err != nil {
  835. fmt.Println(err)
  836. } else {
  837. fmt.Println(reply)
  838. }
  839. return nil
  840. },
  841. },
  842. },
  843. },
  844. {
  845. Name: "stop",
  846. Aliases: []string{"stop"},
  847. Usage: "stop rule $rule_name",
  848. Subcommands: []cli.Command{
  849. {
  850. Name: "rule",
  851. Usage: "stop rule $rule_name",
  852. //Flags: nflag,
  853. Action: func(c *cli.Context) error {
  854. if len(c.Args()) != 1 {
  855. fmt.Printf("Expect rule name.\n")
  856. return nil
  857. }
  858. rname := c.Args()[0]
  859. var reply string
  860. err = client.Call("Server.StopRule", rname, &reply)
  861. if err != nil {
  862. fmt.Println(err)
  863. } else {
  864. fmt.Println(reply)
  865. }
  866. return nil
  867. },
  868. },
  869. },
  870. },
  871. {
  872. Name: "restart",
  873. Aliases: []string{"restart"},
  874. Usage: "restart rule $rule_name",
  875. Subcommands: []cli.Command{
  876. {
  877. Name: "rule",
  878. Usage: "restart rule $rule_name",
  879. //Flags: nflag,
  880. Action: func(c *cli.Context) error {
  881. if len(c.Args()) != 1 {
  882. fmt.Printf("Expect rule name.\n")
  883. return nil
  884. }
  885. rname := c.Args()[0]
  886. var reply string
  887. err = client.Call("Server.RestartRule", rname, &reply)
  888. if err != nil {
  889. fmt.Println(err)
  890. } else {
  891. fmt.Println(reply)
  892. }
  893. return nil
  894. },
  895. },
  896. },
  897. },
  898. {
  899. Name: "register",
  900. Aliases: []string{"register"},
  901. Usage: "register plugin function $plugin_name [$plugin_json | -f plugin_def_file]",
  902. Subcommands: []cli.Command{
  903. {
  904. Name: "plugin",
  905. Usage: "register plugin $plugin_type $plugin_name [$plugin_json | -f plugin_def_file]",
  906. Flags: []cli.Flag{
  907. cli.StringFlag{
  908. Name: "file, f",
  909. Usage: "the location of plugin functions definition file",
  910. FilePath: "/home/myplugin.txt",
  911. },
  912. },
  913. Action: func(c *cli.Context) error {
  914. if len(c.Args()) < 2 {
  915. fmt.Printf("Expect plugin type and name.\n")
  916. return nil
  917. }
  918. ptype := c.Args()[0]
  919. if !strings.EqualFold(ptype, "function") {
  920. fmt.Printf("Plugin type must be function.\n")
  921. return nil
  922. }
  923. pname := c.Args()[1]
  924. sfile := c.String("file")
  925. args := &model.PluginDesc{
  926. RPCArgDesc: model.RPCArgDesc{
  927. Name: pname,
  928. },
  929. }
  930. if sfile != "" {
  931. if len(c.Args()) != 2 {
  932. fmt.Printf("Expect plugin type, name.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  933. return nil
  934. }
  935. if p, err := readDef(sfile, "plugin"); err != nil {
  936. fmt.Printf("%s", err)
  937. return nil
  938. } else {
  939. args.Json = string(p)
  940. }
  941. } else {
  942. if len(c.Args()) != 3 {
  943. fmt.Printf("Expect plugin type, name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  944. return nil
  945. }
  946. args.Json = c.Args()[2]
  947. }
  948. var reply string
  949. err = client.Call("Server.RegisterPlugin", args, &reply)
  950. if err != nil {
  951. fmt.Println(err)
  952. } else {
  953. fmt.Println(reply)
  954. }
  955. return nil
  956. },
  957. },
  958. },
  959. },
  960. {
  961. Name: "import",
  962. Aliases: []string{"import"},
  963. Usage: "import ruleset | data -f file -p partial -s stop",
  964. Subcommands: []cli.Command{
  965. {
  966. Name: "ruleset",
  967. Usage: "\"import ruleset -f ruleset_file",
  968. Flags: []cli.Flag{
  969. cli.StringFlag{
  970. Name: "file, f",
  971. Usage: "the location of the ruleset json file",
  972. FilePath: "/home/ekuiper_ruleset.json",
  973. },
  974. },
  975. Action: func(c *cli.Context) error {
  976. sfile := c.String("file")
  977. if sfile == "" {
  978. fmt.Print("Required ruleset json file to import")
  979. return nil
  980. }
  981. var reply string
  982. err = client.Call("Server.Import", sfile, &reply)
  983. if err != nil {
  984. fmt.Println(err)
  985. } else {
  986. fmt.Println(reply)
  987. }
  988. return nil
  989. },
  990. },
  991. {
  992. Name: "data",
  993. Usage: "\"import data -f configuration_file -p partial -s stop",
  994. Flags: []cli.Flag{
  995. cli.StringFlag{
  996. Name: "file, f",
  997. Usage: "the location of the configuration json file",
  998. FilePath: "/home/ekuiper_configuration.json",
  999. },
  1000. cli.StringFlag{
  1001. Name: "stop, s",
  1002. Usage: "stop kuiper after the action",
  1003. },
  1004. cli.StringFlag{
  1005. Name: "partial, p",
  1006. Usage: "import partial configuration",
  1007. },
  1008. },
  1009. Action: func(c *cli.Context) error {
  1010. sfile := c.String("file")
  1011. if sfile == "" {
  1012. fmt.Print("Required configuration json file to import")
  1013. return nil
  1014. }
  1015. r := c.String("stop")
  1016. if r != "true" && r != "false" {
  1017. fmt.Printf("Expect s flag to be a boolean value.\n")
  1018. return nil
  1019. }
  1020. p := c.String("partial")
  1021. if p != "true" && p != "false" {
  1022. fmt.Printf("Expect p flag to be a boolean value.\n")
  1023. return nil
  1024. }
  1025. args := &model.ImportDataDesc{
  1026. FileName: sfile,
  1027. Stop: r == "true",
  1028. Partial: p == "true",
  1029. }
  1030. var reply string
  1031. err = client.Call("Server.ImportConfiguration", args, &reply)
  1032. if err != nil {
  1033. fmt.Println(err)
  1034. } else {
  1035. fmt.Println(reply)
  1036. }
  1037. return nil
  1038. },
  1039. },
  1040. },
  1041. },
  1042. {
  1043. Name: "export",
  1044. Aliases: []string{"export"},
  1045. Usage: "export ruleset | data $ruleset_file [ -r rules ]",
  1046. Subcommands: []cli.Command{
  1047. {
  1048. Name: "ruleset",
  1049. Usage: "\"export ruleset $ruleset_file",
  1050. Action: func(c *cli.Context) error {
  1051. if len(c.Args()) < 1 {
  1052. fmt.Printf("Require exported file name.\n")
  1053. return nil
  1054. }
  1055. var reply string
  1056. err = client.Call("Server.Export", c.Args()[0], &reply)
  1057. if err != nil {
  1058. fmt.Println(err)
  1059. } else {
  1060. fmt.Println(reply)
  1061. }
  1062. return nil
  1063. },
  1064. },
  1065. {
  1066. Name: "data",
  1067. Usage: "export data $configuration_file [ -r rules ]",
  1068. Flags: []cli.Flag{
  1069. cli.StringFlag{
  1070. Name: "rules, r",
  1071. Usage: "the rules id in json array format",
  1072. },
  1073. },
  1074. Action: func(c *cli.Context) error {
  1075. args := model.ExportDataDesc{
  1076. Rules: []string{},
  1077. FileName: "",
  1078. }
  1079. rulesArray := c.String("rules")
  1080. if rulesArray != "" {
  1081. var rules []string
  1082. err := json.Unmarshal([]byte(rulesArray), &rules)
  1083. if err != nil {
  1084. fmt.Printf("rules %s unmarshal error %s", rules, err)
  1085. return nil
  1086. }
  1087. args.Rules = rules
  1088. if len(c.Args()) != 1 {
  1089. fmt.Printf("Expect configuration file.\n")
  1090. return nil
  1091. }
  1092. args.FileName = c.Args()[0]
  1093. var reply string
  1094. err = client.Call("Server.ExportConfiguration", args, &reply)
  1095. if err != nil {
  1096. fmt.Println(err)
  1097. } else {
  1098. fmt.Println(reply)
  1099. }
  1100. } else {
  1101. if len(c.Args()) != 1 {
  1102. fmt.Printf("Expect configuration file.\n")
  1103. return nil
  1104. }
  1105. args.FileName = c.Args()[0]
  1106. var reply string
  1107. err = client.Call("Server.ExportConfiguration", args, &reply)
  1108. if err != nil {
  1109. fmt.Println(err)
  1110. } else {
  1111. fmt.Println(reply)
  1112. }
  1113. }
  1114. return nil
  1115. },
  1116. },
  1117. },
  1118. },
  1119. }
  1120. app.Name = "Kuiper"
  1121. app.Usage = "The command line tool for EMQ X Kuiper."
  1122. app.Action = func(c *cli.Context) error {
  1123. cli.ShowSubcommandHelp(c)
  1124. //cli.ShowVersion(c)
  1125. return nil
  1126. }
  1127. sort.Sort(cli.FlagsByName(app.Flags))
  1128. sort.Sort(cli.CommandsByName(app.Commands))
  1129. err = app.Run(os.Args)
  1130. if err != nil {
  1131. fmt.Printf("%v", err)
  1132. }
  1133. }
  1134. func getPluginType(arg string) (ptype int, err error) {
  1135. switch arg {
  1136. case "source":
  1137. ptype = 0
  1138. case "sink":
  1139. ptype = 1
  1140. case "function":
  1141. ptype = 2
  1142. case "portable":
  1143. ptype = 3
  1144. case "wasm":
  1145. ptype = 4
  1146. default:
  1147. err = fmt.Errorf("Invalid plugin type %s, should be \"source\", \"sink\", \"function\" or \"portable\" or \"wasm\".\n", arg)
  1148. }
  1149. return
  1150. }
  1151. func readDef(sfile string, t string) ([]byte, error) {
  1152. if _, err := os.Stat(sfile); os.IsNotExist(err) {
  1153. return nil, fmt.Errorf("The specified %s defenition file %s is not existed.\n", t, sfile)
  1154. }
  1155. fmt.Printf("Creating a new %s from file %s.\n", t, sfile)
  1156. if rule, err := os.ReadFile(sfile); err != nil {
  1157. return nil, fmt.Errorf("Failed to read from %s definition file %s.\n", t, sfile)
  1158. } else {
  1159. return rule, nil
  1160. }
  1161. }