Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
C
cake-bot
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Oleksandr Barabash
cake-bot
Commits
8297969f
Commit
8297969f
authored
Oct 25, 2022
by
Oleksandr Barabash
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
is_auth added
parent
6386fc33
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
106 additions
and
12 deletions
+106
-12
app.py
app.py
+3
-0
config.py
config.py
+6
-0
azure_key_vault_client.py
utils/azure_key_vault_client.py
+5
-1
functions.py
utils/functions.py
+13
-1
token_helper.py
utils/token_helper.py
+79
-10
No files found.
app.py
View file @
8297969f
...
...
@@ -74,6 +74,7 @@ ADAPTER.on_turn_error = on_error
BOT
=
TeamsMessagingExtensionsActionPreviewBot
(
app_settings
,
ADAPTER
)
@TOKEN_HELPER.is_auth
async
def
v1_get_initiations
(
request
:
Request
)
->
Response
:
""" Get Initiations by Notification ID """
# noinspection PyBroadException
...
...
@@ -92,6 +93,7 @@ async def v1_get_initiations(request: Request) -> Response:
return
Response
(
status
=
HTTPStatus
.
BAD_REQUEST
)
@TOKEN_HELPER.is_auth
async
def
v1_get_notification
(
request
:
Request
)
->
Response
:
""" Get Notification by ID """
# noinspection PyBroadException
...
...
@@ -114,6 +116,7 @@ async def v1_get_notification(request: Request) -> Response:
return
Response
(
status
=
HTTPStatus
.
BAD_REQUEST
)
@TOKEN_HELPER.is_auth
async
def
v1_notification
(
request
:
Request
)
->
Response
:
""" Notify channel with the link """
# todo(s1z): add auth
...
...
config.py
View file @
8297969f
...
...
@@ -14,10 +14,16 @@ CARDS_PATH = os.path.join(ASSETS_PATH, "cards")
class
Auth
:
""" Auth type """
TYPE
=
"jwt"
RS256
=
"RS256"
HS256
=
"HS256"
CURRENT
=
RS256
BEARER
=
"Bearer"
ALG
=
RS256
ADMIN_LOGIN_SECRET
=
"adminLogin"
ADMIN_PASSW_SECRET
=
"adminPassword"
...
...
utils/azure_key_vault_client.py
View file @
8297969f
...
...
@@ -53,8 +53,12 @@ class AzureKeyVaultClient:
""" Async get secret """
return
self
.
execute_blocking
(
self
.
secret_client
.
get_secret
,
name
)
def
get_key_bl
(
self
,
name
:
str
)
->
"KeyVaultKey"
:
""" Get key, blocking """
return
self
.
key_client
.
get_key
(
name
)
def
get_key
(
self
,
name
:
str
)
->
Awaitable
[
"KeyVaultKey"
]:
"""
Async get key
"""
"""
Get key, Async
"""
return
self
.
execute_blocking
(
self
.
key_client
.
get_key
,
name
)
def
create_key
(
self
,
name
:
str
)
->
Awaitable
[
"KeyVaultKey"
]:
...
...
utils/functions.py
View file @
8297969f
""" Handy Functions """
from
base64
import
b64encode
,
b64decode
from
typing
import
List
,
Optional
,
Dict
from
typing
import
List
,
Optional
,
Dict
,
Tuple
def
get_first_or_none
(
items
:
List
)
->
Optional
[
Dict
[
str
,
any
]]:
...
...
@@ -10,6 +10,18 @@ def get_first_or_none(items: List) -> Optional[Dict[str, any]]:
return
None
def
parse_auth_header
(
header
:
Optional
[
str
])
->
Tuple
[
Optional
[
str
],
Optional
[
str
]]:
""" Parse Authorization header and return Type, Value or None """
if
isinstance
(
header
,
str
):
try
:
t
,
v
=
header
.
split
(
":"
)[:
2
]
return
t
,
v
except
ValueError
:
pass
return
None
,
None
def
b64encode_str
(
data
:
str
,
encoding
=
"utf-8"
)
->
str
:
""" Decode base64 str and return decoded string """
return
b64encode_np
(
data
.
encode
(
encoding
))
.
decode
(
encoding
)
...
...
utils/token_helper.py
View file @
8297969f
""" Token Helper """
import
asyncio
import
sys
from
calendar
import
timegm
from
concurrent.futures.thread
import
ThreadPoolExecutor
from
datetime
import
datetime
,
timedelta
from
http
import
HTTPStatus
from
typing
import
Dict
,
Union
from
Crypto.Hash
import
SHA256
from
aiohttp.web
import
Request
,
Response
from
azure.core.exceptions
import
ResourceNotFoundError
,
HttpResponseError
from
config
import
Auth
from
entities.json.admin_user
import
AdminUser
from
utils.azure_key_vault_client
import
AzureKeyVaultClient
from
utils.functions
import
b64encode_str
,
b64encode_np
from
utils.json_func
import
json_dumps
from
utils.functions
import
b64encode_str
,
b64encode_np
,
parse_auth_header
,
\
b64decode_str
from
utils.json_func
import
json_dumps
,
json_loads
from
utils.log
import
Log
class
MimeTypes
:
...
...
@@ -21,8 +28,8 @@ class MimeTypes:
class
TokenHelper
:
""" Token Helper implementation """
def
__init__
(
self
,
azure_
cli
:
AzureKeyVaultClient
):
self
.
azure_
cli
=
azure_cli
def
__init__
(
self
,
azure_
kv
:
AzureKeyVaultClient
):
self
.
azure_
kv
=
azure_kv
self
.
executor
=
ThreadPoolExecutor
(
10
)
self
.
io_loop
=
asyncio
.
get_event_loop
()
...
...
@@ -34,13 +41,13 @@ class TokenHelper:
if
alg
==
Auth
.
RS256
:
""" RSA signature with SHA-256 """
key
=
self
.
azure_
cli
.
get_or_create_random_key_bl
()
key
=
self
.
azure_
kv
.
get_or_create_random_key_bl
()
header
.
update
(
dict
(
kid
=
key
.
name
))
token_unsigned
=
"{}.{}"
.
format
(
b64encode_str
(
json_dumps
(
header
)),
b64encode_str
(
json_dumps
(
body
)))
signature
=
SHA256
.
new
(
token_unsigned
.
encode
(
"utf-8"
))
.
digest
()
signature_encrypted
=
self
.
azure_
cli
.
encrypt_bl
(
key
,
signature
)
signature_encrypted
=
self
.
azure_
kv
.
encrypt_bl
(
key
,
signature
)
signature_b64
=
b64encode_np
(
signature_encrypted
)
.
decode
(
"utf-8"
)
return
"{}.{}"
.
format
(
token_unsigned
,
signature_b64
)
elif
alg
==
Auth
.
HS256
:
...
...
@@ -64,12 +71,12 @@ class TokenHelper:
""" Perform Auth blocking """
from
config
import
Auth
kv_login
=
self
.
azure_cli
.
get_secret_bl
(
Auth
.
ADMIN_LOGIN_SECRET
)
.
value
kv_passw
=
self
.
azure_cli
.
get_secret_bl
(
Auth
.
ADMIN_PASSW_SECRET
)
.
value
if
user
.
login
==
kv_login
and
user
.
password
==
kv_
passw
:
login
=
self
.
azure_kv
.
get_secret_bl
(
Auth
.
ADMIN_LOGIN_SECRET
)
.
value
passw
=
self
.
azure_kv
.
get_secret_bl
(
Auth
.
ADMIN_PASSW_SECRET
)
.
value
if
user
.
login
==
login
and
user
.
password
==
passw
:
ttl
=
3600
token
=
self
.
create_token_bl
(
user
.
login
,
ttl
)
return
dict
(
tokenType
=
"Bearer"
,
return
dict
(
tokenType
=
Auth
.
BEARER
,
expiresIn
=
ttl
,
accessToken
=
token
)
return
None
...
...
@@ -78,3 +85,65 @@ class TokenHelper:
""" Perform auth async """
return
self
.
io_loop
.
run_in_executor
(
self
.
executor
,
self
.
do_auth_bl
,
user
)
def
is_token_valid
(
self
,
token
:
str
)
->
bool
:
""" Check if token is Valid """
# split first
header_b64_str
,
body_b64_str
,
signature
=
token
.
split
(
"."
)
token_unsigned
=
"{}.{}"
.
format
(
header_b64_str
,
body_b64_str
)
# parse
header
=
json_loads
(
b64decode_str
(
header_b64_str
))
body
=
json_loads
(
b64decode_str
(
body_b64_str
))
# check fields
token_typ
=
header
.
get
(
"typ"
,
None
)
token_alg
=
header
.
get
(
"alg"
,
None
)
token_kid
=
header
.
get
(
"kid"
,
None
)
token_sub
=
body
.
get
(
"sub"
,
None
)
token_exp
=
body
.
get
(
"exp"
,
None
)
if
None
in
[
token_typ
,
token_alg
,
token_kid
,
token_sub
,
token_exp
]:
return
False
# check type
if
token_typ
!=
Auth
.
TYPE
:
return
False
# check alg
if
token_alg
!=
Auth
.
ALG
:
return
False
# check expiration
if
datetime
.
utcnow
()
.
timestamp
()
>
token_exp
:
return
False
# TODO(s1z): Cache this please
# check sub
login
=
self
.
azure_kv
.
get_secret_bl
(
Auth
.
ADMIN_LOGIN_SECRET
)
.
value
if
token_sub
!=
login
:
return
False
# check signature
try
:
key
=
self
.
azure_kv
.
get_key_bl
(
token_kid
)
except
(
ResourceNotFoundError
,
HttpResponseError
):
Log
.
e
(
__name__
,
"Key not found: '{}'"
.
format
(
token_kid
))
return
False
# signature_gen = SHA256.new(token_unsigned.encode("utf-8")).digest()
# signature_encrypted = self.azure_kv.encrypt_bl(key, signature_gen)
# signature_b64 = b64encode_np(signature_encrypted).decode("utf-8")
# Log.e(__name__, exc_info=sys.exc_info())
return
True
def
is_auth
(
self
,
f
):
""" Is auth decorator """
async
def
wr
(
request
:
Request
)
->
Response
:
""" Wrapper """
a_type
,
a_value
=
parse_auth_header
(
request
.
headers
.
get
(
"Authorization"
)
)
if
a_type
==
Auth
.
BEARER
and
self
.
is_token_valid
(
a_value
):
return
await
f
(
request
)
return
Response
(
status
=
HTTPStatus
.
FORBIDDEN
)
return
wr
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment