rpc.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build rpc || !core
  15. // +build rpc !core
  16. package server
  17. import (
  18. "bytes"
  19. "context"
  20. "encoding/json"
  21. "fmt"
  22. "github.com/lf-edge/ekuiper/internal/conf"
  23. "github.com/lf-edge/ekuiper/internal/io/sink"
  24. "github.com/lf-edge/ekuiper/internal/pkg/model"
  25. "github.com/lf-edge/ekuiper/internal/topo/rule"
  26. "github.com/lf-edge/ekuiper/pkg/infra"
  27. "io"
  28. "net/http"
  29. "net/rpc"
  30. "os"
  31. "strings"
  32. "time"
  33. )
  34. const QueryRuleId = "internal-ekuiper_query_rule"
  35. func init() {
  36. servers["rpc"] = rpcComp{}
  37. }
  38. type rpcComp struct {
  39. s *http.Server
  40. }
  41. func (r rpcComp) register() {}
  42. func (r rpcComp) serve() {
  43. // Start rpc service
  44. server := new(Server)
  45. portRpc := conf.Config.Basic.Port
  46. ipRpc := conf.Config.Basic.Ip
  47. rpcSrv := rpc.NewServer()
  48. err := rpcSrv.Register(server)
  49. if err != nil {
  50. logger.Fatal("Format of service Server isn'restHttpType correct. ", err)
  51. }
  52. srvRpc := &http.Server{
  53. Addr: fmt.Sprintf("%s:%d", ipRpc, portRpc),
  54. WriteTimeout: time.Second * 15,
  55. ReadTimeout: time.Second * 15,
  56. IdleTimeout: time.Second * 60,
  57. Handler: rpcSrv,
  58. }
  59. r.s = srvRpc
  60. go func() {
  61. if err = srvRpc.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  62. logger.Fatal("Error serving rpc service:", err)
  63. }
  64. }()
  65. initQuery()
  66. }
  67. func (r rpcComp) close() {
  68. if r.s != nil {
  69. if err := r.s.Shutdown(context.TODO()); err != nil {
  70. logger.Errorf("rpc server shutdown error: %v", err)
  71. }
  72. logger.Info("rpc server shutdown.")
  73. }
  74. }
  75. type Server int
  76. func (t *Server) CreateQuery(sql string, reply *string) error {
  77. if _, ok := registry.Load(QueryRuleId); ok {
  78. stopQuery()
  79. }
  80. tp, err := ruleProcessor.ExecQuery(QueryRuleId, sql)
  81. if err != nil {
  82. return err
  83. } else {
  84. rs := &rule.RuleState{RuleId: QueryRuleId, Topology: tp}
  85. registry.Store(QueryRuleId, rs)
  86. msg := fmt.Sprintf("Query was submit successfully.")
  87. logger.Println(msg)
  88. *reply = fmt.Sprintf(msg)
  89. }
  90. return nil
  91. }
  92. func stopQuery() {
  93. if rs, ok := registry.Load(QueryRuleId); ok {
  94. logger.Printf("stop the query.")
  95. (*rs.Topology).Cancel()
  96. registry.Delete(QueryRuleId)
  97. }
  98. }
  99. /**
  100. * qid is not currently used.
  101. */
  102. func (t *Server) GetQueryResult(_ string, reply *string) error {
  103. if rs, ok := registry.Load(QueryRuleId); ok {
  104. c := (*rs.Topology).GetContext()
  105. if c != nil && c.Err() != nil {
  106. return c.Err()
  107. }
  108. }
  109. sink.QR.LastFetch = time.Now()
  110. sink.QR.Mux.Lock()
  111. if len(sink.QR.Results) > 0 {
  112. *reply = strings.Join(sink.QR.Results, "\n")
  113. sink.QR.Results = make([]string, 0, 10)
  114. } else {
  115. *reply = ""
  116. }
  117. sink.QR.Mux.Unlock()
  118. return nil
  119. }
  120. func (t *Server) Stream(stream string, reply *string) error {
  121. content, err := streamProcessor.ExecStmt(stream)
  122. if err != nil {
  123. return fmt.Errorf("Stream command error: %s", err)
  124. } else {
  125. for _, c := range content {
  126. *reply = *reply + fmt.Sprintln(c)
  127. }
  128. }
  129. return nil
  130. }
  131. func (t *Server) CreateRule(rule *model.RPCArgDesc, reply *string) error {
  132. id, err := createRule(rule.Name, rule.Json)
  133. if err != nil {
  134. return fmt.Errorf("Create rule %s error : %s.", id, err)
  135. } else {
  136. *reply = fmt.Sprintf("Rule %s was created successfully, please use 'bin/kuiper getstatus rule %s' command to get rule status.", rule.Name, rule.Name)
  137. }
  138. return nil
  139. }
  140. func (t *Server) GetStatusRule(name string, reply *string) error {
  141. if r, err := getRuleStatus(name); err != nil {
  142. return err
  143. } else {
  144. *reply = r
  145. }
  146. return nil
  147. }
  148. func (t *Server) GetTopoRule(name string, reply *string) error {
  149. if r, err := getRuleTopo(name); err != nil {
  150. return err
  151. } else {
  152. dst := &bytes.Buffer{}
  153. if err = json.Indent(dst, []byte(r), "", " "); err != nil {
  154. *reply = r
  155. } else {
  156. *reply = dst.String()
  157. }
  158. }
  159. return nil
  160. }
  161. func (t *Server) StartRule(name string, reply *string) error {
  162. if err := startRule(name); err != nil {
  163. return err
  164. } else {
  165. *reply = fmt.Sprintf("Rule %s was started", name)
  166. }
  167. return nil
  168. }
  169. func (t *Server) StopRule(name string, reply *string) error {
  170. *reply = stopRule(name)
  171. return nil
  172. }
  173. func (t *Server) RestartRule(name string, reply *string) error {
  174. err := restartRule(name)
  175. if err != nil {
  176. return err
  177. }
  178. *reply = fmt.Sprintf("Rule %s was restarted.", name)
  179. return nil
  180. }
  181. func (t *Server) DescRule(name string, reply *string) error {
  182. r, err := ruleProcessor.ExecDesc(name)
  183. if err != nil {
  184. return fmt.Errorf("Desc rule error : %s.", err)
  185. } else {
  186. *reply = r
  187. }
  188. return nil
  189. }
  190. func (t *Server) ShowRules(_ int, reply *string) error {
  191. r, err := getAllRulesWithStatus()
  192. if err != nil {
  193. return fmt.Errorf("Show rule error : %s.", err)
  194. }
  195. if len(r) == 0 {
  196. *reply = "No rule definitions are found."
  197. } else {
  198. result, err := json.Marshal(r)
  199. if err != nil {
  200. return fmt.Errorf("Show rule error : %s.", err)
  201. }
  202. dst := &bytes.Buffer{}
  203. if err := json.Indent(dst, result, "", " "); err != nil {
  204. return fmt.Errorf("Show rule error : %s.", err)
  205. }
  206. *reply = dst.String()
  207. }
  208. return nil
  209. }
  210. func (t *Server) DropRule(name string, reply *string) error {
  211. deleteRule(name)
  212. r, err := ruleProcessor.ExecDrop(name)
  213. if err != nil {
  214. return fmt.Errorf("Drop rule error : %s.", err)
  215. } else {
  216. err := t.StopRule(name, reply)
  217. if err != nil {
  218. return err
  219. }
  220. }
  221. *reply = r
  222. return nil
  223. }
  224. func (t *Server) Import(file string, reply *string) error {
  225. f, err := os.Open(file)
  226. if err != nil {
  227. return fmt.Errorf("fail to read file %s: %v", file, err)
  228. }
  229. defer f.Close()
  230. buf := new(bytes.Buffer)
  231. _, err = io.Copy(buf, f)
  232. if err != nil {
  233. return fmt.Errorf("fail to convert file %s: %v", file, err)
  234. }
  235. content := buf.Bytes()
  236. rules, counts, err := rulesetProcessor.Import(content)
  237. if err != nil {
  238. return fmt.Errorf("import ruleset error: %v", err)
  239. }
  240. infra.SafeRun(func() error {
  241. for _, name := range rules {
  242. rul, ee := ruleProcessor.GetRuleById(name)
  243. if ee != nil {
  244. logger.Error(ee)
  245. continue
  246. }
  247. reply := recoverRule(rul)
  248. if reply != "" {
  249. logger.Error(reply)
  250. }
  251. }
  252. return nil
  253. })
  254. *reply = fmt.Sprintf("imported %d streams, %d tables and %d rules", counts[0], counts[1], counts[2])
  255. return nil
  256. }
  257. func (t *Server) Export(file string, reply *string) error {
  258. f, err := os.Create(file)
  259. if err != nil {
  260. return err
  261. }
  262. exported, counts, err := rulesetProcessor.Export()
  263. if err != nil {
  264. return err
  265. }
  266. _, err = io.Copy(f, exported)
  267. if err != nil {
  268. return fmt.Errorf("fail to save to file %s:%v", file, err)
  269. }
  270. *reply = fmt.Sprintf("exported %d streams, %d tables and %d rules", counts[0], counts[1], counts[2])
  271. return nil
  272. }
  273. func (t *Server) ImportConfiguration(arg *model.ImportDataDesc, reply *string) error {
  274. file := arg.FileName
  275. f, err := os.Open(file)
  276. if err != nil {
  277. return fmt.Errorf("fail to read file %s: %v", file, err)
  278. }
  279. defer f.Close()
  280. buf := new(bytes.Buffer)
  281. _, err = io.Copy(buf, f)
  282. if err != nil {
  283. return fmt.Errorf("fail to convert file %s: %v", file, err)
  284. }
  285. content := buf.Bytes()
  286. configurationReset()
  287. err = configurationImport(content, arg.Stop)
  288. if err != nil {
  289. return fmt.Errorf("import configuration error: %v", err)
  290. }
  291. *reply = fmt.Sprintf("import configuration success")
  292. return nil
  293. }
  294. func (t *Server) GetStatusImport(_ int, reply *string) error {
  295. jsonRsp := configurationStatusExport()
  296. result, err := json.Marshal(jsonRsp)
  297. if err != nil {
  298. return fmt.Errorf("Show rule error : %s.", err)
  299. }
  300. dst := &bytes.Buffer{}
  301. if err := json.Indent(dst, result, "", " "); err != nil {
  302. return fmt.Errorf("Show rule error : %s.", err)
  303. }
  304. *reply = dst.String()
  305. return nil
  306. }
  307. func (t *Server) ExportConfiguration(file string, reply *string) error {
  308. f, err := os.Create(file)
  309. if err != nil {
  310. return err
  311. }
  312. jsonBytes, err := configurationExport()
  313. _, err = io.Copy(f, bytes.NewReader(jsonBytes))
  314. if err != nil {
  315. return fmt.Errorf("fail to save to file %s:%v", file, err)
  316. }
  317. *reply = fmt.Sprintf("export configuration success")
  318. return nil
  319. }
  320. func marshalDesc(m interface{}) (string, error) {
  321. s, err := json.Marshal(m)
  322. if err != nil {
  323. return "", fmt.Errorf("invalid json %v", m)
  324. }
  325. dst := &bytes.Buffer{}
  326. if err := json.Indent(dst, s, "", " "); err != nil {
  327. return "", fmt.Errorf("indent json error %v", err)
  328. }
  329. return dst.String(), nil
  330. }
  331. func initQuery() {
  332. ticker := time.NewTicker(time.Second * 5)
  333. go infra.SafeRun(func() error {
  334. for {
  335. <-ticker.C
  336. if registry != nil {
  337. if _, ok := registry.Load(QueryRuleId); !ok {
  338. continue
  339. }
  340. n := time.Now()
  341. w := 10 * time.Second
  342. if v := n.Sub(sink.QR.LastFetch); v >= w {
  343. logger.Printf("The client seems no longer fetch the query result, stop the query now.")
  344. stopQuery()
  345. }
  346. }
  347. }
  348. })
  349. }