From 4c988e28689f1eb6ca0be3d5f044468a7d24fa76 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 3 Feb 2025 11:00:00 +0200 Subject: [PATCH] Rewrite the "sync_local" query. --- crates/core/src/sync_local.rs | 96 ++++++++++++++++++++++++++--------- 1 file changed, 72 insertions(+), 24 deletions(-) diff --git a/crates/core/src/sync_local.rs b/crates/core/src/sync_local.rs index 0e6602e..a9f3e3f 100644 --- a/crates/core/src/sync_local.rs +++ b/crates/core/src/sync_local.rs @@ -55,42 +55,90 @@ pub fn sync_local(db: *mut sqlite::sqlite3, _data: &str) -> Result?) + // | `--UNION ALL + // | `--SCAN ps_updated_rows + // |--SCAN b + // |--USE TEMP B-TREE FOR GROUP BY + // `--CORRELATED SCALAR SUBQUERY 3 + // `--SEARCH r USING INDEX ps_oplog_row (row_type=? AND row_id=?) + // language=SQLite let statement = db .prepare_v2( "\ --- 1. Filter oplog by the ops added but not applied yet (oplog b). --- SELECT DISTINCT / UNION is important for cases with many duplicate ids. WITH updated_rows AS ( - SELECT DISTINCT b.row_type, b.row_id FROM ps_buckets AS buckets - CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id - AND (b.op_id > buckets.last_applied_op) - UNION SELECT row_type, row_id FROM ps_updated_rows + SELECT b.row_type, b.row_id FROM ps_buckets AS buckets + CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id + AND (b.op_id > buckets.last_applied_op) + UNION ALL SELECT row_type, row_id FROM ps_updated_rows ) --- 3. Group the objects from different buckets together into a single one (ops). -SELECT b.row_type as type, - b.row_id as id, - r.data as data, - count(r.bucket) as buckets, - /* max() affects which row is used for 'data' */ - max(r.op_id) as op_id --- 2. Find *all* current ops over different buckets for those objects (oplog r). -FROM updated_rows b - LEFT OUTER JOIN ps_oplog AS r - ON r.row_type = b.row_type - AND r.row_id = b.row_id --- Group for (3) -GROUP BY b.row_type, b.row_id", +SELECT + b.row_type, + b.row_id, + ( + SELECT ifnull(r.data, max(r.op_id)) + FROM ps_oplog r + WHERE r.row_type = b.row_type + AND r.row_id = b.row_id + ) as data + FROM updated_rows b; + GROUP BY b.row_type, b.row_id; + ", ) .into_db_result(db)?; - // TODO: cache statements + // An alternative form of the query is this: + // + // SELECT r.row_type as type, + // r.row_id as id, + // r.data as data, + // max(r.op_id) as op_id + // FROM ps_oplog r + // GROUP BY r.row_type, r.row_id; + // + // This form is simple and fast, but does not filter only on updated rows. It also ignores ps_updated_rows. + // We could later add heuristics to use this form on initial sync, or when a large number of rows have been re-synced. + // + // QUERY PLAN + // `--SCAN r USING INDEX ps_oplog_row + + // TODO: cache individual statements while statement.step().into_db_result(db)? == ResultCode::ROW { let type_name = statement.column_text(0)?; let id = statement.column_text(1)?; - let buckets = statement.column_int(3)?; let data = statement.column_text(2); let table_name = internal_table_name(type_name); @@ -98,7 +146,7 @@ GROUP BY b.row_type, b.row_id", if tables.contains(&table_name) { let quoted = quote_internal_name(type_name, false); - if buckets == 0 { + if data.is_err() { // DELETE let delete_statement = db .prepare_v2(&format!("DELETE FROM {} WHERE id = ?", quoted)) @@ -115,7 +163,7 @@ GROUP BY b.row_type, b.row_id", insert_statement.exec()?; } } else { - if buckets == 0 { + if data.is_err() { // DELETE // language=SQLite let delete_statement = db