浏览代码

feat<stream>: return the error message for topo

ngjaying 5 年之前
父节点
当前提交
2ca3ddf264
共有 3 个文件被更改,包括 7 次插入8 次删除
  1. 0 1
      xstream/api/stream.go
  2. 3 4
      xstream/contexts/default.go
  3. 4 3
      xstream/server/server/server.go

+ 0 - 1
xstream/api/stream.go

@@ -72,7 +72,6 @@ type StreamContext interface {
 	WithMeta(ruleId string, opId string) StreamContext
 	WithInstance(instanceId int) StreamContext
 	WithCancel() (StreamContext, context.CancelFunc)
-	GetError() error
 	SetError(e error)
 }
 

+ 3 - 4
xstream/contexts/default.go

@@ -40,6 +40,9 @@ func (c *DefaultContext) Done() <-chan struct{} {
 }
 
 func (c *DefaultContext) Err() error {
+	if c.err != nil{
+		return c.err
+	}
 	return c.ctx.Err()
 }
 
@@ -72,10 +75,6 @@ func (c *DefaultContext) GetInstanceId() int {
 	return c.instanceId
 }
 
-func (c *DefaultContext) GetError() error {
-	return c.err
-}
-
 func (c *DefaultContext) SetError(err error) {
 	c.err = err
 }

+ 4 - 3
xstream/server/server/server.go

@@ -66,8 +66,8 @@ func stopQuery() {
 func (t *Server) GetQueryResult(qid string, reply *string) error {
 	if rs, ok := registry[QUERY_RULE_ID]; ok {
 		c := (*rs.Topology).GetContext()
-		if c != nil && c.GetError() != nil{
-			return c.GetError()
+		if c != nil && c.Err() != nil{
+			return c.Err()
 		}
 	}
 
@@ -155,7 +155,7 @@ func (t *Server) GetStatusRule(name string, reply *string) error {
 			case context.DeadlineExceeded:
 				*reply = "Stopped: deadline exceed."
 			default:
-				*reply = "Stopped: unknown reason."
+				*reply = fmt.Sprintf("Stopped: %v.", err)
 			}
 		} else {
 			*reply = "Stopped: no context found."
@@ -193,6 +193,7 @@ func (t *Server) doStartRule(rs *RuleState) error {
 		tp := rs.Topology
 		select {
 		case err := <-tp.Open():
+			tp.GetContext().SetError(err)
 			log.Printf("closing rule %s for error: %v", rs.Name, err)
 			tp.Cancel()
 		}