123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189 |
- package extensions
- import (
- "crypto/md5"
- "encoding/hex"
- "encoding/json"
- "fmt"
- "github.com/emqx/kuiper/common"
- "github.com/emqx/kuiper/xstream/api"
- "io/ioutil"
- "net/http"
- "net/url"
- "strings"
- "time"
- )
- const DEFAULT_INTERVAL = 10000
- const DEFAULT_TIMEOUT = 5000
- type HTTPPullSource struct {
- url string
- method string
- interval int
- timeout int
- incremental bool
- body string
- bodyType string
- headers map[string]string
- client *http.Client
- }
- var bodyTypeMap = map[string]string{"none": "", "text": "text/plain", "json": "application/json", "html": "text/html", "xml": "application/xml", "javascript": "application/javascript", "form": ""}
- func (hps *HTTPPullSource) Configure(device string, props map[string]interface{}) error {
- hps.url = "http://localhost"
- if u, ok := props["url"]; ok {
- if p, ok := u.(string); ok {
- hps.url = p
- }
- }
- hps.method = http.MethodGet
- if m, ok := props["method"]; ok {
- if m1, ok1 := m.(string); ok1 {
- switch strings.ToUpper(m1) {
- case http.MethodGet, http.MethodPost, http.MethodPut, http.MethodDelete:
- hps.method = strings.ToUpper(m1)
- default:
- return fmt.Errorf("Not supported HTTP method %s.", m1)
- }
- }
- }
- hps.interval = DEFAULT_INTERVAL
- if i, ok := props["interval"]; ok {
- if i1, ok1 := i.(int); ok1 {
- hps.interval = i1
- } else {
- return fmt.Errorf("Not valid interval value %v.", i1)
- }
- }
- hps.timeout = DEFAULT_TIMEOUT
- if i, ok := props["timeout"]; ok {
- if i1, ok1 := i.(int); ok1 {
- hps.timeout = i1
- } else {
- return fmt.Errorf("Not valid timeout value %v.", i1)
- }
- }
- hps.incremental = false
- if i, ok := props["incremental"]; ok {
- if i1, ok1 := i.(bool); ok1 {
- hps.incremental = i1
- } else {
- return fmt.Errorf("Not valid incremental value %v.", i1)
- }
- }
- hps.bodyType = "json"
- if c, ok := props["bodyType"]; ok {
- if c1, ok1 := c.(string); ok1 {
- if _, ok2 := bodyTypeMap[strings.ToLower(c1)]; ok2 {
- hps.bodyType = strings.ToLower(c1)
- } else {
- return fmt.Errorf("Not valid body type value %v.", c)
- }
- } else {
- return fmt.Errorf("Not valid body type value %v.", c)
- }
- }
- if b, ok := props["body"]; ok {
- if b1, ok1 := b.(string); ok1 {
- hps.body = b1
- } else {
- return fmt.Errorf("Not valid incremental value %v, expect string.", b1)
- }
- }
- hps.headers = make(map[string]string)
- if h, ok := props["headers"]; ok {
- if h1, ok1 := h.(map[string]interface{}); ok1 {
- for k, v := range h1 {
- if v1, ok2 := CastToString(v); ok2 {
- hps.headers[k] = v1
- }
- }
- } else {
- return fmt.Errorf("Not valid header value %v.", h1)
- }
- }
- common.Log.Infof("Initialized with configurations %#v.", hps)
- return nil
- }
- func (hps *HTTPPullSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
- _, e := url.Parse(hps.url)
- if e != nil {
- errCh <- e
- return
- }
- hps.client = &http.Client{Timeout: time.Duration(hps.timeout) * time.Millisecond}
- hps.initTimerPull(ctx, consumer, errCh)
- }
- func (hps *HTTPPullSource) Close(ctx api.StreamContext) error {
- logger := ctx.GetLogger()
- logger.Infof("Closing HTTP pull source")
- return nil
- }
- func (hps *HTTPPullSource) initTimerPull(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
- ticker := time.NewTicker(time.Millisecond * time.Duration(hps.interval))
- logger := ctx.GetLogger()
- defer ticker.Stop()
- var omd5 = ""
- for {
- select {
- case <-ticker.C:
- if resp, e := common.Send(logger, hps.client, hps.bodyType, hps.method, hps.url, hps.headers, true, []byte(hps.body)); e != nil {
- logger.Warnf("Found error %s when trying to reach %v ", e, hps)
- } else {
- logger.Debugf("rest sink got response %v", resp)
- if resp.StatusCode < 200 || resp.StatusCode > 299 {
- logger.Warnf("Found error http return code: %d when trying to reach %v ", resp.StatusCode, hps)
- break
- }
- defer resp.Body.Close()
- c, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- logger.Warnf("Found error %s when trying to reach %v ", err, hps)
- }
- if hps.incremental {
- nmd5 := getMD5Hash(c)
- if omd5 == nmd5 {
- logger.Infof("Content has not changed since last fetch, so skip processing.")
- continue
- } else {
- omd5 = nmd5
- }
- }
- result := make(map[string]interface{})
- meta := make(map[string]interface{})
- if e := json.Unmarshal(c, &result); e != nil {
- logger.Errorf("Invalid data format, cannot convert %s into JSON with error %s", string(c), e)
- return
- }
- select {
- case consumer <- api.NewDefaultSourceTuple(result, meta):
- logger.Debugf("send data to device node")
- }
- }
- case <-ctx.Done():
- return
- }
- }
- }
- func getMD5Hash(text []byte) string {
- hash := md5.Sum(text)
- return hex.EncodeToString(hash[:])
- }
|