main.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. package main
  2. import (
  3. "context"
  4. "engine/common"
  5. "engine/xsql/processors"
  6. "engine/xstream"
  7. "engine/xstream/api"
  8. "engine/xstream/sinks"
  9. "fmt"
  10. "net"
  11. "net/http"
  12. "net/rpc"
  13. "path"
  14. "strings"
  15. "time"
  16. )
  17. var dataDir string
  18. var log = common.Log
  19. type RuleState struct{
  20. Name string
  21. Topology *xstream.TopologyNew
  22. Triggered bool
  23. }
  24. type RuleRegistry map[string]*RuleState
  25. var registry RuleRegistry
  26. var processor *processors.RuleProcessor
  27. type Server int
  28. var QUERY_RULE_ID = "internal-xstream_query_rule"
  29. func (t *Server) CreateQuery(sql string, reply *string) error {
  30. if _, ok := registry[QUERY_RULE_ID]; ok {
  31. stopQuery()
  32. }
  33. tp, err := processors.NewRuleProcessor(path.Dir(dataDir)).ExecQuery(QUERY_RULE_ID, sql)
  34. if err != nil {
  35. msg := fmt.Sprintf("failed to create query: %s.", err)
  36. log.Println(msg)
  37. return fmt.Errorf(msg)
  38. } else {
  39. rs := &RuleState{Name: QUERY_RULE_ID, Topology: tp, Triggered: true}
  40. registry[QUERY_RULE_ID] = rs
  41. msg := fmt.Sprintf("query is submit successfully.")
  42. log.Println(msg)
  43. *reply = fmt.Sprintf(msg)
  44. }
  45. return nil
  46. }
  47. func stopQuery() {
  48. if rs, ok := registry[QUERY_RULE_ID]; ok {
  49. log.Printf("stop the query.")
  50. (*rs.Topology).Cancel()
  51. delete(registry, QUERY_RULE_ID)
  52. }
  53. }
  54. /**
  55. * qid is not currently used.
  56. */
  57. func (t *Server) GetQueryResult(qid string, reply *string) error {
  58. sinks.QR.LastFetch = time.Now()
  59. sinks.QR.Mux.Lock()
  60. if len(sinks.QR.Results) > 0 {
  61. *reply = strings.Join(sinks.QR.Results, "")
  62. sinks.QR.Results = make([]string, 10)
  63. } else {
  64. *reply = ""
  65. }
  66. sinks.QR.Mux.Unlock()
  67. return nil
  68. }
  69. func (t *Server) Stream(stream string, reply *string) error{
  70. content, err := processors.NewStreamProcessor(stream, path.Join(path.Dir(dataDir), "stream")).Exec()
  71. if err != nil {
  72. fmt.Printf("stream command error: %s\n", err)
  73. return err
  74. } else {
  75. for _, c := range content{
  76. *reply = *reply + fmt.Sprintln(c)
  77. }
  78. }
  79. return nil
  80. }
  81. func (t *Server) CreateRule(rule *common.Rule, reply *string) error{
  82. r, err := processor.ExecCreate(rule.Name, rule.Json)
  83. if err != nil {
  84. return fmt.Errorf("create rule error : %s\n", err)
  85. } else {
  86. *reply = fmt.Sprintf("rule %s created", rule.Name)
  87. }
  88. //Start the rule
  89. rs, err := t.createRuleState(r)
  90. if err != nil {
  91. return err
  92. }
  93. err = t.doStartRule(rs)
  94. if err != nil {
  95. return err
  96. }
  97. return nil
  98. }
  99. func (t *Server) createRuleState(rule *api.Rule) (*RuleState, error){
  100. if tp, err := processor.ExecInitRule(rule); err != nil{
  101. return nil, err
  102. }else{
  103. rs := &RuleState{
  104. Name: rule.Id,
  105. Topology: tp,
  106. Triggered: true,
  107. }
  108. registry[rule.Id] = rs
  109. return rs, nil
  110. }
  111. }
  112. func (t *Server) GetStatusRule(name string, reply *string) error{
  113. if rs, ok := registry[name]; ok{
  114. if !rs.Triggered {
  115. *reply = "stopped: canceled manually"
  116. return nil
  117. }
  118. c := (*rs.Topology).GetContext()
  119. if c != nil{
  120. err := c.Err()
  121. switch err{
  122. case nil:
  123. *reply = "running"
  124. case context.Canceled:
  125. *reply = "stopped: canceled by error"
  126. case context.DeadlineExceeded:
  127. *reply = "stopped: deadline exceed"
  128. default:
  129. *reply = "stopped: unknown reason"
  130. }
  131. }else{
  132. *reply = "stopped: no context found"
  133. }
  134. }else{
  135. return fmt.Errorf("rule %s not found", name)
  136. }
  137. return nil
  138. }
  139. func (t *Server) StartRule(name string, reply *string) error{
  140. var rs *RuleState
  141. rs, ok := registry[name]
  142. if !ok{
  143. r, err := processor.GetRuleByName(name)
  144. if err != nil{
  145. return err
  146. }
  147. rs, err = t.createRuleState(r)
  148. if err != nil {
  149. return err
  150. }
  151. }
  152. err := t.doStartRule(rs)
  153. if err != nil{
  154. return err
  155. }
  156. *reply = fmt.Sprintf("rule %s started", name)
  157. return nil
  158. }
  159. func (t *Server) doStartRule(rs *RuleState) error{
  160. rs.Triggered = true
  161. go func() {
  162. tp := rs.Topology
  163. select {
  164. case err := <-tp.Open():
  165. log.Println(err)
  166. tp.Cancel()
  167. }
  168. }()
  169. return nil
  170. }
  171. func (t *Server) StopRule(name string, reply *string) error{
  172. if rs, ok := registry[name]; ok{
  173. (*rs.Topology).Cancel()
  174. rs.Triggered = false
  175. *reply = fmt.Sprintf("rule %s stopped", name)
  176. }else{
  177. *reply = fmt.Sprintf("rule %s not found", name)
  178. }
  179. return nil
  180. }
  181. func (t *Server) RestartRule(name string, reply *string) error{
  182. err := t.StopRule(name, reply)
  183. if err != nil{
  184. return err
  185. }
  186. err = t.StartRule(name, reply)
  187. if err != nil{
  188. return err
  189. }
  190. *reply = fmt.Sprintf("rule %s restarted", name)
  191. return nil
  192. }
  193. func (t *Server) DescRule(name string, reply *string) error{
  194. r, err := processor.ExecDesc(name)
  195. if err != nil {
  196. return fmt.Errorf("desc rule error : %s\n", err)
  197. } else {
  198. *reply = r
  199. }
  200. return nil
  201. }
  202. func (t *Server) ShowRules(_ int, reply *string) error{
  203. r, err := processor.ExecShow()
  204. if err != nil {
  205. return fmt.Errorf("show rule error : %s\n", err)
  206. } else {
  207. *reply = r
  208. }
  209. return nil
  210. }
  211. func (t *Server) DropRule(name string, reply *string) error{
  212. r, err := processor.ExecDrop(name)
  213. if err != nil {
  214. return fmt.Errorf("drop rule error : %s\n", err)
  215. } else {
  216. err := t.StopRule(name, reply)
  217. if err != nil{
  218. return err
  219. }
  220. }
  221. *reply = r
  222. return nil
  223. }
  224. func init(){
  225. var err error
  226. dataDir, err = common.GetDataLoc()
  227. if err != nil {
  228. log.Panic(err)
  229. }else{
  230. log.Infof("db location is %s", dataDir)
  231. }
  232. processor = processors.NewRuleProcessor(path.Dir(dataDir))
  233. registry = make(RuleRegistry)
  234. ticker := time.NewTicker(time.Second * 5)
  235. go func() {
  236. for {
  237. <-ticker.C
  238. if _, ok := registry[QUERY_RULE_ID]; !ok {
  239. continue
  240. }
  241. n := time.Now()
  242. w := 10 * time.Second
  243. if v := n.Sub(sinks.QR.LastFetch); v >= w {
  244. log.Printf("The client seems no longer fetch the query result, stop the query now.")
  245. stopQuery()
  246. }
  247. }
  248. //defer ticker.Stop()
  249. }()
  250. }
  251. func main() {
  252. server := new(Server)
  253. //Start rules
  254. if rules, err := processor.GetAllRules(); err != nil{
  255. log.Infof("Start rules error: %s", err)
  256. }else{
  257. log.Info("Starting rules")
  258. var reply string
  259. for _, rule := range rules{
  260. err = server.StartRule(rule, &reply)
  261. if err != nil {
  262. log.Info(err)
  263. }else{
  264. log.Info(reply)
  265. }
  266. }
  267. }
  268. //Start server
  269. err := rpc.Register(server)
  270. if err != nil {
  271. log.Fatal("Format of service Server isn't correct. ", err)
  272. }
  273. // Register a HTTP handler
  274. rpc.HandleHTTP()
  275. // Listen to TPC connections on port 1234
  276. listener, e := net.Listen("tcp", fmt.Sprintf(":%d", common.Config.Port))
  277. if e != nil {
  278. log.Fatal("Listen error: ", e)
  279. }
  280. msg := fmt.Sprintf("Serving Kuiper server on port %d", common.Config.Port)
  281. log.Info(msg)
  282. fmt.Println(msg)
  283. // Start accept incoming HTTP connections
  284. err = http.Serve(listener, nil)
  285. if err != nil {
  286. log.Fatal("Error serving: ", err)
  287. }
  288. }