Commit 777fc1e2 by Oleksandr Barabash

ether v0.1.0 commit

parent 5b9c96dc
-----BEGIN CERTIFICATE-----
MIIFYDCCBEigAwIBAgIQQAF3ITfU6UK47naqPGQKtzANBgkqhkiG9w0BAQsFADA/
MSQwIgYDVQQKExtEaWdpdGFsIFNpZ25hdHVyZSBUcnVzdCBDby4xFzAVBgNVBAMT
DkRTVCBSb290IENBIFgzMB4XDTIxMDEyMDE5MTQwM1oXDTI0MDkzMDE4MTQwM1ow
TzELMAkGA1UEBhMCVVMxKTAnBgNVBAoTIEludGVybmV0IFNlY3VyaXR5IFJlc2Vh
cmNoIEdyb3VwMRUwEwYDVQQDEwxJU1JHIFJvb3QgWDEwggIiMA0GCSqGSIb3DQEB
AQUAA4ICDwAwggIKAoICAQCt6CRz9BQ385ueK1coHIe+3LffOJCMbjzmV6B493XC
ov71am72AE8o295ohmxEk7axY/0UEmu/H9LqMZshftEzPLpI9d1537O4/xLxIZpL
wYqGcWlKZmZsj348cL+tKSIG8+TA5oCu4kuPt5l+lAOf00eXfJlII1PoOK5PCm+D
LtFJV4yAdLbaL9A4jXsDcCEbdfIwPPqPrt3aY6vrFk/CjhFLfs8L6P+1dy70sntK
4EwSJQxwjQMpoOFTJOwT2e4ZvxCzSow/iaNhUd6shweU9GNx7C7ib1uYgeGJXDR5
bHbvO5BieebbpJovJsXQEOEO3tkQjhb7t/eo98flAgeYjzYIlefiN5YNNnWe+w5y
sR2bvAP5SQXYgd0FtCrWQemsAXaVCg/Y39W9Eh81LygXbNKYwagJZHduRze6zqxZ
Xmidf3LWicUGQSk+WT7dJvUkyRGnWqNMQB9GoZm1pzpRboY7nn1ypxIFeFntPlF4
FQsDj43QLwWyPntKHEtzBRL8xurgUBN8Q5N0s8p0544fAQjQMNRbcTa0B7rBMDBc
SLeCO5imfWCKoqMpgsy6vYMEG6KDA0Gh1gXxG8K28Kh8hjtGqEgqiNx2mna/H2ql
PRmP6zjzZN7IKw0KKP/32+IVQtQi0Cdd4Xn+GOdwiK1O5tmLOsbdJ1Fu/7xk9TND
TwIDAQABo4IBRjCCAUIwDwYDVR0TAQH/BAUwAwEB/zAOBgNVHQ8BAf8EBAMCAQYw
SwYIKwYBBQUHAQEEPzA9MDsGCCsGAQUFBzAChi9odHRwOi8vYXBwcy5pZGVudHJ1
c3QuY29tL3Jvb3RzL2RzdHJvb3RjYXgzLnA3YzAfBgNVHSMEGDAWgBTEp7Gkeyxx
+tvhS5B1/8QVYIWJEDBUBgNVHSAETTBLMAgGBmeBDAECATA/BgsrBgEEAYLfEwEB
ATAwMC4GCCsGAQUFBwIBFiJodHRwOi8vY3BzLnJvb3QteDEubGV0c2VuY3J5cHQu
b3JnMDwGA1UdHwQ1MDMwMaAvoC2GK2h0dHA6Ly9jcmwuaWRlbnRydXN0LmNvbS9E
U1RST09UQ0FYM0NSTC5jcmwwHQYDVR0OBBYEFHm0WeZ7tuXkAXOACIjIGlj26Ztu
MA0GCSqGSIb3DQEBCwUAA4IBAQAKcwBslm7/DlLQrt2M51oGrS+o44+/yQoDFVDC
5WxCu2+b9LRPwkSICHXM6webFGJueN7sJ7o5XPWioW5WlHAQU7G75K/QosMrAdSW
9MUgNTP52GE24HGNtLi1qoJFlcDyqSMo59ahy2cI2qBDLKobkx/J3vWraV0T9VuG
WCLKTVXkcGdtwlfFRjlBz4pYg1htmf5X6DYO8A4jqv2Il9DjXA6USbW1FzXSLr9O
he8Y4IWS6wY7bCkjCWDcRQJMEhg76fsO3txE+FiYruq9RUWhiF1myv4Q6W+CyBFC
Dfvp7OOGAN6dEOM4+qR9sdjoSYKEBpsr6GtPAQw4dy753ec5
-----END CERTIFICATE-----
......@@ -9,4 +9,6 @@ scikit_learn>=0.23.2
# ether requirements
absl-py==1.4.0
aiohttp==3.8.4
simplejson==3.18.4
simplejson==3.19.1
marshmallow-dataclass==8.5.12
stringcase==1.2.0
......@@ -40,7 +40,7 @@ function editReceivers(rx_json, id) {
if (receivers[id].single) isSingle = "checked";
var stationIDhtml =
`Station ID: <a href="${receivers[id].station_url}" target="_blank">${receivers[id].station_id}</a>`;
`Station ID: <p>${receivers[id].stationId}</p>`;
var singleModeHtml = `&emsp;Single Receiver Mode: <input ${isSingle} id="singlerx_toggle_${id}" type="checkbox" />`;
......@@ -51,7 +51,7 @@ function editReceivers(rx_json, id) {
var freqHtml = `Tuned to ${receivers[id].frequency} MHz`;
var edit_stationIDhtml =
`Station ID:<input style="width: 105px;" type="text" value="${receivers[id].station_id}" name="station_id_${id}" />`;
`Station ID:<input style="width: 105px;" type="text" value="${receivers[id].stationId}" name="station_id_${id}" />`;
var edit_locationHtml =
`Latitude:<input style="width: 105px;" type="text" value="${receivers[id].latitude}" name="station_lat_${id}" />
......@@ -142,8 +142,8 @@ function editReceivers(rx_json, id) {
// ****************************************************
// * Sends Rx station URL to backend and refreshes map
// ****************************************************
function makeNewRx(url) {
const new_rx = { "station_url": url };
function makeNewRx(stationId) {
const new_rx = { "stationId": stationId };
// console.log(new_rx);
const otherParams = {
headers: {
......@@ -178,8 +178,8 @@ function destroyRxCards() {
// *******************************************
// * Removes Rx from Backend and Reloads Map
// *******************************************
function deleteReceiver(uid) {
const del_rx = { "uid": uid };
function deleteReceiver(stationId) {
const del_rx = { "stationId": stationId };
// console.log(new_rx);
const otherParams = {
headers: {
......@@ -200,8 +200,8 @@ function deleteReceiver(uid) {
// *******************************************************
// * Updates Rx active state from Backend and Reloads Map
// *******************************************************
function activateReceiver(uid, state) {
const activate_rx = { "uid": uid, "state": state };
function activateReceiver(stationId, state) {
const activate_rx = { "stationId": stationId, "active": state };
const otherParams = {
headers: {
"content-type": "application/json"
......@@ -224,7 +224,7 @@ function showReceivers(rx_json, id) {
const receivers = rx_json['receivers'];
var stationIDhtml =
`Station ID: <a href="${receivers[id].station_url}" target="_blank">${receivers[id].station_id}</a>`;
`Station ID: <p>${receivers[id].stationId}</p>`;
var locationHtml =
`Location: ${receivers[id].latitude}&#176;, ${receivers[id].longitude}&#176;`;
......@@ -244,7 +244,7 @@ function showReceivers(rx_json, id) {
const headingspan = document.getElementById(`${id}-heading`);
const freqspan = document.getElementById(`${id}-freq`);
document.getElementById(`${id}-activate`)
.setAttribute('onclick', `activateReceiver(${receivers[id].uid}, ${!receivers[id].active})`);
.setAttribute('onclick', `activateReceiver('${receivers[id].stationId}', ${!receivers[id].active})`);
if (receivers[id].active == true) {
document.getElementById(`${id}-activate`)
......@@ -308,7 +308,7 @@ function createReceivers(rx_json, id) {
deletecheck.classList.add("edit-checkbox", "delete-icon");
deletecheck.type = 'checkbox';
deletecheck.id = `${receivers[i].uid}-delete`;
deletecheck.setAttribute('onclick', `deleteReceiver(${receivers[i].uid})`);
deletecheck.setAttribute('onclick', `deleteReceiver('${receivers[i].stationId}')`);
const activateiconspan = document.createElement('span');
activateiconspan.classList.add("material-icons", "activate-icon", "no-select");
......
""" Database writer. Obsolete code that was moved into the process and was
pushed out to this file """
import logging
import signal
import sqlite3
import time
from multiprocessing import Process, Value
from utils.ether_service.utils.log import init_logging
logger = logging.getLogger(__name__)
class DatabaseWriter(Process):
""" Database Writer thread """
def __init__(self, db_name, db_edit_queue, db_return_queue,
loglevel=logging.INFO):
self.database_name = db_name
self.db_edit_queue = db_edit_queue
self.db_return_queue = db_return_queue
self._is_running = Value('i', 0)
self._started = Value('i', 0)
self.loglevel = loglevel
super().__init__(name=self.__class__.__name__, daemon=True)
@property
def has_started(self) -> bool:
""" return _server """
with self._started.get_lock():
return self._started.value == 1
@has_started.setter
def has_started(self, value: bool) -> None:
""" set _started """
with self._started.get_lock():
self._started.value = 1 if value else 0
@property
def is_running(self):
""" Get _is_running safely """
with self._is_running.get_lock():
return self._is_running.value == 1
@is_running.setter
def is_running(self, value: bool):
""" set _is_running safely """
with self._is_running.get_lock():
self._is_running.value = 1 if value else 0
def start(self):
""" Start the process """
logger.warning("start")
self.is_running = True
super().start()
def stop(self):
""" Stop the process """
self.db_edit_queue.put(("close", None, True))
self.db_return_queue.get(timeout=1)
self.is_running = False
# noinspection SqlDialectInspection
def run(self):
""" One thread responsible for all database write operations. """
init_logging(level=self.loglevel)
signal.signal(signal.SIGINT, lambda x, y: None)
signal.signal(signal.SIGTERM, lambda x, y: None)
conn = sqlite3.connect(self.database_name)
c = conn.cursor()
c.execute("""CREATE TABLE IF NOT EXISTS receivers (
station_id TEXT UNIQUE,
isAuto INTEGER,
isMobile INTEGER,
isSingle INTEGER,
latitude REAL,
longitude REAL)
""")
c.execute("""CREATE TABLE IF NOT EXISTS interest_areas (
uid INTEGER,
aoi_type TEXT,
latitude REAL,
longitude REAL,
radius INTEGER)
""")
c.execute("""CREATE TABLE IF NOT EXISTS intersects (
id INTEGER PRIMARY KEY AUTOINCREMENT,
time INTEGER,
latitude REAL,
longitude REAL,
num_parents INTEGER,
confidence INTEGER,
aoi_id INTEGER)""")
c.execute("""CREATE TABLE IF NOT EXISTS lobs (time INTEGER,
station_id TEXT,
latitude REAL,
longitude REAL,
confidence INTEGER,
lob REAL)""")
conn.commit()
self.has_started = True
while self.is_running:
command, items, reply = self.db_edit_queue.get()
if command == "done":
conn.commit()
elif command == "close":
conn.commit()
conn.close()
if reply:
self.db_return_queue.put(True)
break
else:
c.executemany(command, items)
if reply:
self.db_return_queue.put(True)
time.sleep(0.001)
""" Kraken data packet implementation """
from dataclasses import dataclass
from typing import Dict
import marshmallow_dataclass
from marshmallow import post_dump, pre_load
from stringcase import camelcase, snakecase
from ..utils.functions import capitalize_first_char
@dataclass
class BasicDataclass:
""" Kraken Packet """
@pre_load
def to_snake_case(self, data, **_kwargs):
""" to snake case pre load method.
Converts javaStyle parameters into the python_style syntax.
:type data: dict
:return str"""
return {snakecase(key): value for key, value in data.items()}
@post_dump
def to_camel_case(self, data, **_kwargs):
""" to camel case post load method """
return {camelcase(key): value for key, value in data.items()}
@classmethod
def get_schema(cls, *args, **kwargs):
""" Get schema """
return marshmallow_dataclass.class_schema(cls)(*args, **kwargs)
def dump(self, *args, **kwargs) -> Dict:
""" Serialize to JSON object """
return self.get_schema(*args, **kwargs).dump(self)
@classmethod
def load(cls, data, *args, **kwargs) -> "BasicDataclass":
""" Deserialize from JSON object """
return cls.get_schema(*args, **kwargs).load(data)
""" Kraken data packet implementation """
from dataclasses import dataclass
from ..basic_dataclass import BasicDataclass
example_packet = {
"tStamp": 1681082256511,
"latitude": "46.6280265",
"longitude": "32.4401186",
"gpsBearing": "0",
"radioBearing": "249",
"conf": "0.69",
"power": "-69.9",
"freq": 441875000,
"antType": "ULA",
"latency": 100,
"doaArray": "0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.33,0.33,0.33,0.33,0.33,0.32,0.32,0.32,0.32,0.32,0.31,0.31,0.31,0.30,0.30,0.30,0.29,0.29,0.28,0.28,0.27,0.27,0.26,0.26,0.25,0.25,0.24,0.24,0.23,0.23,0.22,0.22,0.21,0.21,0.20,0.19,0.19,0.18,0.18,0.17,0.17,0.16,0.16,0.15,0.15,0.14,0.14,0.13,0.13,0.12,0.12,0.11,0.11,0.10,0.10,0.09,0.08,0.08,0.07,0.07,0.06,0.06,0.05,0.05,0.04,0.04,0.03,0.03,0.02,0.02,0.01,0.01,0.01,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.01,0.01,0.02,0.02,0.03,0.04,0.05,0.06,0.07,0.08,0.09,0.11,0.12,0.14,0.16,0.18,0.20,0.22,0.25,0.27,0.30,0.32,0.35,0.38,0.41,0.44,0.47,0.50,0.53,0.55,0.58,0.61,0.64,0.67,0.69,0.72,0.74,0.76,0.79,0.80,0.82,0.84,0.85,0.86,0.87,0.88,0.88,0.89,0.89,0.89,0.88,0.88,0.88,0.87,0.86,0.85,0.85,0.83,0.82,0.81,0.80,0.79,0.77,0.76,0.75,0.73,0.72,0.71,0.69,0.68,0.67,0.66,0.64,0.63,0.62,0.61,0.60,0.59,0.58,0.57,0.56,0.55,0.55,0.54,0.53,0.53,0.52,0.52,0.51,0.51,0.50,0.50,0.49,0.49,0.49,0.49,0.49,0.49,0.48,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,",
"id": "Fake"
}
@dataclass
class KrPacket(BasicDataclass):
""" Kraken Packet """
t_stamp: int
latitude: float
longitude: float
gps_bearing: int
radio_bearing: int # int or float ???
conf: float
power: float
freq: int
ant_type: str
latency: int # useless shit
doa_array: str
id: str
def __repr__(self):
return f"<KrP: {self.id}::{self.radio_bearing}>"
""" This is a Protocol to wirk with obsolete DF-Aggregator objects """
from typing import Protocol
""" This is a receiver (KrRPI) representation """
import logging
import time
from collections import OrderedDict
from dataclasses import dataclass, field
from typing import List, Dict
from marshmallow import post_dump, EXCLUDE
class Receiver(Protocol):
from .basic_dataclass import BasicDataclass
from .data.kr_packet import KrPacket
from ..utils.rb_tree import RBTree
logger = logging.getLogger(__name__)
load_only = dict(load_only=True)
@dataclass
class Bearing:
""" Bearing Param for calculation """
degrees: float
num: int
power: float
confidence: float
#
latitude: float = field(default=0.0)
longitude: float = field(default=0.0)
heading: float = field(default=0.0)
# noinspection PyPep8Naming
@dataclass
class Receiver(BasicDataclass):
""" Receiver object """
...
station_id: str
# TODO(s1z): obsolete fields. remove this as soon as possible
latitude: float = field(default=0.0)
longitude: float = field(default=0.0)
heading: float = field(default=0.0)
doa: float = field(default=0.0)
frequency: float = field(default=0.0)
power: float = field(default=0.0)
confidence: float = field(default=0.0)
doa_time: int = field(default=0)
mobile: bool = field(default=False)
single: bool = field(default=False)
active: bool = field(default=True)
auto: bool = field(default=True)
inverted: bool = field(default=True)
flipped: bool = field(default=False)
packets: List[KrPacket] = field(default_factory=list, metadata=load_only)
bearings: List[Bearing] = field(default_factory=list, metadata=load_only)
samplings: int = field(init=False, metadata=load_only)
tree: RBTree = field(init=False, metadata=load_only)
class Meta:
""" Meta """
unknown = EXCLUDE
@post_dump
def post_dump(self: "Receiver", data: Dict, **_kwargs) -> Dict:
""" Remove unwanted fields """
data.pop("packets", None)
data.pop("bearings", None)
data.pop("samplings", None)
data.pop("tree", None)
return data
def __post_init__(self):
self.tree = RBTree()
self.samplings = 10
def __eq__(self: "Receiver", other: "Receiver") -> bool:
""" compare two receivers """
return self.station_id == other.station_id
@property
def isActive(self: "Receiver") -> bool:
""" Always active """
return self.active
@property
def isSingle(self: "Receiver") -> bool:
""" link to the single parameter. Obsolete, remove me when fixed """
return self.single
@isSingle.setter
def isSingle(self: "Receiver", value: bool):
""" single setter """
self.single = value
@property
def isMobile(self: "Receiver") -> bool:
""" link to the mobile parameter. Obsolete, remove me when fixed """
return self.mobile
@isMobile.setter
def isMobile(self: "Receiver", value: bool) -> None:
""" mobile setter """
self.mobile = value
def receiver_dict(self: "Receiver") -> Dict[str, any]:
""" obsolete serializator """
return self.get_schema().dump(self)
def clear_bearings(self: "Receiver") -> None:
""" Remove all bearings """
nodes = self.tree.get_all()
for node in nodes:
node.items = []
self.tree.delete_node(node.val)
def update_config(self: "Receiver", packet: KrPacket) -> None:
""" Update latitude, longitude, frequency, heading """
# update position if needed
self.latitude = packet.latitude
self.longitude = packet.longitude
self.frequency = packet.freq
self.heading = packet.gps_bearing
@staticmethod
def get_median_bearing(bearing_one: Bearing,
bearing_two: Bearing) -> Bearing:
""" Get median bearing """
total_num = bearing_one.num + bearing_two.num
total_power = bearing_one.power + bearing_two.power
median_degrees = ((bearing_one.degrees * bearing_one.num) +
(bearing_two.degrees * bearing_two.num)) / total_num
total_confidence = bearing_one.confidence + bearing_two.confidence
return Bearing(median_degrees, total_num,
total_power, total_confidence)
def create_absolute_bearings(self, bearings: List[Bearing],
bearings_len: int) -> List[Bearing]:
""" Create bearings """
i = 0
absolute_bearings = []
while i < bearings_len:
curr_bearing = bearings[i]
if i == 0:
absolute_bearings.append(curr_bearing)
i += 1
prev_bearing = absolute_bearings.pop()
if abs(prev_bearing.degrees - curr_bearing.degrees) <= 3:
absolute_bearings.append(self.get_median_bearing(prev_bearing,
curr_bearing))
else:
absolute_bearings.append(curr_bearing)
i += 1
for a_bearing in absolute_bearings:
a_bearing.power = a_bearing.power / a_bearing.num
a_bearing.confidence = a_bearing.confidence / a_bearing.num
# This code is just copied from the old DF-AGG
if self.inverted:
a_bearing.degrees = self.heading + (360 - a_bearing.degrees)
elif self.flipped:
a_bearing.degrees = self.heading + (180 - a_bearing.degrees)
else:
a_bearing.degrees = self.heading + a_bearing.degrees
if a_bearing.degrees < 0:
a_bearing.degrees += 360
elif a_bearing.degrees > 359:
a_bearing.degrees -= 360
return absolute_bearings
def add_packet(self: "Receiver", packet: KrPacket) -> None:
""" Add KrRpi packet to the receiver """
now = int(time.time() * 1000)
# insert packet
self.tree.insert_node(packet.radio_bearing, packet)
bearings = []
bearings_len = 0
# remove old packets
nodes = self.tree.get_all()
for node in nodes:
bearing = Bearing(node.val, 0, 0, 0)
slice_id = None
for idx, item in enumerate(node.items):
if now - item.t_stamp > 20000: # 20 seconds old
slice_id = idx
else:
bearing.num += 1
bearing.power += item.power
bearing.confidence += item.conf
# add bearings
if bearing.num > 0:
bearings_len += 1
bearings.append(bearing)
# slice the list
if slice_id is not None:
node.items = node.items[slice_id:]
def update(self, first_run=False):
""" Some update logic """
...
# remove node if we have 0 records
if len(node.items) == 0:
self.tree.delete_node(node.val)
def receiver_dict(self):
""" Receiver representation in json format """
...
self.bearings = self.create_absolute_bearings(bearings, bearings_len)
def lob_length(self):
""" Some magic here, i suppose. We skip it for now.
There's a bit of hardcode with 200... """
...
@staticmethod
def lob_length() -> int:
""" lob_length """
return 40200
""" Ether web service """
import asyncio
import logging
from asyncio import AbstractEventLoop
from threading import Thread, Lock
from typing import List, Dict
from typing import List, Dict, Optional
from aiohttp import web
from utils.ether_service.entities.receiver import Receiver
from utils.ether_service.handlers.api.v1.ws import WSHandler
from utils.ether_service.utils import ProcessService
from utils.run_app_fixed import create_task
from utils.ws_client import WSClient
logger = logging.getLogger(__name__)
......@@ -20,15 +23,20 @@ class EtherService(Thread):
2. Do math on the background.
3. Update global variables for the obsolete web service."""
def __init__(self, web_service: Thread, db_writer: Thread,
receivers: List[Receiver], shared_dict: Dict,
def __init__(self, web_service: "BottleWebServer",
db_writer: "DatabaseWriter",
shared_dict: Dict,
ws_client: WSClient,
receiver_controller: ProcessService,
*,
io_loop: Optional[AbstractEventLoop] = None,
interface="0.0.0.0", port=8090):
super().__init__(name=self.__class__.__name__, daemon=True)
self.web_service = web_service
self.db_writer = db_writer
self.receivers = receivers
self.shared_dict = shared_dict
self.ws_client = ws_client
self.receiver_controller = receiver_controller
self._is_running = False
self._is_running_lock = Lock()
......@@ -36,7 +44,7 @@ class EtherService(Thread):
self.interface = interface
self.port = port
self.io_loop = asyncio.get_event_loop()
self.io_loop = io_loop or asyncio.get_event_loop()
self.web_app = web.Application()
self.web_app.add_routes([
WSHandler().compile(),
......@@ -58,10 +66,10 @@ class EtherService(Thread):
def run(self):
""" Thread routine """
self.is_running = True
self.main_task = create_task(self.web_app, host=self.interface,
port=self.port, loop=self.io_loop,
handle_signals=False,
print=lambda x: x)
# self.main_task = create_task(self.web_app, host=self.interface,
# port=self.port, loop=self.io_loop,
# handle_signals=False,
# print=lambda x: x)
logger.info("Server has started")
self.io_loop.run_forever()
logger.info("Server has stopped")
......@@ -75,7 +83,9 @@ class EtherService(Thread):
""" Stop the app """
logger.info("Server is stopping")
self.is_running = False
# add web_server.stop()
# add db_writer.stop()
# self.web_service.stop() # How do we stop this crap ?
self.db_writer.stop()
self.receiver_controller.stop()
if self.main_task is not None:
self.io_loop.call_soon_threadsafe(self.main_task.cancel)
self.io_loop.call_soon_threadsafe(self.io_loop.stop)
......@@ -85,6 +85,11 @@ class WSHandler(BaseHandler):
ws = None
connection_id = None
async def handle_text(self, data: str) -> None:
""" handle web socket data """
logging.info(f"Binary data received: {data}")
await self.ws.send_str(data)
@staticmethod
async def handle_binary(data: bytes) -> None:
""" handle web socket data """
......@@ -95,11 +100,11 @@ class WSHandler(BaseHandler):
if msg.type == aiohttp.WSMsgType.ERROR:
ex = self.ws.exception()
exc_info = (ex.__class__, ex, ex.__traceback__)
logging.error("Ws connection closed", exc_info=exc_info)
logging.error("Ws error", exc_info=exc_info)
elif msg.type == aiohttp.WSMsgType.BINARY:
await self.handle_binary(msg.data)
elif msg.type == aiohttp.WSMsgType.TEXT:
await self.ws.send_str(msg.data)
await self.handle_text(msg.data)
@auth_required
async def handler(self, request: Request) -> WebSocketResponse:
......
""" Thread Service implementation """
import logging
import signal
from multiprocessing import Process, Value, Queue
from time import sleep
from ..utils.log import init_logging
logger = logging.getLogger(__name__)
class ProcessService(Process):
""" Service that has start and stop methods and works as a daemon """
def __init__(self, loglevel=logging.INFO):
self.loglevel = loglevel
self._is_running = Value('i', 0)
self.cmd_queue = Queue()
super().__init__(name=self.__class__.__name__, daemon=True)
@property
def is_running(self):
""" Get _is_running safely """
with self._is_running.get_lock():
return self._is_running.value == 1
@is_running.setter
def is_running(self, value: bool):
""" set _is_running safely """
with self._is_running.get_lock():
self._is_running.value = 1 if value else 0
def start(self) -> None:
""" Start the process """
self.is_running = True
super().start()
def stop(self) -> None:
""" Stop the process """
self.is_running = False
self.cmd_queue.put((0, None))
def run(self) -> None:
""" Routine """
signal.signal(signal.SIGINT, lambda x, y: None)
signal.signal(signal.SIGTERM, lambda x, y: None)
init_logging(level=self.loglevel)
while self.is_running:
try:
command = self.cmd_queue.get()
self.handle_command(*command)
except Exception as ex:
exc_info = type(ex), ex, ex.__traceback__
logger.error("main loop execution error", exc_info=exc_info)
sleep(0.001) # sleep for 1ms to prevent 100% CPU load
self.cmd_queue.close()
logger.info(f"{self.name} has been stopped")
def handle_command(self, command: int, *params) -> None:
""" Handle command. Overwrite this in child classes """
raise NotImplementedError()
""" Decorators """
import warnings
def deprecated(f):
""" Show deprecated warning """
def wr(*args, **kwargs):
""" wrapper """
warnings.warn(f"{f} is Deprecated", DeprecationWarning)
return f(*args, **kwargs)
return wr
""" Useful functions """
def capitalize_first_char(data: str) -> str:
""" Capitalize only the first char """
if isinstance(data, str):
if len(data) > 0:
return data[:1].upper() + data[1:]
return data
raise AttributeError(f'Data type "{type(data)}" is not supported')
""" JSON helper module """
import sys
from typing import Any, Union, Optional, Mapping, Iterable
import simplejson as json
import simplejson.scanner as json_scanner
from absl import logging
def json_loads(data: Union[str, bytes], default: Optional[Any] = None) -> \
Union[Mapping[str, Any], Iterable[Mapping[str, Any]]]:
""" Json.loads wrapper, tries to load data and prints errors if any """
try:
j_data = json.loads(data, strict=False)
if isinstance(j_data, dict) or isinstance(j_data, list):
return j_data
except TypeError:
logging.error("json_loads::TypeError:", exc_info=sys.exc_info())
except json_scanner.JSONDecodeError:
logging.error("json_loads::JSONDecodeError:", exc_info=sys.exc_info())
return default
def json_dumps(*args: Any, **kwargs: Mapping[str, Any]) -> str:
""" Json.dumps wrapper, tries to dump data and prints errors if any """
try:
return json.dumps(*args, **kwargs)
except Exception:
logging.error("json_dumps::error:", exc_info=sys.exc_info())
raise
""" Web Socket Client """
import asyncio
import ssl
import sys
from asyncio import AbstractEventLoop
from typing import Callable, Optional
import aiohttp
from absl import logging
from yarl import URL
class WSClient:
""" WebSocket Client """
def __init__(self, io_loop: AbstractEventLoop, scheme: str, host: str,
path: str, user: str, password: str, ca_file: str = None,
on_data_handler: Optional[Callable] = None):
self.io_loop = io_loop
self.scheme = scheme
self.host = host
self.path = path
self.user = user
self.password = password
self.ca_file = ca_file
self.url = URL.build(scheme=self.scheme, host=self.host,
path=self.path, user=self.user,
password=self.password)
# Create SSL Context
self.ssl_context = ssl.create_default_context()
if self.ca_file is not None:
self.ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
self.ssl_context.load_verify_locations(cafile=self.ca_file)
self.is_running = False
self.ws = None
self.main_job = None
self.ping_job = None
self.send_job = None
self.queue = asyncio.Queue()
self.on_data_handlers = []
self.add_on_data_handler(on_data_handler)
def add_on_data_handler(self, handler: Callable) -> None:
""" Add on data handler """
if callable(handler):
self.io_loop.call_soon_threadsafe(self.on_data_handlers.append,
handler)
def del_on_data_handler(self, handler: Callable) -> None:
""" Del on data handler """
def delete():
""" real function """
try:
index = self.on_data_handlers.index(handler)
self.on_data_handlers.pop(index)
except ValueError:
pass
self.io_loop.call_soon_threadsafe(delete)
def on_data_received(self, data: bytes) -> None:
""" On data received via websocket """
for handler in self.on_data_handlers:
handler(data)
def send_data(self, data: str) -> None:
""" Send data to the server """
data_binary = data.encode("utf-8")
self.queue.put_nowait(data_binary)
async def send_worker(self):
""" Send worker """
data = None
while self.is_running:
if self.ws is not None:
if data is None:
data = await self.queue.get()
# noinspection PyBroadException
try:
await self.ws.send_bytes(data)
self.queue.task_done()
data = None
except Exception:
exc_info = sys.exc_info()
logging.warning("send worker failed with error",
exc_info=exc_info)
await asyncio.sleep(0)
async def ping_worker(self):
""" Ping worker """
while self.is_running:
await asyncio.sleep(10)
if self.ws is not None:
# noinspection PyBroadException
try:
await self.ws.ping()
except Exception:
exc_info = sys.exc_info()
logging.warning("ping worker failed with error",
exc_info=exc_info)
def start(self) -> None:
""" Start client """
self.stop()
self.is_running = True
self.main_job = self.io_loop.create_task(self.run())
self.ping_job = self.io_loop.create_task(self.ping_worker())
self.send_job = self.io_loop.create_task(self.send_worker())
def stop(self) -> None:
""" Stop client """
self.is_running = False
if self.main_job:
self.main_job.cancel()
self.main_job = None
if self.ping_job:
self.ping_job.cancel()
self.ping_job = None
if self.send_job:
self.send_job.cancel()
self.send_job = None
async def run(self):
""" start web socket connection """
while self.is_running:
session = aiohttp.ClientSession()
# noinspection PyBroadException
try:
async with session.ws_connect(self.url,
ssl=self.ssl_context) as ws:
logging.info("WS opened.")
self.ws = ws
async for msg in ws:
# noinspection PyUnresolvedReferences
if msg.type == aiohttp.WSMsgType.BINARY:
# noinspection PyUnresolvedReferences
self.on_data_received(msg.data)
elif msg.type == aiohttp.WSMsgType.TEXT:
logging.warning("WS received 'TEXT' data, "
"ignoring...")
elif msg.type == aiohttp.WSMsgType.CLOSED:
logging.warning("WS Closed, raising an IOError.")
raise IOError("WS Closed")
elif msg.type == aiohttp.WSMsgType.ERROR:
raise self.ws.exception()
except Exception as ex:
exc_info = (ex.__class__, ex, ex.__traceback__)
logging.warning("WS error.", exc_info=exc_info)
self.ws = None
await asyncio.sleep(0)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment