rpc.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. package server
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/xstream/sinks"
  6. "strings"
  7. "time"
  8. )
  9. const QUERY_RULE_ID = "internal-xstream_query_rule"
  10. type Server int
  11. func (t *Server) CreateQuery(sql string, reply *string) error {
  12. if _, ok := registry.Load(QUERY_RULE_ID); ok {
  13. stopQuery()
  14. }
  15. tp, err := ruleProcessor.ExecQuery(QUERY_RULE_ID, sql)
  16. if err != nil {
  17. return err
  18. } else {
  19. rs := &RuleState{Name: QUERY_RULE_ID, Topology: tp, Triggered: true}
  20. registry.Store(QUERY_RULE_ID, rs)
  21. msg := fmt.Sprintf("Query was submit successfully.")
  22. logger.Println(msg)
  23. *reply = fmt.Sprintf(msg)
  24. }
  25. return nil
  26. }
  27. func stopQuery() {
  28. if rs, ok := registry.Load(QUERY_RULE_ID); ok {
  29. logger.Printf("stop the query.")
  30. (*rs.Topology).Cancel()
  31. registry.Delete(QUERY_RULE_ID)
  32. }
  33. }
  34. /**
  35. * qid is not currently used.
  36. */
  37. func (t *Server) GetQueryResult(qid string, reply *string) error {
  38. if rs, ok := registry.Load(QUERY_RULE_ID); ok {
  39. c := (*rs.Topology).GetContext()
  40. if c != nil && c.Err() != nil {
  41. return c.Err()
  42. }
  43. }
  44. sinks.QR.LastFetch = time.Now()
  45. sinks.QR.Mux.Lock()
  46. if len(sinks.QR.Results) > 0 {
  47. *reply = strings.Join(sinks.QR.Results, "")
  48. sinks.QR.Results = make([]string, 10)
  49. } else {
  50. *reply = ""
  51. }
  52. sinks.QR.Mux.Unlock()
  53. return nil
  54. }
  55. func (t *Server) Stream(stream string, reply *string) error {
  56. content, err := streamProcessor.ExecStmt(stream)
  57. if err != nil {
  58. return fmt.Errorf("Stream command error: %s", err)
  59. } else {
  60. for _, c := range content {
  61. *reply = *reply + fmt.Sprintln(c)
  62. }
  63. }
  64. return nil
  65. }
  66. func (t *Server) CreateRule(rule *common.Rule, reply *string) error {
  67. r, err := ruleProcessor.ExecCreate(rule.Name, rule.Json)
  68. if err != nil {
  69. return fmt.Errorf("Create rule error : %s.", err)
  70. } else {
  71. *reply = fmt.Sprintf("Rule %s was created, please use 'cli getstatus rule $rule_name' command to get rule status.", rule.Name)
  72. }
  73. //Start the rule
  74. rs, err := createRuleState(r)
  75. if err != nil {
  76. return err
  77. }
  78. err = doStartRule(rs)
  79. if err != nil {
  80. return err
  81. }
  82. return nil
  83. }
  84. func (t *Server) GetStatusRule(name string, reply *string) error {
  85. if r, err := getRuleStatus(name); err != nil {
  86. return err
  87. } else {
  88. *reply = r
  89. }
  90. return nil
  91. }
  92. func (t *Server) StartRule(name string, reply *string) error {
  93. if err := startRule(name); err != nil {
  94. return err
  95. } else {
  96. *reply = fmt.Sprintf("Rule %s was started", name)
  97. }
  98. return nil
  99. }
  100. func (t *Server) StopRule(name string, reply *string) error {
  101. *reply = stopRule(name)
  102. return nil
  103. }
  104. func (t *Server) RestartRule(name string, reply *string) error {
  105. err := restartRule(name)
  106. if err != nil {
  107. return err
  108. }
  109. *reply = fmt.Sprintf("Rule %s was restarted.", name)
  110. return nil
  111. }
  112. func (t *Server) DescRule(name string, reply *string) error {
  113. r, err := ruleProcessor.ExecDesc(name)
  114. if err != nil {
  115. return fmt.Errorf("Desc rule error : %s.", err)
  116. } else {
  117. *reply = r
  118. }
  119. return nil
  120. }
  121. func (t *Server) ShowRules(_ int, reply *string) error {
  122. r, err := ruleProcessor.ExecShow()
  123. if err != nil {
  124. return fmt.Errorf("Show rule error : %s.", err)
  125. } else {
  126. *reply = r
  127. }
  128. return nil
  129. }
  130. func (t *Server) DropRule(name string, reply *string) error {
  131. stopRule(name)
  132. r, err := ruleProcessor.ExecDrop(name)
  133. if err != nil {
  134. return fmt.Errorf("Drop rule error : %s.", err)
  135. } else {
  136. err := t.StopRule(name, reply)
  137. if err != nil {
  138. return err
  139. }
  140. }
  141. *reply = r
  142. return nil
  143. }
  144. func init() {
  145. ticker := time.NewTicker(time.Second * 5)
  146. go func() {
  147. for {
  148. <-ticker.C
  149. if _, ok := registry.Load(QUERY_RULE_ID); !ok {
  150. continue
  151. }
  152. n := time.Now()
  153. w := 10 * time.Second
  154. if v := n.Sub(sinks.QR.LastFetch); v >= w {
  155. logger.Printf("The client seems no longer fetch the query result, stop the query now.")
  156. stopQuery()
  157. ticker.Stop()
  158. return
  159. }
  160. }
  161. }()
  162. }