Commit 582b1db9 by Oleksandr Barabash

import fixes

parent 8297969f
......@@ -14,15 +14,19 @@ CARDS_PATH = os.path.join(ASSETS_PATH, "cards")
class Auth:
""" Auth type """
TYPE = "jwt"
RS256 = "RS256"
HS256 = "HS256"
CURRENT = RS256
BEARER = "Bearer"
ALG = RS256
class Types:
""" Auth types """
BEARER = "Bearer"
BASIC = "Basic"
class Algorithms:
""" Auth Algorithms """
RS256 = "RS256"
HS256 = "HS256"
TYPE = Types.BEARER
ALGORITHM = Algorithms.RS256
TOKEN_TYPE = "jwt"
ADMIN_LOGIN_SECRET = "adminLogin"
ADMIN_PASSW_SECRET = "adminPassword"
......
""" Token Helper """
import asyncio
import sys
from calendar import timegm
from concurrent.futures.thread import ThreadPoolExecutor
from datetime import datetime, timedelta
......@@ -11,7 +10,6 @@ from Crypto.Hash import SHA256
from aiohttp.web import Request, Response
from azure.core.exceptions import ResourceNotFoundError, HttpResponseError
from config import Auth
from entities.json.admin_user import AdminUser
from utils.azure_key_vault_client import AzureKeyVaultClient
from utils.functions import b64encode_str, b64encode_np, parse_auth_header, \
......@@ -39,7 +37,7 @@ class TokenHelper:
""" Sign token and return "{token}.{signature}" """
from config import Auth
if alg == Auth.RS256:
if alg == Auth.Algorithms.RS256:
""" RSA signature with SHA-256 """
key = self.azure_kv.get_or_create_random_key_bl()
header.update(dict(kid=key.name))
......@@ -50,7 +48,7 @@ class TokenHelper:
signature_encrypted = self.azure_kv.encrypt_bl(key, signature)
signature_b64 = b64encode_np(signature_encrypted).decode("utf-8")
return "{}.{}".format(token_unsigned, signature_b64)
elif alg == Auth.HS256:
elif alg == Auth.Algorithms.HS256:
""" HMAC with SHA-256 (HS256) """
pass
raise NotImplementedError("'{}' ALGORITHM ISN'T SUPPORTED".format(alg))
......@@ -61,7 +59,7 @@ class TokenHelper:
date = datetime.utcnow() + timedelta(seconds=ttl_seconds)
exp = timegm(date.utctimetuple())
alg = Auth.CURRENT
alg = Auth.ALGORITHM
jwt_head = dict(typ=MimeTypes.JWT, alg=alg)
jwt_body = dict(sub=login, exp=exp)
token_signed = self.sign_token_bl(jwt_head, jwt_body, alg)
......@@ -76,7 +74,7 @@ class TokenHelper:
if user.login == login and user.password == passw:
ttl = 3600
token = self.create_token_bl(user.login, ttl)
return dict(tokenType=Auth.BEARER,
return dict(tokenType=Auth.TYPE,
expiresIn=ttl,
accessToken=token)
return None
......@@ -88,6 +86,8 @@ class TokenHelper:
def is_token_valid(self, token: str) -> bool:
""" Check if token is Valid """
from config import Auth
# split first
header_b64_str, body_b64_str, signature = token.split(".")
token_unsigned = "{}.{}".format(header_b64_str, body_b64_str)
......@@ -106,11 +106,11 @@ class TokenHelper:
return False
# check type
if token_typ != Auth.TYPE:
if token_typ != Auth.TOKEN_TYPE:
return False
# check alg
if token_alg != Auth.ALG:
if token_alg != Auth.ALGORITHM:
return False
# check expiration
......@@ -130,20 +130,23 @@ class TokenHelper:
Log.e(__name__, "Key not found: '{}'".format(token_kid))
return False
# signature_gen = SHA256.new(token_unsigned.encode("utf-8")).digest()
# signature_encrypted = self.azure_kv.encrypt_bl(key, signature_gen)
# signature_b64 = b64encode_np(signature_encrypted).decode("utf-8")
# Log.e(__name__, exc_info=sys.exc_info())
signature_gen = SHA256.new(token_unsigned.encode("utf-8")).digest()
signature_encrypted = self.azure_kv.encrypt_bl(key, signature_gen)
signature_gen = b64encode_np(signature_encrypted).decode("utf-8")
if signature != signature_gen:
return False
return True
def is_auth(self, f):
""" Is auth decorator """
from config import Auth
async def wr(request: Request) -> Response:
""" Wrapper """
a_type, a_value = parse_auth_header(
request.headers.get("Authorization")
)
if a_type == Auth.BEARER and self.is_token_valid(a_value):
if a_type == Auth.TYPE and self.is_token_valid(a_value):
return await f(request)
return Response(status=HTTPStatus.FORBIDDEN)
return wr
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment