Skip to content

Commit

Permalink
Clear orphan coverage objects (#891)
Browse files Browse the repository at this point in the history
* Add clear_orphan_coverage_objects

* add CONSTRAINT ON DELETE CASCADE on coverage_objects and hexes

* Move coverage_objects update to its own module (set_invalidated_at)
Update clear_coverage_objects to also delete old valid clear_coverage_objects

* Fix later table and drop olf foreign key

* Keep last 10
  • Loading branch information
macpie authored Nov 18, 2024
1 parent d903cf7 commit 6742f2a
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
ALTER TABLE
hexes DROP CONSTRAINT IF EXISTS hexes_uuid_fkey;

ALTER TABLE
hexes
ADD
CONSTRAINT hexes_uuid_fkey FOREIGN KEY (uuid) REFERENCES coverage_objects(uuid) ON DELETE CASCADE;
71 changes: 58 additions & 13 deletions mobile_verifier/src/coverage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,33 @@ impl CoverageObject {
}
}

pub async fn set_invalidated_at(
exec: &mut Transaction<'_, Postgres>,
invalidated_at: DateTime<Utc>,
inserted_at: Option<DateTime<Utc>>,
radio_key: KeyType<'_>,
uuid: Option<Uuid>,
) -> anyhow::Result<()> {
sqlx::query(
r#"
UPDATE coverage_objects
SET invalidated_at = $1
WHERE inserted_at < $2
AND invalidated_at IS NULL
AND radio_key = $3
AND uuid != $4
"#,
)
.bind(invalidated_at)
.bind(inserted_at)
.bind(radio_key)
.bind(uuid)
.execute(&mut *exec)
.await?;

Ok(())
}

#[derive(Debug, Clone, FromRow)]
pub struct HexCoverage {
pub uuid: Uuid,
Expand Down Expand Up @@ -457,24 +484,42 @@ pub async fn clear_coverage_objects(
tx: &mut Transaction<'_, Postgres>,
timestamp: &DateTime<Utc>,
) -> Result<(), sqlx::Error> {
// Delete any hex coverage objects that were invalidated before the given timestamp
sqlx::query(
r#"
DELETE FROM hexes WHERE uuid IN (
SELECT uuid
FROM coverage_objects
WHERE invalidated_at < $1
)
"#,
)
.bind(timestamp)
.execute(&mut *tx)
.await?;
// We do not delete hexes here anymore because `38_coverage_objects_cascade_delete.sql` add CONSTRAINT ON DELETE CASCADE

// Remove all invalidated objects before timestamp
sqlx::query("DELETE FROM coverage_objects WHERE invalidated_at < $1")
.bind(timestamp)
.execute(&mut *tx)
.await?;

// Delete all but the last 10 valid coverage_objects entry per radio_key before a given timestamp
sqlx::query(
r#"
WITH orphan_coverage_objects AS (
SELECT
uuid
FROM (
SELECT
uuid,
radio_key,
inserted_at,
ROW_NUMBER() OVER (PARTITION BY radio_key ORDER BY inserted_at DESC) AS row_num
FROM
coverage_objects
WHERE
invalidated_at IS NULL AND inserted_at < $1
) AS ranked_rows
WHERE row_num > 10
)
DELETE FROM coverage_objects
USING orphan_coverage_objects
WHERE coverage_objects.uuid = orphan_coverage_objects.uuid;
"#,
)
.bind(timestamp)
.execute(&mut *tx)
.await?;

Ok(())
}

Expand Down
22 changes: 7 additions & 15 deletions mobile_verifier/src/heartbeats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pub mod wifi;

use crate::{
cell_type::{CellType, CellTypeLabel},
coverage::{CoverageClaimTimeCache, CoverageObjectCache, CoverageObjectMeta},
coverage::{self, CoverageClaimTimeCache, CoverageObjectCache, CoverageObjectMeta},
geofence::GeofenceValidator,
seniority::{Seniority, SeniorityUpdate},
GatewayResolution, GatewayResolver,
Expand Down Expand Up @@ -650,21 +650,13 @@ impl ValidatedHeartbeat {
}

pub async fn save(self, exec: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> {
sqlx::query(
r#"
UPDATE coverage_objects
SET invalidated_at = $1
WHERE inserted_at < $2
AND invalidated_at IS NULL
AND radio_key = $3
AND uuid != $4
"#,
coverage::set_invalidated_at(
exec,
self.heartbeat.timestamp,
self.coverage_meta.as_ref().map(|x| x.inserted_at),
self.heartbeat.key(),
self.heartbeat.coverage_object,
)
.bind(self.heartbeat.timestamp)
.bind(self.coverage_meta.as_ref().map(|x| x.inserted_at)) // Guaranteed not to be NULL
.bind(self.heartbeat.key())
.bind(self.heartbeat.coverage_object)
.execute(&mut *exec)
.await?;
// Save the heartbeat
match self.heartbeat.hb_type {
Expand Down

0 comments on commit 6742f2a

Please sign in to comment.