Commit 3b16ace6 by Oleksandr Barabash

moved from xlsx to csv

parent b0b757a5
......@@ -18,3 +18,4 @@ package.json
**.pyc
.venv
.idea
.DS_Store
""" App """
""" Eter-cli implementation """
import abc
import asyncio
import logging
import os
import signal
import sys
from asyncio import AbstractEventLoop
from typing import Optional
from concurrent.futures import ThreadPoolExecutor
from typing import Any
from absl import app, flags
from absl.flags import FLAGS
from marshmallow import EXCLUDE
from openpyxl import Workbook
from openpyxl.utils import get_column_letter
from utils.ether_service.entities.data.kr_packet import KrPacketV2
from utils.ether_service.utils.json_func import json_loads
......@@ -24,7 +25,74 @@ flags.DEFINE_string('station_id', None, "Station Id")
flags.DEFINE_string('file', None, "File where to save the history")
def init_logging(filename=None, level=None):
class PacketLogger(abc.ABC):
""" Abstract Logger """
@abc.abstractmethod
def log(self, kr_packet: KrPacketV2) -> None:
""" Log packet """
raise NotImplementedError()
@abc.abstractmethod
def save(self) -> None:
""" Save data """
raise NotImplementedError()
class CSVLogger(PacketLogger):
""" CSVLogger """
BREAK_LINE = "\r\n"
def __init__(self, file_path: str):
self.file_path = file_path
is_file_exists = os.path.exists(self.file_path)
self.file = open(self.file_path, "a+")
if not is_file_exists:
self.write("; ".join(["Station ID", "Timestamp", "Frequency",
"DoA", "Confidence", "Power", "Serial",
"Radio Bearing",
"Compass Offset"]) + self.BREAK_LINE)
self.save()
@staticmethod
def serialize_data(data: Any) -> str:
""" serialize data for the csv format """
if isinstance(data, str):
return data
return str(data)
# noinspection PyBroadException
def write(self, data: str) -> None:
""" Try write """
while True:
try:
self.file.write(data)
return
except Exception:
self.file = open(self.file_path, "a+")
def log(self, kr_packet: KrPacketV2) -> None:
""" Log packet """
self.write("; ".join([
self.serialize_data(kr_packet.station_id),
self.serialize_data(kr_packet.get_timestamp()),
self.serialize_data(kr_packet.freq / 1000000),
self.serialize_data(kr_packet.get_absolute_doa()),
self.serialize_data(kr_packet.conf),
self.serialize_data(kr_packet.power),
self.serialize_data(kr_packet.serial),
self.serialize_data(kr_packet.radio_bearing),
self.serialize_data(kr_packet.compass_offset)
]) + self.BREAK_LINE)
def save(self) -> None:
""" Save data """
self.file.flush()
self.file.close()
def init_logging(filename: str = None, level: int = None) -> None:
""" init logging on the app level """
logging_config = {"format": "%(message)s",
"level": level or logging.DEBUG}
......@@ -34,82 +102,11 @@ def init_logging(filename=None, level=None):
logging.basicConfig(**logging_config)
def stop_server(server: WSClient, io_loop: AbstractEventLoop):
""" Stop server and execute finish gracefully """
def wr(_signum, _frame):
""" We need a wraper to dynamicaly set server """
server.stop()
io_loop.stop()
return wr
class SpreadSheetHelper:
""" SpreadSheetHelper implementation """
def __init__(self):
self.workbook = Workbook()
self.sheet = self.workbook.active
self.sheet.title = "DoA"
self.row = 1
self.column = 1
self.create_headers()
self.format_columns()
def create_headers(self) -> None:
""" Create headers """
self.sheet.cell(self.row, self.column, "Station ID")
self.sheet.cell(self.row, self.column + 1, "Timestamp")
self.sheet.cell(self.row, self.column + 2, "Frequency")
self.sheet.cell(self.row, self.column + 3, "DoA")
self.sheet.cell(self.row, self.column + 4, "Confidence")
self.sheet.cell(self.row, self.column + 5, "Power")
self.sheet.cell(self.row, self.column + 6, "Serial")
self.sheet.cell(self.row, self.column + 7, "Radio Bearing")
self.sheet.cell(self.row, self.column + 8, "Compass Offset")
self.row += 1
def log_packet(self, kr_packet: KrPacketV2) -> None:
""" Log packet """
self.sheet.cell(self.row, self.column, kr_packet.station_id)
self.sheet.cell(self.row, self.column + 1, kr_packet.get_timestamp())
self.sheet.cell(self.row, self.column + 2, kr_packet.freq / 1000000)
self.sheet.cell(self.row, self.column + 3,
kr_packet.get_absolute_doa())
self.sheet.cell(self.row, self.column + 4, kr_packet.conf)
self.sheet.cell(self.row, self.column + 5, kr_packet.power)
self.sheet.cell(self.row, self.column + 6, kr_packet.serial)
self.sheet.cell(self.row, self.column + 7, kr_packet.radio_bearing)
self.sheet.cell(self.row, self.column + 8, kr_packet.compass_offset)
self.row += 1
def save(self, filename: str) -> None:
""" save workbook """
self.workbook.save(filename)
def format_columns(self) -> None:
""" set width """
for column_cells in self.sheet.columns:
new_column_letter = get_column_letter(column_cells[0].column)
self.sheet.column_dimensions[new_column_letter].width = 20
# noinspection PyShadowingNames
def try_to_log(helper: SpreadSheetHelper, kr_packet: KrPacketV2) -> None:
""" try to save log into the file """
if FLAGS.file is not None:
try:
helper.log_packet(kr_packet)
helper.save(FLAGS.file)
except IOError:
logging.error("save to file error!", exc_info=sys.exc_info())
def handle_message(helper: SpreadSheetHelper) -> None:
""" Handle message wrapper """
def on_data_handler(packet_logger: PacketLogger):
""" Handle data """
def wr(data: bytes) -> None:
""" Handle Message """
""" On Data handler """
message = json_loads(data)
if message is None:
logger.warning(f"Failed to parse message!")
......@@ -118,35 +115,63 @@ def handle_message(helper: SpreadSheetHelper) -> None:
# noinspection PyBroadException
try:
kr_packet = KrPacketV2.load(message, unknown=EXCLUDE)
station_id = FLAGS.station_id
station_id = flags.FLAGS.station_id
if station_id is None or \
station_id.lower() == kr_packet.station_id.lower():
logging.info(f"{kr_packet}")
try_to_log(helper, kr_packet)
logger.info(f"{kr_packet}")
packet_logger.log(kr_packet)
except Exception:
logging.error("handle_message: Error!", exc_info=sys.exc_info())
return wr
# noinspection PyShadowingNames
def main(_argv) -> None:
""" Main """
def stop_server(server: WSClient, packet_logger: PacketLogger,
io_loop: AbstractEventLoop):
""" Stop server and execute finish gracefully """
def wr(_signum, _frame):
""" We need a wraper to dynamicaly set server """
def callback():
""" callback """
if server.is_running:
logging.info("Stopping the service!")
packet_logger.save()
server.stop()
else:
logging.info("Service is already stopping!")
io_loop.call_soon_threadsafe(callback)
return wr
def main(_argv) -> int:
""" main """
# When main is called, means logging has been changed on the abseil level
# Change geventwebsocket and asyncio log levels
logging.getLogger("geventwebsocket").level = logging.WARNING
logging.getLogger("asyncio").level = logging.INFO
logging_level = logging.INFO
init_logging(level=logging_level)
helper = SpreadSheetHelper()
# change default level to INFO
init_logging(level=logging.INFO)
io_loop = asyncio.get_event_loop()
packet_logger = CSVLogger(flags.FLAGS.file)
ws_client = WSClient(io_loop=io_loop, scheme="https", host="cds.s1z.info",
path="/api/v1/ws-agg",
user="kraken", password="kraken", ca_file="./ca.pem",
on_data_handler=handle_message(helper))
on_data_handler=on_data_handler(packet_logger))
ws_client.start()
signal.signal(signal.SIGINT, stop_server(ws_client, io_loop))
signal.signal(signal.SIGTERM, stop_server(ws_client, io_loop))
signal.signal(signal.SIGINT, stop_server(ws_client, packet_logger,
io_loop))
signal.signal(signal.SIGTERM, stop_server(ws_client, packet_logger,
io_loop))
io_loop.run_forever()
return 0
if __name__ == "__main__":
......
......@@ -15,3 +15,4 @@ stringcase==1.2.0
# excel
openpyxl==3.1.2
aiofiles==23.1.0
\ No newline at end of file
......@@ -58,7 +58,7 @@ class KrPacketV2(BasicDataclass):
).strftime('%Y-%m-%d %H:%M:%S')
def get_absolute_doa(self):
""" get abolute doa """
""" get absolute doa """
absolute_bearing = self.gps_bearing + (360 - self.radio_bearing)
if absolute_bearing < 0:
absolute_bearing += 360
......
""" Web Socket Client """
import asyncio
import ssl
import sys
from asyncio import AbstractEventLoop
from typing import Callable, Optional
from typing import Callable, Optional, Awaitable
import aiohttp
from absl import logging
from yarl import URL
class WSClientSocketError(Exception):
""" Basic Socket error """
pass
class WSClient:
""" WebSocket Client """
def __init__(self, io_loop: AbstractEventLoop, scheme: str, host: str,
path: str, user: str, password: str, ca_file: str = None,
on_data_handler: Optional[Callable] = None):
path: str, user: str, password: str,
on_data_handler: Callable, ca_file: Optional[str] = None):
self.io_loop = io_loop
self.scheme = scheme
self.host = host
......@@ -22,7 +26,9 @@ class WSClient:
self.user = user
self.password = password
self.ca_file = ca_file
self.main_job: Optional[Awaitable] = None
self.on_data_handler = on_data_handler
self.url = URL.build(scheme=self.scheme, host=self.host,
path=self.path, user=self.user,
password=self.password)
......@@ -33,122 +39,76 @@ class WSClient:
self.ssl_context.load_verify_locations(cafile=self.ca_file)
self.is_running = False
self.ws = None
self.main_job = None
self.ping_job = None
self.send_job = None
self.queue = asyncio.Queue()
self.on_data_handlers = []
self.add_on_data_handler(on_data_handler)
def add_on_data_handler(self, handler: Callable) -> None:
""" Add on data handler """
if callable(handler):
self.io_loop.call_soon_threadsafe(self.on_data_handlers.append,
handler)
def del_on_data_handler(self, handler: Callable) -> None:
""" Del on data handler """
def delete():
""" real function """
try:
index = self.on_data_handlers.index(handler)
self.on_data_handlers.pop(index)
except ValueError:
pass
self.io_loop.call_soon_threadsafe(delete)
def start(self) -> None:
""" Start client """
def on_data_received(self, data: bytes) -> None:
""" On data received via websocket """
for handler in self.on_data_handlers:
handler(data)
def start_callback(o) -> None:
""" start callback """
o.is_running = True
o.main_job = o.io_loop.create_task(o.run())
def send_data(self, data: str) -> None:
""" Send data to the server """
data_binary = data.encode("utf-8")
self.queue.put_nowait(data_binary)
self.io_loop.call_soon_threadsafe(start_callback, self)
async def send_worker(self):
""" Send worker """
data = None
while self.is_running:
if self.ws is not None:
if data is None:
data = await self.queue.get()
# noinspection PyBroadException
try:
await self.ws.send_bytes(data)
self.queue.task_done()
data = None
except Exception:
exc_info = sys.exc_info()
logging.warning("send worker failed with error",
exc_info=exc_info)
await asyncio.sleep(0)
def stop(self) -> None:
""" Stop client """
async def ping_worker(self):
""" Ping worker """
while self.is_running:
await asyncio.sleep(10)
if self.ws is not None:
# noinspection PyBroadException
try:
await self.ws.ping()
except Exception:
exc_info = sys.exc_info()
logging.warning("ping worker failed with error",
exc_info=exc_info)
async def finisher(obj: "WSClient") -> None:
""" Finishes the execution and clears the objects params """
if obj.main_job is not None:
obj.main_job.cancel()
obj.io_loop.stop()
def start(self) -> None:
""" Start client """
self.stop()
self.is_running = True
self.main_job = self.io_loop.create_task(self.run())
self.ping_job = self.io_loop.create_task(self.ping_worker())
self.send_job = self.io_loop.create_task(self.send_worker())
def stop_callback(obj: "WSClient") -> None:
""" start callback """
obj.is_running = False
obj.io_loop.create_task(finisher(obj))
def stop(self) -> None:
""" Stop client """
self.is_running = False
if self.main_job:
self.main_job.cancel()
self.main_job = None
if self.ping_job:
self.ping_job.cancel()
self.ping_job = None
if self.send_job:
self.send_job.cancel()
self.send_job = None
self.io_loop.call_soon_threadsafe(stop_callback, self)
async def run(self):
""" start web socket connection """
while self.is_running:
ws = None
session = aiohttp.ClientSession()
# noinspection PyBroadException
try:
async with session.ws_connect(self.url,
ssl=self.ssl_context) as ws:
logging.info("WS opened.")
self.ws = ws
async for msg in ws:
# noinspection PyUnresolvedReferences
if msg.type == aiohttp.WSMsgType.BINARY:
# noinspection PyUnresolvedReferences
self.on_data_received(msg.data)
self.on_data_handler(msg.data)
elif msg.type == aiohttp.WSMsgType.CONTINUATION:
pass
elif msg.type == aiohttp.WSMsgType.TEXT:
logging.warning("WS received 'TEXT' data, "
"ignoring...")
pass # do nothing
elif msg.type == aiohttp.WSMsgType.PING:
# TODO(s1z): Check if it's handled on the low level
pass
elif msg.type == aiohttp.WSMsgType.PONG:
# TODO(s1z): Check if it's handled on the low level
pass
elif msg.type == aiohttp.WSMsgType.CLOSE:
raise WSClientSocketError("CLOSE message received")
elif msg.type == aiohttp.WSMsgType.CLOSING:
raise WSClientSocketError("Socket is CLOSING")
elif msg.type == aiohttp.WSMsgType.CLOSED:
logging.warning("WS Closed, raising an IOError.")
raise IOError("WS Closed")
raise WSClientSocketError("Socket is CLOSED")
elif msg.type == aiohttp.WSMsgType.ERROR:
raise self.ws.exception()
except Exception as ex:
raise WSClientSocketError("Socket ERROR")
await asyncio.sleep(0)
await asyncio.sleep(0)
except WSClientSocketError as ex:
exc_info = (ex.__class__, ex, ex.__traceback__)
logging.warning("WS error.", exc_info=exc_info)
self.ws = None
except Exception as ex:
exc_info = (ex.__class__, ex, ex.__traceback__)
logging.warning("Unknown Error received:", exc_info=exc_info)
finally:
if ws is not None:
await ws.close()
if session is not None:
await session.close()
await asyncio.sleep(0)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment