Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
D
df-aggregator
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Oleksandr Barabash
df-aggregator
Commits
684d30e8
Commit
684d30e8
authored
Dec 19, 2020
by
Corey Koval
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Performance Improvements
parent
2b83094a
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
87 additions
and
57 deletions
+87
-57
df-aggregator.py
df-aggregator.py
+76
-56
vincenty.py
vincenty.py
+11
-1
No files found.
df-aggregator.py
View file @
684d30e8
...
...
@@ -21,6 +21,7 @@ from multiprocessing import Process, Queue
from
bottle
import
route
,
run
,
request
,
get
,
post
,
put
,
response
,
redirect
,
template
,
static_file
DBSCAN_Q
=
Queue
()
DBSCAN_WAIT_Q
=
Queue
()
DATABASE_EDIT_Q
=
Queue
()
DATABASE_RETURN
=
Queue
()
...
...
@@ -198,8 +199,11 @@ def plot_intersects(lat_a, lon_a, doa_a, lat_b, lon_b, doa_b, max_distance = 100
# This becomes noticable at over 10k intersections.
#######################################################################
def
do_dbscan
(
X
):
DBSCAN_WAIT_Q
.
put
(
True
)
db
=
DBSCAN
(
eps
=
ms
.
eps
,
min_samples
=
ms
.
min_samp
)
.
fit
(
X
)
DBSCAN_Q
.
put
(
db
.
labels_
)
if
not
DBSCAN_WAIT_Q
.
empty
():
DBSCAN_WAIT_Q
.
get
()
###############################################
# Computes DBSCAN Alorithm is applicable,
...
...
@@ -222,7 +226,7 @@ def process_data(database_name):
for
aoi
in
aoi_list
:
print
(
f
"Checking AOI {aoi}."
)
curs
.
execute
(
'''SELECT longitude, latitude, time FROM intersects
WHERE aoi_id=? ORDER BY confidence LIMIT
50
000'''
,
[
aoi
])
WHERE aoi_id=? ORDER BY confidence LIMIT
25
000'''
,
[
aoi
])
intersect_array
=
np
.
array
(
curs
.
fetchall
())
if
intersect_array
.
size
!=
0
:
if
ms
.
eps
>
0
:
...
...
@@ -231,12 +235,23 @@ def process_data(database_name):
size_x
=
sys
.
getsizeof
(
X
)
/
1024
print
(
f
"The dataset is {size_x} kilobytes"
)
print
(
f
"Computing Clusters from {n_points} intersections."
)
while
not
DBSCAN_WAIT_Q
.
empty
():
print
(
"Waiting for my turn..."
)
time
.
sleep
(
1
)
starttime
=
time
.
time
()
db
=
Process
(
target
=
do_dbscan
,
args
=
(
X
,))
db
.
daemon
=
True
db
.
start
()
labels
=
DBSCAN_Q
.
get
()
db
.
join
()
try
:
labels
=
DBSCAN_Q
.
get
(
timeout
=
10
)
db
.
join
()
except
:
print
(
"DBSCAN took took long, terminated."
)
if
not
DBSCAN_WAIT_Q
.
empty
():
DBSCAN_WAIT_Q
.
get
()
db
.
terminate
()
return
likely_location
,
intersect_list
,
ellipsedata
stoptime
=
time
.
time
()
print
(
f
"DBSCAN took {stoptime - starttime} seconds to compute the clusters."
)
...
...
@@ -289,7 +304,7 @@ def process_data(database_name):
intersect_list
.
append
(
x
.
tolist
())
else
:
print
(
"No Intersections
."
)
print
(
f
"No Intersections in AOI {aoi}
."
)
conn
.
close
()
return
likely_location
,
intersect_list
,
ellipsedata
...
...
@@ -311,12 +326,12 @@ def purge_database(type, lat, lon, radius):
distance
=
v
.
inverse
(
x
,
(
lat
,
lon
))[
0
]
if
distance
<
radius
:
command
=
"DELETE FROM intersects WHERE latitude=? AND longitude=?"
DATABASE_EDIT_Q
.
put
((
command
,
x
))
DATABASE_RETURN
.
get
(
timeout
=
1
)
DATABASE_EDIT_Q
.
put
((
command
,
x
,
False
))
#
DATABASE_RETURN.get(timeout=1)
purge_count
+=
1
elif
type
==
"aoi"
:
pass
DATABASE_EDIT_Q
.
put
((
"done"
,
None
))
DATABASE_EDIT_Q
.
put
((
"done"
,
None
,
False
))
print
(
f
"I purged {purge_count} intersects."
)
@get
(
"/run_all_aoi_rules"
)
...
...
@@ -334,31 +349,31 @@ def run_aoi_rules():
conn
.
close
()
if
n_aoi
==
0
:
command
=
"UPDATE intersects SET aoi_id=?"
DATABASE_EDIT_Q
.
put
((
command
,
(
-
1
,)))
DATABASE_EDIT_Q
.
put
((
"done"
,
None
))
DATABASE_EDIT_Q
.
put
((
command
,
(
-
1
,)
,
True
))
DATABASE_EDIT_Q
.
put
((
"done"
,
None
,
False
))
DATABASE_RETURN
.
get
(
timeout
=
1
)
else
:
for
point
in
intersect_list
:
keep_list
=
[]
lat
,
lon
=
point
for
x
in
aoi_list
:
aoi
=
{
'uid'
:
x
[
0
],
'aoi_type'
:
x
[
1
],
'latitude'
:
x
[
2
],
'longitude'
:
x
[
3
],
'radius'
:
x
[
4
]
}
distance
=
v
.
inverse
((
aoi
[
'latitude'
],
aoi
[
'longitude'
]),
(
lat
,
lon
))[
0
]
if
aoi
[
'aoi_type'
]
==
"exclusion"
:
if
distance
<
aoi
[
'radius'
]:
#
aoi = {
#
'uid': x[0],
#
'aoi_type': x[1],
#
'latitude': x[2],
#
'longitude': x[3],
#
'radius': x[4]
#
}
distance
=
v
.
haversine
(
x
[
2
],
x
[
3
],
lat
,
lon
)
if
x
[
1
]
==
"exclusion"
:
if
distance
<
x
[
4
]:
keep_list
=
[
False
]
break
elif
aoi
[
'aoi_type'
]
==
"aoi"
:
if
distance
<
aoi
[
'radius'
]:
elif
x
[
1
]
==
"aoi"
:
if
distance
<
x
[
4
]:
command
=
"UPDATE intersects SET aoi_id=? WHERE latitude=? AND longitude=?"
params
=
(
aoi
[
'uid'
],
lat
,
lon
)
DATABASE_EDIT_Q
.
put
((
command
,
params
))
params
=
(
x
[
0
],
lat
,
lon
)
DATABASE_EDIT_Q
.
put
((
command
,
params
,
True
))
DATABASE_RETURN
.
get
(
timeout
=
1
)
sorted
+=
1
keep_list
.
append
(
True
)
...
...
@@ -367,11 +382,11 @@ def run_aoi_rules():
if
not
any
(
keep_list
):
command
=
"DELETE from intersects WHERE latitude=? AND longitude=?"
DATABASE_EDIT_Q
.
put
((
command
,
point
))
DATABASE_EDIT_Q
.
put
((
command
,
point
,
True
))
DATABASE_RETURN
.
get
(
timeout
=
1
)
purged
+=
1
DATABASE_EDIT_Q
.
put
((
"done"
,
None
))
DATABASE_EDIT_Q
.
put
((
"done"
,
None
,
False
))
print
(
f
"Purged {purged} intersections and sorted {sorted} intersections into {n_aoi} AOIs."
)
return
"OK"
...
...
@@ -456,6 +471,8 @@ def write_czml(best_point, all_the_points, ellipsedata):
for
x
in
ellipsedata
:
# rotation = 2 * np.pi - x[2]
if
x
[
0
]
>=
x
[
1
]:
# rotation = x[2]
semiMajorAxis
=
x
[
0
]
semiMinorAxis
=
x
[
1
]
rotation
=
2
*
np
.
pi
-
x
[
2
]
...
...
@@ -690,7 +707,11 @@ def update_rx(action):
add_receiver
(
receiver_url
)
elif
action
==
"del"
:
index
=
int
(
data
[
'uid'
])
del_receiver
(
receivers
[
index
]
.
station_id
)
command
=
"DELETE FROM receivers WHERE station_id=?"
DATABASE_EDIT_Q
.
put
((
command
,
[
int
(
data
[
'uid'
])],
True
))
DATABASE_RETURN
.
get
(
timeout
=
1
)
DATABASE_EDIT_Q
.
put
((
"done"
,
None
,
False
))
# del_receiver(receivers[index].station_id)
del
receivers
[
index
]
elif
action
==
"activate"
:
index
=
int
(
data
[
'uid'
])
...
...
@@ -743,13 +764,13 @@ def handle_interest_areas(action):
add_aoi
(
aoi_type
,
lat
,
lon
,
radius
)
elif
action
==
"del"
:
command
=
"UPDATE intersects SET aoi_id=? WHERE aoi_id=?"
DATABASE_EDIT_Q
.
put
((
command
,
(
-
1
,
data
[
'uid'
])
))
DATABASE_EDIT_Q
.
put
((
command
,
(
-
1
,
data
[
'uid'
]),
True
))
DATABASE_RETURN
.
get
(
timeout
=
1
)
to_table
=
str
(
data
[
'uid'
])
command
=
"DELETE FROM interest_areas WHERE uid=?"
DATABASE_EDIT_Q
.
put
((
command
,
to_table
))
DATABASE_EDIT_Q
.
put
((
command
,
to_table
,
True
))
DATABASE_RETURN
.
get
(
timeout
=
1
)
DATABASE_EDIT_Q
.
put
((
"done"
,
None
))
DATABASE_EDIT_Q
.
put
((
"done"
,
None
,
False
))
elif
action
==
"purge"
:
conn
=
sqlite3
.
connect
(
database_name
)
c
=
conn
.
cursor
()
...
...
@@ -811,7 +832,7 @@ def run_receiver(receivers):
if
keep
:
to_table
=
[
receivers
[
x
]
.
doa_time
,
avg_coord
[
0
],
avg_coord
[
1
],
len
(
intersect_list
),
avg_coord
[
2
],
in_aoi
]
command
=
"INSERT INTO intersects VALUES (?,?,?,?,?,?)"
DATABASE_EDIT_Q
.
put
((
command
,
to_table
))
DATABASE_EDIT_Q
.
put
((
command
,
to_table
,
True
))
DATABASE_RETURN
.
get
(
timeout
=
1
)
# Loop to compute intersections for a single receiver and update all receivers
...
...
@@ -831,6 +852,7 @@ def run_receiver(receivers):
lon_rxa
=
current_doa
[
3
]
conf_rxa
=
current_doa
[
4
]
doa_rxa
=
current_doa
[
5
]
keep_count
=
0
if
len
(
lob_array
)
>
1
:
for
previous
in
lob_array
:
lat_rxb
=
previous
[
0
]
...
...
@@ -850,17 +872,19 @@ def run_receiver(receivers):
# if intersection:
keep
,
in_aoi
=
check_aoi
(
*
intersection
[
0
:
2
])
if
keep
:
print
(
intersection
)
# print(intersection)
keep_count
+=
1
to_table
=
[
current_time
,
intersection
[
0
],
intersection
[
1
],
1
,
intersection
[
2
],
in_aoi
]
command
=
"INSERT INTO intersects VALUES (?,?,?,?,?,?)"
DATABASE_EDIT_Q
.
put
((
command
,
to_table
))
DATABASE_EDIT_Q
.
put
((
command
,
to_table
,
True
))
DATABASE_RETURN
.
get
(
timeout
=
1
)
print
(
f
"Computed and kept {keep_count} intersections."
)
command
=
"INSERT INTO lobs VALUES (?,?,?,?,?,?)"
DATABASE_EDIT_Q
.
put
((
command
,
current_doa
))
DATABASE_EDIT_Q
.
put
((
command
,
current_doa
,
True
))
DATABASE_RETURN
.
get
(
timeout
=
1
)
DATABASE_EDIT_Q
.
put
((
"done"
,
None
))
DATABASE_EDIT_Q
.
put
((
"done"
,
None
,
False
))
try
:
if
rx
.
isActive
:
rx
.
update
()
except
IOError
:
...
...
@@ -898,7 +922,7 @@ def check_aoi(lat, lon):
'longitude'
:
x
[
3
],
'radius'
:
x
[
4
]
}
distance
=
v
.
inverse
((
aoi
[
'latitude'
],
aoi
[
'longitude'
]),
(
lat
,
lon
))[
0
]
distance
=
v
.
haversine
(
aoi
[
'latitude'
],
aoi
[
'longitude'
],
lat
,
lon
)
if
aoi
[
'aoi_type'
]
==
"exclusion"
:
if
distance
<
aoi
[
'radius'
]:
keep
=
False
...
...
@@ -930,9 +954,9 @@ def add_receiver(receiver_url):
new_rx
[
'mobile'
],
new_rx
[
'single'
],
new_rx
[
'latitude'
],
new_rx
[
'longitude'
]]
command
=
"INSERT OR IGNORE INTO receivers VALUES (?,?,?,?,?,?,?)"
DATABASE_EDIT_Q
.
put
((
command
,
to_table
))
DATABASE_EDIT_Q
.
put
((
command
,
to_table
,
True
))
DATABASE_RETURN
.
get
(
timeout
=
1
)
DATABASE_EDIT_Q
.
put
((
"done"
,
None
))
DATABASE_EDIT_Q
.
put
((
"done"
,
None
,
False
))
mobile
=
c
.
execute
(
"SELECT isMobile FROM receivers WHERE station_id = ?"
,
[
new_rx
[
'station_id'
]])
.
fetchone
()[
0
]
single
=
c
.
execute
(
"SELECT isSingle FROM receivers WHERE station_id = ?"
,
...
...
@@ -977,22 +1001,12 @@ def update_rx_table():
latitude=?,
longitude=?
WHERE station_id = ?'''
DATABASE_EDIT_Q
.
put
((
command
,
to_table
))
DATABASE_EDIT_Q
.
put
((
command
,
to_table
,
True
))
# try:
DATABASE_RETURN
.
get
(
timeout
=
1
)
# except:
# pass
DATABASE_EDIT_Q
.
put
((
"done"
,
None
))
###############################################
# Removes a receiver from the program and
# database upon request.
###############################################
def
del_receiver
(
del_rx
):
command
=
"DELETE FROM receivers WHERE station_id=?"
DATABASE_EDIT_Q
.
put
((
command
,
[
del_rx
]))
DATABASE_RETURN
.
get
(
timeout
=
1
)
DATABASE_EDIT_Q
.
put
((
"done"
,
None
))
DATABASE_EDIT_Q
.
put
((
"done"
,
None
,
False
))
###############################################
# Updates the database with new interest areas.
...
...
@@ -1006,9 +1020,9 @@ def add_aoi(aoi_type, lat, lon, radius):
uid
=
(
prev_uid
+
1
)
if
prev_uid
!=
None
else
0
to_table
=
[
uid
,
aoi_type
,
lat
,
lon
,
radius
]
command
=
'INSERT INTO interest_areas VALUES (?,?,?,?,?)'
DATABASE_EDIT_Q
.
put
((
command
,
to_table
))
DATABASE_EDIT_Q
.
put
((
command
,
to_table
,
True
))
DATABASE_RETURN
.
get
(
timeout
=
1
)
DATABASE_EDIT_Q
.
put
((
"done"
,
None
))
DATABASE_EDIT_Q
.
put
((
"done"
,
None
,
False
))
#########################################
# Read all the AOIs from the DB
...
...
@@ -1021,6 +1035,10 @@ def fetch_aoi_data():
conn
.
close
()
return
aoi_list
###############################################
# One thread responsible for all database write
# operations.
###############################################
def
database_writer
():
conn
=
sqlite3
.
connect
(
database_name
)
c
=
conn
.
cursor
()
...
...
@@ -1055,17 +1073,19 @@ def database_writer():
lob REAL)'''
)
conn
.
commit
()
while
True
:
command
,
items
=
DATABASE_EDIT_Q
.
get
()
command
,
items
,
reply
=
DATABASE_EDIT_Q
.
get
()
if
command
==
"done"
:
conn
.
commit
()
elif
command
==
"close"
:
conn
.
commit
()
conn
.
close
()
DATABASE_RETURN
.
put
(
True
)
if
reply
:
DATABASE_RETURN
.
put
(
True
)
break
else
:
c
.
execute
(
command
,
items
)
DATABASE_RETURN
.
put
(
True
)
if
reply
:
DATABASE_RETURN
.
put
(
True
)
###############################################
# Thangs to do before closing the program.
...
...
@@ -1075,7 +1095,7 @@ def finish():
print
(
"Processing, please wait."
)
ms
.
receiving
=
False
update_rx_table
()
DATABASE_EDIT_Q
.
put
((
"close"
,
None
))
DATABASE_EDIT_Q
.
put
((
"close"
,
None
,
True
))
DATABASE_RETURN
.
get
(
timeout
=
1
)
if
geofile
!=
None
:
write_geojson
(
*
process_data
(
database_name
)[:
2
])
...
...
vincenty.py
View file @
684d30e8
...
...
@@ -15,7 +15,7 @@ from math import atan2
from
math
import
cos
from
math
import
radians
from
math
import
degrees
from
math
import
sin
from
math
import
sin
,
asin
from
math
import
sqrt
from
math
import
tan
from
math
import
pow
...
...
@@ -24,6 +24,16 @@ a=6378137.0 # radius at equator in meters (WGS-84)
f
=
1
/
298.257223563
# flattening of the ellipsoid (WGS-84)
b
=
(
1
-
f
)
*
a
def
haversine
(
lat1
,
lon1
,
lat2
,
lon2
):
# convert decimal degrees to radians
lon1
,
lat1
,
lon2
,
lat2
=
map
(
radians
,
[
lon1
,
lat1
,
lon2
,
lat2
])
# haversine formula
dlon
=
lon2
-
lon1
dlat
=
lat2
-
lat1
c
=
2
*
asin
(
sqrt
(
sin
(
dlat
/
2
)
**
2
+
cos
(
lat1
)
*
cos
(
lat2
)
*
sin
(
dlon
/
2
)
**
2
))
return
c
*
a
def
get_heading
(
coord1
,
coord2
):
lat1
=
radians
(
coord1
[
0
])
lon1
=
radians
(
coord1
[
1
])
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment