Commit ca440e27 by Oleksandr Barabash

basic auth added

parent c3b303ae
""" Handy Functions """
import binascii
import logging
import sys
import urllib.parse
from base64 import b64encode, b64decode
......@@ -12,6 +13,8 @@ from utils.log import Log
TAG = __name__
log = logging.getLogger()
DEFAULT_LOCALE = "en"
......@@ -59,6 +62,17 @@ def parse_auth_header(header: Optional[str]) -> Tuple[Optional[str],
return None, None
def parse_basic_auth(data: str, divider=":") -> Tuple[Optional[str],
Optional[str]]:
""" Split Value with divider and return login and password """
try:
login, password = data.split(divider)
return login, password
except ValueError:
log.error("Failed to parse basic auth, data: '{}'".format(data))
return None, None
def b64encode_str(data: str, encoding="utf-8") -> str:
""" Decode base64 str and return decoded string """
return b64encode_np(data.encode(encoding)).decode(encoding)
......
......@@ -13,7 +13,7 @@ from azure.core.exceptions import ResourceNotFoundError, HttpResponseError
from entities.json.admin_user import AdminUser
from utils.azure_key_vault_client import AzureKeyVaultClient
from utils.functions import b64encode_str, b64encode_np, parse_auth_header, \
b64decode_str, b64decode_np
b64decode_str, b64decode_np, parse_basic_auth
from utils.json_func import json_dumps, json_loads
from utils.log import Log
......@@ -25,6 +25,17 @@ class TokenHelper:
self.azure_kv = azure_kv
self.executor = ThreadPoolExecutor(10)
self.io_loop = asyncio.get_event_loop()
self.login = None
self.password = None
def get_admin_login_password(self):
""" get admin login and password """
if None in [self.login, self.password]:
from config import Auth
login = self.azure_kv.get_secret_bl(Auth.ADMIN_LOGIN_SECRET).value
passwd = self.azure_kv.get_secret_bl(Auth.ADMIN_PASSW_SECRET).value
self.login, self.password = login, passwd
return self.login, self.password
def sign_token_bl(self, header: Dict[str, Union[str, int]],
body: Dict[str, Union[str, int]],
......@@ -65,14 +76,11 @@ class TokenHelper:
""" Perform Auth blocking """
from config import Auth
login = self.azure_kv.get_secret_bl(Auth.ADMIN_LOGIN_SECRET).value
passw = self.azure_kv.get_secret_bl(Auth.ADMIN_PASSW_SECRET).value
if user.login == login and user.password == passw:
login, password = self.get_admin_login_password()
if user.login == login and user.password == password:
ttl = 3600
token = self.create_token_bl(user.login, ttl)
return dict(tokenType=Auth.TYPE,
expiresIn=ttl,
accessToken=token)
return dict(tokenType=Auth.TYPE, expiresIn=ttl, accessToken=token)
return None
def do_auth(self, user: AdminUser):
......@@ -80,6 +88,16 @@ class TokenHelper:
return self.io_loop.run_in_executor(self.executor, self.do_auth_bl,
user)
def is_basic_valid(self, credentials: str) -> bool:
""" Check if credentials are valid """
from config import Auth
basic_login, basic_password = parse_basic_auth(credentials)
login, password = self.get_admin_login_password()
if None not in [basic_login, basic_password, login, password] and \
basic_login == login and basic_password == password:
return True
return False
def is_token_valid(self, token: str) -> bool:
""" Check if token is Valid """
from config import Auth
......@@ -162,7 +180,9 @@ class TokenHelper:
request.headers.get("Authorization")
)
Log.i(__name__, "auth_headers:: type: '{}'".format(a_type))
if a_type == Auth.TYPE and self.is_token_valid(a_value):
if a_type == Auth.Types.BEARER and self.is_token_valid(a_value):
return await f(request)
elif a_type == Auth.Types.BASIC and self.is_basic_valid(a_value):
return await f(request)
return Response(status=HTTPStatus.FORBIDDEN)
return wr
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment