Commit 5b9c96dc by Oleksandr Barabash

init commit. Removed update on receivers as it's going to be obsolete

parent 5eccee51
......@@ -14,3 +14,7 @@ static/node_modules
static/cesium-kml-czml-editor
package-lock.json
package.json
*.pyc
**.pyc
.venv
.idea
......@@ -5,3 +5,8 @@ numpy>=1.13.3
lxml>=4.2.1
czml3>=0.5.4
scikit_learn>=0.23.2
# ether requirements
absl-py==1.4.0
aiohttp==3.8.4
simplejson==3.18.4
""" This is a Protocol to wirk with obsolete DF-Aggregator objects """
from typing import Protocol
class Receiver(Protocol):
""" Receiver object """
...
def update(self, first_run=False):
""" Some update logic """
...
def receiver_dict(self):
""" Receiver representation in json format """
...
def lob_length(self):
""" Some magic here, i suppose. We skip it for now.
There's a bit of hardcode with 200... """
...
""" Ether web service """
import asyncio
import logging
from threading import Thread, Lock
from typing import List, Dict
from aiohttp import web
from utils.ether_service.entities.receiver import Receiver
from utils.ether_service.handlers.api.v1.ws import WSHandler
from utils.run_app_fixed import create_task
logger = logging.getLogger(__name__)
class EtherService(Thread):
""" Ether Service. This is a thread that runs aiohttp.
Main purpose is:
1. Receive data from KrSPIs.
2. Do math on the background.
3. Update global variables for the obsolete web service."""
def __init__(self, web_service: Thread, db_writer: Thread,
receivers: List[Receiver], shared_dict: Dict,
*,
interface="0.0.0.0", port=8090):
super().__init__(name=self.__class__.__name__, daemon=True)
self.web_service = web_service
self.db_writer = db_writer
self.receivers = receivers
self.shared_dict = shared_dict
self._is_running = False
self._is_running_lock = Lock()
self.interface = interface
self.port = port
self.io_loop = asyncio.get_event_loop()
self.web_app = web.Application()
self.web_app.add_routes([
WSHandler().compile(),
])
self.main_task = None
@property
def is_running(self):
""" Get _is_running safely """
with self._is_running_lock:
return self._is_running
@is_running.setter
def is_running(self, value: bool):
""" set _is_running safely """
with self._is_running_lock:
self._is_running = value
def run(self):
""" Thread routine """
self.is_running = True
self.main_task = create_task(self.web_app, host=self.interface,
port=self.port, loop=self.io_loop,
handle_signals=False,
print=lambda x: x)
logger.info("Server has started")
self.io_loop.run_forever()
logger.info("Server has stopped")
def start(self):
""" Start the app """
logger.info("Server is starting")
super().start()
def stop(self):
""" Stop the app """
logger.info("Server is stopping")
self.is_running = False
# add web_server.stop()
# add db_writer.stop()
self.io_loop.call_soon_threadsafe(self.main_task.cancel)
self.io_loop.call_soon_threadsafe(self.io_loop.stop)
""" Websocket handler """
import base64
import sys
from typing import Any, Awaitable, Union, Tuple, Optional
import aiohttp
import uuid
from absl import logging
from aiohttp import WSMessage
from aiohttp.web_exceptions import HTTPForbidden
from aiohttp.web_request import Request
from aiohttp.web_response import Response
from aiohttp.web_ws import WebSocketResponse
from multidict import CIMultiDictProxy
from ...base_handler import BaseHandler
class AuthType:
""" Supported auth types """
BASIC = "basic"
BEARER = "bearer"
async def auth_basic(credentials: str) -> bool:
""" Auth using basic credentials """
# noinspection PyBroadException
try:
data = base64.b64decode(credentials.encode("utf-8")).decode("utf-8")
login, password = data.split(":")
if login == "kraken" and password == "kraken":
return True
except Exception:
logging.error("auth_basic error:", exc_info=sys.exc_info())
return False
def parse_auth_header(headers: "CIMultiDictProxy[str]")\
-> Tuple[Optional[str], Optional[str]]:
""" Parse auth header, return AuthType and AuthData """
auth_header = headers.get("Authorization") or None
if auth_header is not None:
try:
auth_type, auth_data = auth_header.split(" ")
return auth_type, auth_data
except ValueError:
logging.error(f"Could not parse Auth Header: '{auth_header}'")
return None, None
async def authenticate(request: Request) -> bool:
""" Authenticate request """
auth_type, auth_data = parse_auth_header(request.headers)
if auth_type is not None and auth_data is not None:
logging.info(f"auth_type and auth_data are fine, handling")
if auth_type.lower() == AuthType.BASIC:
logging.info(f"Handling Basic auth")
return await auth_basic(auth_data)
elif auth_data.lower() == AuthType.BEARER:
logging.warning(f"{AuthType.BEARER} is not implemented!")
return False
else:
logging.warning(f"{auth_type} is not supported!")
logging.warning(f"Either, auth_type or auth_data is null")
return False
def auth_required(f) -> Any:
""" Check if user is auth """
async def wr(self, request: Request, *args, **kwargs)\
-> Awaitable[Union[WebSocketResponse, Response]]:
""" wrapper """
is_auth = await authenticate(request) or self.PATH.find("?") >= 0
if is_auth:
return await f(self, request, *args, **kwargs)
raise HTTPForbidden()
return wr
class WSHandler(BaseHandler):
""" DoA Handler """
METHOD = "GET"
PATH = "/api/v1/ws"
ws = None
connection_id = None
@staticmethod
async def handle_binary(data: bytes) -> None:
""" handle web socket data """
logging.info(f"Binary data received: {data}")
async def handle_message(self, msg: WSMessage) -> None:
""" Handle incoming websocket packet """
if msg.type == aiohttp.WSMsgType.ERROR:
ex = self.ws.exception()
exc_info = (ex.__class__, ex, ex.__traceback__)
logging.error("Ws connection closed", exc_info=exc_info)
elif msg.type == aiohttp.WSMsgType.BINARY:
await self.handle_binary(msg.data)
elif msg.type == aiohttp.WSMsgType.TEXT:
await self.ws.send_str(msg.data)
@auth_required
async def handler(self, request: Request) -> WebSocketResponse:
""" The handler """
connection_id = uuid.uuid4()
logging.info(f"WS opened. connection_id: '{connection_id}'")
self.ws = ws = WebSocketResponse()
await ws.prepare(request)
logging.info(f"WS prepared. connection_id: '{connection_id}'")
async for msg in ws:
# noinspection PyTypeChecker
await self.handle_message(msg)
logging.info(f"WS closed. connection_id: '{connection_id}'")
return ws
""" This is a base handler """
import logging
from aiohttp import web
from aiohttp.web_request import Request
from aiohttp.web_response import Response
class BaseHandler:
""" API V1 Health Check """
METHOD = None
PATH = None
async def handler(self, _: Request) -> Response:
""" The handler """
raise NotImplementedError()
def compile(self):
""" compile into the route """
logger = logging.getLogger(self.__module__)
logger.info(f"{self.METHOD}:{self.PATH}")
return web.route(self.METHOD, self.PATH, self.handler)
def _get_name(self):
""" Get handler name """
return f"{self.__module__}.{self.__class__.__name__}"
from __future__ import absolute_import
from __future__ import unicode_literals
import logging
def init_logging(filename=None, level=None):
""" init logging on the app level """
logging_config = {"format": "%(asctime)-23s %(levelname)8s::"
"%(name)s::%(funcName)s:"
"%(message)s",
"level": level or logging.DEBUG}
if filename is not None:
logging_config["filename"] = filename
logging.getLogger().handlers = []
logging.basicConfig(**logging_config)
""" Default run_app uses method run_until_complete,
which is awful and wrongn """
import asyncio
import logging
import socket
from asyncio import all_tasks, Task
from ssl import SSLContext
from typing import Union, Awaitable, Optional
from aiohttp.abc import Application, AbstractAccessLogger
from aiohttp.log import access_logger
from aiohttp.web import HostSequence, _run_app, _cancel_tasks
from typing import (
Any,
Awaitable,
Callable,
Iterable as TypingIterable,
List,
Optional,
Set,
Type,
Union,
cast,
)
from aiohttp.web_log import AccessLogger
from aiohttp.web_runner import GracefulExit
def create_task(
app: Union[Application, Awaitable[Application]],
*,
host: Optional[Union[str, HostSequence]] = None,
port: Optional[int] = None,
path: Optional[str] = None,
sock: Optional[Union[socket.socket, TypingIterable[socket.socket]]] = None,
shutdown_timeout: float = 60.0,
keepalive_timeout: float = 75.0,
ssl_context: Optional[SSLContext] = None,
print: Callable[..., None] = print,
backlog: int = 128,
access_log_class: Type[AbstractAccessLogger] = AccessLogger,
access_log_format: str = AccessLogger.LOG_FORMAT,
access_log: Optional[logging.Logger] = access_logger,
handle_signals: bool = True,
reuse_address: Optional[bool] = None,
reuse_port: Optional[bool] = None,
loop: Optional[asyncio.AbstractEventLoop] = None,
) -> Task:
"""Run an app locally"""
if loop is None:
loop = asyncio.new_event_loop()
# Configure if and only if in debugging mode and using the default logger
if loop.get_debug() and access_log and access_log.name == "aiohttp.access":
if access_log.level == logging.NOTSET:
access_log.setLevel(logging.DEBUG)
if not access_log.hasHandlers():
access_log.addHandler(logging.StreamHandler())
main_task = loop.create_task(
_run_app(
app,
host=host,
port=port,
path=path,
sock=sock,
shutdown_timeout=shutdown_timeout,
keepalive_timeout=keepalive_timeout,
ssl_context=ssl_context,
print=print,
backlog=backlog,
access_log_class=access_log_class,
access_log_format=access_log_format,
access_log=access_log,
handle_signals=handle_signals,
reuse_address=reuse_address,
reuse_port=reuse_port,
)
)
return main_task
def run_app(
app: Union[Application, Awaitable[Application]],
*,
host: Optional[Union[str, HostSequence]] = None,
port: Optional[int] = None,
path: Optional[str] = None,
sock: Optional[Union[socket.socket, TypingIterable[socket.socket]]] = None,
shutdown_timeout: float = 60.0,
keepalive_timeout: float = 75.0,
ssl_context: Optional[SSLContext] = None,
print: Callable[..., None] = print,
backlog: int = 128,
access_log_class: Type[AbstractAccessLogger] = AccessLogger,
access_log_format: str = AccessLogger.LOG_FORMAT,
access_log: Optional[logging.Logger] = access_logger,
handle_signals: bool = True,
reuse_address: Optional[bool] = None,
reuse_port: Optional[bool] = None,
loop: Optional[asyncio.AbstractEventLoop] = None,
) -> None:
"""Run an app locally"""
main_task = create_task(app, host=host, port=port, path=path, sock=sock,
shutdown_timeout=shutdown_timeout,
keepalive_timeout=keepalive_timeout,
ssl_context=ssl_context, print=print,
backlog=backlog,
access_log_class=access_log_class,
access_log_format=access_log_format,
access_log=access_log,
handle_signals=handle_signals,
reuse_address=reuse_address,
reuse_port=reuse_port, loop=loop)
try:
asyncio.set_event_loop(loop)
loop.run_forever()
except (GracefulExit, KeyboardInterrupt): # pragma: no cover
pass
finally:
_cancel_tasks({main_task}, loop)
_cancel_tasks(all_tasks(loop), loop)
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment