rpc.go 7.6 KB


  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build rpc || !core
  15. // +build rpc !core
  16. package server
  17. import (
  18. "bytes"
  19. "context"
  20. "encoding/json"
  21. "fmt"
  22. "github.com/lf-edge/ekuiper/internal/conf"
  23. "github.com/lf-edge/ekuiper/internal/pkg/model"
  24. "github.com/lf-edge/ekuiper/internal/topo/sink"
  25. "github.com/lf-edge/ekuiper/pkg/infra"
  26. "io"
  27. "net/http"
  28. "net/rpc"
  29. "os"
  30. "strings"
  31. "time"
  32. )
  33. const QueryRuleId = "internal-ekuiper_query_rule"
  34. func init() {
  35. servers["rpc"] = rpcComp{}
  36. }
  37. type rpcComp struct {
  38. s *http.Server
  39. }
  40. func (r rpcComp) register() {}
  41. func (r rpcComp) serve() {
  42. // Start rpc service
  43. server := new(Server)
  44. portRpc := conf.Config.Basic.Port
  45. ipRpc := conf.Config.Basic.Ip
  46. rpcSrv := rpc.NewServer()
  47. err := rpcSrv.Register(server)
  48. if err != nil {
  49. logger.Fatal("Format of service Server isn'restHttpType correct. ", err)
  50. }
  51. srvRpc := &http.Server{
  52. Addr: fmt.Sprintf("%s:%d", ipRpc, portRpc),
  53. WriteTimeout: time.Second * 15,
  54. ReadTimeout: time.Second * 15,
  55. IdleTimeout: time.Second * 60,
  56. Handler: rpcSrv,
  57. }
  58. r.s = srvRpc
  59. go func() {
  60. if err = srvRpc.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  61. logger.Fatal("Error serving rpc service:", err)
  62. }
  63. }()
  64. }
  65. func (r rpcComp) close() {
  66. if r.s != nil {
  67. if err := r.s.Shutdown(context.TODO()); err != nil {
  68. logger.Errorf("rpc server shutdown error: %v", err)
  69. }
  70. logger.Info("rpc server shutdown.")
  71. }
  72. }
  73. type Server int
  74. func (t *Server) CreateQuery(sql string, reply *string) error {
  75. if _, ok := registry.Load(QueryRuleId); ok {
  76. stopQuery()
  77. }
  78. tp, err := ruleProcessor.ExecQuery(QueryRuleId, sql)
  79. if err != nil {
  80. return err
  81. } else {
  82. rs := &RuleState{RuleId: QueryRuleId, Topology: tp, Triggered: true}
  83. registry.Store(QueryRuleId, rs)
  84. msg := fmt.Sprintf("Query was submit successfully.")
  85. logger.Println(msg)
  86. *reply = fmt.Sprintf(msg)
  87. }
  88. return nil
  89. }
  90. func stopQuery() {
  91. if rs, ok := registry.Load(QueryRuleId); ok {
  92. logger.Printf("stop the query.")
  93. (*rs.Topology).Cancel()
  94. registry.Delete(QueryRuleId)
  95. }
  96. }
  97. /**
  98. * qid is not currently used.
  99. */
  100. func (t *Server) GetQueryResult(_ string, reply *string) error {
  101. if rs, ok := registry.Load(QueryRuleId); ok {
  102. c := (*rs.Topology).GetContext()
  103. if c != nil && c.Err() != nil {
  104. return c.Err()
  105. }
  106. }
  107. sink.QR.LastFetch = time.Now()
  108. sink.QR.Mux.Lock()
  109. if len(sink.QR.Results) > 0 {
  110. *reply = strings.Join(sink.QR.Results, "\n")
  111. sink.QR.Results = make([]string, 0, 10)
  112. } else {
  113. *reply = ""
  114. }
  115. sink.QR.Mux.Unlock()
  116. return nil
  117. }
  118. func (t *Server) Stream(stream string, reply *string) error {
  119. content, err := streamProcessor.ExecStmt(stream)
  120. if err != nil {
  121. return fmt.Errorf("Stream command error: %s", err)
  122. } else {
  123. for _, c := range content {
  124. *reply = *reply + fmt.Sprintln(c)
  125. }
  126. }
  127. return nil
  128. }
  129. func (t *Server) CreateRule(rule *model.RPCArgDesc, reply *string) error {
  130. r, err := ruleProcessor.ExecCreate(rule.Name, rule.Json)
  131. if err != nil {
  132. return fmt.Errorf("Create rule error : %s.", err)
  133. } else {
  134. *reply = fmt.Sprintf("Rule %s was created successfully, please use 'bin/kuiper getstatus rule %s' command to get rule status.", rule.Name, rule.Name)
  135. }
  136. //Start the rule
  137. rs, err := createRuleState(r)
  138. if err != nil {
  139. return err
  140. }
  141. err = doStartRule(rs)
  142. if err != nil {
  143. return err
  144. }
  145. return nil
  146. }
  147. func (t *Server) GetStatusRule(name string, reply *string) error {
  148. if r, err := getRuleStatus(name); err != nil {
  149. return err
  150. } else {
  151. *reply = r
  152. }
  153. return nil
  154. }
  155. func (t *Server) GetTopoRule(name string, reply *string) error {
  156. if r, err := getRuleTopo(name); err != nil {
  157. return err
  158. } else {
  159. dst := &bytes.Buffer{}
  160. if err = json.Indent(dst, []byte(r), "", " "); err != nil {
  161. *reply = r
  162. } else {
  163. *reply = dst.String()
  164. }
  165. }
  166. return nil
  167. }
  168. func (t *Server) StartRule(name string, reply *string) error {
  169. if err := startRule(name); err != nil {
  170. return err
  171. } else {
  172. *reply = fmt.Sprintf("Rule %s was started", name)
  173. }
  174. return nil
  175. }
  176. func (t *Server) StopRule(name string, reply *string) error {
  177. *reply = stopRule(name)
  178. return nil
  179. }
  180. func (t *Server) RestartRule(name string, reply *string) error {
  181. err := restartRule(name)
  182. if err != nil {
  183. return err
  184. }
  185. *reply = fmt.Sprintf("Rule %s was restarted.", name)
  186. return nil
  187. }
  188. func (t *Server) DescRule(name string, reply *string) error {
  189. r, err := ruleProcessor.ExecDesc(name)
  190. if err != nil {
  191. return fmt.Errorf("Desc rule error : %s.", err)
  192. } else {
  193. *reply = r
  194. }
  195. return nil
  196. }
  197. func (t *Server) ShowRules(_ int, reply *string) error {
  198. r, err := getAllRulesWithStatus()
  199. if err != nil {
  200. return fmt.Errorf("Show rule error : %s.", err)
  201. }
  202. if len(r) == 0 {
  203. *reply = "No rule definitions are found."
  204. } else {
  205. result, err := json.Marshal(r)
  206. if err != nil {
  207. return fmt.Errorf("Show rule error : %s.", err)
  208. }
  209. dst := &bytes.Buffer{}
  210. if err := json.Indent(dst, result, "", " "); err != nil {
  211. return fmt.Errorf("Show rule error : %s.", err)
  212. }
  213. *reply = dst.String()
  214. }
  215. return nil
  216. }
  217. func (t *Server) DropRule(name string, reply *string) error {
  218. deleteRule(name)
  219. r, err := ruleProcessor.ExecDrop(name)
  220. if err != nil {
  221. return fmt.Errorf("Drop rule error : %s.", err)
  222. } else {
  223. err := t.StopRule(name, reply)
  224. if err != nil {
  225. return err
  226. }
  227. }
  228. *reply = r
  229. return nil
  230. }
  231. func (t *Server) Import(file string, reply *string) error {
  232. f, err := os.Open(file)
  233. if err != nil {
  234. return fmt.Errorf("fail to read file %s: %v", file, err)
  235. }
  236. defer f.Close()
  237. buf := new(bytes.Buffer)
  238. _, err = io.Copy(buf, f)
  239. if err != nil {
  240. return fmt.Errorf("fail to convert file %s: %v", file, err)
  241. }
  242. content := buf.Bytes()
  243. rules, counts, err := rulesetProcessor.Import(content)
  244. if err != nil {
  245. return fmt.Errorf("import ruleset error: %v", err)
  246. }
  247. infra.SafeRun(func() error {
  248. for _, name := range rules {
  249. err := startRule(name)
  250. if err != nil {
  251. logger.Error(err)
  252. }
  253. }
  254. return nil
  255. })
  256. *reply = fmt.Sprintf("imported %d streams, %d tables and %d rules", counts[0], counts[1], counts[2])
  257. return nil
  258. }
  259. func (t *Server) Export(file string, reply *string) error {
  260. f, err := os.Create(file)
  261. if err != nil {
  262. return err
  263. }
  264. exported, counts, err := rulesetProcessor.Export()
  265. if err != nil {
  266. return err
  267. }
  268. _, err = io.Copy(f, exported)
  269. if err != nil {
  270. return fmt.Errorf("fail to save to file %s:%v", file, err)
  271. }
  272. *reply = fmt.Sprintf("exported %d streams, %d tables and %d rules", counts[0], counts[1], counts[2])
  273. return nil
  274. }
  275. func marshalDesc(m interface{}) (string, error) {
  276. s, err := json.Marshal(m)
  277. if err != nil {
  278. return "", fmt.Errorf("invalid json %v", m)
  279. }
  280. dst := &bytes.Buffer{}
  281. if err := json.Indent(dst, s, "", " "); err != nil {
  282. return "", fmt.Errorf("indent json error %v", err)
  283. }
  284. return dst.String(), nil
  285. }
  286. func init() {
  287. ticker := time.NewTicker(time.Second * 5)
  288. go infra.SafeRun(func() error {
  289. for {
  290. <-ticker.C
  291. if registry != nil {
  292. if _, ok := registry.Load(QueryRuleId); !ok {
  293. continue
  294. }
  295. n := time.Now()
  296. w := 10 * time.Second
  297. if v := n.Sub(sink.QR.LastFetch); v >= w {
  298. logger.Printf("The client seems no longer fetch the query result, stop the query now.")
  299. stopQuery()
  300. }
  301. }
  302. }
  303. })
  304. }