rpc.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build rpc || !core
  15. // +build rpc !core
  16. package server
  17. import (
  18. "bytes"
  19. "context"
  20. "encoding/json"
  21. "fmt"
  22. "github.com/lf-edge/ekuiper/internal/conf"
  23. "github.com/lf-edge/ekuiper/internal/pkg/model"
  24. "github.com/lf-edge/ekuiper/internal/topo/sink"
  25. "github.com/lf-edge/ekuiper/pkg/infra"
  26. "net/http"
  27. "net/rpc"
  28. "strings"
  29. "time"
  30. )
  31. const QueryRuleId = "internal-ekuiper_query_rule"
  32. func init() {
  33. servers["rpc"] = rpcComp{}
  34. }
  35. type rpcComp struct {
  36. s *http.Server
  37. }
  38. func (r rpcComp) register() {}
  39. func (r rpcComp) serve() {
  40. // Start rpc service
  41. server := new(Server)
  42. portRpc := conf.Config.Basic.Port
  43. ipRpc := conf.Config.Basic.Ip
  44. rpcSrv := rpc.NewServer()
  45. err := rpcSrv.Register(server)
  46. if err != nil {
  47. logger.Fatal("Format of service Server isn'restHttpType correct. ", err)
  48. }
  49. srvRpc := &http.Server{
  50. Addr: fmt.Sprintf("%s:%d", ipRpc, portRpc),
  51. WriteTimeout: time.Second * 15,
  52. ReadTimeout: time.Second * 15,
  53. IdleTimeout: time.Second * 60,
  54. Handler: rpcSrv,
  55. }
  56. r.s = srvRpc
  57. go func() {
  58. if err = srvRpc.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  59. logger.Fatal("Error serving rpc service:", err)
  60. }
  61. }()
  62. }
  63. func (r rpcComp) close() {
  64. if r.s != nil {
  65. if err := r.s.Shutdown(context.TODO()); err != nil {
  66. logger.Errorf("rpc server shutdown error: %v", err)
  67. }
  68. logger.Info("rpc server shutdown.")
  69. }
  70. }
  71. type Server int
  72. func (t *Server) CreateQuery(sql string, reply *string) error {
  73. if _, ok := registry.Load(QueryRuleId); ok {
  74. stopQuery()
  75. }
  76. tp, err := ruleProcessor.ExecQuery(QueryRuleId, sql)
  77. if err != nil {
  78. return err
  79. } else {
  80. rs := &RuleState{RuleId: QueryRuleId, Topology: tp, Triggered: true}
  81. registry.Store(QueryRuleId, rs)
  82. msg := fmt.Sprintf("Query was submit successfully.")
  83. logger.Println(msg)
  84. *reply = fmt.Sprintf(msg)
  85. }
  86. return nil
  87. }
  88. func stopQuery() {
  89. if rs, ok := registry.Load(QueryRuleId); ok {
  90. logger.Printf("stop the query.")
  91. (*rs.Topology).Cancel()
  92. registry.Delete(QueryRuleId)
  93. }
  94. }
  95. /**
  96. * qid is not currently used.
  97. */
  98. func (t *Server) GetQueryResult(_ string, reply *string) error {
  99. if rs, ok := registry.Load(QueryRuleId); ok {
  100. c := (*rs.Topology).GetContext()
  101. if c != nil && c.Err() != nil {
  102. return c.Err()
  103. }
  104. }
  105. sink.QR.LastFetch = time.Now()
  106. sink.QR.Mux.Lock()
  107. if len(sink.QR.Results) > 0 {
  108. *reply = strings.Join(sink.QR.Results, "\n")
  109. sink.QR.Results = make([]string, 0, 10)
  110. } else {
  111. *reply = ""
  112. }
  113. sink.QR.Mux.Unlock()
  114. return nil
  115. }
  116. func (t *Server) Stream(stream string, reply *string) error {
  117. content, err := streamProcessor.ExecStmt(stream)
  118. if err != nil {
  119. return fmt.Errorf("Stream command error: %s", err)
  120. } else {
  121. for _, c := range content {
  122. *reply = *reply + fmt.Sprintln(c)
  123. }
  124. }
  125. return nil
  126. }
  127. func (t *Server) CreateRule(rule *model.RPCArgDesc, reply *string) error {
  128. r, err := ruleProcessor.ExecCreate(rule.Name, rule.Json)
  129. if err != nil {
  130. return fmt.Errorf("Create rule error : %s.", err)
  131. } else {
  132. *reply = fmt.Sprintf("Rule %s was created successfully, please use 'bin/kuiper getstatus rule %s' command to get rule status.", rule.Name, rule.Name)
  133. }
  134. //Start the rule
  135. rs, err := createRuleState(r)
  136. if err != nil {
  137. return err
  138. }
  139. err = doStartRule(rs)
  140. if err != nil {
  141. return err
  142. }
  143. return nil
  144. }
  145. func (t *Server) GetStatusRule(name string, reply *string) error {
  146. if r, err := getRuleStatus(name); err != nil {
  147. return err
  148. } else {
  149. *reply = r
  150. }
  151. return nil
  152. }
  153. func (t *Server) GetTopoRule(name string, reply *string) error {
  154. if r, err := getRuleTopo(name); err != nil {
  155. return err
  156. } else {
  157. dst := &bytes.Buffer{}
  158. if err = json.Indent(dst, []byte(r), "", " "); err != nil {
  159. *reply = r
  160. } else {
  161. *reply = dst.String()
  162. }
  163. }
  164. return nil
  165. }
  166. func (t *Server) StartRule(name string, reply *string) error {
  167. if err := startRule(name); err != nil {
  168. return err
  169. } else {
  170. *reply = fmt.Sprintf("Rule %s was started", name)
  171. }
  172. return nil
  173. }
  174. func (t *Server) StopRule(name string, reply *string) error {
  175. *reply = stopRule(name)
  176. return nil
  177. }
  178. func (t *Server) RestartRule(name string, reply *string) error {
  179. err := restartRule(name)
  180. if err != nil {
  181. return err
  182. }
  183. *reply = fmt.Sprintf("Rule %s was restarted.", name)
  184. return nil
  185. }
  186. func (t *Server) DescRule(name string, reply *string) error {
  187. r, err := ruleProcessor.ExecDesc(name)
  188. if err != nil {
  189. return fmt.Errorf("Desc rule error : %s.", err)
  190. } else {
  191. *reply = r
  192. }
  193. return nil
  194. }
  195. func (t *Server) ShowRules(_ int, reply *string) error {
  196. r, err := getAllRulesWithStatus()
  197. if err != nil {
  198. return fmt.Errorf("Show rule error : %s.", err)
  199. }
  200. if len(r) == 0 {
  201. *reply = "No rule definitions are found."
  202. } else {
  203. result, err := json.Marshal(r)
  204. if err != nil {
  205. return fmt.Errorf("Show rule error : %s.", err)
  206. }
  207. dst := &bytes.Buffer{}
  208. if err := json.Indent(dst, result, "", " "); err != nil {
  209. return fmt.Errorf("Show rule error : %s.", err)
  210. }
  211. *reply = dst.String()
  212. }
  213. return nil
  214. }
  215. func (t *Server) DropRule(name string, reply *string) error {
  216. deleteRule(name)
  217. r, err := ruleProcessor.ExecDrop(name)
  218. if err != nil {
  219. return fmt.Errorf("Drop rule error : %s.", err)
  220. } else {
  221. err := t.StopRule(name, reply)
  222. if err != nil {
  223. return err
  224. }
  225. }
  226. *reply = r
  227. return nil
  228. }
  229. func marshalDesc(m interface{}) (string, error) {
  230. s, err := json.Marshal(m)
  231. if err != nil {
  232. return "", fmt.Errorf("invalid json %v", m)
  233. }
  234. dst := &bytes.Buffer{}
  235. if err := json.Indent(dst, s, "", " "); err != nil {
  236. return "", fmt.Errorf("indent json error %v", err)
  237. }
  238. return dst.String(), nil
  239. }
  240. func init() {
  241. ticker := time.NewTicker(time.Second * 5)
  242. go infra.SafeRun(func() error {
  243. for {
  244. <-ticker.C
  245. if registry != nil {
  246. if _, ok := registry.Load(QueryRuleId); !ok {
  247. continue
  248. }
  249. n := time.Now()
  250. w := 10 * time.Second
  251. if v := n.Sub(sink.QR.LastFetch); v >= w {
  252. logger.Printf("The client seems no longer fetch the query result, stop the query now.")
  253. stopQuery()
  254. }
  255. }
  256. }
  257. })
  258. }