Преглед изворни кода

print err info for query tools

RockyJin пре 5 година
родитељ
комит
ec62fcca0b

+ 1 - 1
common/util.go

@@ -259,7 +259,7 @@ func GetLoc(subdir string)(string, error) {
 		return confDir, nil
 	}
 
-	return "", fmt.Errorf("conf dir not found")
+	return "", fmt.Errorf("conf dir not found, please set KuiperBaseKey program environment variable correctly.")
 }
 
 func GetAndCreateDataLoc(dir string) (string, error) {

+ 1 - 0
xsql/processors/xsql_processor.go

@@ -236,6 +236,7 @@ func (p *RuleProcessor) ExecQuery(ruleid, sql string) (*xstream.TopologyNew, err
 			select {
 			case err := <-tp.Open():
 				log.Infof("closing query for error: %v", err)
+				tp.GetContext().SetError(err)
 				tp.Cancel()
 			}
 		}()

+ 2 - 0
xstream/api/stream.go

@@ -72,6 +72,8 @@ type StreamContext interface {
 	WithMeta(ruleId string, opId string) StreamContext
 	WithInstance(instanceId int) StreamContext
 	WithCancel() (StreamContext, context.CancelFunc)
+	GetError() error
+	SetError(e error)
 }
 
 type Operator interface {

+ 10 - 0
xstream/contexts/default.go

@@ -15,6 +15,7 @@ type DefaultContext struct {
 	opId       string
 	instanceId int
 	ctx        context.Context
+	err        error
 }
 
 func Background() *DefaultContext {
@@ -71,6 +72,15 @@ func (c *DefaultContext) GetInstanceId() int {
 	return c.instanceId
 }
 
+func (c *DefaultContext) GetError() error {
+	return c.err
+}
+
+func (c *DefaultContext) SetError(err error) {
+	c.err = err
+}
+
+
 func (c *DefaultContext) WithMeta(ruleId string, opId string) api.StreamContext {
 	return &DefaultContext{
 		ruleId:     ruleId,

+ 3 - 5
xstream/server/server/server.go

@@ -66,10 +66,8 @@ func stopQuery() {
 func (t *Server) GetQueryResult(qid string, reply *string) error {
 	if rs, ok := registry[QUERY_RULE_ID]; ok {
 		c := (*rs.Topology).GetContext()
-		if c != nil {
-			if err := c.Err(); err != nil {
-				return err
-			}
+		if c != nil && c.GetError() != nil{
+			return c.GetError()
 		}
 	}
 
@@ -102,7 +100,7 @@ func (t *Server) CreateRule(rule *common.Rule, reply *string) error {
 	if err != nil {
 		return fmt.Errorf("Create rule error : %s.", err)
 	} else {
-		*reply = fmt.Sprintf("Rule %s was created.", rule.Name)
+		*reply = fmt.Sprintf("Rule %s was created, please use 'cli getstatus rule $rule_name' command to get rule status.", rule.Name)
 	}
 	//Start the rule
 	rs, err := t.createRuleState(r)

+ 1 - 1
xstream/streams.go

@@ -27,7 +27,7 @@ func NewWithName(name string) *TopologyNew {
 	return tp
 }
 
-func (s *TopologyNew) GetContext() context.Context {
+func (s *TopologyNew) GetContext() api.StreamContext {
 	return s.ctx
 }