|
@@ -18,7 +18,7 @@ import (
|
|
|
"fmt"
|
|
|
"net/http"
|
|
|
"net/url"
|
|
|
- "strings"
|
|
|
+ "time"
|
|
|
|
|
|
"github.com/lf-edge/ekuiper/internal/conf"
|
|
|
"github.com/lf-edge/ekuiper/internal/pkg/httpx"
|
|
@@ -40,21 +40,6 @@ func (ms *RestSink) Open(ctx api.StreamContext) error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-type MultiErrors []error
|
|
|
-
|
|
|
-func (me MultiErrors) AddError(err error) MultiErrors {
|
|
|
- me = append(me, err)
|
|
|
- return me
|
|
|
-}
|
|
|
-
|
|
|
-func (me MultiErrors) Error() string {
|
|
|
- s := make([]string, len(me))
|
|
|
- for i, v := range me {
|
|
|
- s = append(s, fmt.Sprintf("Error %d with info %s. \n", i, v))
|
|
|
- }
|
|
|
- return strings.Join(s, " ")
|
|
|
-}
|
|
|
-
|
|
|
func (ms *RestSink) Collect(ctx api.StreamContext, item interface{}) error {
|
|
|
logger := ctx.GetLogger()
|
|
|
logger.Debugf("rest sink receive %s", item)
|
|
@@ -100,8 +85,14 @@ func (ms *RestSink) Collect(ctx api.StreamContext, item interface{}) error {
|
|
|
}
|
|
|
|
|
|
func (ms *RestSink) Send(ctx api.StreamContext, decodedData []byte, v interface{}, logger api.Logger) (*http.Response, error) {
|
|
|
- // Allow to use tokens in headers
|
|
|
- // TODO optimization: only do this if tokens are used in template
|
|
|
+ // Allow to use tokens in headers and check oAuth token expiration
|
|
|
+ if ms.accessConf != nil && ms.accessConf.ExpireInSecond > 0 &&
|
|
|
+ int(time.Now().Sub(ms.tokenLastUpdateAt).Abs().Seconds()) >= ms.accessConf.ExpireInSecond {
|
|
|
+ ctx.GetLogger().Debugf("Refreshing token for REST sink")
|
|
|
+ if err := ms.refresh(ctx); err != nil {
|
|
|
+ ctx.GetLogger().Warnf("Refresh REST sink token error: %v", err)
|
|
|
+ }
|
|
|
+ }
|
|
|
if ms.tokens != nil {
|
|
|
switch dt := v.(type) {
|
|
|
case map[string]interface{}:
|