main.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. package main
  2. import (
  3. "context"
  4. "engine/common"
  5. "engine/xsql/processors"
  6. "engine/xstream"
  7. "engine/xstream/api"
  8. "engine/xstream/sinks"
  9. "fmt"
  10. "net"
  11. "net/http"
  12. "net/rpc"
  13. "path"
  14. "strings"
  15. "time"
  16. )
  17. var dataDir string
  18. var log = common.Log
  19. type RuleState struct{
  20. Name string
  21. Topology *xstream.TopologyNew
  22. Triggered bool
  23. }
  24. type RuleRegistry map[string]*RuleState
  25. var registry RuleRegistry
  26. var processor *processors.RuleProcessor
  27. type Server int
  28. var QUERY_RULE_ID = "internal-xstream_query_rule"
  29. func (t *Server) CreateQuery(sql string, reply *string) error {
  30. if _, ok := registry[QUERY_RULE_ID]; ok {
  31. stopQuery()
  32. }
  33. tp, err := processors.NewRuleProcessor(path.Dir(dataDir)).ExecQuery(QUERY_RULE_ID, sql)
  34. if err != nil {
  35. msg := fmt.Sprintf("Failed to create query: %s.", err)
  36. log.Println(msg)
  37. return fmt.Errorf(msg)
  38. } else {
  39. rs := &RuleState{Name: QUERY_RULE_ID, Topology: tp, Triggered: true}
  40. registry[QUERY_RULE_ID] = rs
  41. msg := fmt.Sprintf("Query was submit successfully.")
  42. log.Println(msg)
  43. *reply = fmt.Sprintf(msg)
  44. }
  45. return nil
  46. }
  47. func stopQuery() {
  48. if rs, ok := registry[QUERY_RULE_ID]; ok {
  49. log.Printf("stop the query.")
  50. (*rs.Topology).Cancel()
  51. delete(registry, QUERY_RULE_ID)
  52. }
  53. }
  54. /**
  55. * qid is not currently used.
  56. */
  57. func (t *Server) GetQueryResult(qid string, reply *string) error {
  58. sinks.QR.LastFetch = time.Now()
  59. sinks.QR.Mux.Lock()
  60. if len(sinks.QR.Results) > 0 {
  61. *reply = strings.Join(sinks.QR.Results, "")
  62. sinks.QR.Results = make([]string, 10)
  63. } else {
  64. *reply = ""
  65. }
  66. sinks.QR.Mux.Unlock()
  67. return nil
  68. }
  69. func (t *Server) Stream(stream string, reply *string) error{
  70. content, err := processors.NewStreamProcessor(stream, path.Join(path.Dir(dataDir), "stream")).Exec()
  71. if err != nil {
  72. return fmt.Errorf("Stream command error: %s", err)
  73. } else {
  74. for _, c := range content{
  75. *reply = *reply + fmt.Sprintln(c)
  76. }
  77. }
  78. return nil
  79. }
  80. func (t *Server) CreateRule(rule *common.Rule, reply *string) error{
  81. r, err := processor.ExecCreate(rule.Name, rule.Json)
  82. if err != nil {
  83. return fmt.Errorf("Create rule error : %s.", err)
  84. } else {
  85. *reply = fmt.Sprintf("Rule %s was created.", rule.Name)
  86. }
  87. //Start the rule
  88. rs, err := t.createRuleState(r)
  89. if err != nil {
  90. return err
  91. }
  92. err = t.doStartRule(rs)
  93. if err != nil {
  94. return err
  95. }
  96. return nil
  97. }
  98. func (t *Server) createRuleState(rule *api.Rule) (*RuleState, error){
  99. if tp, err := processor.ExecInitRule(rule); err != nil{
  100. return nil, err
  101. }else{
  102. rs := &RuleState{
  103. Name: rule.Id,
  104. Topology: tp,
  105. Triggered: true,
  106. }
  107. registry[rule.Id] = rs
  108. return rs, nil
  109. }
  110. }
  111. func (t *Server) GetStatusRule(name string, reply *string) error{
  112. if rs, ok := registry[name]; ok{
  113. if !rs.Triggered {
  114. *reply = "Stopped: canceled manually."
  115. return nil
  116. }
  117. c := (*rs.Topology).GetContext()
  118. if c != nil{
  119. err := c.Err()
  120. switch err{
  121. case nil:
  122. *reply = "Running\n"
  123. case context.Canceled:
  124. *reply = "Stopped: canceled by error."
  125. case context.DeadlineExceeded:
  126. *reply = "Stopped: deadline exceed."
  127. default:
  128. *reply = "Stopped: unknown reason."
  129. }
  130. }else{
  131. *reply = "Stopped: no context found."
  132. }
  133. }else{
  134. return fmt.Errorf("Rule %s is not found", name)
  135. }
  136. return nil
  137. }
  138. func (t *Server) StartRule(name string, reply *string) error{
  139. var rs *RuleState
  140. rs, ok := registry[name]
  141. if !ok{
  142. r, err := processor.GetRuleByName(name)
  143. if err != nil{
  144. return err
  145. }
  146. rs, err = t.createRuleState(r)
  147. if err != nil {
  148. return err
  149. }
  150. }
  151. err := t.doStartRule(rs)
  152. if err != nil{
  153. return err
  154. }
  155. *reply = fmt.Sprintf("Rule %s was started", name)
  156. return nil
  157. }
  158. func (t *Server) doStartRule(rs *RuleState) error{
  159. rs.Triggered = true
  160. go func() {
  161. tp := rs.Topology
  162. select {
  163. case err := <-tp.Open():
  164. log.Println(err)
  165. tp.Cancel()
  166. }
  167. }()
  168. return nil
  169. }
  170. func (t *Server) StopRule(name string, reply *string) error{
  171. if rs, ok := registry[name]; ok{
  172. (*rs.Topology).Cancel()
  173. rs.Triggered = false
  174. *reply = fmt.Sprintf("Rule %s was stopped.", name)
  175. }else{
  176. *reply = fmt.Sprintf("Rule %s was not found.", name)
  177. }
  178. return nil
  179. }
  180. func (t *Server) RestartRule(name string, reply *string) error{
  181. err := t.StopRule(name, reply)
  182. if err != nil{
  183. return err
  184. }
  185. err = t.StartRule(name, reply)
  186. if err != nil{
  187. return err
  188. }
  189. *reply = fmt.Sprintf("Rule %s was restarted.", name)
  190. return nil
  191. }
  192. func (t *Server) DescRule(name string, reply *string) error{
  193. r, err := processor.ExecDesc(name)
  194. if err != nil {
  195. return fmt.Errorf("Desc rule error : %s.", err)
  196. } else {
  197. *reply = r
  198. }
  199. return nil
  200. }
  201. func (t *Server) ShowRules(_ int, reply *string) error{
  202. r, err := processor.ExecShow()
  203. if err != nil {
  204. return fmt.Errorf("Show rule error : %s.", err)
  205. } else {
  206. *reply = r
  207. }
  208. return nil
  209. }
  210. func (t *Server) DropRule(name string, reply *string) error{
  211. r, err := processor.ExecDrop(name)
  212. if err != nil {
  213. return fmt.Errorf("Drop rule error : %s.", err)
  214. } else {
  215. err := t.StopRule(name, reply)
  216. if err != nil{
  217. return err
  218. }
  219. }
  220. *reply = r
  221. return nil
  222. }
  223. func init(){
  224. var err error
  225. dataDir, err = common.GetDataLoc()
  226. if err != nil {
  227. log.Panic(err)
  228. }else{
  229. log.Infof("db location is %s", dataDir)
  230. }
  231. processor = processors.NewRuleProcessor(path.Dir(dataDir))
  232. registry = make(RuleRegistry)
  233. ticker := time.NewTicker(time.Second * 5)
  234. go func() {
  235. for {
  236. <-ticker.C
  237. if _, ok := registry[QUERY_RULE_ID]; !ok {
  238. continue
  239. }
  240. n := time.Now()
  241. w := 10 * time.Second
  242. if v := n.Sub(sinks.QR.LastFetch); v >= w {
  243. log.Printf("The client seems no longer fetch the query result, stop the query now.")
  244. stopQuery()
  245. }
  246. }
  247. //defer ticker.Stop()
  248. }()
  249. }
  250. var Version string = "unknown"
  251. func main() {
  252. server := new(Server)
  253. //Start rules
  254. if rules, err := processor.GetAllRules(); err != nil{
  255. log.Infof("Start rules error: %s", err)
  256. }else{
  257. log.Info("Starting rules")
  258. var reply string
  259. for _, rule := range rules{
  260. err = server.StartRule(rule, &reply)
  261. if err != nil {
  262. log.Info(err)
  263. }else{
  264. log.Info(reply)
  265. }
  266. }
  267. }
  268. //Start server
  269. err := rpc.Register(server)
  270. if err != nil {
  271. log.Fatal("Format of service Server isn't correct. ", err)
  272. }
  273. // Register a HTTP handler
  274. rpc.HandleHTTP()
  275. // Listen to TPC connections on port 1234
  276. listener, e := net.Listen("tcp", fmt.Sprintf(":%d", common.Config.Port))
  277. if e != nil {
  278. log.Fatal("Listen error: ", e)
  279. }
  280. msg := fmt.Sprintf("Serving kuiper (version - %s) on port %d... \n", Version, common.Config.Port)
  281. log.Info(msg)
  282. fmt.Printf(msg)
  283. // Start accept incoming HTTP connections
  284. err = http.Serve(listener, nil)
  285. if err != nil {
  286. log.Fatal("Error serving: ", err)
  287. }
  288. }