Prechádzať zdrojové kódy

fix(source httppull): Memory leak when use http source

Should close http connection after read out the body
Add ctx.Done() check in select statement, else will lead to goroutine leak

Close #888 #925
Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>
Jianxiang Ran 3 rokov pred
rodič
commit
48bd01429b
1 zmenil súbory, kde vykonal 3 pridanie a 1 odobranie
  1. 3 1
      internal/topo/source/httppull_source.go

+ 3 - 1
internal/topo/source/httppull_source.go

@@ -176,11 +176,11 @@ func (hps *HTTPPullSource) initTimerPull(ctx api.StreamContext, consumer chan<-
 					logger.Warnf("Found error http return code: %d when trying to reach %v ", resp.StatusCode, hps)
 					logger.Warnf("Found error http return code: %d when trying to reach %v ", resp.StatusCode, hps)
 					break
 					break
 				}
 				}
-				defer resp.Body.Close()
 				c, err := ioutil.ReadAll(resp.Body)
 				c, err := ioutil.ReadAll(resp.Body)
 				if err != nil {
 				if err != nil {
 					logger.Warnf("Found error %s when trying to reach %v ", err, hps)
 					logger.Warnf("Found error %s when trying to reach %v ", err, hps)
 				}
 				}
+				resp.Body.Close()
 				if hps.incremental {
 				if hps.incremental {
 					nmd5 := getMD5Hash(c)
 					nmd5 := getMD5Hash(c)
 					if omd5 == nmd5 {
 					if omd5 == nmd5 {
@@ -201,6 +201,8 @@ func (hps *HTTPPullSource) initTimerPull(ctx api.StreamContext, consumer chan<-
 				select {
 				select {
 				case consumer <- api.NewDefaultSourceTuple(result, meta):
 				case consumer <- api.NewDefaultSourceTuple(result, meta):
 					logger.Debugf("send data to device node")
 					logger.Debugf("send data to device node")
+				case <-ctx.Done():
+					return
 				}
 				}
 			}
 			}
 		case <-ctx.Done():
 		case <-ctx.Done():