rpc.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build rpc || !core
  15. // +build rpc !core
  16. package server
  17. import (
  18. "bytes"
  19. "context"
  20. "encoding/json"
  21. "fmt"
  22. "io"
  23. "net/http"
  24. "net/rpc"
  25. "os"
  26. "strings"
  27. "time"
  28. "github.com/lf-edge/ekuiper/internal/conf"
  29. "github.com/lf-edge/ekuiper/internal/io/sink"
  30. "github.com/lf-edge/ekuiper/internal/pkg/model"
  31. "github.com/lf-edge/ekuiper/internal/topo/rule"
  32. "github.com/lf-edge/ekuiper/pkg/cast"
  33. "github.com/lf-edge/ekuiper/pkg/infra"
  34. )
  35. const QueryRuleId = "internal-ekuiper_query_rule"
  36. func init() {
  37. servers["rpc"] = &rpcComp{}
  38. }
  39. type rpcComp struct {
  40. s *http.Server
  41. }
  42. func (r *rpcComp) register() {}
  43. func (r *rpcComp) serve() {
  44. // Start rpc service
  45. server := new(Server)
  46. portRpc := conf.Config.Basic.Port
  47. ipRpc := conf.Config.Basic.Ip
  48. rpcSrv := rpc.NewServer()
  49. err := rpcSrv.Register(server)
  50. if err != nil {
  51. logger.Fatal("Format of service Server isn'restHttpType correct. ", err)
  52. }
  53. srvRpc := &http.Server{
  54. Addr: cast.JoinHostPortInt(ipRpc, portRpc),
  55. WriteTimeout: time.Second * 15,
  56. ReadTimeout: time.Second * 15,
  57. IdleTimeout: time.Second * 60,
  58. Handler: rpcSrv,
  59. }
  60. r.s = srvRpc
  61. go func() {
  62. if err = srvRpc.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  63. logger.Fatal("Error serving rpc service:", err)
  64. }
  65. }()
  66. initQuery()
  67. }
  68. func (r *rpcComp) close() {
  69. if r.s != nil {
  70. if err := r.s.Shutdown(context.TODO()); err != nil {
  71. logger.Errorf("rpc server shutdown error: %v", err)
  72. }
  73. logger.Info("rpc server shutdown.")
  74. }
  75. }
  76. type Server int
  77. func (t *Server) CreateQuery(sql string, reply *string) error {
  78. if _, ok := registry.Load(QueryRuleId); ok {
  79. stopQuery()
  80. }
  81. tp, err := ruleProcessor.ExecQuery(QueryRuleId, sql)
  82. if err != nil {
  83. return err
  84. } else {
  85. rs := &rule.RuleState{RuleId: QueryRuleId, Topology: tp}
  86. registry.Store(QueryRuleId, rs)
  87. msg := fmt.Sprintf("Query was submit successfully.")
  88. logger.Println(msg)
  89. *reply = fmt.Sprint(msg)
  90. }
  91. return nil
  92. }
  93. func stopQuery() {
  94. if rs, ok := registry.Load(QueryRuleId); ok {
  95. logger.Printf("stop the query.")
  96. (*rs.Topology).Cancel()
  97. registry.Delete(QueryRuleId)
  98. }
  99. }
  100. /**
  101. * qid is not currently used.
  102. */
  103. func (t *Server) GetQueryResult(_ string, reply *string) error {
  104. if rs, ok := registry.Load(QueryRuleId); ok {
  105. c := (*rs.Topology).GetContext()
  106. if c != nil && c.Err() != nil {
  107. return c.Err()
  108. }
  109. }
  110. sink.QR.LastFetch = time.Now()
  111. sink.QR.Mux.Lock()
  112. if len(sink.QR.Results) > 0 {
  113. *reply = strings.Join(sink.QR.Results, "\n")
  114. sink.QR.Results = make([]string, 0, 10)
  115. } else {
  116. *reply = ""
  117. }
  118. sink.QR.Mux.Unlock()
  119. return nil
  120. }
  121. func (t *Server) Stream(stream string, reply *string) error {
  122. content, err := streamProcessor.ExecStmt(stream)
  123. if err != nil {
  124. return fmt.Errorf("Stream command error: %s", err)
  125. } else {
  126. for _, c := range content {
  127. *reply = *reply + fmt.Sprintln(c)
  128. }
  129. }
  130. return nil
  131. }
  132. func (t *Server) CreateRule(rule *model.RPCArgDesc, reply *string) error {
  133. id, err := createRule(rule.Name, rule.Json)
  134. if err != nil {
  135. return fmt.Errorf("Create rule %s error : %s.", id, err)
  136. } else {
  137. *reply = fmt.Sprintf("Rule %s was created successfully, please use 'bin/kuiper getstatus rule %s' command to get rule status.", rule.Name, rule.Name)
  138. }
  139. return nil
  140. }
  141. func (t *Server) GetStatusRule(name string, reply *string) error {
  142. if r, err := getRuleStatus(name); err != nil {
  143. return err
  144. } else {
  145. *reply = r
  146. }
  147. return nil
  148. }
  149. func (t *Server) GetTopoRule(name string, reply *string) error {
  150. if r, err := getRuleTopo(name); err != nil {
  151. return err
  152. } else {
  153. dst := &bytes.Buffer{}
  154. if err = json.Indent(dst, cast.StringToBytes(r), "", " "); err != nil {
  155. *reply = r
  156. } else {
  157. *reply = dst.String()
  158. }
  159. }
  160. return nil
  161. }
  162. func (t *Server) StartRule(name string, reply *string) error {
  163. if err := startRule(name); err != nil {
  164. return err
  165. } else {
  166. *reply = fmt.Sprintf("Rule %s was started", name)
  167. }
  168. return nil
  169. }
  170. func (t *Server) StopRule(name string, reply *string) error {
  171. *reply = stopRule(name)
  172. return nil
  173. }
  174. func (t *Server) RestartRule(name string, reply *string) error {
  175. err := restartRule(name)
  176. if err != nil {
  177. return err
  178. }
  179. *reply = fmt.Sprintf("Rule %s was restarted.", name)
  180. return nil
  181. }
  182. func (t *Server) DescRule(name string, reply *string) error {
  183. r, err := ruleProcessor.ExecDesc(name)
  184. if err != nil {
  185. return fmt.Errorf("Desc rule error : %s.", err)
  186. } else {
  187. *reply = r
  188. }
  189. return nil
  190. }
  191. func (t *Server) ShowRules(_ int, reply *string) error {
  192. r, err := getAllRulesWithStatus()
  193. if err != nil {
  194. return fmt.Errorf("Show rule error : %s.", err)
  195. }
  196. if len(r) == 0 {
  197. *reply = "No rule definitions are found."
  198. } else {
  199. result, err := json.Marshal(r)
  200. if err != nil {
  201. return fmt.Errorf("Show rule error : %s.", err)
  202. }
  203. dst := &bytes.Buffer{}
  204. if err := json.Indent(dst, result, "", " "); err != nil {
  205. return fmt.Errorf("Show rule error : %s.", err)
  206. }
  207. *reply = dst.String()
  208. }
  209. return nil
  210. }
  211. func (t *Server) DropRule(name string, reply *string) error {
  212. deleteRule(name)
  213. r, err := ruleProcessor.ExecDrop(name)
  214. if err != nil {
  215. return fmt.Errorf("Drop rule error : %s.", err)
  216. } else {
  217. err := t.StopRule(name, reply)
  218. if err != nil {
  219. return err
  220. }
  221. }
  222. *reply = r
  223. return nil
  224. }
  225. func (t *Server) ValidateRule(rule *model.RPCArgDesc, reply *string) error {
  226. s, err := validateRule(rule.Name, rule.Json)
  227. if s {
  228. *reply = "The rule has been successfully validated and is confirmed to be correct."
  229. } else {
  230. *reply = err.Error()
  231. }
  232. return nil
  233. }
  234. func (t *Server) Import(file string, reply *string) error {
  235. f, err := os.Open(file)
  236. if err != nil {
  237. return fmt.Errorf("fail to read file %s: %v", file, err)
  238. }
  239. defer f.Close()
  240. buf := new(bytes.Buffer)
  241. _, err = io.Copy(buf, f)
  242. if err != nil {
  243. return fmt.Errorf("fail to convert file %s: %v", file, err)
  244. }
  245. content := buf.Bytes()
  246. rules, counts, err := rulesetProcessor.Import(content)
  247. if err != nil {
  248. return fmt.Errorf("import ruleset error: %v", err)
  249. }
  250. infra.SafeRun(func() error {
  251. for _, name := range rules {
  252. rul, ee := ruleProcessor.GetRuleById(name)
  253. if ee != nil {
  254. logger.Error(ee)
  255. continue
  256. }
  257. reply := recoverRule(rul)
  258. if reply != "" {
  259. logger.Error(reply)
  260. }
  261. }
  262. return nil
  263. })
  264. *reply = fmt.Sprintf("imported %d streams, %d tables and %d rules", counts[0], counts[1], counts[2])
  265. return nil
  266. }
  267. func (t *Server) Export(file string, reply *string) error {
  268. f, err := os.Create(file)
  269. if err != nil {
  270. return err
  271. }
  272. exported, counts, err := rulesetProcessor.Export()
  273. if err != nil {
  274. return err
  275. }
  276. _, err = io.Copy(f, exported)
  277. if err != nil {
  278. return fmt.Errorf("fail to save to file %s:%v", file, err)
  279. }
  280. *reply = fmt.Sprintf("exported %d streams, %d tables and %d rules", counts[0], counts[1], counts[2])
  281. return nil
  282. }
  283. func (t *Server) ImportConfiguration(arg *model.ImportDataDesc, reply *string) error {
  284. file := arg.FileName
  285. f, err := os.Open(file)
  286. if err != nil {
  287. return fmt.Errorf("fail to read file %s: %v", file, err)
  288. }
  289. defer f.Close()
  290. buf := new(bytes.Buffer)
  291. _, err = io.Copy(buf, f)
  292. if err != nil {
  293. return fmt.Errorf("fail to convert file %s: %v", file, err)
  294. }
  295. content := buf.Bytes()
  296. partial := arg.Partial
  297. var result ImportConfigurationStatus
  298. if !partial {
  299. configurationReset()
  300. result = configurationImport(content, arg.Stop)
  301. } else {
  302. result = configurationPartialImport(content)
  303. }
  304. marshal, _ := json.Marshal(result)
  305. dst := &bytes.Buffer{}
  306. if err := json.Indent(dst, marshal, "", " "); err != nil {
  307. return fmt.Errorf("import configuration error: %v", err)
  308. }
  309. *reply = dst.String()
  310. return nil
  311. }
  312. func (t *Server) GetStatusImport(_ int, reply *string) error {
  313. jsonRsp := configurationStatusExport()
  314. result, err := json.Marshal(jsonRsp)
  315. if err != nil {
  316. return fmt.Errorf("Show rule error : %s.", err)
  317. }
  318. dst := &bytes.Buffer{}
  319. if err := json.Indent(dst, result, "", " "); err != nil {
  320. return fmt.Errorf("Show rule error : %s.", err)
  321. }
  322. *reply = dst.String()
  323. return nil
  324. }
  325. func (t *Server) ExportConfiguration(arg *model.ExportDataDesc, reply *string) error {
  326. rules := arg.Rules
  327. file := arg.FileName
  328. f, err := os.Create(file)
  329. if err != nil {
  330. return err
  331. }
  332. var jsonBytes []byte
  333. // do not specify rules, export all
  334. if len(rules) == 0 {
  335. jsonBytes, err = configurationExport()
  336. } else {
  337. jsonBytes, err = ruleMigrationProcessor.ConfigurationPartialExport(rules)
  338. }
  339. if err != nil {
  340. return err
  341. }
  342. _, err = io.Copy(f, bytes.NewReader(jsonBytes))
  343. if err != nil {
  344. return fmt.Errorf("fail to save to file %s:%v", file, err)
  345. }
  346. *reply = fmt.Sprintf("export configuration success")
  347. return nil
  348. }
  349. func marshalDesc(m interface{}) (string, error) {
  350. s, err := json.Marshal(m)
  351. if err != nil {
  352. return "", fmt.Errorf("invalid json %v", m)
  353. }
  354. dst := &bytes.Buffer{}
  355. if err := json.Indent(dst, s, "", " "); err != nil {
  356. return "", fmt.Errorf("indent json error %v", err)
  357. }
  358. return dst.String(), nil
  359. }
  360. func initQuery() {
  361. ticker := time.NewTicker(time.Second * 5)
  362. go infra.SafeRun(func() error {
  363. for {
  364. <-ticker.C
  365. if registry != nil {
  366. if _, ok := registry.Load(QueryRuleId); !ok {
  367. continue
  368. }
  369. n := time.Now()
  370. w := 10 * time.Second
  371. if v := n.Sub(sink.QR.LastFetch); v >= w {
  372. logger.Printf("The client seems no longer fetch the query result, stop the query now.")
  373. stopQuery()
  374. }
  375. }
  376. }
  377. })
  378. }