Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect columns used in generated columns #256

Merged
merged 11 commits into from
Aug 30, 2024
Merged
4 changes: 4 additions & 0 deletions crates/corro-agent/src/api/public/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ async fn sub_by_id(

let (evt_tx, evt_rx) = mpsc::channel(512);

let query_hash = matcher.hash().to_owned();
tokio::spawn(catch_up_sub(matcher, params, rx, evt_tx));

let (tx, body) = hyper::Body::channel();
Expand All @@ -102,6 +103,7 @@ async fn sub_by_id(
hyper::Response::builder()
.status(StatusCode::OK)
.header("corro-query-id", id.to_string())
.header("corro-query-hash", query_hash)
.body(body)
.expect("could not build query response body")
}
Expand Down Expand Up @@ -701,6 +703,7 @@ pub async fn api_v1_subs(
tripwire,
));

let query_hash = handle.hash().to_owned();
let matcher_id = match upsert_sub(
handle,
maybe_created,
Expand All @@ -718,6 +721,7 @@ pub async fn api_v1_subs(
hyper::Response::builder()
.status(StatusCode::OK)
.header("corro-query-id", matcher_id.to_string())
.header("corro-query-hash", query_hash)
.body(body)
.expect("could not generate ok http response for query request")
}
Expand Down
134 changes: 99 additions & 35 deletions crates/corro-types/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use corro_api_types::{
use enquote::unquote;
use fallible_iterator::FallibleIterator;
use indexmap::{IndexMap, IndexSet};
use metrics::counter;
use metrics::{counter, histogram};
use parking_lot::{Condvar, Mutex, RwLock};
use rusqlite::{
params_from_iter,
Expand Down Expand Up @@ -282,10 +282,12 @@ impl SubsManager {
match e {
mpsc::error::TrySendError::Full(item) => {
warn!("channel is full, falling back to async send");

let changes_tx = handle.inner.changes_tx.clone();
tokio::spawn(async move {
_ = changes_tx.send(item).await;
});
counter!("corro.subs.changes.channel.async_fallbacks_count", "sql_hash" => handle.inner.hash.clone()).increment(1);
}
mpsc::error::TrySendError::Closed(_) => {
if let Some(handle) = self.remove(id) {
Expand All @@ -298,6 +300,7 @@ impl SubsManager {
}
}

#[derive(Debug)]
struct MatchableChange<'a> {
table: &'a TableName,
pk: &'a [u8],
Expand Down Expand Up @@ -370,6 +373,10 @@ impl MatcherHandle {
self.inner.id
}

pub fn hash(&self) -> &str {
&self.inner.hash
}

pub fn parsed_columns(&self) -> &[ResultColumn] {
&self.inner.parsed.columns
}
Expand Down Expand Up @@ -527,12 +534,14 @@ impl MatcherHandle {
candidates: &mut MatchCandidates,
change: MatchableChange,
) -> bool {
trace!("filtering change {change:?}");
// don't double process the same pk
if candidates
.get(change.table)
.map(|pks| pks.contains(change.pk))
.unwrap_or_default()
{
trace!("already contained key");
return false;
}

Expand All @@ -545,6 +554,7 @@ impl MatcherHandle {
.map(|cols| change.column.is_crsql_sentinel() || cols.contains(change.column.as_str()))
.unwrap_or_default()
{
trace!("could not match against parsed query table and columns");
return false;
}

Expand Down Expand Up @@ -600,8 +610,9 @@ impl Matcher {
sql: &str,
) -> Result<(Matcher, MatcherHandle), MatcherError> {
let sub_path = Self::sub_path(subs_path.as_path(), id);
let sql_hash = hex::encode(seahash::hash(sql.as_bytes()).to_be_bytes());

info!("Initializing subscription at {sub_path}");
info!(%sql_hash, sub_id = %id, "Initializing subscription at {sub_path}");

std::fs::create_dir_all(&sub_path)?;

Expand Down Expand Up @@ -767,11 +778,13 @@ impl Matcher {
pks.get(tbl_name)
.cloned()
.ok_or(MatcherError::MissingPrimaryKeys)?
.to_vec()
.into_iter()
.map(|pk| format!("coalesce({pk}, \"\")"))
.collect::<Vec<_>>()
.join(","),
);

info!(sub_id = %id, "modified query for table '{tbl_name}': {new_query}");
info!(%sql_hash, sub_id = %id, "modified query for table '{tbl_name}': {new_query}");

statements.insert(
tbl_name.clone(),
Expand All @@ -791,8 +804,6 @@ impl Matcher {
// big channel to not miss anything
let (changes_tx, changes_rx) = mpsc::channel(20480);

let sql_hash = hex::encode(seahash::hash(sql.as_bytes()).to_be_bytes());

let handle = MatcherHandle {
inner: Arc::new(InnerMatcherHandle {
id,
Expand Down Expand Up @@ -1095,14 +1106,18 @@ impl Matcher {
tbl_name,
pks,
) {
info!(sub_id = %self.id, "query plan for table '{tbl_name}':\n{plan}");
info!(sub_id = %self.id, sql_hash = %self.hash, "query plan for table '{tbl_name}':\n{plan}");
}
}

Ok(())
}

async fn cmd_loop(mut self, mut state_conn: CrConn, mut tripwire: Tripwire) {
const PROCESS_CHANGES_THRESHOLD: usize = 1000;
const PROCESSING_WARN_THRESHOLD: Duration = Duration::from_secs(5);
const PROCESS_BUFFER_DEADLINE: Duration = Duration::from_millis(600);

info!(sub_id = %self.id, "Starting loop to run the subscription");
{
let (lock, cvar) = &*self.state;
Expand All @@ -1120,15 +1135,16 @@ impl Matcher {
let mut purge_changes_interval = tokio::time::interval(Duration::from_secs(300));

// max duration of aggregating candidates
let mut process_changes_interval = tokio::time::interval(Duration::from_millis(600));
let process_changes_deadline = tokio::time::sleep(PROCESS_BUFFER_DEADLINE);
tokio::pin!(process_changes_deadline);

loop {
enum Branch {
NewCandidates((MatchCandidates, CrsqlDbVersion)),
PurgeOldChanges,
}

trace!("looping...");
// trace!("looping...");

let branch = tokio::select! {
biased;
Expand All @@ -1147,7 +1163,7 @@ impl Matcher {
}
last_db_version = Some(db_version);

if buf_count >= 500 {
if buf_count >= PROCESS_CHANGES_THRESHOLD {
if let Some(db_version) = last_db_version.take() {
Branch::NewCandidates((std::mem::take(&mut buf), db_version))
} else {
Expand All @@ -1157,7 +1173,10 @@ impl Matcher {
continue;
}
},
_ = process_changes_interval.tick() => {
_ = process_changes_deadline.as_mut() => {
process_changes_deadline
.as_mut()
.reset((Instant::now() + PROCESS_BUFFER_DEADLINE).into());
if buf_count == 0 {
continue;
}
Expand Down Expand Up @@ -1189,10 +1208,24 @@ impl Matcher {
}
break;
}
debug!(sub_id = %self.id, "processed {buf_count} changes for subscription in {:?}", start.elapsed());
let elapsed = start.elapsed();

histogram!("corro.subs.changes.processing.duration.seconds", "sql_hash" => self.hash.clone()).record(elapsed);

if elapsed >= PROCESSING_WARN_THRESHOLD {
warn!(sub_id = %self.id, "processed {buf_count} changes (very slowly) for subscription in {elapsed:?}");
} else {
debug!(sub_id = %self.id, "processed {buf_count} changes for subscription in {elapsed:?}");
}
buf_count = 0;

// reset the deadline
process_changes_deadline
.as_mut()
.reset((Instant::now() + PROCESS_BUFFER_DEADLINE).into());
}
Branch::PurgeOldChanges => {
let start = Instant::now();
let res = block_in_place(|| {
let tx = self.conn.transaction()?;

Expand All @@ -1207,7 +1240,7 @@ impl Matcher {

match res {
Ok(deleted) => {
info!(sub_id = %self.id, "Deleted {deleted} old changes row")
info!(sub_id = %self.id, "Deleted {deleted} old changes row in {:?}", start.elapsed());
}
Err(e) => {
error!(sub_id = %self.id, "could not delete old changes: {e}");
Expand Down Expand Up @@ -1491,7 +1524,10 @@ impl Matcher {
for pks in pks {
tx.prepare_cached(&format!(
"INSERT INTO {tmp_table_name} VALUES ({})",
(0..pks.len()).map(|_i| "?").collect::<Vec<_>>().join(",")
(0..pks.len())
.map(|_i| "coalesce(?, \"\")")
.collect::<Vec<_>>()
.join(",")
))?
.execute(params_from_iter(pks))?;
}
Expand Down Expand Up @@ -1539,6 +1575,7 @@ impl Matcher {
))?;

for table in tables.iter() {
let start = Instant::now();
let stmt = match self.cached_statements.get(table.as_str()) {
Some(stmt) => stmt,
None => {
Expand Down Expand Up @@ -1690,6 +1727,9 @@ impl Matcher {
}
// clean that up
tx.execute_batch("DELETE FROM state_results")?;

let elapsed = start.elapsed();
histogram!("corro.subs.changes.processing.table.duration.seconds", "sql_hash" => self.hash.clone(), "table" => table.0.to_string()).record(elapsed);
}

// clean up temporary tables immediately
Expand Down Expand Up @@ -1892,7 +1932,7 @@ fn extract_select_columns(select: &Select, schema: &Schema) -> Result<ParsedSele
let entry =
parsed.table_columns.entry(tbl_name.0.clone()).or_default();
for name in names.iter() {
entry.insert(name.0.clone());
insert_col(entry, schema, &tbl_name.0, &name.0);
}
}
}
Expand All @@ -1913,6 +1953,20 @@ fn extract_select_columns(select: &Select, schema: &Schema) -> Result<ParsedSele
Ok(parsed)
}

fn insert_col(set: &mut HashSet<String>, schema: &Schema, tbl_name: &str, name: &str) {
let table = schema.tables.get(tbl_name);
if let Some(generated) =
table.and_then(|tbl| tbl.columns.get(name).and_then(|col| col.generated.as_ref()))
{
// recursively check for generated columns
for name in generated.from.iter() {
insert_col(set, schema, tbl_name, name);
}
} else {
set.insert(name.to_owned());
}
}

fn extract_expr_columns(
expr: &Expr,
schema: &Schema,
Expand All @@ -1923,21 +1977,29 @@ fn extract_expr_columns(
Expr::Qualified(tblname, colname) => {
let resolved_name = parsed.aliases.get(&tblname.0).unwrap_or(&tblname.0);
// println!("adding column: {resolved_name} => {colname:?}");
parsed
.table_columns
.entry(resolved_name.clone())
.or_default()
.insert(colname.0.clone());
insert_col(
parsed
.table_columns
.entry(resolved_name.clone())
.or_default(),
schema,
resolved_name,
&colname.0,
);
}
// simplest case but also mentioning the schema
Expr::DoublyQualified(schema_name, tblname, colname) if schema_name.0 == "main" => {
let resolved_name = parsed.aliases.get(&tblname.0).unwrap_or(&tblname.0);
// println!("adding column: {resolved_name} => {colname:?}");
parsed
.table_columns
.entry(resolved_name.clone())
.or_default()
.insert(colname.0.clone());
insert_col(
parsed
.table_columns
.entry(resolved_name.clone())
.or_default(),
schema,
resolved_name,
&colname.0,
);
}

Expr::Name(colname) => {
Expand All @@ -1958,11 +2020,12 @@ fn extract_expr_columns(
}

if let Some(found) = found {
parsed
.table_columns
.entry(found.to_owned())
.or_default()
.insert(check_col_name);
insert_col(
parsed.table_columns.entry(found.to_owned()).or_default(),
schema,
found,
&check_col_name,
);
} else {
return Err(MatcherError::TableForColumnNotFound {
col_name: check_col_name,
Expand All @@ -1988,11 +2051,12 @@ fn extract_expr_columns(
}

if let Some(found) = found {
parsed
.table_columns
.entry(found.to_owned())
.or_default()
.insert(colname.0.clone());
insert_col(
parsed.table_columns.entry(found.to_owned()).or_default(),
schema,
found,
&colname.0,
);
} else {
if colname.0.starts_with('"') {
return Ok(());
Expand Down
Loading
Loading