Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clear orphan coverage objects #891

Merged
merged 5 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
ALTER TABLE
hexes DROP CONSTRAINT IF EXISTS hexes_uuid_fkey;

ALTER TABLE
hexes
ADD
CONSTRAINT hexes_uuid_fkey FOREIGN KEY (uuid) REFERENCES coverage_objects(uuid) ON DELETE CASCADE;
71 changes: 58 additions & 13 deletions mobile_verifier/src/coverage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,33 @@ impl CoverageObject {
}
}

pub async fn set_invalidated_at(
exec: &mut Transaction<'_, Postgres>,
invalidated_at: DateTime<Utc>,
inserted_at: Option<DateTime<Utc>>,
radio_key: KeyType<'_>,
uuid: Option<Uuid>,
) -> anyhow::Result<()> {
sqlx::query(
r#"
UPDATE coverage_objects
SET invalidated_at = $1
WHERE inserted_at < $2
AND invalidated_at IS NULL
AND radio_key = $3
AND uuid != $4
"#,
)
.bind(invalidated_at)
.bind(inserted_at)
.bind(radio_key)
.bind(uuid)
.execute(&mut *exec)
.await?;

Ok(())
}

#[derive(Debug, Clone, FromRow)]
pub struct HexCoverage {
pub uuid: Uuid,
Expand Down Expand Up @@ -457,24 +484,42 @@ pub async fn clear_coverage_objects(
tx: &mut Transaction<'_, Postgres>,
timestamp: &DateTime<Utc>,
) -> Result<(), sqlx::Error> {
// Delete any hex coverage objects that were invalidated before the given timestamp
sqlx::query(
r#"
DELETE FROM hexes WHERE uuid IN (
SELECT uuid
FROM coverage_objects
WHERE invalidated_at < $1
)
"#,
)
.bind(timestamp)
.execute(&mut *tx)
.await?;
// We do not delete hexes here anymore because `38_coverage_objects_cascade_delete.sql` add CONSTRAINT ON DELETE CASCADE

// Remove all invalidated objects before timestamp
sqlx::query("DELETE FROM coverage_objects WHERE invalidated_at < $1")
.bind(timestamp)
.execute(&mut *tx)
.await?;

// Delete all but the last 10 valid coverage_objects entry per radio_key before a given timestamp
sqlx::query(
r#"
WITH orphan_coverage_objects AS (
SELECT
uuid
FROM (
SELECT
uuid,
radio_key,
inserted_at,
ROW_NUMBER() OVER (PARTITION BY radio_key ORDER BY inserted_at DESC) AS row_num
FROM
coverage_objects
WHERE
invalidated_at IS NULL AND inserted_at < $1
) AS ranked_rows
WHERE row_num > 10
)
DELETE FROM coverage_objects
USING orphan_coverage_objects
WHERE coverage_objects.uuid = orphan_coverage_objects.uuid;
"#,
)
.bind(timestamp)
.execute(&mut *tx)
.await?;

Ok(())
}

Expand Down
22 changes: 7 additions & 15 deletions mobile_verifier/src/heartbeats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pub mod wifi;

use crate::{
cell_type::{CellType, CellTypeLabel},
coverage::{CoverageClaimTimeCache, CoverageObjectCache, CoverageObjectMeta},
coverage::{self, CoverageClaimTimeCache, CoverageObjectCache, CoverageObjectMeta},
geofence::GeofenceValidator,
seniority::{Seniority, SeniorityUpdate},
GatewayResolution, GatewayResolver,
Expand Down Expand Up @@ -650,21 +650,13 @@ impl ValidatedHeartbeat {
}

pub async fn save(self, exec: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> {
sqlx::query(
r#"
UPDATE coverage_objects
SET invalidated_at = $1
WHERE inserted_at < $2
AND invalidated_at IS NULL
AND radio_key = $3
AND uuid != $4
"#,
coverage::set_invalidated_at(
exec,
self.heartbeat.timestamp,
self.coverage_meta.as_ref().map(|x| x.inserted_at),
self.heartbeat.key(),
self.heartbeat.coverage_object,
)
.bind(self.heartbeat.timestamp)
.bind(self.coverage_meta.as_ref().map(|x| x.inserted_at)) // Guaranteed not to be NULL
.bind(self.heartbeat.key())
.bind(self.heartbeat.coverage_object)
.execute(&mut *exec)
.await?;
// Save the heartbeat
match self.heartbeat.hb_type {
Expand Down