main.go 29 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "bufio"
  17. "encoding/json"
  18. "fmt"
  19. "net/rpc"
  20. "os"
  21. "sort"
  22. "strings"
  23. "time"
  24. "github.com/urfave/cli"
  25. "github.com/lf-edge/ekuiper/internal/conf"
  26. "github.com/lf-edge/ekuiper/internal/pkg/model"
  27. "github.com/lf-edge/ekuiper/pkg/cast"
  28. "github.com/lf-edge/ekuiper/pkg/infra"
  29. )
  30. type clientConf struct {
  31. Host string `yaml:"host"`
  32. Port int `yaml:"port"`
  33. }
  34. const ClientYaml = "client.yaml"
  35. func streamProcess(client *rpc.Client, args string) {
  36. var reply string
  37. if args == "" {
  38. args = strings.Join(os.Args[1:], " ")
  39. }
  40. err := client.Call("Server.Stream", args, &reply)
  41. if err != nil {
  42. fmt.Println(err)
  43. } else {
  44. fmt.Println(reply)
  45. }
  46. }
  47. var (
  48. Version = "unknown"
  49. LoadFileType = "relative"
  50. )
  51. func main() {
  52. conf.LoadFileType = LoadFileType
  53. app := cli.NewApp()
  54. app.Version = Version
  55. // nflag := []cli.Flag { cli.StringFlag{
  56. // Name: "name, n",
  57. // Usage: "the name of stream",
  58. // }}
  59. var cfg map[string]clientConf
  60. err := conf.LoadConfigByName(ClientYaml, &cfg)
  61. if err != nil {
  62. conf.Log.Fatal(err)
  63. fmt.Printf("Failed to load config file with error %s.\n", err)
  64. }
  65. var config *clientConf
  66. c, ok := cfg["basic"]
  67. if !ok {
  68. fmt.Printf("No basic config in client.yaml, will use the default configuration.\n")
  69. } else {
  70. config = &c
  71. }
  72. if config == nil {
  73. config = &clientConf{
  74. Host: "127.0.0.1",
  75. Port: 20498,
  76. }
  77. }
  78. fmt.Printf("Connecting to %s... \n", cast.JoinHostPortInt(config.Host, config.Port))
  79. // Create a TCP connection to localhost on port 1234
  80. client, err := rpc.DialHTTP("tcp", cast.JoinHostPortInt(config.Host, config.Port))
  81. if err != nil {
  82. fmt.Printf("Failed to connect the server, please start the server.\n")
  83. return
  84. }
  85. app.Commands = []cli.Command{
  86. {
  87. Name: "query",
  88. Aliases: []string{"query"},
  89. Usage: "query command line",
  90. Action: func(c *cli.Context) error {
  91. reader := bufio.NewReader(os.Stdin)
  92. ticker := time.NewTicker(time.Millisecond * 300)
  93. defer ticker.Stop()
  94. for {
  95. fmt.Print("kuiper > ")
  96. text, _ := reader.ReadString('\n')
  97. // convert CRLF to LF
  98. text = strings.Replace(text, "\n", "", -1)
  99. if strings.EqualFold(text, "quit") || strings.EqualFold(text, "exit") {
  100. break
  101. } else if strings.Trim(text, " ") == "" {
  102. continue
  103. } else {
  104. var reply string
  105. err := client.Call("Server.CreateQuery", text, &reply)
  106. if err != nil {
  107. fmt.Println(err)
  108. continue
  109. }
  110. fmt.Println(reply)
  111. go func() {
  112. err := infra.SafeRun(func() error {
  113. for {
  114. <-ticker.C
  115. var result string
  116. e := client.Call("Server.GetQueryResult", "", &result)
  117. if e != nil {
  118. return e
  119. }
  120. if result != "" {
  121. fmt.Println(result)
  122. }
  123. }
  124. })
  125. if err != nil {
  126. fmt.Println(err)
  127. fmt.Print("kuiper > ")
  128. }
  129. }()
  130. }
  131. }
  132. return nil
  133. },
  134. },
  135. {
  136. Name: "create",
  137. Aliases: []string{"create"},
  138. Usage: "create stream $stream_name | create stream $stream_name -f $stream_def_file | create table $table_name | create table $table_name -f $table_def_file| create rule $rule_name $rule_json | create rule $rule_name -f $rule_def_file | create plugin $plugin_type $plugin_name $plugin_json | create plugin $plugin_type $plugin_name -f $plugin_def_file | create service $service_name $service_json | create schema $schema_type $schema_name $schema_json",
  139. Subcommands: []cli.Command{
  140. {
  141. Name: "stream",
  142. Usage: "create stream $stream_name [-f stream_def_file]",
  143. Flags: []cli.Flag{
  144. cli.StringFlag{
  145. Name: "file, f",
  146. Usage: "the location of stream definition file",
  147. FilePath: "/home/mystream.txt",
  148. },
  149. },
  150. Action: func(c *cli.Context) error {
  151. sfile := c.String("file")
  152. if sfile != "" {
  153. if stream, err := readDef(sfile, "stream"); err != nil {
  154. fmt.Printf("%s", err)
  155. return nil
  156. } else {
  157. args := strings.Join([]string{"CREATE STREAM ", string(stream)}, " ")
  158. streamProcess(client, args)
  159. return nil
  160. }
  161. } else {
  162. streamProcess(client, "")
  163. return nil
  164. }
  165. },
  166. },
  167. {
  168. Name: "table",
  169. Usage: "create table $table_name [-f table_def_file]",
  170. Flags: []cli.Flag{
  171. cli.StringFlag{
  172. Name: "file, f",
  173. Usage: "the location of table definition file",
  174. FilePath: "/home/mytable.txt",
  175. },
  176. },
  177. Action: func(c *cli.Context) error {
  178. sfile := c.String("file")
  179. if sfile != "" {
  180. if stream, err := readDef(sfile, "table"); err != nil {
  181. fmt.Printf("%s", err)
  182. return nil
  183. } else {
  184. args := strings.Join([]string{"CREATE TABLE ", string(stream)}, " ")
  185. streamProcess(client, args)
  186. return nil
  187. }
  188. } else {
  189. streamProcess(client, "")
  190. return nil
  191. }
  192. },
  193. },
  194. {
  195. Name: "rule",
  196. Usage: "create rule $rule_name [$rule_json | -f rule_def_file]",
  197. Flags: []cli.Flag{
  198. cli.StringFlag{
  199. Name: "file, f",
  200. Usage: "the location of rule definition file",
  201. FilePath: "/home/myrule.txt",
  202. },
  203. },
  204. Action: func(c *cli.Context) error {
  205. sfile := c.String("file")
  206. if sfile != "" {
  207. if rule, err := readDef(sfile, "rule"); err != nil {
  208. fmt.Printf("%s", err)
  209. return nil
  210. } else {
  211. if len(c.Args()) != 1 {
  212. fmt.Printf("Expect rule name.\n")
  213. return nil
  214. }
  215. rname := c.Args()[0]
  216. var reply string
  217. args := &model.RPCArgDesc{Name: rname, Json: string(rule)}
  218. err = client.Call("Server.CreateRule", args, &reply)
  219. if err != nil {
  220. fmt.Println(err)
  221. } else {
  222. fmt.Println(reply)
  223. }
  224. }
  225. return nil
  226. } else {
  227. if len(c.Args()) != 2 {
  228. fmt.Printf("Expect rule name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  229. return nil
  230. }
  231. rname := c.Args()[0]
  232. rjson := c.Args()[1]
  233. var reply string
  234. args := &model.RPCArgDesc{Name: rname, Json: rjson}
  235. err = client.Call("Server.CreateRule", args, &reply)
  236. if err != nil {
  237. fmt.Println(err)
  238. } else {
  239. fmt.Println(reply)
  240. }
  241. return nil
  242. }
  243. },
  244. },
  245. {
  246. Name: "plugin",
  247. Usage: "create plugin $plugin_type $plugin_name [$plugin_json | -f plugin_def_file]",
  248. Flags: []cli.Flag{
  249. cli.StringFlag{
  250. Name: "file, f",
  251. Usage: "the location of plugin definition file",
  252. FilePath: "/home/myplugin.txt",
  253. },
  254. },
  255. Action: func(c *cli.Context) error {
  256. if len(c.Args()) < 2 {
  257. fmt.Printf("Expect plugin type and name.\n")
  258. return nil
  259. }
  260. ptype, err := getPluginType(c.Args()[0])
  261. if err != nil {
  262. fmt.Printf("%s\n", err)
  263. return nil
  264. }
  265. pname := c.Args()[1]
  266. sfile := c.String("file")
  267. args := &model.PluginDesc{
  268. RPCArgDesc: model.RPCArgDesc{
  269. Name: pname,
  270. },
  271. Type: ptype,
  272. }
  273. if sfile != "" {
  274. if len(c.Args()) != 2 {
  275. fmt.Printf("Expect plugin type, name.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  276. return nil
  277. }
  278. if p, err := readDef(sfile, "plugin"); err != nil {
  279. fmt.Printf("%s", err)
  280. return nil
  281. } else {
  282. args.Json = string(p)
  283. }
  284. } else {
  285. if len(c.Args()) != 3 {
  286. fmt.Printf("Expect plugin type, name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  287. return nil
  288. }
  289. args.Json = c.Args()[2]
  290. }
  291. var reply string
  292. err = client.Call("Server.CreatePlugin", args, &reply)
  293. if err != nil {
  294. fmt.Println(err)
  295. } else {
  296. fmt.Println(reply)
  297. }
  298. return nil
  299. },
  300. },
  301. {
  302. Name: "service",
  303. Usage: "create service $service_name $service_json",
  304. Action: func(c *cli.Context) error {
  305. if len(c.Args()) < 2 {
  306. fmt.Printf("Expect service name and json.\n")
  307. return nil
  308. }
  309. var reply string
  310. err = client.Call("Server.CreateService", &model.RPCArgDesc{
  311. Name: c.Args()[0],
  312. Json: c.Args()[1],
  313. }, &reply)
  314. if err != nil {
  315. fmt.Println(err)
  316. } else {
  317. fmt.Println(reply)
  318. }
  319. return nil
  320. },
  321. },
  322. {
  323. Name: "schema",
  324. Usage: "create schema $schema_type $schema_name $schema_json",
  325. Action: func(c *cli.Context) error {
  326. if len(c.Args()) < 3 {
  327. fmt.Printf("Expect plugin type, name and json.\n")
  328. return nil
  329. }
  330. var reply string
  331. err = client.Call("Server.CreateSchema", &model.RPCTypedArgDesc{
  332. Type: c.Args()[0],
  333. Name: c.Args()[1],
  334. Json: c.Args()[2],
  335. }, &reply)
  336. if err != nil {
  337. fmt.Println(err)
  338. } else {
  339. fmt.Println(reply)
  340. }
  341. return nil
  342. },
  343. },
  344. },
  345. },
  346. {
  347. Name: "describe",
  348. Aliases: []string{"describe"},
  349. Usage: "describe stream $stream_name | describe table $table_name | describe rule $rule_name | describe plugin $plugin_type $plugin_name | describe udf $udf_name | describe service $service_name | describe service_func $service_func_name | describe schema $schema_type $schema_name",
  350. Subcommands: []cli.Command{
  351. {
  352. Name: "stream",
  353. Usage: "describe stream $stream_name",
  354. // Flags: nflag,
  355. Action: func(c *cli.Context) error {
  356. streamProcess(client, "")
  357. return nil
  358. },
  359. },
  360. {
  361. Name: "table",
  362. Usage: "describe table $table_name",
  363. // Flags: nflag,
  364. Action: func(c *cli.Context) error {
  365. streamProcess(client, "")
  366. return nil
  367. },
  368. },
  369. {
  370. Name: "rule",
  371. Usage: "describe rule $rule_name",
  372. Action: func(c *cli.Context) error {
  373. if len(c.Args()) != 1 {
  374. fmt.Printf("Expect rule name.\n")
  375. return nil
  376. }
  377. rname := c.Args()[0]
  378. var reply string
  379. err = client.Call("Server.DescRule", rname, &reply)
  380. if err != nil {
  381. fmt.Println(err)
  382. } else {
  383. fmt.Println(reply)
  384. }
  385. return nil
  386. },
  387. },
  388. {
  389. Name: "plugin",
  390. Usage: "describe plugin $plugin_type $plugin_name",
  391. // Flags: nflag,
  392. Action: func(c *cli.Context) error {
  393. ptype, err := getPluginType(c.Args()[0])
  394. if err != nil {
  395. fmt.Printf("%s\n", err)
  396. return nil
  397. }
  398. if len(c.Args()) != 2 {
  399. fmt.Printf("Expect plugin name.\n")
  400. return nil
  401. }
  402. pname := c.Args()[1]
  403. args := &model.PluginDesc{
  404. RPCArgDesc: model.RPCArgDesc{
  405. Name: pname,
  406. },
  407. Type: ptype,
  408. }
  409. var reply string
  410. err = client.Call("Server.DescPlugin", args, &reply)
  411. if err != nil {
  412. fmt.Println(err)
  413. } else {
  414. fmt.Println(reply)
  415. }
  416. return nil
  417. },
  418. },
  419. {
  420. Name: "udf",
  421. Usage: "describe udf $udf_name",
  422. // Flags: nflag,
  423. Action: func(c *cli.Context) error {
  424. if len(c.Args()) != 1 {
  425. fmt.Printf("Expect udf name.\n")
  426. return nil
  427. }
  428. pname := c.Args()[0]
  429. var reply string
  430. err = client.Call("Server.DescUdf", pname, &reply)
  431. if err != nil {
  432. fmt.Println(err)
  433. } else {
  434. fmt.Println(reply)
  435. }
  436. return nil
  437. },
  438. },
  439. {
  440. Name: "service",
  441. Usage: "describe service $service_name",
  442. Action: func(c *cli.Context) error {
  443. if len(c.Args()) != 1 {
  444. fmt.Printf("Expect service name.\n")
  445. return nil
  446. }
  447. name := c.Args()[0]
  448. var reply string
  449. err = client.Call("Server.DescService", name, &reply)
  450. if err != nil {
  451. fmt.Println(err)
  452. } else {
  453. fmt.Println(reply)
  454. }
  455. return nil
  456. },
  457. },
  458. {
  459. Name: "service_func",
  460. Usage: "describe service_func $service_func_name",
  461. Action: func(c *cli.Context) error {
  462. if len(c.Args()) != 1 {
  463. fmt.Printf("Expect service func name.\n")
  464. return nil
  465. }
  466. name := c.Args()[0]
  467. var reply string
  468. err = client.Call("Server.DescServiceFunc", name, &reply)
  469. if err != nil {
  470. fmt.Println(err)
  471. } else {
  472. fmt.Println(reply)
  473. }
  474. return nil
  475. },
  476. },
  477. {
  478. Name: "schema",
  479. Usage: "describe schema $schema_type $schema_name",
  480. Action: func(c *cli.Context) error {
  481. if len(c.Args()) != 2 {
  482. fmt.Printf("Expect schema type and name.\n")
  483. return nil
  484. }
  485. args := &model.RPCTypedArgDesc{
  486. Type: c.Args()[0],
  487. Name: c.Args()[1],
  488. }
  489. var reply string
  490. err = client.Call("Server.DescSchema", args, &reply)
  491. if err != nil {
  492. fmt.Println(err)
  493. } else {
  494. fmt.Println(reply)
  495. }
  496. return nil
  497. },
  498. },
  499. },
  500. },
  501. {
  502. Name: "drop",
  503. Aliases: []string{"drop"},
  504. Usage: "drop stream $stream_name | drop table $table_name |drop rule $rule_name | drop plugin $plugin_type $plugin_name -s $stop | drop service $service_name | drop schema $schema_type $schema_name",
  505. Subcommands: []cli.Command{
  506. {
  507. Name: "stream",
  508. Usage: "drop stream $stream_name",
  509. // Flags: nflag,
  510. Action: func(c *cli.Context) error {
  511. streamProcess(client, "")
  512. return nil
  513. },
  514. },
  515. {
  516. Name: "table",
  517. Usage: "drop table $table_name",
  518. // Flags: nflag,
  519. Action: func(c *cli.Context) error {
  520. streamProcess(client, "")
  521. return nil
  522. },
  523. },
  524. {
  525. Name: "rule",
  526. Usage: "drop rule $rule_name",
  527. // Flags: nflag,
  528. Action: func(c *cli.Context) error {
  529. if len(c.Args()) != 1 {
  530. fmt.Printf("Expect rule name.\n")
  531. return nil
  532. }
  533. rname := c.Args()[0]
  534. var reply string
  535. err = client.Call("Server.DropRule", rname, &reply)
  536. if err != nil {
  537. fmt.Println(err)
  538. } else {
  539. fmt.Println(reply)
  540. }
  541. return nil
  542. },
  543. },
  544. {
  545. Name: "plugin",
  546. Usage: "drop plugin $plugin_type $plugin_name -s stop",
  547. Flags: []cli.Flag{
  548. cli.StringFlag{
  549. Name: "stop, s",
  550. Usage: "stop kuiper after the action",
  551. },
  552. },
  553. Action: func(c *cli.Context) error {
  554. r := c.String("stop")
  555. if r != "true" && r != "false" {
  556. fmt.Printf("Expect s flag to be a boolean value.\n")
  557. return nil
  558. }
  559. if len(c.Args()) < 2 || len(c.Args()) > 3 {
  560. fmt.Printf("Expect plugin type and name.\n")
  561. return nil
  562. }
  563. ptype, err := getPluginType(c.Args()[0])
  564. if err != nil {
  565. fmt.Printf("%s\n", err)
  566. return nil
  567. }
  568. pname := c.Args()[1]
  569. args := &model.PluginDesc{
  570. RPCArgDesc: model.RPCArgDesc{
  571. Name: pname,
  572. },
  573. Type: ptype,
  574. Stop: r == "true",
  575. }
  576. var reply string
  577. err = client.Call("Server.DropPlugin", args, &reply)
  578. if err != nil {
  579. fmt.Println(err)
  580. } else {
  581. fmt.Println(reply)
  582. }
  583. return nil
  584. },
  585. },
  586. {
  587. Name: "service",
  588. Usage: "drop service $service_name",
  589. Action: func(c *cli.Context) error {
  590. if len(c.Args()) != 1 {
  591. fmt.Printf("Expect service name.\n")
  592. return nil
  593. }
  594. name := c.Args()[0]
  595. var reply string
  596. err = client.Call("Server.DropService", name, &reply)
  597. if err != nil {
  598. fmt.Println(err)
  599. } else {
  600. fmt.Println(reply)
  601. }
  602. return nil
  603. },
  604. },
  605. {
  606. Name: "schema",
  607. Usage: "drop schema $schema_type $schema_name",
  608. Action: func(c *cli.Context) error {
  609. if len(c.Args()) != 2 {
  610. fmt.Printf("Expect schema type and name.\n")
  611. return nil
  612. }
  613. args := &model.RPCTypedArgDesc{
  614. Type: c.Args()[0],
  615. Name: c.Args()[1],
  616. }
  617. var reply string
  618. err = client.Call("Server.DropSchema", args, &reply)
  619. if err != nil {
  620. fmt.Println(err)
  621. } else {
  622. fmt.Println(reply)
  623. }
  624. return nil
  625. },
  626. },
  627. },
  628. },
  629. {
  630. Name: "show",
  631. Aliases: []string{"show"},
  632. Usage: "show streams | show tables | show rules | show plugins $plugin_type | show services | show service_funcs | show schemas $schema_type",
  633. Subcommands: []cli.Command{
  634. {
  635. Name: "streams",
  636. Usage: "show streams",
  637. Action: func(c *cli.Context) error {
  638. streamProcess(client, "")
  639. return nil
  640. },
  641. },
  642. {
  643. Name: "tables",
  644. Usage: "show tables",
  645. Action: func(c *cli.Context) error {
  646. streamProcess(client, "")
  647. return nil
  648. },
  649. },
  650. {
  651. Name: "rules",
  652. Usage: "show rules",
  653. Action: func(c *cli.Context) error {
  654. var reply string
  655. err = client.Call("Server.ShowRules", 0, &reply)
  656. if err != nil {
  657. fmt.Println(err)
  658. } else {
  659. fmt.Println(reply)
  660. }
  661. return nil
  662. },
  663. },
  664. {
  665. Name: "plugins",
  666. Usage: "show plugins $plugin_type",
  667. Action: func(c *cli.Context) error {
  668. if len(c.Args()) != 1 {
  669. fmt.Printf("Expect plugin type.\n")
  670. return nil
  671. }
  672. ptype, err := getPluginType(c.Args()[0])
  673. if err != nil {
  674. fmt.Printf("%s\n", err)
  675. return nil
  676. }
  677. var reply string
  678. err = client.Call("Server.ShowPlugins", ptype, &reply)
  679. if err != nil {
  680. fmt.Println(err)
  681. } else {
  682. fmt.Println(reply)
  683. }
  684. return nil
  685. },
  686. },
  687. {
  688. Name: "udfs",
  689. Usage: "show udfs",
  690. Action: func(c *cli.Context) error {
  691. var reply string
  692. err = client.Call("Server.ShowUdfs", 0, &reply)
  693. if err != nil {
  694. fmt.Println(err)
  695. } else {
  696. fmt.Println(reply)
  697. }
  698. return nil
  699. },
  700. },
  701. {
  702. Name: "services",
  703. Usage: "show services",
  704. Action: func(c *cli.Context) error {
  705. var reply string
  706. err = client.Call("Server.ShowServices", 0, &reply)
  707. if err != nil {
  708. fmt.Println(err)
  709. } else {
  710. fmt.Println(reply)
  711. }
  712. return nil
  713. },
  714. },
  715. {
  716. Name: "service_funcs",
  717. Usage: "show service_funcs",
  718. Action: func(c *cli.Context) error {
  719. var reply string
  720. err = client.Call("Server.ShowServiceFuncs", 0, &reply)
  721. if err != nil {
  722. fmt.Println(err)
  723. } else {
  724. fmt.Println(reply)
  725. }
  726. return nil
  727. },
  728. },
  729. {
  730. Name: "schemas",
  731. Usage: "show schemas $schema_type",
  732. Action: func(c *cli.Context) error {
  733. if len(c.Args()) != 1 {
  734. fmt.Printf("Expect schema type.\n")
  735. return nil
  736. }
  737. var reply string
  738. err = client.Call("Server.ShowSchemas", c.Args()[0], &reply)
  739. if err != nil {
  740. fmt.Println(err)
  741. } else {
  742. fmt.Println(reply)
  743. }
  744. return nil
  745. },
  746. },
  747. },
  748. },
  749. {
  750. Name: "getstatus",
  751. Aliases: []string{"getstatus"},
  752. Usage: "getstatus rule $rule_name | import",
  753. Subcommands: []cli.Command{
  754. {
  755. Name: "rule",
  756. Usage: "getstatus rule $rule_name",
  757. // Flags: nflag,
  758. Action: func(c *cli.Context) error {
  759. if len(c.Args()) != 1 {
  760. fmt.Printf("Expect rule name.\n")
  761. return nil
  762. }
  763. rname := c.Args()[0]
  764. var reply string
  765. err = client.Call("Server.GetStatusRule", rname, &reply)
  766. if err != nil {
  767. fmt.Println(err)
  768. } else {
  769. fmt.Println(reply)
  770. }
  771. return nil
  772. },
  773. },
  774. {
  775. Name: "import",
  776. Usage: "getstatus import",
  777. // Flags: nflag,
  778. Action: func(c *cli.Context) error {
  779. var reply string
  780. err = client.Call("Server.GetStatusImport", 0, &reply)
  781. if err != nil {
  782. fmt.Println(err)
  783. } else {
  784. fmt.Println(reply)
  785. }
  786. return nil
  787. },
  788. },
  789. },
  790. },
  791. {
  792. Name: "gettopo",
  793. Aliases: []string{"gettopo"},
  794. Usage: "gettopo rule $rule_name",
  795. Subcommands: []cli.Command{
  796. {
  797. Name: "rule",
  798. Usage: "getstopo rule $rule_name",
  799. // Flags: nflag,
  800. Action: func(c *cli.Context) error {
  801. if len(c.Args()) != 1 {
  802. fmt.Printf("Expect rule name.\n")
  803. return nil
  804. }
  805. rname := c.Args()[0]
  806. var reply string
  807. err = client.Call("Server.GetTopoRule", rname, &reply)
  808. if err != nil {
  809. fmt.Println(err)
  810. } else {
  811. fmt.Println(reply)
  812. }
  813. return nil
  814. },
  815. },
  816. },
  817. },
  818. {
  819. Name: "start",
  820. Aliases: []string{"start"},
  821. Usage: "start rule $rule_name",
  822. Subcommands: []cli.Command{
  823. {
  824. Name: "rule",
  825. Usage: "start rule $rule_name",
  826. // Flags: nflag,
  827. Action: func(c *cli.Context) error {
  828. if len(c.Args()) != 1 {
  829. fmt.Printf("Expect rule name.\n")
  830. return nil
  831. }
  832. rname := c.Args()[0]
  833. var reply string
  834. err = client.Call("Server.StartRule", rname, &reply)
  835. if err != nil {
  836. fmt.Println(err)
  837. } else {
  838. fmt.Println(reply)
  839. }
  840. return nil
  841. },
  842. },
  843. },
  844. },
  845. {
  846. Name: "stop",
  847. Aliases: []string{"stop"},
  848. Usage: "stop rule $rule_name",
  849. Subcommands: []cli.Command{
  850. {
  851. Name: "rule",
  852. Usage: "stop rule $rule_name",
  853. // Flags: nflag,
  854. Action: func(c *cli.Context) error {
  855. if len(c.Args()) != 1 {
  856. fmt.Printf("Expect rule name.\n")
  857. return nil
  858. }
  859. rname := c.Args()[0]
  860. var reply string
  861. err = client.Call("Server.StopRule", rname, &reply)
  862. if err != nil {
  863. fmt.Println(err)
  864. } else {
  865. fmt.Println(reply)
  866. }
  867. return nil
  868. },
  869. },
  870. },
  871. },
  872. {
  873. Name: "restart",
  874. Aliases: []string{"restart"},
  875. Usage: "restart rule $rule_name",
  876. Subcommands: []cli.Command{
  877. {
  878. Name: "rule",
  879. Usage: "restart rule $rule_name",
  880. // Flags: nflag,
  881. Action: func(c *cli.Context) error {
  882. if len(c.Args()) != 1 {
  883. fmt.Printf("Expect rule name.\n")
  884. return nil
  885. }
  886. rname := c.Args()[0]
  887. var reply string
  888. err = client.Call("Server.RestartRule", rname, &reply)
  889. if err != nil {
  890. fmt.Println(err)
  891. } else {
  892. fmt.Println(reply)
  893. }
  894. return nil
  895. },
  896. },
  897. },
  898. },
  899. {
  900. Name: "register",
  901. Aliases: []string{"register"},
  902. Usage: "register plugin function $plugin_name [$plugin_json | -f plugin_def_file]",
  903. Subcommands: []cli.Command{
  904. {
  905. Name: "plugin",
  906. Usage: "register plugin $plugin_type $plugin_name [$plugin_json | -f plugin_def_file]",
  907. Flags: []cli.Flag{
  908. cli.StringFlag{
  909. Name: "file, f",
  910. Usage: "the location of plugin functions definition file",
  911. FilePath: "/home/myplugin.txt",
  912. },
  913. },
  914. Action: func(c *cli.Context) error {
  915. if len(c.Args()) < 2 {
  916. fmt.Printf("Expect plugin type and name.\n")
  917. return nil
  918. }
  919. ptype := c.Args()[0]
  920. if !strings.EqualFold(ptype, "function") {
  921. fmt.Printf("Plugin type must be function.\n")
  922. return nil
  923. }
  924. pname := c.Args()[1]
  925. sfile := c.String("file")
  926. args := &model.PluginDesc{
  927. RPCArgDesc: model.RPCArgDesc{
  928. Name: pname,
  929. },
  930. }
  931. if sfile != "" {
  932. if len(c.Args()) != 2 {
  933. fmt.Printf("Expect plugin type, name.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  934. return nil
  935. }
  936. if p, err := readDef(sfile, "plugin"); err != nil {
  937. fmt.Printf("%s", err)
  938. return nil
  939. } else {
  940. args.Json = string(p)
  941. }
  942. } else {
  943. if len(c.Args()) != 3 {
  944. fmt.Printf("Expect plugin type, name and json.\nBut found %d args:%s.\n", len(c.Args()), c.Args())
  945. return nil
  946. }
  947. args.Json = c.Args()[2]
  948. }
  949. var reply string
  950. err = client.Call("Server.RegisterPlugin", args, &reply)
  951. if err != nil {
  952. fmt.Println(err)
  953. } else {
  954. fmt.Println(reply)
  955. }
  956. return nil
  957. },
  958. },
  959. },
  960. },
  961. {
  962. Name: "import",
  963. Aliases: []string{"import"},
  964. Usage: "import ruleset | data -f file -p partial -s stop",
  965. Subcommands: []cli.Command{
  966. {
  967. Name: "ruleset",
  968. Usage: "\"import ruleset -f ruleset_file",
  969. Flags: []cli.Flag{
  970. cli.StringFlag{
  971. Name: "file, f",
  972. Usage: "the location of the ruleset json file",
  973. FilePath: "/home/ekuiper_ruleset.json",
  974. },
  975. },
  976. Action: func(c *cli.Context) error {
  977. sfile := c.String("file")
  978. if sfile == "" {
  979. fmt.Print("Required ruleset json file to import")
  980. return nil
  981. }
  982. var reply string
  983. err = client.Call("Server.Import", sfile, &reply)
  984. if err != nil {
  985. fmt.Println(err)
  986. } else {
  987. fmt.Println(reply)
  988. }
  989. return nil
  990. },
  991. },
  992. {
  993. Name: "data",
  994. Usage: "\"import data -f configuration_file -p partial -s stop",
  995. Flags: []cli.Flag{
  996. cli.StringFlag{
  997. Name: "file, f",
  998. Usage: "the location of the configuration json file",
  999. FilePath: "/home/ekuiper_configuration.json",
  1000. },
  1001. cli.StringFlag{
  1002. Name: "stop, s",
  1003. Usage: "stop kuiper after the action",
  1004. },
  1005. cli.StringFlag{
  1006. Name: "partial, p",
  1007. Usage: "import partial configuration",
  1008. },
  1009. },
  1010. Action: func(c *cli.Context) error {
  1011. sfile := c.String("file")
  1012. if sfile == "" {
  1013. fmt.Print("Required configuration json file to import")
  1014. return nil
  1015. }
  1016. r := c.String("stop")
  1017. if r != "true" && r != "false" {
  1018. fmt.Printf("Expect s flag to be a boolean value.\n")
  1019. return nil
  1020. }
  1021. p := c.String("partial")
  1022. if p != "true" && p != "false" {
  1023. fmt.Printf("Expect p flag to be a boolean value.\n")
  1024. return nil
  1025. }
  1026. args := &model.ImportDataDesc{
  1027. FileName: sfile,
  1028. Stop: r == "true",
  1029. Partial: p == "true",
  1030. }
  1031. var reply string
  1032. err = client.Call("Server.ImportConfiguration", args, &reply)
  1033. if err != nil {
  1034. fmt.Println(err)
  1035. } else {
  1036. fmt.Println(reply)
  1037. }
  1038. return nil
  1039. },
  1040. },
  1041. },
  1042. },
  1043. {
  1044. Name: "export",
  1045. Aliases: []string{"export"},
  1046. Usage: "export ruleset | data $ruleset_file [ -r rules ]",
  1047. Subcommands: []cli.Command{
  1048. {
  1049. Name: "ruleset",
  1050. Usage: "\"export ruleset $ruleset_file",
  1051. Action: func(c *cli.Context) error {
  1052. if len(c.Args()) < 1 {
  1053. fmt.Printf("Require exported file name.\n")
  1054. return nil
  1055. }
  1056. var reply string
  1057. err = client.Call("Server.Export", c.Args()[0], &reply)
  1058. if err != nil {
  1059. fmt.Println(err)
  1060. } else {
  1061. fmt.Println(reply)
  1062. }
  1063. return nil
  1064. },
  1065. },
  1066. {
  1067. Name: "data",
  1068. Usage: "export data $configuration_file [ -r rules ]",
  1069. Flags: []cli.Flag{
  1070. cli.StringFlag{
  1071. Name: "rules, r",
  1072. Usage: "the rules id in json array format",
  1073. },
  1074. },
  1075. Action: func(c *cli.Context) error {
  1076. args := model.ExportDataDesc{
  1077. Rules: []string{},
  1078. FileName: "",
  1079. }
  1080. rulesArray := c.String("rules")
  1081. if rulesArray != "" {
  1082. var rules []string
  1083. err := json.Unmarshal(cast.StringToBytes(rulesArray), &rules)
  1084. if err != nil {
  1085. fmt.Printf("rules %s unmarshal error %s", rules, err)
  1086. return nil
  1087. }
  1088. args.Rules = rules
  1089. if len(c.Args()) != 1 {
  1090. fmt.Printf("Expect configuration file.\n")
  1091. return nil
  1092. }
  1093. args.FileName = c.Args()[0]
  1094. var reply string
  1095. err = client.Call("Server.ExportConfiguration", args, &reply)
  1096. if err != nil {
  1097. fmt.Println(err)
  1098. } else {
  1099. fmt.Println(reply)
  1100. }
  1101. } else {
  1102. if len(c.Args()) != 1 {
  1103. fmt.Printf("Expect configuration file.\n")
  1104. return nil
  1105. }
  1106. args.FileName = c.Args()[0]
  1107. var reply string
  1108. err = client.Call("Server.ExportConfiguration", args, &reply)
  1109. if err != nil {
  1110. fmt.Println(err)
  1111. } else {
  1112. fmt.Println(reply)
  1113. }
  1114. }
  1115. return nil
  1116. },
  1117. },
  1118. },
  1119. },
  1120. }
  1121. app.Name = "Kuiper"
  1122. app.Usage = "The command line tool for EMQ X Kuiper."
  1123. app.Action = func(c *cli.Context) error {
  1124. cli.ShowSubcommandHelp(c)
  1125. // cli.ShowVersion(c)
  1126. return nil
  1127. }
  1128. sort.Sort(cli.FlagsByName(app.Flags))
  1129. sort.Sort(cli.CommandsByName(app.Commands))
  1130. err = app.Run(os.Args)
  1131. if err != nil {
  1132. fmt.Printf("%v", err)
  1133. }
  1134. }
  1135. func getPluginType(arg string) (ptype int, err error) {
  1136. switch arg {
  1137. case "source":
  1138. ptype = 0
  1139. case "sink":
  1140. ptype = 1
  1141. case "function":
  1142. ptype = 2
  1143. case "portable":
  1144. ptype = 3
  1145. case "wasm":
  1146. ptype = 4
  1147. default:
  1148. err = fmt.Errorf("Invalid plugin type %s, should be \"source\", \"sink\", \"function\" or \"portable\" or \"wasm\".\n", arg)
  1149. }
  1150. return
  1151. }
  1152. func readDef(sfile string, t string) ([]byte, error) {
  1153. if _, err := os.Stat(sfile); os.IsNotExist(err) {
  1154. return nil, fmt.Errorf("The specified %s defenition file %s is not existed.\n", t, sfile)
  1155. }
  1156. fmt.Printf("Creating a new %s from file %s.\n", t, sfile)
  1157. if rule, err := os.ReadFile(sfile); err != nil {
  1158. return nil, fmt.Errorf("Failed to read from %s definition file %s.\n", t, sfile)
  1159. } else {
  1160. return rule, nil
  1161. }
  1162. }