Commit 08410532 by Oleksandr Barabash

grabber added

parent 79a14e65
""" App """
import asyncio
import logging
import signal
import sys
from asyncio import AbstractEventLoop
from typing import Optional
import numpy as np
from marshmallow import EXCLUDE
from utils.ether_service.entities.data.kr_packet import KrPacketV2
from utils.ether_service.utils.json_func import json_loads
from utils.ws_client import WSClient
logger = logging.getLogger(__name__)
def init_logging(filename=None, level=None):
""" init logging on the app level """
logging_config = {"format": "%(message)s",
"level": level or logging.DEBUG}
if filename is not None:
logging_config["filename"] = filename
logging.getLogger().handlers = []
logging.basicConfig(**logging_config)
def stop_server(server: WSClient, io_loop: AbstractEventLoop):
""" Stop server and execute finish gracefully """
def wr(_signum, _frame):
""" We need a wraper to dynamicaly set server """
server.stop()
io_loop.stop()
return wr
# noinspection PyShadowingNames
def try_to_log(kr_packet: KrPacketV2,
log_name: Optional[str] = None) -> None:
""" try to save log into the file """
if log_name is not None:
try:
with open(log_name, "a") as f:
f.write(f"{kr_packet}\r\n")
except IOError:
logging.error("save to file error!", exc_info=sys.exc_info())
# noinspection PyShadowingNames
def handle_message(station_name: Optional[str] = None,
log_name: Optional[str] = None) -> "(_: bytes) -> None":
""" Handle Message """
def wrapped(data: bytes) -> None:
""" Handle Message """
message = json_loads(data)
if message is None:
logger.warning(f"Failed to parse message!")
logger.debug(f"Failed to parse message!: {repr(data)}")
return
# noinspection PyBroadException
try:
kr_packet = KrPacketV2.load(message, unknown=EXCLUDE)
if station_name is None or \
station_name.lower() == kr_packet.station_id.lower():
logging.info(f"{kr_packet}")
try_to_log(kr_packet, log_name)
except Exception:
logging.error("handle_message: Error!", exc_info=sys.exc_info())
return wrapped
# noinspection PyShadowingNames
def main(io_loop: AbstractEventLoop,
station_name: Optional[str] = None,
log_name: Optional[str] = None) -> None:
""" Main """
ws_client = WSClient(io_loop=io_loop, scheme="https", host="cds.s1z.info",
path="/api/v1/ws-agg",
user="kraken", password="kraken", ca_file="./ca.pem",
on_data_handler=handle_message(station_name,
log_name))
ws_client.start()
signal.signal(signal.SIGINT, stop_server(ws_client, io_loop))
signal.signal(signal.SIGTERM, stop_server(ws_client, io_loop))
io_loop.run_forever()
if __name__ == "__main__":
logging.getLogger("geventwebsocket").level = logging.WARNING
logging.getLogger("asyncio").level = logging.INFO
logging_level = logging.INFO
init_logging(level=logging_level)
loop = asyncio.get_event_loop()
station_name, file_name = None, None
if len(sys.argv) > 1:
station_name = sys.argv[1]
if len(sys.argv) > 2:
file_name = sys.argv[2]
main(loop, station_name, file_name)
""" Kraken data packet implementation """ """ Kraken data packet implementation """
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime, timedelta
from ..basic_dataclass import BasicDataclass from ..basic_dataclass import BasicDataclass
example_packet = {
"tStamp": 1681082256511,
"latitude": "46.6280265",
"longitude": "32.4401186",
"gpsBearing": "0",
"radioBearing": "249",
"conf": "0.69",
"power": "-69.9",
"freq": 441875000,
"antType": "ULA",
"latency": 100,
"doaArray
"id": "Fake"
}
@dataclass @dataclass
class KrPacket(BasicDataclass): class KrPacket(BasicDataclass):
""" Kraken Packet """ """ Kraken Packet """
...@@ -39,3 +24,43 @@ class KrPacket(BasicDataclass): ...@@ -39,3 +24,43 @@ class KrPacket(BasicDataclass):
def __repr__(self): def __repr__(self):
return f"<KrP: {self.id}::{self.radio_bearing}>" return f"<KrP: {self.id}::{self.radio_bearing}>"
@dataclass
class KrPacketV2(BasicDataclass):
""" Kraken Packet V2 """
adc_overdrive: int
num_corr_sources: str
snr_db: float
serial: str
# doa_array: str
station_id: str
t_stamp: int
gps_timestamp: int
latitude: float
longitude: float
gps_bearing: float
radio_bearing: float # str(int)
conf: float
power: float
freq: int
ant_type: str
latency: int
processing_time: int
def __repr__(self):
dt = (
datetime.fromtimestamp(self.t_stamp / 1000)
+ timedelta(hours=3) # this is a hack, fix it pls!
).strftime('%Y-%m-%d %H:%M:%S')
absolute_bearing = self.gps_bearing + (360 - self.radio_bearing)
if absolute_bearing < 0:
absolute_bearing += 360
elif absolute_bearing > 359:
absolute_bearing -= 360
return f"{dt}::{self.station_id}::" \
f"frequency: {self.freq / 1000000}, " \
f"DoA: {absolute_bearing}, " \
f"confidence: {self.conf}, " \
f"power: {self.power}"
...@@ -53,7 +53,7 @@ class ProcessService(Process): ...@@ -53,7 +53,7 @@ class ProcessService(Process):
self.handle_command(*command) self.handle_command(*command)
except Exception as ex: except Exception as ex:
exc_info = type(ex), ex, ex.__traceback__ exc_info = type(ex), ex, ex.__traceback__
# logger.error("main loop execution error", exc_info=exc_info) logger.error("main loop execution error", exc_info=exc_info)
now = time() now = time()
if time() - last_refresh >= 1: if time() - last_refresh >= 1:
last_refresh = now last_refresh = now
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment