rpc.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build rpc || !core
  15. // +build rpc !core
  16. package server
  17. import (
  18. "bytes"
  19. "context"
  20. "encoding/json"
  21. "fmt"
  22. "github.com/lf-edge/ekuiper/internal/conf"
  23. "github.com/lf-edge/ekuiper/internal/pkg/model"
  24. "github.com/lf-edge/ekuiper/internal/topo/sink"
  25. "net/http"
  26. "net/rpc"
  27. "strings"
  28. "time"
  29. )
  30. const QueryRuleId = "internal-ekuiper_query_rule"
  31. func init() {
  32. servers["rpc"] = rpcComp{}
  33. }
  34. type rpcComp struct {
  35. s *http.Server
  36. }
  37. func (r rpcComp) register() {}
  38. func (r rpcComp) serve() {
  39. // Start rpc service
  40. server := new(Server)
  41. portRpc := conf.Config.Basic.Port
  42. ipRpc := conf.Config.Basic.Ip
  43. rpcSrv := rpc.NewServer()
  44. err := rpcSrv.Register(server)
  45. if err != nil {
  46. logger.Fatal("Format of service Server isn'restHttpType correct. ", err)
  47. }
  48. srvRpc := &http.Server{
  49. Addr: fmt.Sprintf("%s:%d", ipRpc, portRpc),
  50. WriteTimeout: time.Second * 15,
  51. ReadTimeout: time.Second * 15,
  52. IdleTimeout: time.Second * 60,
  53. Handler: rpcSrv,
  54. }
  55. r.s = srvRpc
  56. go func() {
  57. if err = srvRpc.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  58. logger.Fatal("Error serving rpc service:", err)
  59. }
  60. }()
  61. }
  62. func (r rpcComp) close() {
  63. if r.s != nil {
  64. if err := r.s.Shutdown(context.TODO()); err != nil {
  65. logger.Errorf("rpc server shutdown error: %v", err)
  66. }
  67. logger.Info("rpc server shutdown.")
  68. }
  69. }
  70. type Server int
  71. func (t *Server) CreateQuery(sql string, reply *string) error {
  72. if _, ok := registry.Load(QueryRuleId); ok {
  73. stopQuery()
  74. }
  75. tp, err := ruleProcessor.ExecQuery(QueryRuleId, sql)
  76. if err != nil {
  77. return err
  78. } else {
  79. rs := &RuleState{Name: QueryRuleId, Topology: tp, Triggered: true}
  80. registry.Store(QueryRuleId, rs)
  81. msg := fmt.Sprintf("Query was submit successfully.")
  82. logger.Println(msg)
  83. *reply = fmt.Sprintf(msg)
  84. }
  85. return nil
  86. }
  87. func stopQuery() {
  88. if rs, ok := registry.Load(QueryRuleId); ok {
  89. logger.Printf("stop the query.")
  90. (*rs.Topology).Cancel()
  91. registry.Delete(QueryRuleId)
  92. }
  93. }
  94. /**
  95. * qid is not currently used.
  96. */
  97. func (t *Server) GetQueryResult(_ string, reply *string) error {
  98. if rs, ok := registry.Load(QueryRuleId); ok {
  99. c := (*rs.Topology).GetContext()
  100. if c != nil && c.Err() != nil {
  101. return c.Err()
  102. }
  103. }
  104. sink.QR.LastFetch = time.Now()
  105. sink.QR.Mux.Lock()
  106. if len(sink.QR.Results) > 0 {
  107. *reply = strings.Join(sink.QR.Results, "")
  108. sink.QR.Results = make([]string, 10)
  109. } else {
  110. *reply = ""
  111. }
  112. sink.QR.Mux.Unlock()
  113. return nil
  114. }
  115. func (t *Server) Stream(stream string, reply *string) error {
  116. content, err := streamProcessor.ExecStmt(stream)
  117. if err != nil {
  118. return fmt.Errorf("Stream command error: %s", err)
  119. } else {
  120. for _, c := range content {
  121. *reply = *reply + fmt.Sprintln(c)
  122. }
  123. }
  124. return nil
  125. }
  126. func (t *Server) CreateRule(rule *model.RPCArgDesc, reply *string) error {
  127. r, err := ruleProcessor.ExecCreate(rule.Name, rule.Json)
  128. if err != nil {
  129. return fmt.Errorf("Create rule error : %s.", err)
  130. } else {
  131. *reply = fmt.Sprintf("Rule %s was created successfully, please use 'bin/kuiper getstatus rule %s' command to get rule status.", rule.Name, rule.Name)
  132. }
  133. //Start the rule
  134. rs, err := createRuleState(r)
  135. if err != nil {
  136. return err
  137. }
  138. err = doStartRule(rs)
  139. if err != nil {
  140. return err
  141. }
  142. return nil
  143. }
  144. func (t *Server) GetStatusRule(name string, reply *string) error {
  145. if r, err := getRuleStatus(name); err != nil {
  146. return err
  147. } else {
  148. *reply = r
  149. }
  150. return nil
  151. }
  152. func (t *Server) GetTopoRule(name string, reply *string) error {
  153. if r, err := getRuleTopo(name); err != nil {
  154. return err
  155. } else {
  156. dst := &bytes.Buffer{}
  157. if err = json.Indent(dst, []byte(r), "", " "); err != nil {
  158. *reply = r
  159. } else {
  160. *reply = dst.String()
  161. }
  162. }
  163. return nil
  164. }
  165. func (t *Server) StartRule(name string, reply *string) error {
  166. if err := startRule(name); err != nil {
  167. return err
  168. } else {
  169. *reply = fmt.Sprintf("Rule %s was started", name)
  170. }
  171. return nil
  172. }
  173. func (t *Server) StopRule(name string, reply *string) error {
  174. *reply = stopRule(name)
  175. return nil
  176. }
  177. func (t *Server) RestartRule(name string, reply *string) error {
  178. err := restartRule(name)
  179. if err != nil {
  180. return err
  181. }
  182. *reply = fmt.Sprintf("Rule %s was restarted.", name)
  183. return nil
  184. }
  185. func (t *Server) DescRule(name string, reply *string) error {
  186. r, err := ruleProcessor.ExecDesc(name)
  187. if err != nil {
  188. return fmt.Errorf("Desc rule error : %s.", err)
  189. } else {
  190. *reply = r
  191. }
  192. return nil
  193. }
  194. func (t *Server) ShowRules(_ int, reply *string) error {
  195. r, err := getAllRulesWithStatus()
  196. if err != nil {
  197. return fmt.Errorf("Show rule error : %s.", err)
  198. }
  199. if len(r) == 0 {
  200. *reply = "No rule definitions are found."
  201. } else {
  202. result, err := json.Marshal(r)
  203. if err != nil {
  204. return fmt.Errorf("Show rule error : %s.", err)
  205. }
  206. dst := &bytes.Buffer{}
  207. if err := json.Indent(dst, result, "", " "); err != nil {
  208. return fmt.Errorf("Show rule error : %s.", err)
  209. }
  210. *reply = dst.String()
  211. }
  212. return nil
  213. }
  214. func (t *Server) DropRule(name string, reply *string) error {
  215. deleteRule(name)
  216. r, err := ruleProcessor.ExecDrop(name)
  217. if err != nil {
  218. return fmt.Errorf("Drop rule error : %s.", err)
  219. } else {
  220. err := t.StopRule(name, reply)
  221. if err != nil {
  222. return err
  223. }
  224. }
  225. *reply = r
  226. return nil
  227. }
  228. func marshalDesc(m interface{}) (string, error) {
  229. s, err := json.Marshal(m)
  230. if err != nil {
  231. return "", fmt.Errorf("invalid json %v", m)
  232. }
  233. dst := &bytes.Buffer{}
  234. if err := json.Indent(dst, s, "", " "); err != nil {
  235. return "", fmt.Errorf("indent json error %v", err)
  236. }
  237. return dst.String(), nil
  238. }
  239. func init() {
  240. ticker := time.NewTicker(time.Second * 5)
  241. go func() {
  242. for {
  243. <-ticker.C
  244. if registry != nil {
  245. if _, ok := registry.Load(QueryRuleId); !ok {
  246. continue
  247. }
  248. n := time.Now()
  249. w := 10 * time.Second
  250. if v := n.Sub(sink.QR.LastFetch); v >= w {
  251. logger.Printf("The client seems no longer fetch the query result, stop the query now.")
  252. stopQuery()
  253. }
  254. }
  255. }
  256. }()
  257. }