Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add implicit updates of channel records to update_user #734

Merged
merged 8 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 36 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions autoconnect/autoconnect-common/src/test_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ pub fn hello_db() -> MockDbClient {
pub fn hello_again_db(uaid: Uuid) -> MockDbClient {
let mut db = MockDbClient::new();
db.expect_get_user().times(1).return_once(move |_| {
Ok(Some(User {
uaid,
// Last connected 10 minutes ago
connected_at: ms_since_epoch() - (10 * 60 * 1000),
..Default::default()
}))
let user = User::builder()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sad that we can't call ..Default::default() because the _channels item isn't technically writable, but very happy to have a builder method instead.

.uaid(uaid)
.connected_at(ms_since_epoch() - (10 * 60 * 1000))
.build()
.unwrap();
Ok(Some(user))
});

db.expect_update_user().times(1).return_once(|_| Ok(true));
Expand Down
10 changes: 5 additions & 5 deletions autoconnect/autoconnect-ws/autoconnect-ws-sm/src/unidentified.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,11 @@ impl UnidentifiedClient {
// change from the previous state machine impl)
}

let user = User {
node_id: Some(self.app_state.router_url.to_owned()),
connected_at,
..Default::default()
};
let user = User::builder()
.node_id(self.app_state.router_url.to_owned())
.connected_at(connected_at)
.build()
.map_err(|e| SMErrorKind::Internal(format!("User::builder error: {e}")))?;
Ok(GetOrCreateUser {
user,
existing_user: false,
Expand Down
11 changes: 6 additions & 5 deletions autoendpoint/src/routers/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,14 +228,15 @@ pub mod tests {
data: Option<String>,
router_type: RouterType,
) -> Notification {
let user = User::builder()
.router_data(router_data)
.router_type(router_type.to_string())
.build()
.unwrap();
Notification {
message_id: "test-message-id".to_string(),
subscription: Subscription {
user: User {
router_data: Some(router_data),
router_type: router_type.to_string(),
..Default::default()
},
user,
channel_id: channel_id(),
vapid: None,
},
Expand Down
10 changes: 5 additions & 5 deletions autoendpoint/src/routes/registration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ pub async fn register_uaid_route(
incr_metric("ua.command.register", &app_state.metrics, &request);

// Register user and channel in database
let user = User {
router_type: path_args.router_type.to_string(),
router_data: Some(router_data),
..Default::default()
};
let user = User::builder()
.router_type(path_args.router_type.to_string())
.router_data(router_data)
.build()
.map_err(|e| ApiErrorKind::General(format!("User::builder error: {e}")))?;
let channel_id = router_data_input.channel_id.unwrap_or_else(Uuid::new_v4);
trace!("🌍 Creating user with UAID {}", user.uaid);
trace!("🌍 user = {:?}", user);
Expand Down
1 change: 1 addition & 0 deletions autopush-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ url.workspace = true

again = "0.1"
async-trait = "0.1"
derive_builder = "0.20"
gethostname = "0.4"
num_cpus = "1.16"
woothee = "0.13"
Expand Down
168 changes: 134 additions & 34 deletions autopush-common/src/db/bigtable/bigtable_client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fmt;
use std::fmt::Display;
Expand Down Expand Up @@ -28,7 +29,7 @@ use crate::db::{
};

pub use self::metadata::MetadataBuilder;
use self::row::Row;
use self::row::{Row, RowCells};
use super::pool::BigTablePool;
use super::BigTableDbSettings;

Expand Down Expand Up @@ -201,6 +202,46 @@ fn to_string(value: Vec<u8>, name: &str) -> Result<String, DbError> {
})
}

/// Parse the "set" (see [DbClient::add_channels]) of channel ids in a bigtable Row.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on extracting these into functions. Thanks!

///
/// Cells should solely contain the set of channels otherwise an Error is returned.
fn channels_from_cells(cells: &RowCells) -> DbResult<HashSet<Uuid>> {
let mut result = HashSet::new();
for cells in cells.values() {
let Some(cell) = cells.last() else {
continue;
};
let Some((_, chid)) = cell.qualifier.split_once("chid:") else {
return Err(DbError::Integrity(
"get_channels expected: chid:<chid>".to_owned(),
None,
));
};
result.insert(Uuid::from_str(chid).map_err(|e| DbError::General(e.to_string()))?);
}
Ok(result)
}

/// Convert the [HashSet] of channel ids to cell entries for a bigtable Row
fn channels_to_cells(channels: Cow<HashSet<Uuid>>, expiry: SystemTime) -> Vec<cell::Cell> {
let channels = channels.into_owned();
let mut cells = Vec::with_capacity(channels.len().min(100_000));
for (i, channel_id) in channels.into_iter().enumerate() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a desire to check the size of challens prior to iteration, or is it intended to be a processing limit at runtime while looping through? Just curious in case a check before iterating was valuable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we're going to begin enforcing a max limit of channels shortly, which will probably end up changing this check #733

// There is a limit of 100,000 mutations per batch for bigtable.
// https://cloud.google.com/bigtable/quotas
// If you have 100,000 channels, you have too many.
if i >= 100_000 {
break;
}
cells.push(cell::Cell {
qualifier: format!("chid:{}", channel_id.as_hyphenated()),
timestamp: expiry,
..Default::default()
});
}
cells
}

pub fn retry_policy(max: usize) -> RetryPolicy {
RetryPolicy::default()
.with_max_retries(max)
Expand Down Expand Up @@ -281,7 +322,7 @@ pub fn retryable_error(metrics: Arc<StatsdClient>) -> impl Fn(&grpcio::Error) ->
/// 2) When router TTLs are eventually enabled: `add_channel` and
/// `increment_storage` can write cells with later expiry times than the other
/// router cells
fn is_incomplete_router_record(cells: &HashMap<String, Vec<cell::Cell>>) -> bool {
fn is_incomplete_router_record(cells: &RowCells) -> bool {
cells
.keys()
.all(|k| ["current_timestamp", "version"].contains(&k.as_str()) || k.starts_with("chid:"))
Expand Down Expand Up @@ -770,6 +811,11 @@ impl BigTableClientImpl {
});
};

cells.extend(channels_to_cells(
Cow::Borrowed(&user.priv_channels),
expiry,
));

row.add_cells(ROUTER_FAMILY, cells);
row
}
Expand Down Expand Up @@ -942,6 +988,9 @@ impl DbClient for BigTableClientImpl {
result.current_timestamp = Some(to_u64(cell.value, "current_timestamp")?)
}

// Read the channels last, after removal of all non channel cells
result.priv_channels = channels_from_cells(&row.cells)?;

Ok(Some(result))
}

Expand Down Expand Up @@ -976,24 +1025,13 @@ impl DbClient for BigTableClientImpl {
let mut row = Row::new(row_key);
let expiry = std::time::SystemTime::now() + Duration::from_secs(MAX_CHANNEL_TTL);

let mut cells = Vec::with_capacity(channels.len().min(100_000));
for (i, channel_id) in channels.into_iter().enumerate() {
// There is a limit of 100,000 mutations per batch for bigtable.
// https://cloud.google.com/bigtable/quotas
// If you have 100,000 channels, you have too many.
if i >= 100_000 {
break;
}
cells.push(cell::Cell {
qualifier: format!("chid:{}", channel_id.as_hyphenated()),
timestamp: expiry,
..Default::default()
});
}
// Note: updating the version column isn't necessary here because this
// write only adds a new (or updates an existing) column with a 0 byte
// value
row.add_cells(ROUTER_FAMILY, cells);
row.add_cells(
ROUTER_FAMILY,
channels_to_cells(Cow::Owned(channels), expiry),
);

self.write_row(row).await?;
Ok(())
Expand All @@ -1011,23 +1049,10 @@ impl DbClient for BigTableClientImpl {
cq_filter,
]));

let mut result = HashSet::new();
if let Some(record) = self.read_row(req).await? {
for mut cells in record.cells.into_values() {
let Some(cell) = cells.pop() else {
continue;
};
let Some((_, chid)) = cell.qualifier.split_once("chid:") else {
return Err(DbError::Integrity(
"get_channels expected: chid:<chid>".to_owned(),
None,
));
};
result.insert(Uuid::from_str(chid).map_err(|e| DbError::General(e.to_string()))?);
}
}

Ok(result)
let Some(row) = self.read_row(req).await? else {
return Ok(Default::default());
};
channels_from_cells(&row.cells)
}

/// Delete the channel. Does not delete its associated pending messages.
Expand Down Expand Up @@ -1769,4 +1794,79 @@ mod tests {

client.remove_user(&uaid).await.unwrap();
}

#[actix_rt::test]
async fn channel_and_current_timestamp_ttl_updates() {
let client = new_client().unwrap();
let uaid = gen_test_uaid();
let chid = Uuid::parse_str(TEST_CHID).unwrap();
client.remove_user(&uaid).await.unwrap();

// Setup a user with some channels and a current_timestamp
let user = User {
uaid,
..Default::default()
};
client.add_user(&user).await.unwrap();

client.add_channel(&uaid, &chid).await.unwrap();
client
.add_channel(&uaid, &uuid::Uuid::new_v4())
.await
.unwrap();

client
.increment_storage(
&uaid,
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs(),
)
.await
.unwrap();

let req = client.read_row_request(&uaid.as_simple().to_string());
let Some(mut row) = client.read_row(req).await.unwrap() else {
panic!("Expected row");
};

// Ensure the initial expiry (timestamp) of all the cells in the row
let expiry = row.take_required_cell("connected_at").unwrap().timestamp;
for mut cells in row.cells.into_values() {
let Some(cell) = cells.pop() else {
continue;
};
assert!(
cell.timestamp >= expiry,
"{} cell timestamp should >= connected_at's",
cell.qualifier
);
}

let mut user = client.get_user(&uaid).await.unwrap().unwrap();
client.update_user(&mut user).await.unwrap();

// Ensure update_user updated the expiry (timestamp) of every cell in the row
let req = client.read_row_request(&uaid.as_simple().to_string());
let Some(mut row) = client.read_row(req).await.unwrap() else {
panic!("Expected row");
};

let expiry2 = row.take_required_cell("connected_at").unwrap().timestamp;
assert!(expiry2 > expiry);

for mut cells in row.cells.into_values() {
let Some(cell) = cells.pop() else {
continue;
};
assert_eq!(
cell.timestamp, expiry2,
"{} cell timestamp should match connected_at's",
cell.qualifier
);
}

client.remove_user(&uaid).await.unwrap();
}
}
4 changes: 3 additions & 1 deletion autopush-common/src/db/bigtable/bigtable_client/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use crate::db::error::{DbError, DbResult};

use super::{cell::Cell, RowKey};

pub type RowCells = HashMap<String, Vec<Cell>>;

/// A Bigtable storage row. Bigtable stores by Family ID which isn't
/// very useful for us later, so we overload this structure a bit.
/// When we read data back out of Bigtable, we index cells by
Expand All @@ -19,7 +21,7 @@ pub struct Row {
pub row_key: RowKey,
/// The row's collection of cells, indexed by either the
/// FamilyID (for write) or Qualifier (for read).
pub cells: HashMap<String, Vec<Cell>>,
pub cells: RowCells,
}

impl Row {
Expand Down
Loading