main.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. package main
  2. import (
  3. "context"
  4. "engine/common"
  5. "engine/xsql/processors"
  6. "engine/xstream"
  7. "engine/xstream/sinks"
  8. "fmt"
  9. "net"
  10. "net/http"
  11. "net/rpc"
  12. "path"
  13. "strings"
  14. "time"
  15. )
  16. var dataDir string
  17. var log = common.Log
  18. type RuleState struct{
  19. Name string
  20. Topology *xstream.TopologyNew
  21. Triggered bool
  22. }
  23. type RuleRegistry map[string]*RuleState
  24. var registry RuleRegistry
  25. var processor *processors.RuleProcessor
  26. type Server int
  27. var QUERY_RULE_ID string = "internal-xstream_query_rule"
  28. func (t *Server) CreateQuery(sql string, reply *string) error {
  29. if _, ok := registry[QUERY_RULE_ID]; ok {
  30. stopQuery()
  31. }
  32. tp, err := processors.NewRuleProcessor(path.Dir(dataDir)).ExecQuery(QUERY_RULE_ID, sql)
  33. if err != nil {
  34. msg := fmt.Sprintf("failed to create query: %s.", err)
  35. log.Println(msg)
  36. return fmt.Errorf(msg)
  37. } else {
  38. rs := &RuleState{Name: QUERY_RULE_ID, Topology: tp, Triggered: true}
  39. registry[QUERY_RULE_ID] = rs
  40. msg := fmt.Sprintf("query is submit successfully.")
  41. log.Println(msg)
  42. *reply = fmt.Sprintf(msg)
  43. }
  44. return nil
  45. }
  46. func stopQuery() {
  47. if rs, ok := registry[QUERY_RULE_ID]; ok {
  48. log.Printf("stop the query.")
  49. (*rs.Topology).Cancel()
  50. delete(registry, QUERY_RULE_ID)
  51. }
  52. }
  53. /**
  54. * qid is not currently used.
  55. */
  56. func (t *Server) GetQueryResult(qid string, reply *string) error {
  57. sinks.QR.LastFetch = time.Now()
  58. sinks.QR.Mux.Lock()
  59. if len(sinks.QR.Results) > 0 {
  60. *reply = strings.Join(sinks.QR.Results, "")
  61. sinks.QR.Results = make([]string, 10)
  62. } else {
  63. *reply = ""
  64. }
  65. sinks.QR.Mux.Unlock()
  66. return nil
  67. }
  68. func (t *Server) Stream(stream string, reply *string) error{
  69. content, err := processors.NewStreamProcessor(stream, path.Join(path.Dir(dataDir), "stream")).Exec()
  70. if err != nil {
  71. fmt.Printf("stream command error: %s\n", err)
  72. return err
  73. } else {
  74. for _, c := range content{
  75. *reply = *reply + fmt.Sprintln(c)
  76. }
  77. }
  78. return nil
  79. }
  80. func (t *Server) CreateRule(rule *common.Rule, reply *string) error{
  81. r, err := processor.ExecCreate(rule.Name, rule.Json)
  82. if err != nil {
  83. return fmt.Errorf("create rule error : %s\n", err)
  84. } else {
  85. *reply = fmt.Sprintf("rule %s created", rule.Name)
  86. }
  87. //Start the rule
  88. rs, err := t.createRuleState(r)
  89. if err != nil {
  90. return err
  91. }
  92. err = t.doStartRule(rs)
  93. if err != nil {
  94. return err
  95. }
  96. return nil
  97. }
  98. func (t *Server) createRuleState(rule *xstream.Rule) (*RuleState, error){
  99. if tp, err := processor.ExecInitRule(rule); err != nil{
  100. return nil, err
  101. }else{
  102. rs := &RuleState{
  103. Name: rule.Id,
  104. Topology: tp,
  105. Triggered: true,
  106. }
  107. registry[rule.Id] = rs
  108. return rs, nil
  109. }
  110. }
  111. func (t *Server) GetStatusRule(name string, reply *string) error{
  112. if rs, ok := registry[name]; ok{
  113. if !rs.Triggered {
  114. *reply = "stopped: canceled manually"
  115. return nil
  116. }
  117. c := (*rs.Topology).GetContext()
  118. if c != nil{
  119. err := c.Err()
  120. switch err{
  121. case nil:
  122. *reply = "running"
  123. case context.Canceled:
  124. *reply = "stopped: canceled by error"
  125. case context.DeadlineExceeded:
  126. *reply = "stopped: deadline exceed"
  127. default:
  128. *reply = "stopped: unknown reason"
  129. }
  130. }else{
  131. *reply = "stopped: no context found"
  132. }
  133. }else{
  134. return fmt.Errorf("rule %s not found", name)
  135. }
  136. return nil
  137. }
  138. func (t *Server) StartRule(name string, reply *string) error{
  139. var rs *RuleState
  140. rs, ok := registry[name]
  141. if !ok{
  142. r, err := processor.GetRuleByName(name)
  143. if err != nil{
  144. return err
  145. }
  146. rs, err = t.createRuleState(r)
  147. if err != nil {
  148. return err
  149. }
  150. }
  151. err := t.doStartRule(rs)
  152. if err != nil{
  153. return err
  154. }
  155. *reply = fmt.Sprintf("rule %s started", name)
  156. return nil
  157. }
  158. func (t *Server) doStartRule(rs *RuleState) error{
  159. rs.Triggered = true
  160. go func() {
  161. tp := rs.Topology
  162. select {
  163. case err := <-tp.Open():
  164. log.Println(err)
  165. tp.Cancel()
  166. }
  167. }()
  168. return nil
  169. }
  170. func (t *Server) StopRule(name string, reply *string) error{
  171. if rs, ok := registry[name]; ok{
  172. (*rs.Topology).Cancel()
  173. rs.Triggered = false
  174. *reply = fmt.Sprintf("rule %s stopped", name)
  175. }else{
  176. *reply = fmt.Sprintf("rule %s not found", name)
  177. }
  178. return nil
  179. }
  180. func (t *Server) RestartRule(name string, reply *string) error{
  181. err := t.StopRule(name, reply)
  182. if err != nil{
  183. return err
  184. }
  185. err = t.StartRule(name, reply)
  186. if err != nil{
  187. return err
  188. }
  189. *reply = fmt.Sprintf("rule %s restarted", name)
  190. return nil
  191. }
  192. func (t *Server) DescRule(name string, reply *string) error{
  193. r, err := processor.ExecDesc(name)
  194. if err != nil {
  195. return fmt.Errorf("desc rule error : %s\n", err)
  196. } else {
  197. *reply = r
  198. }
  199. return nil
  200. }
  201. func (t *Server) ShowRules(_ int, reply *string) error{
  202. r, err := processor.ExecShow()
  203. if err != nil {
  204. return fmt.Errorf("show rule error : %s\n", err)
  205. } else {
  206. *reply = r
  207. }
  208. return nil
  209. }
  210. func (t *Server) DropRule(name string, reply *string) error{
  211. r, err := processor.ExecDrop(name)
  212. if err != nil {
  213. return fmt.Errorf("drop rule error : %s\n", err)
  214. } else {
  215. err := t.StopRule(name, reply)
  216. if err != nil{
  217. return err
  218. }
  219. }
  220. *reply = r
  221. return nil
  222. }
  223. func init(){
  224. var err error
  225. dataDir, err = common.GetDataLoc()
  226. if err != nil {
  227. log.Panic(err)
  228. }else{
  229. log.Infof("db location is %s", dataDir)
  230. }
  231. processor = processors.NewRuleProcessor(path.Dir(dataDir))
  232. registry = make(RuleRegistry)
  233. ticker := time.NewTicker(time.Second * 5)
  234. go func() {
  235. for {
  236. <-ticker.C
  237. if _, ok := registry[QUERY_RULE_ID]; !ok {
  238. continue
  239. }
  240. n := time.Now()
  241. w := 10 * time.Second
  242. if v := n.Sub(sinks.QR.LastFetch); v >= w {
  243. log.Printf("The client seems no longer fetch the query result, stop the query now.")
  244. stopQuery()
  245. }
  246. }
  247. //defer ticker.Stop()
  248. }()
  249. }
  250. func main() {
  251. server := new(Server)
  252. //Start rules
  253. if rules, err := processor.GetAllRules(); err != nil{
  254. log.Infof("Start rules error: %s", err)
  255. }else{
  256. log.Info("Starting rules")
  257. var reply string
  258. for _, rule := range rules{
  259. err = server.StartRule(rule, &reply)
  260. if err != nil {
  261. log.Info(err)
  262. }else{
  263. log.Info(reply)
  264. }
  265. }
  266. }
  267. //Start server
  268. err := rpc.Register(server)
  269. if err != nil {
  270. log.Fatal("Format of service Server isn't correct. ", err)
  271. }
  272. // Register a HTTP handler
  273. rpc.HandleHTTP()
  274. // Listen to TPC connections on port 1234
  275. listener, e := net.Listen("tcp", fmt.Sprintf(":%d", common.Config.Port))
  276. if e != nil {
  277. log.Fatal("Listen error: ", e)
  278. }
  279. msg := fmt.Sprintf("Serving Rule server on port %d", common.Config.Port)
  280. log.Info(msg)
  281. fmt.Println(msg)
  282. // Start accept incoming HTTP connections
  283. err = http.Serve(listener, nil)
  284. if err != nil {
  285. log.Fatal("Error serving: ", err)
  286. }
  287. }