rpc.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build rpc || !core
  15. // +build rpc !core
  16. package server
  17. import (
  18. "bytes"
  19. "context"
  20. "encoding/json"
  21. "fmt"
  22. "github.com/lf-edge/ekuiper/internal/conf"
  23. "github.com/lf-edge/ekuiper/internal/pkg/model"
  24. "github.com/lf-edge/ekuiper/internal/topo/rule"
  25. "github.com/lf-edge/ekuiper/internal/topo/sink"
  26. "github.com/lf-edge/ekuiper/pkg/infra"
  27. "io"
  28. "net/http"
  29. "net/rpc"
  30. "os"
  31. "strings"
  32. "time"
  33. )
  34. const QueryRuleId = "internal-ekuiper_query_rule"
  35. func init() {
  36. servers["rpc"] = rpcComp{}
  37. }
  38. type rpcComp struct {
  39. s *http.Server
  40. }
  41. func (r rpcComp) register() {}
  42. func (r rpcComp) serve() {
  43. // Start rpc service
  44. server := new(Server)
  45. portRpc := conf.Config.Basic.Port
  46. ipRpc := conf.Config.Basic.Ip
  47. rpcSrv := rpc.NewServer()
  48. err := rpcSrv.Register(server)
  49. if err != nil {
  50. logger.Fatal("Format of service Server isn'restHttpType correct. ", err)
  51. }
  52. srvRpc := &http.Server{
  53. Addr: fmt.Sprintf("%s:%d", ipRpc, portRpc),
  54. WriteTimeout: time.Second * 15,
  55. ReadTimeout: time.Second * 15,
  56. IdleTimeout: time.Second * 60,
  57. Handler: rpcSrv,
  58. }
  59. r.s = srvRpc
  60. go func() {
  61. if err = srvRpc.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  62. logger.Fatal("Error serving rpc service:", err)
  63. }
  64. }()
  65. }
  66. func (r rpcComp) close() {
  67. if r.s != nil {
  68. if err := r.s.Shutdown(context.TODO()); err != nil {
  69. logger.Errorf("rpc server shutdown error: %v", err)
  70. }
  71. logger.Info("rpc server shutdown.")
  72. }
  73. }
  74. type Server int
  75. func (t *Server) CreateQuery(sql string, reply *string) error {
  76. if _, ok := registry.Load(QueryRuleId); ok {
  77. stopQuery()
  78. }
  79. tp, err := ruleProcessor.ExecQuery(QueryRuleId, sql)
  80. if err != nil {
  81. return err
  82. } else {
  83. rs := &rule.RuleState{RuleId: QueryRuleId, Topology: tp}
  84. registry.Store(QueryRuleId, rs)
  85. msg := fmt.Sprintf("Query was submit successfully.")
  86. logger.Println(msg)
  87. *reply = fmt.Sprintf(msg)
  88. }
  89. return nil
  90. }
  91. func stopQuery() {
  92. if rs, ok := registry.Load(QueryRuleId); ok {
  93. logger.Printf("stop the query.")
  94. (*rs.Topology).Cancel()
  95. registry.Delete(QueryRuleId)
  96. }
  97. }
  98. /**
  99. * qid is not currently used.
  100. */
  101. func (t *Server) GetQueryResult(_ string, reply *string) error {
  102. if rs, ok := registry.Load(QueryRuleId); ok {
  103. c := (*rs.Topology).GetContext()
  104. if c != nil && c.Err() != nil {
  105. return c.Err()
  106. }
  107. }
  108. sink.QR.LastFetch = time.Now()
  109. sink.QR.Mux.Lock()
  110. if len(sink.QR.Results) > 0 {
  111. *reply = strings.Join(sink.QR.Results, "\n")
  112. sink.QR.Results = make([]string, 0, 10)
  113. } else {
  114. *reply = ""
  115. }
  116. sink.QR.Mux.Unlock()
  117. return nil
  118. }
  119. func (t *Server) Stream(stream string, reply *string) error {
  120. content, err := streamProcessor.ExecStmt(stream)
  121. if err != nil {
  122. return fmt.Errorf("Stream command error: %s", err)
  123. } else {
  124. for _, c := range content {
  125. *reply = *reply + fmt.Sprintln(c)
  126. }
  127. }
  128. return nil
  129. }
  130. func (t *Server) CreateRule(rule *model.RPCArgDesc, reply *string) error {
  131. id, err := createRule(rule.Name, rule.Json)
  132. if err != nil {
  133. return fmt.Errorf("Create rule %s error : %s.", id, err)
  134. } else {
  135. *reply = fmt.Sprintf("Rule %s was created successfully, please use 'bin/kuiper getstatus rule %s' command to get rule status.", rule.Name, rule.Name)
  136. }
  137. return nil
  138. }
  139. func (t *Server) GetStatusRule(name string, reply *string) error {
  140. if r, err := getRuleStatus(name); err != nil {
  141. return err
  142. } else {
  143. *reply = r
  144. }
  145. return nil
  146. }
  147. func (t *Server) GetTopoRule(name string, reply *string) error {
  148. if r, err := getRuleTopo(name); err != nil {
  149. return err
  150. } else {
  151. dst := &bytes.Buffer{}
  152. if err = json.Indent(dst, []byte(r), "", " "); err != nil {
  153. *reply = r
  154. } else {
  155. *reply = dst.String()
  156. }
  157. }
  158. return nil
  159. }
  160. func (t *Server) StartRule(name string, reply *string) error {
  161. if err := startRule(name); err != nil {
  162. return err
  163. } else {
  164. *reply = fmt.Sprintf("Rule %s was started", name)
  165. }
  166. return nil
  167. }
  168. func (t *Server) StopRule(name string, reply *string) error {
  169. *reply = stopRule(name)
  170. return nil
  171. }
  172. func (t *Server) RestartRule(name string, reply *string) error {
  173. err := restartRule(name)
  174. if err != nil {
  175. return err
  176. }
  177. *reply = fmt.Sprintf("Rule %s was restarted.", name)
  178. return nil
  179. }
  180. func (t *Server) DescRule(name string, reply *string) error {
  181. r, err := ruleProcessor.ExecDesc(name)
  182. if err != nil {
  183. return fmt.Errorf("Desc rule error : %s.", err)
  184. } else {
  185. *reply = r
  186. }
  187. return nil
  188. }
  189. func (t *Server) ShowRules(_ int, reply *string) error {
  190. r, err := getAllRulesWithStatus()
  191. if err != nil {
  192. return fmt.Errorf("Show rule error : %s.", err)
  193. }
  194. if len(r) == 0 {
  195. *reply = "No rule definitions are found."
  196. } else {
  197. result, err := json.Marshal(r)
  198. if err != nil {
  199. return fmt.Errorf("Show rule error : %s.", err)
  200. }
  201. dst := &bytes.Buffer{}
  202. if err := json.Indent(dst, result, "", " "); err != nil {
  203. return fmt.Errorf("Show rule error : %s.", err)
  204. }
  205. *reply = dst.String()
  206. }
  207. return nil
  208. }
  209. func (t *Server) DropRule(name string, reply *string) error {
  210. deleteRule(name)
  211. r, err := ruleProcessor.ExecDrop(name)
  212. if err != nil {
  213. return fmt.Errorf("Drop rule error : %s.", err)
  214. } else {
  215. err := t.StopRule(name, reply)
  216. if err != nil {
  217. return err
  218. }
  219. }
  220. *reply = r
  221. return nil
  222. }
  223. func (t *Server) Import(file string, reply *string) error {
  224. f, err := os.Open(file)
  225. if err != nil {
  226. return fmt.Errorf("fail to read file %s: %v", file, err)
  227. }
  228. defer f.Close()
  229. buf := new(bytes.Buffer)
  230. _, err = io.Copy(buf, f)
  231. if err != nil {
  232. return fmt.Errorf("fail to convert file %s: %v", file, err)
  233. }
  234. content := buf.Bytes()
  235. rules, counts, err := rulesetProcessor.Import(content)
  236. if err != nil {
  237. return fmt.Errorf("import ruleset error: %v", err)
  238. }
  239. infra.SafeRun(func() error {
  240. for _, name := range rules {
  241. rul, ee := ruleProcessor.GetRuleById(name)
  242. if ee != nil {
  243. logger.Error(ee)
  244. continue
  245. }
  246. reply := recoverRule(rul)
  247. if reply != "" {
  248. logger.Error(reply)
  249. }
  250. }
  251. return nil
  252. })
  253. *reply = fmt.Sprintf("imported %d streams, %d tables and %d rules", counts[0], counts[1], counts[2])
  254. return nil
  255. }
  256. func (t *Server) Export(file string, reply *string) error {
  257. f, err := os.Create(file)
  258. if err != nil {
  259. return err
  260. }
  261. exported, counts, err := rulesetProcessor.Export()
  262. if err != nil {
  263. return err
  264. }
  265. _, err = io.Copy(f, exported)
  266. if err != nil {
  267. return fmt.Errorf("fail to save to file %s:%v", file, err)
  268. }
  269. *reply = fmt.Sprintf("exported %d streams, %d tables and %d rules", counts[0], counts[1], counts[2])
  270. return nil
  271. }
  272. func marshalDesc(m interface{}) (string, error) {
  273. s, err := json.Marshal(m)
  274. if err != nil {
  275. return "", fmt.Errorf("invalid json %v", m)
  276. }
  277. dst := &bytes.Buffer{}
  278. if err := json.Indent(dst, s, "", " "); err != nil {
  279. return "", fmt.Errorf("indent json error %v", err)
  280. }
  281. return dst.String(), nil
  282. }
  283. func init() {
  284. ticker := time.NewTicker(time.Second * 5)
  285. go infra.SafeRun(func() error {
  286. for {
  287. <-ticker.C
  288. if registry != nil {
  289. if _, ok := registry.Load(QueryRuleId); !ok {
  290. continue
  291. }
  292. n := time.Now()
  293. w := 10 * time.Second
  294. if v := n.Sub(sink.QR.LastFetch); v >= w {
  295. logger.Printf("The client seems no longer fetch the query result, stop the query now.")
  296. stopQuery()
  297. }
  298. }
  299. }
  300. })
  301. }