Browse Source

get status for query

RockyJin 5 years atrás
parent
commit
2bddcf2a37
2 changed files with 17 additions and 3 deletions
  1. 7 2
      xstream/cli/main.go
  2. 10 1
      xstream/server/server/server.go

+ 7 - 2
xstream/cli/main.go

@@ -103,14 +103,19 @@ func main() {
 						err := client.Call("Server.CreateQuery", text, &reply)
 						if err != nil{
 							fmt.Println(err)
-							return err
+							continue
 						} else {
 							fmt.Println(reply)
 							go func() {
 								for {
 									<-ticker.C
 									var result string
-									_ = client.Call("Server.GetQueryResult", "", &result)
+									e := client.Call("Server.GetQueryResult", "", &result)
+									if e != nil {
+										fmt.Println(e)
+										fmt.Print("kuiper > ")
+										return
+									}
 									if result != "" {
 										fmt.Println(result)
 									}

+ 10 - 1
xstream/server/server/server.go

@@ -41,7 +41,7 @@ func (t *Server) CreateQuery(sql string, reply *string) error {
 	}
 	tp, err := processors.NewRuleProcessor(path.Dir(dataDir)).ExecQuery(QUERY_RULE_ID, sql)
 	if err != nil {
-		return fmt.Errorf("failed to create query: %s", err)
+		return err
 	} else {
 		rs := &RuleState{Name: QUERY_RULE_ID, Topology: tp, Triggered: true}
 		registry[QUERY_RULE_ID] = rs
@@ -64,6 +64,15 @@ func stopQuery() {
  * qid is not currently used.
  */
 func (t *Server) GetQueryResult(qid string, reply *string) error {
+	if rs, ok := registry[QUERY_RULE_ID]; ok {
+		c := (*rs.Topology).GetContext()
+		if c != nil {
+			if err := c.Err(); err != nil {
+				return err
+			}
+		}
+	}
+
 	sinks.QR.LastFetch = time.Now()
 	sinks.QR.Mux.Lock()
 	if len(sinks.QR.Results) > 0 {