Jelajahi Sumber

fea(jwt auth): support JWT RSA256 auth for rest api (#1058)

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>
superxan 3 tahun lalu
induk
melakukan
6b8160627e

+ 8 - 0
docs/en_US/operation/configuration_file.md

@@ -65,6 +65,14 @@ The port for the rest api http server to listen to.
 ### restTls
 The tls cert file path and key file path setting. If restTls is not set, the rest api server will listen on http. Otherwise, it will listen on https.
 
+## authentication 
+eKuiper will check the ``Token`` for rest api when ``authentication`` option is true. please check this file for [more info](./operations.md).
+
+```yaml
+basic:
+  authentication: false
+```
+
 ## Prometheus Configuration
 
 eKuiper can export metrics to prometheus if ``prometheus`` option is true. The prometheus will be served with the port specified by ``prometheusPort`` option.

+ 46 - 0
docs/en_US/operation/operations.md

@@ -7,3 +7,49 @@
 
 eKuiper provides some RESTful management APIs. Please refer to [Rest-API doc](../restapi/overview.md) for more detailed information.
 
+
+## Authentication
+
+eKuiper support ``JWT RSA256`` authentication for the RESTful management APIs since ``1.4.0`` if enabled . Users need put their Public Key in ``etc/mgmt`` folder and use the corresponding Private key to sign the JWT Tokens.
+When user request the RESTful apis, put the ``Token`` in http request headers in the following format:
+```go
+Authorization: XXXXXXXXXXXXXXX
+```
+If the token is correct, eKuiper will respond the result; otherwise, it will return http ``401``code.
+
+
+### JWT Header
+
+```json
+{
+  "typ": "JWT",
+  "alg": "RS256"
+}
+```
+
+
+### JWT payload
+The JWT Payload should use the following format
+
+|  field   | optional |  meaning  |
+|  ----  | ----  | ----  |
+| iss  | false| Issuer , must use the same name with the public key put in ``etc/mgmt``|
+| aud  | false |Audience , must be ``eKuiper`` |
+| exp  | true |Expiration Time |
+| jti  | true |JWT ID |
+| iat  | true |Issued At |
+| nbf  | true |Not Before |
+| sub  | true |Subject |
+
+There is an example in json format
+```json
+{
+  "iss": "sample_key.pub",
+  "adu": "eKuiper"
+}
+```
+When use this format, user must make sure the correct Public key file ``sample_key.pub`` are under ``etc/mgmt`` .
+
+### JWT Signature
+
+need use the Private key to sign the Tokens and put the corresponding Public Key in ``etc/mgmt`` .

+ 9 - 0
docs/zh_CN/operation/configuration_file.md

@@ -45,6 +45,15 @@ REST http 服务器监听端口
 ### restTls
 TLS 证书 cert 文件和 key 文件位置。如果 restTls 选项未配置,则 REST 服务器将启动为 http 服务器,否则启动为 https 服务器。
 
+## authentication 
+当 ``authentication`` 选项为 true 时,eKuiper 将为 rest api 请求检查 ``Token`` 。请检查此文件以获取 [更多信息](./operations.md)。
+
+```yaml
+basic:
+  authentication: false
+```
+
+
 ## Prometheus 配置
 
 如果 `prometheus` 参数设置为 true,eKuiper 将把运行指标暴露到 prometheus。Prometheus 将运行在 `prometheusPort` 参数指定的端口上。

+ 47 - 1
docs/zh_CN/operation/operations.md

@@ -5,4 +5,50 @@
 
 ## Restful APIs
 
-eKuiper 提供了一些 RESTful 管理 API。eKuiper 提供了一些 RESTful 管理 APIs。请参考 [Rest-API 文档](../restapi/overview.md)以获取更详细信息。
+eKuiper 提供了一些 RESTful 管理 API。eKuiper 提供了一些 RESTful 管理 APIs。请参考 [Rest-API 文档](../restapi/overview.md)以获取更详细信息。
+
+## Authentication
+
+如果使能的话, eKuiper 从 1.4.0 起将为 RESTful API 提供基于 ``JWT RSA256`` 的身份验证。用户需要将他们的公钥放在 ``etc/mgmt`` 文件夹中,并使用相应的私钥来签署 JWT 令牌。
+当用户请求 RESTful API 时,将 ``Token`` 放在 http 请求头中,格式如下:
+```
+Authorization:XXXXXXXXXXXXXXX
+```
+如果token正确,eKuiper会响应结果;否则,它将返回 http ``401`` 代码。
+
+
+### JWT Header
+
+```json
+{
+  "typ": "JWT",
+  "alg": "RS256"
+}
+```
+
+
+### JWT Payload
+JWT Payload 应使用以下格式
+
+|  字段   | 是否可选 |  意义  |
+|  ----  | ----  | ----  |
+| iss  | 否| 颁发者 ,  此字段必须与``etc/mgmt``目录中的相应公钥文件名字一致|
+| aud  | 否 |颁发对象 , 此字段必须是 ``eKuiper`` |
+| exp  | 是 |过期时间 |
+| jti  | 是 |JWT ID |
+| iat  | 是 |颁发时间 |
+| nbf  | 是 |Not Before |
+| sub  | 是 |主题 |
+
+这里有一个 json 格式的例子
+```json
+{
+  "iss": "sample_key.pub",
+  "adu": "eKuiper"
+}
+```
+使用此格式时,用户必须确保正确的公钥文件 ``sample_key.pub`` 位于 ``etc/mgmt`` 下。
+
+### JWT Signature
+
+需要使用私钥对令牌进行签名,并将相应的公钥放在 ``etc/mgmt`` 中。

+ 2 - 0
etc/kuiper.yaml

@@ -17,6 +17,8 @@ basic:
   restIp: 0.0.0.0
   # REST service port
   restPort: 9081
+  # true|false, when true, will check the RSA jwt token for rest api
+  authentication: false
   #  restTls:
   #    certfile: /var/https-server.crt
   #    keyfile: /var/https-server.key

+ 27 - 0
etc/mgmt/sample_key

@@ -0,0 +1,27 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIEowIBAAKCAQEA4f5wg5l2hKsTeNem/V41fGnJm6gOdrj8ym3rFkEU/wT8RDtn
+SgFEZOQpHEgQ7JL38xUfU0Y3g6aYw9QT0hJ7mCpz9Er5qLaMXJwZxzHzAahlfA0i
+cqabvJOMvQtzD6uQv6wPEyZtDTWiQi9AXwBpHssPnpYGIn20ZZuNlX2BrClciHhC
+PUIIZOQn/MmqTD31jSyjoQoV7MhhMTATKJx2XrHhR+1DcKJzQBSTAGnpYVaqpsAR
+ap+nwRipr3nUTuxyGohBTSmjJ2usSeQXHI3bODIRe1AuTyHceAbewn8b462yEWKA
+Rdpd9AjQW5SIVPfdsz5B6GlYQ5LdYKtznTuy7wIDAQABAoIBAQCwia1k7+2oZ2d3
+n6agCAbqIE1QXfCmh41ZqJHbOY3oRQG3X1wpcGH4Gk+O+zDVTV2JszdcOt7E5dAy
+MaomETAhRxB7hlIOnEN7WKm+dGNrKRvV0wDU5ReFMRHg31/Lnu8c+5BvGjZX+ky9
+POIhFFYJqwCRlopGSUIxmVj5rSgtzk3iWOQXr+ah1bjEXvlxDOWkHN6YfpV5ThdE
+KdBIPGEVqa63r9n2h+qazKrtiRqJqGnOrHzOECYbRFYhexsNFz7YT02xdfSHn7gM
+IvabDDP/Qp0PjE1jdouiMaFHYnLBbgvlnZW9yuVf/rpXTUq/njxIXMmvmEyyvSDn
+FcFikB8pAoGBAPF77hK4m3/rdGT7X8a/gwvZ2R121aBcdPwEaUhvj/36dx596zvY
+mEOjrWfZhF083/nYWE2kVquj2wjs+otCLfifEEgXcVPTnEOPO9Zg3uNSL0nNQghj
+FuD3iGLTUBCtM66oTe0jLSslHe8gLGEQqyMzHOzYxNqibxcOZIe8Qt0NAoGBAO+U
+I5+XWjWEgDmvyC3TrOSf/KCGjtu0TSv30ipv27bDLMrpvPmD/5lpptTFwcxvVhCs
+2b+chCjlghFSWFbBULBrfci2FtliClOVMYrlNBdUSJhf3aYSG2Doe6Bgt1n2CpNn
+/iu37Y3NfemZBJA7hNl4dYe+f+uzM87cdQ214+jrAoGAXA0XxX8ll2+ToOLJsaNT
+OvNB9h9Uc5qK5X5w+7G7O998BN2PC/MWp8H+2fVqpXgNENpNXttkRm1hk1dych86
+EunfdPuqsX+as44oCyJGFHVBnWpm33eWQw9YqANRI+pCJzP08I5WK3osnPiwshd+
+hR54yjgfYhBFNI7B95PmEQkCgYBzFSz7h1+s34Ycr8SvxsOBWxymG5zaCsUbPsL0
+4aCgLScCHb9J+E86aVbbVFdglYa5Id7DPTL61ixhl7WZjujspeXZGSbmq0Kcnckb
+mDgqkLECiOJW2NHP/j0McAkDLL4tysF8TLDO8gvuvzNC+WQ6drO2ThrypLVZQ+ry
+eBIPmwKBgEZxhqa0gVvHQG/7Od69KWj4eJP28kq13RhKay8JOoN0vPmspXJo1HY3
+CKuHRG+AP579dncdUnOMvfXOtkdM4vk0+hWASBQzM9xzVcztCa+koAugjVaLS9A+
+9uQoqEeVNTckxx0S2bYevRy7hGQmUJTyQm3j1zEUR5jpdbL83Fbq
+-----END RSA PRIVATE KEY-----

+ 9 - 0
etc/mgmt/sample_key.pub

@@ -0,0 +1,9 @@
+-----BEGIN PUBLIC KEY-----
+MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA4f5wg5l2hKsTeNem/V41
+fGnJm6gOdrj8ym3rFkEU/wT8RDtnSgFEZOQpHEgQ7JL38xUfU0Y3g6aYw9QT0hJ7
+mCpz9Er5qLaMXJwZxzHzAahlfA0icqabvJOMvQtzD6uQv6wPEyZtDTWiQi9AXwBp
+HssPnpYGIn20ZZuNlX2BrClciHhCPUIIZOQn/MmqTD31jSyjoQoV7MhhMTATKJx2
+XrHhR+1DcKJzQBSTAGnpYVaqpsARap+nwRipr3nUTuxyGohBTSmjJ2usSeQXHI3b
+ODIRe1AuTyHceAbewn8b462yEWKARdpd9AjQW5SIVPfdsz5B6GlYQ5LdYKtznTuy
+7wIDAQAB
+-----END PUBLIC KEY-----

+ 1 - 1
go.mod

@@ -7,6 +7,7 @@ require (
 	github.com/alicebob/miniredis/v2 v2.15.1
 	github.com/benbjohnson/clock v1.0.0
 	github.com/cespare/xxhash/v2 v2.1.1 // indirect
+	github.com/dgrijalva/jwt-go v3.2.0+incompatible
 	github.com/eclipse/paho.mqtt.golang v1.3.5
 	github.com/edgexfoundry/go-mod-core-contracts/v2 v2.0.0
 	github.com/edgexfoundry/go-mod-messaging/v2 v2.0.1
@@ -32,7 +33,6 @@ require (
 	github.com/msgpack-rpc/msgpack-rpc-go v0.0.0-20131026060856-c76397e1782b
 	github.com/msgpack/msgpack-go v0.0.0-20130625150338-8224460e6fa3 // indirect
 	github.com/pebbe/zmq4 v1.2.7
-	github.com/pkg/errors v0.8.1 // indirect
 	github.com/prometheus/client_golang v1.2.1
 	github.com/sirupsen/logrus v1.4.2
 	github.com/smartystreets/goconvey v1.6.4 // indirect

+ 2 - 0
go.sum

@@ -43,6 +43,8 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
+github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
 github.com/eclipse/paho.mqtt.golang v1.3.5 h1:sWtmgNxYM9P2sP+xEItMozsR3w0cqZFlqnNN1bdl41Y=
 github.com/eclipse/paho.mqtt.golang v1.3.5/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc=
 github.com/edgexfoundry/go-mod-core-contracts/v2 v2.0.0 h1:tvfovdyoHOb392L59hiuA90awiXLX5IR3HOgbcWZkVQ=

+ 1 - 0
internal/conf/conf.go

@@ -52,6 +52,7 @@ type KuiperConf struct {
 		Prometheus     bool     `yaml:"prometheus"`
 		PrometheusPort int      `yaml:"prometheusPort"`
 		PluginHosts    string   `yaml:"pluginHosts"`
+		Authentication bool     `yaml:"authentication"`
 	}
 	Rule api.RuleOption
 	Sink struct {

+ 85 - 0
internal/pkg/jwt/jwt_rsa.go

@@ -0,0 +1,85 @@
+package jwt
+
+import (
+	"errors"
+	"fmt"
+	"github.com/dgrijalva/jwt-go"
+	"time"
+)
+
+const ExpireTimeMinutes = 10
+
+type Token struct {
+	jwt.StandardClaims
+}
+
+type ErrorType int8
+
+const (
+	JWT_VALIDATE_ERROR ErrorType = 1
+)
+
+const JWT_VALIDATE_TEMP = "JWTVAL__ERRCODE:%d__ERRSTR:%s__TOKEN:%s"
+const JWT_OTHER_TEMP = "JWTOTH__ERRSTR:%s__TOKEN:%s"
+
+type Error struct {
+	errType   ErrorType
+	Inner     error
+	metaToken string
+}
+
+func (e Error) Error() string {
+	switch e.errType {
+	case JWT_VALIDATE_ERROR:
+		if ve, ok := e.Inner.(*jwt.ValidationError); ok {
+			return fmt.Sprintf(JWT_VALIDATE_TEMP, ve.Errors, ve.Error(), e.metaToken)
+		}
+	default:
+		return fmt.Sprintf(JWT_OTHER_TEMP, e.Inner.Error(), e.metaToken)
+	}
+	return "Invalid token " + e.metaToken
+}
+
+func CreateToken(signKeyName, issuer, aud string) (string, error) {
+	tk := &Token{}
+	tk.Issuer = issuer
+	tk.Audience = aud
+	tk.ExpiresAt = time.Now().Add(time.Duration(ExpireTimeMinutes) * time.Minute).Unix()
+	token := jwt.NewWithClaims(jwt.GetSigningMethod("RS256"), tk)
+	signKey, err := GetPrivateKeyWithKeyName(signKeyName)
+	if err != nil {
+		return "", err
+	}
+	return token.SignedString(signKey)
+}
+
+func ParseToken(th string) (*Token, error) {
+	tk := &Token{}
+	token, err := jwt.ParseWithClaims(th, tk, func(token *jwt.Token) (interface{}, error) {
+		jwtToken := token.Claims.(*Token)
+
+		if jwtToken.Issuer == "" {
+			return "", fmt.Errorf("issuer field not exist in jwt payload")
+		}
+		pubKey, err := GetPublicKey(jwtToken.Issuer)
+		if err != nil {
+			return "", err
+		}
+		return pubKey, nil
+	})
+	if ve, ok := err.(*jwt.ValidationError); ok {
+		return tk, Error{
+			errType:   JWT_VALIDATE_ERROR,
+			Inner:     ve,
+			metaToken: th,
+		}
+	}
+
+	if err != nil {
+		return nil, err
+	}
+	if !token.Valid {
+		return nil, errors.New("invalid token")
+	}
+	return tk, nil
+}

+ 76 - 0
internal/pkg/jwt/jwt_rsa_test.go

@@ -0,0 +1,76 @@
+package jwt
+
+import (
+	"fmt"
+	"testing"
+)
+
+var expiredToken = "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJhdWQiOiJlS3VpcGVyIiwiZXhwIjoxNjM2MDExMzQxLCJpc3MiOiJzYW1wbGVfa2V5LnB1YiJ9.qm5Pq9VxDC10qbOM081U5NwScTOxYV_F5vyqbU9rXB2ebz4kDio_R2tgEgGyJ41lwD7gFl1quBjp_EgokPZNOoGRg5R1Ygf7iF8XJSDxYkspSCsBtZAuMCo3MCz3slQyvnr24qv3idUDhlwO6FPHGLaLHEyvrETSl1ZcECq2wvW01Tc2Jmg0-Kpp6TmEbH5aD-L0or5Bfy0ytBQ64nd2hKVaoADZZOXSt1iH2-1R35fEc_lBw7zs4QpCC2R--muoqYsYkESR08o6wIKAxRJvqeWab3C9k_g0zaPhwa7ZQ9wRzah-tc6PdotZkAyH7BCx-f7llO7UT47k0GnrhBe21g"
+var badFormatToken = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJmb28iOiJiYXIiLCJleHAiOjE1MDAwLCJpc3MiOiJ0ZXN0In0"
+
+func genToken(signKeyName, issuer, aud string) string {
+	tkStr, _ := CreateToken(signKeyName, issuer, aud)
+	return tkStr
+}
+
+func TestParseToken(t *testing.T) {
+	type args struct {
+		th string
+	}
+	tests := []struct {
+		name    string
+		args    args
+		wantErr bool
+	}{
+		{
+			name: "pass: have issuer public key",
+			args: args{
+				th: genToken("sample_key", "sample_key.pub", "eKuiper"),
+			},
+			wantErr: false,
+		},
+		{
+			name: "fail: token expired",
+			args: args{
+				th: expiredToken,
+			},
+			wantErr: true,
+		},
+		{
+			name: "fail: token sign error",
+			args: args{
+				th: genToken("sample_key", "sample_key.pub", "eKuiper") + "badSign",
+			},
+			wantErr: true,
+		},
+		{
+			name: "fail: do not have issuer's public key",
+			args: args{
+				th: genToken("sample_key", "notexist.pub", "eKuiper"),
+			},
+			wantErr: true,
+		},
+		{
+			name: "bad token format",
+			args: args{
+				th: badFormatToken,
+			},
+			wantErr: true,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			_, err := ParseToken(tt.args.th)
+			if (err != nil) != tt.wantErr {
+				t.Errorf("ParseToken() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+
+			if err != nil {
+				fmt.Printf("=====================\n")
+				fmt.Printf("Validate Error %s", err)
+			}
+
+		})
+	}
+}

+ 99 - 0
internal/pkg/jwt/rsa_keys.go

@@ -0,0 +1,99 @@
+package jwt
+
+import (
+	"crypto/rsa"
+	"fmt"
+	"github.com/dgrijalva/jwt-go"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"io/ioutil"
+	"path"
+	"strings"
+	"sync"
+)
+
+var privateKeyRepository = make(map[string]*rsa.PrivateKey)
+var repositoryLock sync.Mutex
+
+const RSAKeyDir = "mgmt"
+
+func GetPrivateKeyWithKeyName(keyName string) (*rsa.PrivateKey, error) {
+	repositoryLock.Lock()
+	defer repositoryLock.Unlock()
+
+	key, ok := privateKeyRepository[keyName]
+	if ok {
+		return key, nil
+	}
+
+	privateKey, err := privateKeyFromFile(keyName)
+	if err != nil {
+		return nil, err
+	}
+
+	privateKeyRepository[keyName] = privateKey
+
+	return privateKey, nil
+}
+
+func GetPublicKey(keyName string) (*rsa.PublicKey, error) {
+	publicKey, err := publicKeyFromFile(keyName)
+	if err != nil {
+		return nil, err
+	}
+
+	return publicKey, nil
+}
+
+func insensitiveGetFilePath(prikeyName string) (string, error) {
+	confDir, err := conf.GetConfLoc()
+	if nil != err {
+		return "", err
+	}
+
+	dir := path.Join(confDir, RSAKeyDir)
+	infos, err := ioutil.ReadDir(dir)
+	if nil != err {
+		return "", err
+	}
+
+	for _, info := range infos {
+		fileName := info.Name()
+		if strings.ToLower(fileName) == strings.ToLower(prikeyName) {
+			filePath := path.Join(dir, fileName)
+			return filePath, nil
+		}
+	}
+	return "", fmt.Errorf("not found target key file %s in /etc/%s", prikeyName, RSAKeyDir)
+}
+
+func privateKeyFromFile(keyName string) (*rsa.PrivateKey, error) {
+	keyPath, err := insensitiveGetFilePath(keyName)
+	if err != nil {
+		return nil, err
+	}
+	keyBytes, err := ioutil.ReadFile(keyPath)
+	if err != nil {
+		return nil, err
+	}
+	signKey, err := jwt.ParseRSAPrivateKeyFromPEM(keyBytes)
+	if err != nil {
+		return nil, err
+	}
+	return signKey, nil
+}
+
+func publicKeyFromFile(keyName string) (*rsa.PublicKey, error) {
+	keyPath, err := insensitiveGetFilePath(keyName)
+	if err != nil {
+		return nil, err
+	}
+	keyBytes, err := ioutil.ReadFile(keyPath)
+	if err != nil {
+		return nil, err
+	}
+	pubKey, err := jwt.ParseRSAPublicKeyFromPEM(keyBytes)
+	if err != nil {
+		return nil, err
+	}
+	return pubKey, nil
+}

+ 39 - 0
internal/server/middleware/auth.go

@@ -0,0 +1,39 @@
+package middleware
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/pkg/jwt"
+	"net/http"
+)
+
+var notAuth = []string{"/", "/ping"}
+
+var Auth = func(next http.Handler) http.Handler {
+	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+		requestPath := r.URL.Path
+		for _, value := range notAuth {
+			if value == requestPath {
+				next.ServeHTTP(w, r)
+				return
+			}
+		}
+
+		tokenHeader := r.Header.Get("Authorization")
+
+		if tokenHeader == "" {
+			http.Error(w, "missing_token", http.StatusUnauthorized)
+			return
+		}
+		tk, err := jwt.ParseToken(tokenHeader)
+		if err != nil {
+			http.Error(w, err.Error(), http.StatusUnauthorized)
+			return
+		}
+		if tk.StandardClaims.Audience != "eKuiper" {
+			http.Error(w, fmt.Sprintf("audience field should be eKuiper, but got %s", tk.StandardClaims.Audience), http.StatusUnauthorized)
+			return
+		}
+
+		next.ServeHTTP(w, r)
+	})
+}

+ 84 - 0
internal/server/middleware/auth_test.go

@@ -0,0 +1,84 @@
+package middleware
+
+import (
+	"github.com/lf-edge/ekuiper/internal/pkg/jwt"
+	"io/ioutil"
+	"net/http"
+	"net/http/httptest"
+	"reflect"
+	"testing"
+)
+
+func genToken(signKeyName, issuer, aud string) string {
+	tkStr, _ := jwt.CreateToken(signKeyName, issuer, aud)
+	return tkStr
+}
+
+func Test_AUTH(t *testing.T) {
+	nextHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+		w.WriteHeader(200)
+	})
+
+	handler := Auth(nextHandler)
+
+	type args struct {
+		th string
+	}
+	tests := []struct {
+		name     string
+		args     args
+		req      *http.Request
+		res      *httptest.ResponseRecorder
+		wantCode int
+	}{
+		{
+			name:     "token right",
+			args:     args{th: genToken("sample_key", "sample_key.pub", "eKuiper")},
+			req:      httptest.NewRequest("GET", "http://127.0.0.1:9081/streams", nil),
+			res:      httptest.NewRecorder(),
+			wantCode: 200,
+		},
+
+		{
+			name:     "audience not right",
+			args:     args{th: genToken("sample_key", "sample_key.pub", "Neuron")},
+			req:      httptest.NewRequest("GET", "http://127.0.0.1:9081/streams", nil),
+			res:      httptest.NewRecorder(),
+			wantCode: 401,
+		},
+		{
+			name:     "no token",
+			args:     args{th: ""},
+			req:      httptest.NewRequest("GET", "http://127.0.0.1:9081/streams", nil),
+			res:      httptest.NewRecorder(),
+			wantCode: 401,
+		},
+		{
+			name:     "no need token path",
+			args:     args{th: ""},
+			req:      httptest.NewRequest("GET", "http://127.0.0.1:9081/ping", nil),
+			res:      httptest.NewRecorder(),
+			wantCode: 200,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			tt.req.Header.Set("Authorization", tt.args.th)
+			handler.ServeHTTP(tt.res, tt.req)
+
+			res := tt.res.Result()
+
+			data, err := ioutil.ReadAll(res.Body)
+			if err != nil {
+				t.Errorf("expected error to be nil got %v", err)
+			}
+
+			if !reflect.DeepEqual(tt.wantCode, tt.res.Code) {
+				t.Errorf("expect %d, actual %d, result %s", tt.wantCode, tt.res.Code, string(data))
+			}
+
+			_ = res.Body.Close()
+		})
+	}
+}

+ 6 - 1
internal/server/rest.go

@@ -25,6 +25,7 @@ import (
 	"github.com/lf-edge/ekuiper/internal/plugin"
 	"github.com/lf-edge/ekuiper/internal/plugin/native"
 	"github.com/lf-edge/ekuiper/internal/plugin/portable"
+	"github.com/lf-edge/ekuiper/internal/server/middleware"
 	"github.com/lf-edge/ekuiper/internal/service"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
@@ -91,7 +92,7 @@ func jsonResponse(i interface{}, w http.ResponseWriter, logger api.Logger) {
 	}
 }
 
-func createRestServer(ip string, port int) *http.Server {
+func createRestServer(ip string, port int, needToken bool) *http.Server {
 	r := mux.NewRouter()
 	r.HandleFunc("/", rootHandler).Methods(http.MethodGet, http.MethodPost)
 	r.HandleFunc("/ping", pingHandler).Methods(http.MethodGet)
@@ -138,6 +139,10 @@ func createRestServer(ip string, port int) *http.Server {
 	r.HandleFunc("/services/functions/{name}", serviceFunctionHandler).Methods(http.MethodGet)
 	r.HandleFunc("/services/{name}", serviceHandler).Methods(http.MethodDelete, http.MethodGet, http.MethodPut)
 
+	if needToken {
+		r.Use(middleware.Auth)
+	}
+
 	server := &http.Server{
 		Addr: fmt.Sprintf("%s:%d", ip, port),
 		// Good practice to set timeouts to avoid Slowloris attacks.

+ 1 - 1
internal/server/server.go

@@ -133,7 +133,7 @@ func StartUp(Version, LoadFileType string) {
 	}
 
 	//Start rest service
-	srvRest := createRestServer(conf.Config.Basic.RestIp, conf.Config.Basic.RestPort)
+	srvRest := createRestServer(conf.Config.Basic.RestIp, conf.Config.Basic.RestPort, conf.Config.Basic.Authentication)
 	go func() {
 		var err error
 		if conf.Config.Basic.RestTls == nil {